Difference between revisions of "Mimir push reverse proxy in go"
Jump to navigation
Jump to search
(8 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
+ | ``` | ||
+ | sqlite3 auth.db | ||
+ | ``` | ||
+ | |||
``` | ``` | ||
CREATE TABLE users ( | CREATE TABLE users ( | ||
username TEXT PRIMARY KEY, | username TEXT PRIMARY KEY, | ||
− | password TEXT NOT NULL | + | password TEXT NOT NULL, |
+ | org_id TEXT | ||
); | ); | ||
− | INSERT INTO users (username, password) VALUES ('admin', 'password123'); | + | INSERT INTO users (username, password, org_id) VALUES ('admin', 'password123', test); |
``` | ``` | ||
Line 13: | Line 18: | ||
import ( | import ( | ||
− | + | "bytes" | |
− | + | // "context" | |
− | + | "database/sql" | |
− | + | "encoding/base64" | |
− | + | "fmt" | |
− | + | "io" | |
− | + | "log" | |
− | + | "net/http" | |
− | + | "net/url" | |
+ | "os" | ||
+ | "strconv" | ||
+ | "strings" | ||
+ | "time" | ||
− | + | "github.com/golang/protobuf/proto" | |
+ | "github.com/golang/snappy" | ||
+ | _ "github.com/mattn/go-sqlite3" | ||
+ | pb "github.com/prometheus/prometheus/prompb" | ||
) | ) | ||
− | |||
var db *sql.DB | var db *sql.DB | ||
+ | var mimirURL *url.URL | ||
+ | var mimirUsername, mimirPassword string | ||
+ | var httpClient *http.Client | ||
+ | |||
+ | func getOrgID(username string) (string, error) { | ||
+ | var orgID string | ||
+ | err := db.QueryRow("SELECT org_id FROM users WHERE username = ?", username).Scan(&orgID) | ||
+ | if err != nil { | ||
+ | return "", err | ||
+ | } | ||
+ | return orgID, nil | ||
+ | } | ||
+ | |||
+ | // func parseRawMetric(raw string, username string) (*pb.WriteRequest, error) { | ||
+ | func parseRawMetric(raw string, username string, orgID string) (*pb.WriteRequest, error) { // Add orgID parameter | ||
+ | raw = strings.TrimSpace(raw) | ||
+ | |||
+ | idxOpen := strings.Index(raw, "{") | ||
+ | if idxOpen == -1 { | ||
+ | return nil, fmt.Errorf("invalid format: missing '{'") | ||
+ | } | ||
+ | idxClose := strings.Index(raw, "}") | ||
+ | if idxClose == -1 || idxClose < idxOpen { | ||
+ | return nil, fmt.Errorf("invalid format: missing '}'") | ||
+ | } | ||
+ | |||
+ | metricName := strings.TrimSpace(raw[:idxOpen]) | ||
+ | labelPart := raw[idxOpen+1 : idxClose] | ||
+ | rest := strings.TrimSpace(raw[idxClose+1:]) | ||
+ | |||
+ | parts := strings.Fields(rest) | ||
+ | if len(parts) < 2 { | ||
+ | return nil, fmt.Errorf("invalid format: missing value or timestamp") | ||
+ | } | ||
+ | |||
+ | value, err := strconv.ParseFloat(parts[0], 64) | ||
+ | if err != nil { | ||
+ | return nil, fmt.Errorf("invalid metric value: %v", err) | ||
+ | } | ||
+ | |||
+ | tsSec, err := strconv.ParseInt(parts[1], 10, 64) | ||
+ | if err != nil { | ||
+ | return nil, fmt.Errorf("invalid timestamp: %v", err) | ||
+ | } | ||
+ | timestamp := tsSec * 1000 | ||
+ | |||
+ | labels := []pb.Label{{Name: "__name__", Value: metricName}} | ||
+ | labelPart = strings.TrimSpace(labelPart) | ||
+ | if labelPart != "" { | ||
+ | labelPairs := strings.Split(labelPart, ",") | ||
+ | for _, pair := range labelPairs { | ||
+ | pair = strings.TrimSpace(pair) | ||
+ | if pair == "" { | ||
+ | continue | ||
+ | } | ||
+ | kv := strings.SplitN(pair, "=", 2) | ||
+ | if len(kv) != 2 { | ||
+ | return nil, fmt.Errorf("invalid label format: %s", pair) | ||
+ | } | ||
+ | key := strings.TrimSpace(kv[0]) | ||
+ | val := strings.Trim(strings.TrimSpace(kv[1]), "\"") | ||
+ | labels = append(labels, pb.Label{Name: key, Value: val}) | ||
+ | } | ||
+ | } | ||
+ | |||
+ | // Add the username and org_id label HERE: | ||
+ | labels = append(labels, pb.Label{Name: "username", Value: username}) | ||
+ | if orgID != "" { | ||
+ | labels = append(labels, pb.Label{Name: "org_id", Value: orgID}) // Use org_id label name | ||
+ | } | ||
+ | // | ||
+ | |||
+ | ts := pb.TimeSeries{ | ||
+ | Labels: labels, | ||
+ | Samples: []pb.Sample{{Value: value, Timestamp: timestamp}}, | ||
+ | } | ||
+ | |||
+ | writeReq := &pb.WriteRequest{ | ||
+ | Timeseries: []pb.TimeSeries{ts}, | ||
+ | } | ||
+ | return writeReq, nil | ||
+ | } | ||
func main() { | func main() { | ||
− | + | var err error | |
− | + | db, err = sql.Open("sqlite3", "./auth.db") | |
− | + | if err != nil { | |
− | + | log.Fatalf("Failed to open database: %v", err) | |
− | + | } | |
− | + | defer db.Close() | |
+ | |||
+ | mimirURLStr := os.Getenv("MIMIR_URL") | ||
+ | if mimirURLStr == "" { | ||
+ | mimirURLStr = "https://examplemimir.example.com/api/v1/push" | ||
+ | } | ||
+ | mimirURL, err = url.Parse(mimirURLStr) | ||
+ | if err != nil { | ||
+ | log.Fatalf("Invalid MIMIR_URL: %v", err) | ||
+ | } | ||
+ | |||
+ | mimirUsername = os.Getenv("MIMIR_USERNAME") | ||
+ | if mimirUsername == "" { | ||
+ | mimirUsername = "your_username" | ||
+ | } | ||
+ | mimirPassword = os.Getenv("MIMIR_PASSWORD") | ||
+ | if mimirPassword == "" { | ||
+ | mimirPassword = "your_password" | ||
+ | } | ||
+ | |||
+ | httpClient = &http.Client{ | ||
+ | Timeout: 10 * time.Second, | ||
+ | } | ||
+ | |||
+ | http.HandleFunc("/api/v1/push", func(w http.ResponseWriter, r *http.Request) { | ||
+ | if !authenticate(r) { | ||
+ | http.Error(w, "Unauthorized", http.StatusUnauthorized) | ||
+ | return | ||
+ | } | ||
+ | |||
+ | username, _, ok := r.BasicAuth() | ||
+ | if !ok { | ||
+ | http.Error(w, "Unauthorized", http.StatusUnauthorized) | ||
+ | return | ||
+ | } | ||
+ | |||
+ | // Get org_id from the database instead of from the header | ||
+ | orgID, err := getOrgID(username) | ||
+ | if err != nil { | ||
+ | http.Error(w, "Failed to retrieve org_id", http.StatusInternalServerError) | ||
+ | return | ||
+ | } | ||
+ | |||
+ | data, err := io.ReadAll(r.Body) | ||
+ | if err != nil { | ||
+ | http.Error(w, "Failed to read request body", http.StatusInternalServerError) | ||
+ | return | ||
+ | } | ||
+ | r.Body.Close() | ||
+ | |||
+ | writeReq, err := parseRawMetric(string(data), username, orgID) | ||
+ | if err != nil { | ||
+ | http.Error(w, fmt.Sprintf("Failed to parse metric: %v", err), http.StatusBadRequest) | ||
+ | log.Printf("Error parsing metric: %v, Raw data: %s", err, string(data)) | ||
+ | return | ||
+ | } | ||
+ | |||
+ | serialized, err := proto.Marshal(writeReq) | ||
+ | if err != nil { | ||
+ | http.Error(w, "Failed to marshal protobuf message", http.StatusInternalServerError) | ||
+ | log.Printf("Error marshaling protobuf: %v, WriteRequest: %+v", err, writeReq) | ||
+ | return | ||
+ | } | ||
+ | |||
+ | compressedData := snappy.Encode(nil, serialized) | ||
+ | |||
+ | backendReq, err := http.NewRequestWithContext(r.Context(), "POST", mimirURL.String(), bytes.NewReader(compressedData)) | ||
+ | if err != nil { | ||
+ | http.Error(w, "Failed to create backend request", http.StatusInternalServerError) | ||
+ | log.Printf("Error creating backend request: %v", err) | ||
+ | return | ||
+ | } | ||
+ | backendReq.Header.Set("Content-Type", "application/x-protobuf") | ||
+ | backendReq.Header.Set("Content-Encoding", "snappy") | ||
+ | backendReq.SetBasicAuth(mimirUsername, mimirPassword) | ||
+ | |||
+ | // Use the org_id from the database | ||
+ | if orgID != "" { | ||
+ | backendReq.Header.Set("X-Scope-OrgID", orgID) | ||
+ | } | ||
+ | |||
+ | resp, err := httpClient.Do(backendReq) | ||
+ | if err != nil { | ||
+ | http.Error(w, fmt.Sprintf("Failed to push to Mimir backend: %v", err), http.StatusBadGateway) | ||
+ | log.Printf("Error pushing to Mimir: %v, Mimir URL: %s", err, mimirURL.String()) | ||
+ | return | ||
+ | } | ||
+ | defer resp.Body.Close() | ||
− | + | if resp.StatusCode >= 400 { | |
− | + | body, _ := io.ReadAll(resp.Body) | |
− | + | http.Error(w, fmt.Sprintf("Mimir returned error: %s (Status Code: %d)", string(body), resp.StatusCode), http.StatusBadGateway) | |
− | + | log.Printf("Mimir returned error: %s (Status Code: %d), Mimir URL: %s", string(body), resp.StatusCode, mimirURL.String()) | |
− | + | return | |
+ | } | ||
− | + | w.WriteHeader(resp.StatusCode) | |
− | + | io.Copy(w, resp.Body) | |
− | + | }) | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | + | log.Println("HTTP server running on :8080") | |
− | + | log.Fatal(http.ListenAndServe(":8080", nil)) | |
} | } | ||
− | |||
func authenticate(r *http.Request) bool { | func authenticate(r *http.Request) bool { | ||
− | + | authHeader := r.Header.Get("Authorization") | |
− | + | if authHeader == "" || !strings.HasPrefix(authHeader, "Basic ") { | |
− | + | return false | |
− | + | } | |
− | + | decoded, err := base64.StdEncoding.DecodeString(strings.TrimPrefix(authHeader, "Basic ")) | |
− | + | if err != nil { | |
− | + | return false | |
− | + | } | |
− | + | parts := strings.SplitN(string(decoded), ":", 2) | |
− | + | if len(parts) != 2 { | |
− | + | return false | |
− | + | } | |
− | + | username, password := parts[0], parts[1] | |
− | + | return validateUser(username, password) | |
} | } | ||
− | |||
func validateUser(username, password string) bool { | func validateUser(username, password string) bool { | ||
− | + | var storedPassword string | |
− | + | err := db.QueryRow("SELECT password FROM users WHERE username = ?", username).Scan(&storedPassword) | |
− | + | if err != nil { | |
− | + | return false | |
− | + | } | |
− | + | return password == storedPassword | |
} | } | ||
``` | ``` | ||
+ | ``` | ||
+ | mkdir push-proxy | ||
+ | cd push-proxy | ||
+ | go mod init | ||
+ | go mod tidy | ||
+ | go build | ||
+ | ``` | ||
+ | |||
+ | ``` | ||
+ | set -a | ||
+ | MIMIR_URL=https://mimir-push-url.example.com/api/v1/push | ||
+ | MIMIR_USERNAME=mybasicauthuser | ||
+ | MIMIR_PASSWORD=mybasicauthpass | ||
+ | ``` | ||
+ | |||
+ | |||
+ | simple test | ||
``` | ``` | ||
curl -u admin:password123 http://localhost:8080/api/v1/push -d '{}' | curl -u admin:password123 http://localhost:8080/api/v1/push -d '{}' | ||
``` | ``` | ||
+ | |||
+ | simple test with data | ||
+ | ``` | ||
+ | curl -X POST -g \ | ||
+ | -u admin:password123 \ | ||
+ | --data "my_metric1{label1=\"value1\",label2=\"value2\"} 12.34 $(date +%s)" \ | ||
+ | http://localhost:8080/api/v1/push?job=my_job&instance=my_instance | ||
+ | ``` | ||
+ | or | ||
+ | ``` | ||
+ | current_time_seconds=$(date +%s) | ||
+ | ts=$((current_time_seconds - 0)) | ||
+ | rn=$(( ( RANDOM % 20 ) + 10 )) | ||
+ | |||
+ | curl -X POST -g \ | ||
+ | -u admin:password123 \ | ||
+ | --data "my_metric2{app=\"test",job=\"test\",instance=\"test\"} $rn ${ts}" \ | ||
+ | http://localhost:8080/api/v1/push | ||
+ | ``` | ||
+ | |||
+ | |||
+ | # Grafana DataSource add | ||
+ | |||
+ | X-Scope-OrgID: test | ||
+ | |||
+ | If nothing is passed you have access to org_id=anonymous |
Latest revision as of 03:41, 24 February 2025
sqlite3 auth.db
CREATE TABLE users ( username TEXT PRIMARY KEY, password TEXT NOT NULL, org_id TEXT ); INSERT INTO users (username, password, org_id) VALUES ('admin', 'password123', test);
package main import ( "bytes" // "context" "database/sql" "encoding/base64" "fmt" "io" "log" "net/http" "net/url" "os" "strconv" "strings" "time" "github.com/golang/protobuf/proto" "github.com/golang/snappy" _ "github.com/mattn/go-sqlite3" pb "github.com/prometheus/prometheus/prompb" ) var db *sql.DB var mimirURL *url.URL var mimirUsername, mimirPassword string var httpClient *http.Client func getOrgID(username string) (string, error) { var orgID string err := db.QueryRow("SELECT org_id FROM users WHERE username = ?", username).Scan(&orgID) if err != nil { return "", err } return orgID, nil } // func parseRawMetric(raw string, username string) (*pb.WriteRequest, error) { func parseRawMetric(raw string, username string, orgID string) (*pb.WriteRequest, error) { // Add orgID parameter raw = strings.TrimSpace(raw) idxOpen := strings.Index(raw, "{") if idxOpen == -1 { return nil, fmt.Errorf("invalid format: missing '{'") } idxClose := strings.Index(raw, "}") if idxClose == -1 || idxClose < idxOpen { return nil, fmt.Errorf("invalid format: missing '}'") } metricName := strings.TrimSpace(raw[:idxOpen]) labelPart := raw[idxOpen+1 : idxClose] rest := strings.TrimSpace(raw[idxClose+1:]) parts := strings.Fields(rest) if len(parts) < 2 { return nil, fmt.Errorf("invalid format: missing value or timestamp") } value, err := strconv.ParseFloat(parts[0], 64) if err != nil { return nil, fmt.Errorf("invalid metric value: %v", err) } tsSec, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { return nil, fmt.Errorf("invalid timestamp: %v", err) } timestamp := tsSec * 1000 labels := []pb.Label{{Name: "__name__", Value: metricName}} labelPart = strings.TrimSpace(labelPart) if labelPart != "" { labelPairs := strings.Split(labelPart, ",") for _, pair := range labelPairs { pair = strings.TrimSpace(pair) if pair == "" { continue } kv := strings.SplitN(pair, "=", 2) if len(kv) != 2 { return nil, fmt.Errorf("invalid label format: %s", pair) } key := strings.TrimSpace(kv[0]) val := strings.Trim(strings.TrimSpace(kv[1]), "\"") labels = append(labels, pb.Label{Name: key, Value: val}) } } // Add the username and org_id label HERE: labels = append(labels, pb.Label{Name: "username", Value: username}) if orgID != "" { labels = append(labels, pb.Label{Name: "org_id", Value: orgID}) // Use org_id label name } // ts := pb.TimeSeries{ Labels: labels, Samples: []pb.Sample{{Value: value, Timestamp: timestamp}}, } writeReq := &pb.WriteRequest{ Timeseries: []pb.TimeSeries{ts}, } return writeReq, nil } func main() { var err error db, err = sql.Open("sqlite3", "./auth.db") if err != nil { log.Fatalf("Failed to open database: %v", err) } defer db.Close() mimirURLStr := os.Getenv("MIMIR_URL") if mimirURLStr == "" { mimirURLStr = "https://examplemimir.example.com/api/v1/push" } mimirURL, err = url.Parse(mimirURLStr) if err != nil { log.Fatalf("Invalid MIMIR_URL: %v", err) } mimirUsername = os.Getenv("MIMIR_USERNAME") if mimirUsername == "" { mimirUsername = "your_username" } mimirPassword = os.Getenv("MIMIR_PASSWORD") if mimirPassword == "" { mimirPassword = "your_password" } httpClient = &http.Client{ Timeout: 10 * time.Second, } http.HandleFunc("/api/v1/push", func(w http.ResponseWriter, r *http.Request) { if !authenticate(r) { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } username, _, ok := r.BasicAuth() if !ok { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } // Get org_id from the database instead of from the header orgID, err := getOrgID(username) if err != nil { http.Error(w, "Failed to retrieve org_id", http.StatusInternalServerError) return } data, err := io.ReadAll(r.Body) if err != nil { http.Error(w, "Failed to read request body", http.StatusInternalServerError) return } r.Body.Close() writeReq, err := parseRawMetric(string(data), username, orgID) if err != nil { http.Error(w, fmt.Sprintf("Failed to parse metric: %v", err), http.StatusBadRequest) log.Printf("Error parsing metric: %v, Raw data: %s", err, string(data)) return } serialized, err := proto.Marshal(writeReq) if err != nil { http.Error(w, "Failed to marshal protobuf message", http.StatusInternalServerError) log.Printf("Error marshaling protobuf: %v, WriteRequest: %+v", err, writeReq) return } compressedData := snappy.Encode(nil, serialized) backendReq, err := http.NewRequestWithContext(r.Context(), "POST", mimirURL.String(), bytes.NewReader(compressedData)) if err != nil { http.Error(w, "Failed to create backend request", http.StatusInternalServerError) log.Printf("Error creating backend request: %v", err) return } backendReq.Header.Set("Content-Type", "application/x-protobuf") backendReq.Header.Set("Content-Encoding", "snappy") backendReq.SetBasicAuth(mimirUsername, mimirPassword) // Use the org_id from the database if orgID != "" { backendReq.Header.Set("X-Scope-OrgID", orgID) } resp, err := httpClient.Do(backendReq) if err != nil { http.Error(w, fmt.Sprintf("Failed to push to Mimir backend: %v", err), http.StatusBadGateway) log.Printf("Error pushing to Mimir: %v, Mimir URL: %s", err, mimirURL.String()) return } defer resp.Body.Close() if resp.StatusCode >= 400 { body, _ := io.ReadAll(resp.Body) http.Error(w, fmt.Sprintf("Mimir returned error: %s (Status Code: %d)", string(body), resp.StatusCode), http.StatusBadGateway) log.Printf("Mimir returned error: %s (Status Code: %d), Mimir URL: %s", string(body), resp.StatusCode, mimirURL.String()) return } w.WriteHeader(resp.StatusCode) io.Copy(w, resp.Body) }) log.Println("HTTP server running on :8080") log.Fatal(http.ListenAndServe(":8080", nil)) } func authenticate(r *http.Request) bool { authHeader := r.Header.Get("Authorization") if authHeader == "" || !strings.HasPrefix(authHeader, "Basic ") { return false } decoded, err := base64.StdEncoding.DecodeString(strings.TrimPrefix(authHeader, "Basic ")) if err != nil { return false } parts := strings.SplitN(string(decoded), ":", 2) if len(parts) != 2 { return false } username, password := parts[0], parts[1] return validateUser(username, password) } func validateUser(username, password string) bool { var storedPassword string err := db.QueryRow("SELECT password FROM users WHERE username = ?", username).Scan(&storedPassword) if err != nil { return false } return password == storedPassword }
mkdir push-proxy cd push-proxy go mod init go mod tidy go build
set -a MIMIR_URL=https://mimir-push-url.example.com/api/v1/push MIMIR_USERNAME=mybasicauthuser MIMIR_PASSWORD=mybasicauthpass
simple test
curl -u admin:password123 http://localhost:8080/api/v1/push -d '{}'
simple test with data
curl -X POST -g \ -u admin:password123 \ --data "my_metric1{label1=\"value1\",label2=\"value2\"} 12.34 $(date +%s)" \ http://localhost:8080/api/v1/push?job=my_job&instance=my_instance
or
current_time_seconds=$(date +%s) ts=$((current_time_seconds - 0)) rn=$(( ( RANDOM % 20 ) + 10 )) curl -X POST -g \ -u admin:password123 \ --data "my_metric2{app=\"test",job=\"test\",instance=\"test\"} $rn ${ts}" \ http://localhost:8080/api/v1/push
Grafana DataSource add
X-Scope-OrgID: test
If nothing is passed you have access to org_id=anonymous