Difference between revisions of "Mimir push reverse proxy in go"
Jump to navigation
Jump to search
| Line 6: | Line 6: | ||
CREATE TABLE users ( | CREATE TABLE users ( | ||
username TEXT PRIMARY KEY, | username TEXT PRIMARY KEY, | ||
| − | password TEXT NOT NULL | + | password TEXT NOT NULL, |
| + | org_id TEXT | ||
); | ); | ||
| Line 17: | Line 18: | ||
import ( | import ( | ||
| − | "bytes" // "context" "database/sql" "encoding/base64" "fmt" "io" | + | "bytes" |
| − | "log" "net/http" "net/url" "os" "strconv" "strings" "time" | + | // "context" |
| + | "database/sql" | ||
| + | "encoding/base64" | ||
| + | "fmt" | ||
| + | "io" | ||
| + | "log" | ||
| + | "net/http" | ||
| + | "net/url" | ||
| + | "os" | ||
| + | "strconv" | ||
| + | "strings" | ||
| + | "time" | ||
| − | "github.com/golang/protobuf/proto" "github.com/golang/snappy" | + | "github.com/golang/protobuf/proto" |
| − | + | "github.com/golang/snappy" | |
| − | "github.com/ | + | _ "github.com/mattn/go-sqlite3" |
| + | pb "github.com/prometheus/prometheus/prompb" | ||
) | ) | ||
| − | var db *sql.DB var mimirURL *url.URL var mimirUsername, mimirPassword | + | var db *sql.DB |
| − | + | var mimirURL *url.URL | |
| + | var mimirUsername, mimirPassword string | ||
| + | var httpClient *http.Client | ||
| − | func parseRawMetric(raw string, username string) (*pb.WriteRequest, | + | func getOrgID(username string) (string, error) { |
| − | error) { | + | var orgID string |
| + | err := db.QueryRow("SELECT org_id FROM users WHERE username = ?", username).Scan(&orgID) | ||
| + | if err != nil { | ||
| + | return "", err | ||
| + | } | ||
| + | return orgID, nil | ||
| + | } | ||
| + | |||
| + | // func parseRawMetric(raw string, username string) (*pb.WriteRequest, error) { | ||
| + | func parseRawMetric(raw string, username string, orgID string) (*pb.WriteRequest, error) { // Add orgID parameter | ||
raw = strings.TrimSpace(raw) | raw = strings.TrimSpace(raw) | ||
| − | idxOpen := strings.Index(raw, "{") if idxOpen == -1 { | + | idxOpen := strings.Index(raw, "{") |
| + | if idxOpen == -1 { | ||
return nil, fmt.Errorf("invalid format: missing '{'") | return nil, fmt.Errorf("invalid format: missing '{'") | ||
| − | } idxClose := strings.Index(raw, "}") if idxClose == -1 || | + | } |
| − | + | idxClose := strings.Index(raw, "}") | |
| + | if idxClose == -1 || idxClose < idxOpen { | ||
return nil, fmt.Errorf("invalid format: missing '}'") | return nil, fmt.Errorf("invalid format: missing '}'") | ||
} | } | ||
metricName := strings.TrimSpace(raw[:idxOpen]) | metricName := strings.TrimSpace(raw[:idxOpen]) | ||
| − | labelPart := raw[idxOpen+1 : idxClose] rest := | + | labelPart := raw[idxOpen+1 : idxClose] |
| − | + | rest := strings.TrimSpace(raw[idxClose+1:]) | |
| − | parts := strings.Fields(rest) if len(parts) < 2 { | + | parts := strings.Fields(rest) |
| − | return nil, fmt.Errorf("invalid format: missing value | + | if len(parts) < 2 { |
| − | + | return nil, fmt.Errorf("invalid format: missing value or timestamp") | |
} | } | ||
| − | value, err := strconv.ParseFloat(parts[0], 64) if err != nil { | + | value, err := strconv.ParseFloat(parts[0], 64) |
| + | if err != nil { | ||
return nil, fmt.Errorf("invalid metric value: %v", err) | return nil, fmt.Errorf("invalid metric value: %v", err) | ||
} | } | ||
| − | tsSec, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { | + | tsSec, err := strconv.ParseInt(parts[1], 10, 64) |
| + | if err != nil { | ||
return nil, fmt.Errorf("invalid timestamp: %v", err) | return nil, fmt.Errorf("invalid timestamp: %v", err) | ||
| − | } timestamp := tsSec * 1000 | + | } |
| + | timestamp := tsSec * 1000 | ||
labels := []pb.Label{{Name: "__name__", Value: metricName}} | labels := []pb.Label{{Name: "__name__", Value: metricName}} | ||
| − | labelPart = strings.TrimSpace(labelPart) if labelPart != "" { | + | labelPart = strings.TrimSpace(labelPart) |
| − | labelPairs := strings.Split(labelPart, ",") for _, | + | if labelPart != "" { |
| − | + | labelPairs := strings.Split(labelPart, ",") | |
| − | pair = strings.TrimSpace(pair) if pair == "" { | + | for _, pair := range labelPairs { |
| + | pair = strings.TrimSpace(pair) | ||
| + | if pair == "" { | ||
continue | continue | ||
| − | } kv := strings.SplitN(pair, "=", 2) if len(kv) | + | } |
| − | + | kv := strings.SplitN(pair, "=", 2) | |
| − | return nil, fmt.Errorf("invalid label | + | if len(kv) != 2 { |
| − | + | return nil, fmt.Errorf("invalid label format: %s", pair) | |
| − | } key := strings.TrimSpace(kv[0]) val := | + | } |
| − | + | key := strings.TrimSpace(kv[0]) | |
| − | labels = append(labels, pb.Label{Name: key, | + | val := strings.Trim(strings.TrimSpace(kv[1]), "\"") |
| − | + | labels = append(labels, pb.Label{Name: key, Value: val}) | |
} | } | ||
} | } | ||
| − | // Add the username label HERE: labels = append(labels, | + | // Add the username and org_id label HERE: |
| − | pb.Label{Name: " | + | labels = append(labels, pb.Label{Name: "username", Value: username}) |
| + | if orgID != "" { | ||
| + | labels = append(labels, pb.Label{Name: "org_id", Value: orgID}) // Use org_id label name | ||
| + | } | ||
| + | // | ||
ts := pb.TimeSeries{ | ts := pb.TimeSeries{ | ||
| − | Labels: labels, Samples: []pb.Sample{{Value: value, | + | Labels: labels, |
| − | + | Samples: []pb.Sample{{Value: value, Timestamp: timestamp}}, | |
} | } | ||
writeReq := &pb.WriteRequest{ | writeReq := &pb.WriteRequest{ | ||
Timeseries: []pb.TimeSeries{ts}, | Timeseries: []pb.TimeSeries{ts}, | ||
| − | } return writeReq, nil | + | } |
| + | return writeReq, nil | ||
} | } | ||
func main() { | func main() { | ||
| − | var err error db, err = sql.Open("sqlite3", "./auth.db") if err != | + | var err error |
| − | + | db, err = sql.Open("sqlite3", "./auth.db") | |
| + | if err != nil { | ||
log.Fatalf("Failed to open database: %v", err) | log.Fatalf("Failed to open database: %v", err) | ||
| − | } defer db.Close() | + | } |
| + | defer db.Close() | ||
| − | mimirURLStr := os.Getenv("MIMIR_URL") if mimirURLStr == "" { | + | mimirURLStr := os.Getenv("MIMIR_URL") |
| − | mimirURLStr = | + | if mimirURLStr == "" { |
| − | + | mimirURLStr = "https://examplemimir.example.com/api/v1/push" | |
| − | } mimirURL, err = url.Parse(mimirURLStr) if err != nil { | + | } |
| + | mimirURL, err = url.Parse(mimirURLStr) | ||
| + | if err != nil { | ||
log.Fatalf("Invalid MIMIR_URL: %v", err) | log.Fatalf("Invalid MIMIR_URL: %v", err) | ||
} | } | ||
| − | mimirUsername = os.Getenv("MIMIR_USERNAME") if mimirUsername == | + | mimirUsername = os.Getenv("MIMIR_USERNAME") |
| − | + | if mimirUsername == "" { | |
mimirUsername = "your_username" | mimirUsername = "your_username" | ||
| − | } mimirPassword = os.Getenv("MIMIR_PASSWORD") if mimirPassword == | + | } |
| − | + | mimirPassword = os.Getenv("MIMIR_PASSWORD") | |
| + | if mimirPassword == "" { | ||
mimirPassword = "your_password" | mimirPassword = "your_password" | ||
} | } | ||
| Line 111: | Line 152: | ||
} | } | ||
| − | http.HandleFunc("/api/v1/push", func(w http.ResponseWriter, | + | http.HandleFunc("/api/v1/push", func(w http.ResponseWriter, r *http.Request) { |
| − | |||
if !authenticate(r) { | if !authenticate(r) { | ||
| − | http.Error(w, "Unauthorized", | + | http.Error(w, "Unauthorized", http.StatusUnauthorized) |
| − | http.StatusUnauthorized) return | + | return |
| + | } | ||
| + | |||
| + | username, _, ok := r.BasicAuth() | ||
| + | if !ok { | ||
| + | http.Error(w, "Unauthorized", http.StatusUnauthorized) | ||
| + | return | ||
} | } | ||
| − | + | // Get org_id from the database instead of from the header | |
| − | ! | + | orgID, err := getOrgID(username) |
| − | http.Error(w, " | + | if err != nil { |
| − | + | http.Error(w, "Failed to retrieve org_id", http.StatusInternalServerError) | |
| − | + | return | |
} | } | ||
| − | data, err := io.ReadAll(r.Body) if err != nil { | + | data, err := io.ReadAll(r.Body) |
| − | http.Error(w, "Failed to read request body", | + | if err != nil { |
| − | + | http.Error(w, "Failed to read request body", http.StatusInternalServerError) | |
| − | } r.Body.Close() | + | return |
| + | } | ||
| + | r.Body.Close() | ||
| − | + | writeReq, err := parseRawMetric(string(data), username, orgID) | |
| − | |||
| − | |||
| − | |||
| − | |||
if err != nil { | if err != nil { | ||
| − | http.Error(w, fmt.Sprintf("Failed to parse | + | http.Error(w, fmt.Sprintf("Failed to parse metric: %v", err), http.StatusBadRequest) |
| − | + | log.Printf("Error parsing metric: %v, Raw data: %s", err, string(data)) | |
| − | log.Printf("Error parsing metric: %v, Raw data: | + | return |
| − | |||
} | } | ||
| − | serialized, err := proto.Marshal(writeReq) if err != nil { | + | serialized, err := proto.Marshal(writeReq) |
| − | http.Error(w, "Failed to marshal protobuf | + | if err != nil { |
| − | + | http.Error(w, "Failed to marshal protobuf message", http.StatusInternalServerError) | |
| − | log.Printf("Error marshaling protobuf: %v, | + | log.Printf("Error marshaling protobuf: %v, WriteRequest: %+v", err, writeReq) |
| − | + | return | |
} | } | ||
compressedData := snappy.Encode(nil, serialized) | compressedData := snappy.Encode(nil, serialized) | ||
| − | backendReq, err := http.NewRequestWithContext(r.Context(), | + | backendReq, err := http.NewRequestWithContext(r.Context(), "POST", mimirURL.String(), bytes.NewReader(compressedData)) |
| − | + | if err != nil { | |
| − | + | http.Error(w, "Failed to create backend request", http.StatusInternalServerError) | |
| − | http.Error(w, "Failed to create backend request", | + | log.Printf("Error creating backend request: %v", err) |
| − | + | return | |
| − | + | } | |
| − | } backendReq.Header.Set("Content-Type", | + | backendReq.Header.Set("Content-Type", "application/x-protobuf") |
| − | |||
backendReq.Header.Set("Content-Encoding", "snappy") | backendReq.Header.Set("Content-Encoding", "snappy") | ||
backendReq.SetBasicAuth(mimirUsername, mimirPassword) | backendReq.SetBasicAuth(mimirUsername, mimirPassword) | ||
| − | resp, err := httpClient.Do(backendReq) if err != nil { | + | // Use the org_id from the database |
| − | http.Error(w, fmt.Sprintf("Failed to push to | + | if orgID != "" { |
| − | + | backendReq.Header.Set("X-Scope-OrgID", orgID) | |
| − | log.Printf("Error pushing to Mimir: %v, Mimir URL: | + | } |
| − | + | ||
| − | } defer resp.Body.Close() | + | resp, err := httpClient.Do(backendReq) |
| + | if err != nil { | ||
| + | http.Error(w, fmt.Sprintf("Failed to push to Mimir backend: %v", err), http.StatusBadGateway) | ||
| + | log.Printf("Error pushing to Mimir: %v, Mimir URL: %s", err, mimirURL.String()) | ||
| + | return | ||
| + | } | ||
| + | defer resp.Body.Close() | ||
if resp.StatusCode >= 400 { | if resp.StatusCode >= 400 { | ||
| − | body, _ := io.ReadAll(resp.Body) http.Error(w, | + | body, _ := io.ReadAll(resp.Body) |
| − | + | http.Error(w, fmt.Sprintf("Mimir returned error: %s (Status Code: %d)", string(body), resp.StatusCode), http.StatusBadGateway) | |
| − | + | log.Printf("Mimir returned error: %s (Status Code: %d), Mimir URL: %s", string(body), resp.StatusCode, mimirURL.String()) | |
| − | |||
| − | |||
| − | |||
return | return | ||
} | } | ||
| − | w.WriteHeader(resp.StatusCode) io.Copy(w, resp.Body) | + | w.WriteHeader(resp.StatusCode) |
| + | io.Copy(w, resp.Body) | ||
}) | }) | ||
| Line 187: | Line 233: | ||
func authenticate(r *http.Request) bool { | func authenticate(r *http.Request) bool { | ||
| − | authHeader := r.Header.Get("Authorization") if authHeader == | + | authHeader := r.Header.Get("Authorization") |
| − | + | if authHeader == "" || !strings.HasPrefix(authHeader, "Basic ") { | |
return false | return false | ||
} | } | ||
| − | decoded, err := | + | decoded, err := base64.StdEncoding.DecodeString(strings.TrimPrefix(authHeader, "Basic ")) |
| − | + | if err != nil { | |
| − | |||
return false | return false | ||
} | } | ||
| − | parts := strings.SplitN(string(decoded), ":", 2) if len(parts) | + | parts := strings.SplitN(string(decoded), ":", 2) |
| − | + | if len(parts) != 2 { | |
return false | return false | ||
} | } | ||
| − | username, password := parts[0], parts[1] | + | username, password := parts[0], parts[1] |
| − | validateUser(username, password) | + | return validateUser(username, password) |
} | } | ||
func validateUser(username, password string) bool { | func validateUser(username, password string) bool { | ||
| − | var storedPassword string err := db.QueryRow("SELECT password | + | var storedPassword string |
| − | + | err := db.QueryRow("SELECT password FROM users WHERE username = ?", username).Scan(&storedPassword) | |
if err != nil { | if err != nil { | ||
return false | return false | ||
| − | } return password == storedPassword | + | } |
| + | return password == storedPassword | ||
} | } | ||
``` | ``` | ||
| Line 253: | Line 299: | ||
curl -X POST -g \ | curl -X POST -g \ | ||
-u admin:password123 \ | -u admin:password123 \ | ||
| − | --data "my_metric2{app=\" | + | --data "my_metric2{app=\"test",job=\"test\",instance=\"test\"} $rn ${ts}" \ |
| − | http://localhost:8080/api/v1/push | + | http://localhost:8080/api/v1/push |
``` | ``` | ||
Revision as of 03:38, 24 February 2025
sqlite3 auth.db
CREATE TABLE users (
username TEXT PRIMARY KEY,
password TEXT NOT NULL,
org_id TEXT
);
INSERT INTO users (username, password) VALUES ('admin', 'password123');
package main
import (
"bytes"
// "context"
"database/sql"
"encoding/base64"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
_ "github.com/mattn/go-sqlite3"
pb "github.com/prometheus/prometheus/prompb"
)
var db *sql.DB
var mimirURL *url.URL
var mimirUsername, mimirPassword string
var httpClient *http.Client
func getOrgID(username string) (string, error) {
var orgID string
err := db.QueryRow("SELECT org_id FROM users WHERE username = ?", username).Scan(&orgID)
if err != nil {
return "", err
}
return orgID, nil
}
// func parseRawMetric(raw string, username string) (*pb.WriteRequest, error) {
func parseRawMetric(raw string, username string, orgID string) (*pb.WriteRequest, error) { // Add orgID parameter
raw = strings.TrimSpace(raw)
idxOpen := strings.Index(raw, "{")
if idxOpen == -1 {
return nil, fmt.Errorf("invalid format: missing '{'")
}
idxClose := strings.Index(raw, "}")
if idxClose == -1 || idxClose < idxOpen {
return nil, fmt.Errorf("invalid format: missing '}'")
}
metricName := strings.TrimSpace(raw[:idxOpen])
labelPart := raw[idxOpen+1 : idxClose]
rest := strings.TrimSpace(raw[idxClose+1:])
parts := strings.Fields(rest)
if len(parts) < 2 {
return nil, fmt.Errorf("invalid format: missing value or timestamp")
}
value, err := strconv.ParseFloat(parts[0], 64)
if err != nil {
return nil, fmt.Errorf("invalid metric value: %v", err)
}
tsSec, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid timestamp: %v", err)
}
timestamp := tsSec * 1000
labels := []pb.Label{{Name: "__name__", Value: metricName}}
labelPart = strings.TrimSpace(labelPart)
if labelPart != "" {
labelPairs := strings.Split(labelPart, ",")
for _, pair := range labelPairs {
pair = strings.TrimSpace(pair)
if pair == "" {
continue
}
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 {
return nil, fmt.Errorf("invalid label format: %s", pair)
}
key := strings.TrimSpace(kv[0])
val := strings.Trim(strings.TrimSpace(kv[1]), "\"")
labels = append(labels, pb.Label{Name: key, Value: val})
}
}
// Add the username and org_id label HERE:
labels = append(labels, pb.Label{Name: "username", Value: username})
if orgID != "" {
labels = append(labels, pb.Label{Name: "org_id", Value: orgID}) // Use org_id label name
}
//
ts := pb.TimeSeries{
Labels: labels,
Samples: []pb.Sample{{Value: value, Timestamp: timestamp}},
}
writeReq := &pb.WriteRequest{
Timeseries: []pb.TimeSeries{ts},
}
return writeReq, nil
}
func main() {
var err error
db, err = sql.Open("sqlite3", "./auth.db")
if err != nil {
log.Fatalf("Failed to open database: %v", err)
}
defer db.Close()
mimirURLStr := os.Getenv("MIMIR_URL")
if mimirURLStr == "" {
mimirURLStr = "https://examplemimir.example.com/api/v1/push"
}
mimirURL, err = url.Parse(mimirURLStr)
if err != nil {
log.Fatalf("Invalid MIMIR_URL: %v", err)
}
mimirUsername = os.Getenv("MIMIR_USERNAME")
if mimirUsername == "" {
mimirUsername = "your_username"
}
mimirPassword = os.Getenv("MIMIR_PASSWORD")
if mimirPassword == "" {
mimirPassword = "your_password"
}
httpClient = &http.Client{
Timeout: 10 * time.Second,
}
http.HandleFunc("/api/v1/push", func(w http.ResponseWriter, r *http.Request) {
if !authenticate(r) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
username, _, ok := r.BasicAuth()
if !ok {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// Get org_id from the database instead of from the header
orgID, err := getOrgID(username)
if err != nil {
http.Error(w, "Failed to retrieve org_id", http.StatusInternalServerError)
return
}
data, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusInternalServerError)
return
}
r.Body.Close()
writeReq, err := parseRawMetric(string(data), username, orgID)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to parse metric: %v", err), http.StatusBadRequest)
log.Printf("Error parsing metric: %v, Raw data: %s", err, string(data))
return
}
serialized, err := proto.Marshal(writeReq)
if err != nil {
http.Error(w, "Failed to marshal protobuf message", http.StatusInternalServerError)
log.Printf("Error marshaling protobuf: %v, WriteRequest: %+v", err, writeReq)
return
}
compressedData := snappy.Encode(nil, serialized)
backendReq, err := http.NewRequestWithContext(r.Context(), "POST", mimirURL.String(), bytes.NewReader(compressedData))
if err != nil {
http.Error(w, "Failed to create backend request", http.StatusInternalServerError)
log.Printf("Error creating backend request: %v", err)
return
}
backendReq.Header.Set("Content-Type", "application/x-protobuf")
backendReq.Header.Set("Content-Encoding", "snappy")
backendReq.SetBasicAuth(mimirUsername, mimirPassword)
// Use the org_id from the database
if orgID != "" {
backendReq.Header.Set("X-Scope-OrgID", orgID)
}
resp, err := httpClient.Do(backendReq)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to push to Mimir backend: %v", err), http.StatusBadGateway)
log.Printf("Error pushing to Mimir: %v, Mimir URL: %s", err, mimirURL.String())
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
http.Error(w, fmt.Sprintf("Mimir returned error: %s (Status Code: %d)", string(body), resp.StatusCode), http.StatusBadGateway)
log.Printf("Mimir returned error: %s (Status Code: %d), Mimir URL: %s", string(body), resp.StatusCode, mimirURL.String())
return
}
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
})
log.Println("HTTP server running on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func authenticate(r *http.Request) bool {
authHeader := r.Header.Get("Authorization")
if authHeader == "" || !strings.HasPrefix(authHeader, "Basic ") {
return false
}
decoded, err := base64.StdEncoding.DecodeString(strings.TrimPrefix(authHeader, "Basic "))
if err != nil {
return false
}
parts := strings.SplitN(string(decoded), ":", 2)
if len(parts) != 2 {
return false
}
username, password := parts[0], parts[1]
return validateUser(username, password)
}
func validateUser(username, password string) bool {
var storedPassword string
err := db.QueryRow("SELECT password FROM users WHERE username = ?", username).Scan(&storedPassword)
if err != nil {
return false
}
return password == storedPassword
}
mkdir push-proxy cd push-proxy go mod init go mod tidy go build
set -a MIMIR_URL=https://mimir-push-url.example.com/api/v1/push MIMIR_USERNAME=mybasicauthuser MIMIR_PASSWORD=mybasicauthpass
simple test
curl -u admin:password123 http://localhost:8080/api/v1/push -d '{}'
simple test with data
curl -X POST -g \
-u admin:password123 \
--data "my_metric1{label1=\"value1\",label2=\"value2\"} 12.34 $(date +%s)" \
http://localhost:8080/api/v1/push?job=my_job&instance=my_instance
or
current_time_seconds=$(date +%s)
ts=$((current_time_seconds - 0))
rn=$(( ( RANDOM % 20 ) + 10 ))
curl -X POST -g \
-u admin:password123 \
--data "my_metric2{app=\"test",job=\"test\",instance=\"test\"} $rn ${ts}" \
http://localhost:8080/api/v1/push