Mimir push reverse proxy in go
Jump to navigation
Jump to search
sqlite3 auth.db
CREATE TABLE users (
username TEXT PRIMARY KEY,
password TEXT NOT NULL,
org_id TEXT
);
INSERT INTO users (username, password, org_id) VALUES ('admin', 'password123', test);
package main
import (
"bytes"
// "context"
"database/sql"
"encoding/base64"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
_ "github.com/mattn/go-sqlite3"
pb "github.com/prometheus/prometheus/prompb"
)
var db *sql.DB
var mimirURL *url.URL
var mimirUsername, mimirPassword string
var httpClient *http.Client
func getOrgID(username string) (string, error) {
var orgID string
err := db.QueryRow("SELECT org_id FROM users WHERE username = ?", username).Scan(&orgID)
if err != nil {
return "", err
}
return orgID, nil
}
// func parseRawMetric(raw string, username string) (*pb.WriteRequest, error) {
func parseRawMetric(raw string, username string, orgID string) (*pb.WriteRequest, error) { // Add orgID parameter
raw = strings.TrimSpace(raw)
idxOpen := strings.Index(raw, "{")
if idxOpen == -1 {
return nil, fmt.Errorf("invalid format: missing '{'")
}
idxClose := strings.Index(raw, "}")
if idxClose == -1 || idxClose < idxOpen {
return nil, fmt.Errorf("invalid format: missing '}'")
}
metricName := strings.TrimSpace(raw[:idxOpen])
labelPart := raw[idxOpen+1 : idxClose]
rest := strings.TrimSpace(raw[idxClose+1:])
parts := strings.Fields(rest)
if len(parts) < 2 {
return nil, fmt.Errorf("invalid format: missing value or timestamp")
}
value, err := strconv.ParseFloat(parts[0], 64)
if err != nil {
return nil, fmt.Errorf("invalid metric value: %v", err)
}
tsSec, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid timestamp: %v", err)
}
timestamp := tsSec * 1000
labels := []pb.Label{{Name: "__name__", Value: metricName}}
labelPart = strings.TrimSpace(labelPart)
if labelPart != "" {
labelPairs := strings.Split(labelPart, ",")
for _, pair := range labelPairs {
pair = strings.TrimSpace(pair)
if pair == "" {
continue
}
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 {
return nil, fmt.Errorf("invalid label format: %s", pair)
}
key := strings.TrimSpace(kv[0])
val := strings.Trim(strings.TrimSpace(kv[1]), "\"")
labels = append(labels, pb.Label{Name: key, Value: val})
}
}
// Add the username and org_id label HERE:
labels = append(labels, pb.Label{Name: "username", Value: username})
if orgID != "" {
labels = append(labels, pb.Label{Name: "org_id", Value: orgID}) // Use org_id label name
}
//
ts := pb.TimeSeries{
Labels: labels,
Samples: []pb.Sample{{Value: value, Timestamp: timestamp}},
}
writeReq := &pb.WriteRequest{
Timeseries: []pb.TimeSeries{ts},
}
return writeReq, nil
}
func main() {
var err error
db, err = sql.Open("sqlite3", "./auth.db")
if err != nil {
log.Fatalf("Failed to open database: %v", err)
}
defer db.Close()
mimirURLStr := os.Getenv("MIMIR_URL")
if mimirURLStr == "" {
mimirURLStr = "https://examplemimir.example.com/api/v1/push"
}
mimirURL, err = url.Parse(mimirURLStr)
if err != nil {
log.Fatalf("Invalid MIMIR_URL: %v", err)
}
mimirUsername = os.Getenv("MIMIR_USERNAME")
if mimirUsername == "" {
mimirUsername = "your_username"
}
mimirPassword = os.Getenv("MIMIR_PASSWORD")
if mimirPassword == "" {
mimirPassword = "your_password"
}
httpClient = &http.Client{
Timeout: 10 * time.Second,
}
http.HandleFunc("/api/v1/push", func(w http.ResponseWriter, r *http.Request) {
if !authenticate(r) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
username, _, ok := r.BasicAuth()
if !ok {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// Get org_id from the database instead of from the header
orgID, err := getOrgID(username)
if err != nil {
http.Error(w, "Failed to retrieve org_id", http.StatusInternalServerError)
return
}
data, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusInternalServerError)
return
}
r.Body.Close()
writeReq, err := parseRawMetric(string(data), username, orgID)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to parse metric: %v", err), http.StatusBadRequest)
log.Printf("Error parsing metric: %v, Raw data: %s", err, string(data))
return
}
serialized, err := proto.Marshal(writeReq)
if err != nil {
http.Error(w, "Failed to marshal protobuf message", http.StatusInternalServerError)
log.Printf("Error marshaling protobuf: %v, WriteRequest: %+v", err, writeReq)
return
}
compressedData := snappy.Encode(nil, serialized)
backendReq, err := http.NewRequestWithContext(r.Context(), "POST", mimirURL.String(), bytes.NewReader(compressedData))
if err != nil {
http.Error(w, "Failed to create backend request", http.StatusInternalServerError)
log.Printf("Error creating backend request: %v", err)
return
}
backendReq.Header.Set("Content-Type", "application/x-protobuf")
backendReq.Header.Set("Content-Encoding", "snappy")
backendReq.SetBasicAuth(mimirUsername, mimirPassword)
// Use the org_id from the database
if orgID != "" {
backendReq.Header.Set("X-Scope-OrgID", orgID)
}
resp, err := httpClient.Do(backendReq)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to push to Mimir backend: %v", err), http.StatusBadGateway)
log.Printf("Error pushing to Mimir: %v, Mimir URL: %s", err, mimirURL.String())
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
http.Error(w, fmt.Sprintf("Mimir returned error: %s (Status Code: %d)", string(body), resp.StatusCode), http.StatusBadGateway)
log.Printf("Mimir returned error: %s (Status Code: %d), Mimir URL: %s", string(body), resp.StatusCode, mimirURL.String())
return
}
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
})
log.Println("HTTP server running on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func authenticate(r *http.Request) bool {
authHeader := r.Header.Get("Authorization")
if authHeader == "" || !strings.HasPrefix(authHeader, "Basic ") {
return false
}
decoded, err := base64.StdEncoding.DecodeString(strings.TrimPrefix(authHeader, "Basic "))
if err != nil {
return false
}
parts := strings.SplitN(string(decoded), ":", 2)
if len(parts) != 2 {
return false
}
username, password := parts[0], parts[1]
return validateUser(username, password)
}
func validateUser(username, password string) bool {
var storedPassword string
err := db.QueryRow("SELECT password FROM users WHERE username = ?", username).Scan(&storedPassword)
if err != nil {
return false
}
return password == storedPassword
}
mkdir push-proxy cd push-proxy go mod init go mod tidy go build
set -a MIMIR_URL=https://mimir-push-url.example.com/api/v1/push MIMIR_USERNAME=mybasicauthuser MIMIR_PASSWORD=mybasicauthpass
simple test
curl -u admin:password123 http://localhost:8080/api/v1/push -d '{}'
simple test with data
curl -X POST -g \
-u admin:password123 \
--data "my_metric1{label1=\"value1\",label2=\"value2\"} 12.34 $(date +%s)" \
http://localhost:8080/api/v1/push?job=my_job&instance=my_instance
or
current_time_seconds=$(date +%s)
ts=$((current_time_seconds - 0))
rn=$(( ( RANDOM % 20 ) + 10 ))
curl -X POST -g \
-u admin:password123 \
--data "my_metric2{app=\"test",job=\"test\",instance=\"test\"} $rn ${ts}" \
http://localhost:8080/api/v1/push
Grafana DataSource add
X-Scope-OrgID: test
If nothing is passed you have access to org_id=anonymous