Difference between revisions of "Mimir push reverse proxy in go"
Jump to navigation
Jump to search
| Line 28: | Line 28: | ||
string var httpClient *http.Client | string var httpClient *http.Client | ||
| − | func parseRawMetric(raw string) (*pb.WriteRequest, error) { | + | func parseRawMetric(raw string, username string) (*pb.WriteRequest, |
| + | error) { | ||
| + | raw = strings.TrimSpace(raw) | ||
| + | |||
| + | idxOpen := strings.Index(raw, "{") if idxOpen == -1 { | ||
| + | return nil, fmt.Errorf("invalid format: missing '{'") | ||
| + | } idxClose := strings.Index(raw, "}") if idxClose == -1 || | ||
| + | idxClose < idxOpen { | ||
| + | return nil, fmt.Errorf("invalid format: missing '}'") | ||
| + | } | ||
| + | |||
| + | metricName := strings.TrimSpace(raw[:idxOpen]) | ||
| + | labelPart := raw[idxOpen+1 : idxClose] rest := | ||
| + | strings.TrimSpace(raw[idxClose+1:]) | ||
| + | |||
| + | parts := strings.Fields(rest) if len(parts) < 2 { | ||
| + | return nil, fmt.Errorf("invalid format: missing value | ||
| + | or timestamp") | ||
| + | } | ||
| + | |||
| + | value, err := strconv.ParseFloat(parts[0], 64) if err != nil { | ||
| + | return nil, fmt.Errorf("invalid metric value: %v", err) | ||
| + | } | ||
| + | |||
| + | tsSec, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { | ||
| + | return nil, fmt.Errorf("invalid timestamp: %v", err) | ||
| + | } timestamp := tsSec * 1000 | ||
| + | |||
| + | labels := []pb.Label{{Name: "__name__", Value: metricName}} | ||
| + | labelPart = strings.TrimSpace(labelPart) if labelPart != "" { | ||
| + | labelPairs := strings.Split(labelPart, ",") for _, | ||
| + | pair := range labelPairs { | ||
| + | pair = strings.TrimSpace(pair) if pair == "" { | ||
| + | continue | ||
| + | } kv := strings.SplitN(pair, "=", 2) if len(kv) | ||
| + | != 2 { | ||
| + | return nil, fmt.Errorf("invalid label | ||
| + | format: %s", pair) | ||
| + | } key := strings.TrimSpace(kv[0]) val := | ||
| + | strings.Trim(strings.TrimSpace(kv[1]), "\"") | ||
| + | labels = append(labels, pb.Label{Name: key, | ||
| + | Value: val}) | ||
| + | } | ||
| + | } | ||
| + | |||
| + | // Add the username label HERE: labels = append(labels, | ||
| + | pb.Label{Name: "username", Value: username}) | ||
| + | |||
| + | ts := pb.TimeSeries{ | ||
| + | Labels: labels, Samples: []pb.Sample{{Value: value, | ||
| + | Timestamp: timestamp}}, | ||
| + | } | ||
| + | |||
| + | writeReq := &pb.WriteRequest{ | ||
| + | Timeseries: []pb.TimeSeries{ts}, | ||
| + | } return writeReq, nil | ||
| + | } | ||
| + | |||
| + | func xxparseRawMetric(raw string) (*pb.WriteRequest, error) { | ||
raw = strings.TrimSpace(raw) | raw = strings.TrimSpace(raw) | ||
| Line 112: | Line 170: | ||
http.Error(w, "Unauthorized", | http.Error(w, "Unauthorized", | ||
http.StatusUnauthorized) return | http.StatusUnauthorized) return | ||
| + | } | ||
| + | |||
| + | username, _, ok := r.BasicAuth() // Extract username if | ||
| + | !ok { | ||
| + | http.Error(w, "Unauthorized", | ||
| + | http.StatusUnauthorized) // Should not happen | ||
| + | if authenticate passed, but good to check return | ||
} | } | ||
| Line 119: | Line 184: | ||
} r.Body.Close() | } r.Body.Close() | ||
| − | writeReq, err := parseRawMetric(string(data)) // | + | // writeReq, err := parseRawMetric(string(data)) // |
| − | is now used if err != nil { | + | writeReq is now used |
| + | writeReq, err := | ||
| + | parseRawMetric(string(data), username) | ||
| + | // Pass username to parseRawMetric | ||
| + | if err != nil { | ||
http.Error(w, fmt.Sprintf("Failed to parse | http.Error(w, fmt.Sprintf("Failed to parse | ||
metric: %v", err), http.StatusBadRequest) | metric: %v", err), http.StatusBadRequest) | ||
Revision as of 23:15, 23 February 2025
sqlite3 auth.db
CREATE TABLE users (
username TEXT PRIMARY KEY,
password TEXT NOT NULL
);
INSERT INTO users (username, password) VALUES ('admin', 'password123');
package main
import (
"bytes" // "context" "database/sql" "encoding/base64" "fmt" "io"
"log" "net/http" "net/url" "os" "strconv" "strings" "time"
"github.com/golang/protobuf/proto" "github.com/golang/snappy"
pb "github.com/prometheus/prometheus/prompb" _
"github.com/mattn/go-sqlite3"
)
var db *sql.DB var mimirURL *url.URL var mimirUsername, mimirPassword
string var httpClient *http.Client
func parseRawMetric(raw string, username string) (*pb.WriteRequest,
error) {
raw = strings.TrimSpace(raw)
idxOpen := strings.Index(raw, "{") if idxOpen == -1 {
return nil, fmt.Errorf("invalid format: missing '{'")
} idxClose := strings.Index(raw, "}") if idxClose == -1 ||
idxClose < idxOpen {
return nil, fmt.Errorf("invalid format: missing '}'")
}
metricName := strings.TrimSpace(raw[:idxOpen])
labelPart := raw[idxOpen+1 : idxClose] rest :=
strings.TrimSpace(raw[idxClose+1:])
parts := strings.Fields(rest) if len(parts) < 2 {
return nil, fmt.Errorf("invalid format: missing value
or timestamp")
}
value, err := strconv.ParseFloat(parts[0], 64) if err != nil {
return nil, fmt.Errorf("invalid metric value: %v", err)
}
tsSec, err := strconv.ParseInt(parts[1], 10, 64) if err != nil {
return nil, fmt.Errorf("invalid timestamp: %v", err)
} timestamp := tsSec * 1000
labels := []pb.Label{{Name: "__name__", Value: metricName}}
labelPart = strings.TrimSpace(labelPart) if labelPart != "" {
labelPairs := strings.Split(labelPart, ",") for _,
pair := range labelPairs {
pair = strings.TrimSpace(pair) if pair == "" {
continue
} kv := strings.SplitN(pair, "=", 2) if len(kv)
!= 2 {
return nil, fmt.Errorf("invalid label
format: %s", pair)
} key := strings.TrimSpace(kv[0]) val :=
strings.Trim(strings.TrimSpace(kv[1]), "\"")
labels = append(labels, pb.Label{Name: key,
Value: val})
}
}
// Add the username label HERE: labels = append(labels,
pb.Label{Name: "username", Value: username})
ts := pb.TimeSeries{
Labels: labels, Samples: []pb.Sample{{Value: value,
Timestamp: timestamp}},
}
writeReq := &pb.WriteRequest{
Timeseries: []pb.TimeSeries{ts},
} return writeReq, nil
}
func xxparseRawMetric(raw string) (*pb.WriteRequest, error) {
raw = strings.TrimSpace(raw)
idxOpen := strings.Index(raw, "{") if idxOpen == -1 {
return nil, fmt.Errorf("invalid format: missing '{'")
} idxClose := strings.Index(raw, "}") if idxClose == -1 ||
idxClose < idxOpen {
return nil, fmt.Errorf("invalid format: missing '}'")
}
metricName := strings.TrimSpace(raw[:idxOpen])
labelPart := raw[idxOpen+1 : idxClose] rest :=
strings.TrimSpace(raw[idxClose+1:])
parts := strings.Fields(rest) if len(parts) < 2 {
return nil, fmt.Errorf("invalid format: missing value
or timestamp")
}
value, err := strconv.ParseFloat(parts[0], 64) if err != nil {
return nil, fmt.Errorf("invalid metric value: %v", err)
}
tsSec, err := strconv.ParseInt(parts[1], 10, 64) if err != nil {
return nil, fmt.Errorf("invalid timestamp: %v", err)
} timestamp := tsSec * 1000
labels := []pb.Label{{Name: "__name__", Value: metricName}}
labelPart = strings.TrimSpace(labelPart) if labelPart != "" {
labelPairs := strings.Split(labelPart, ",") for _,
pair := range labelPairs {
pair = strings.TrimSpace(pair) if pair == "" {
continue
} kv := strings.SplitN(pair, "=", 2) if len(kv)
!= 2 {
return nil, fmt.Errorf("invalid label
format: %s", pair)
} key := strings.TrimSpace(kv[0]) val :=
strings.Trim(strings.TrimSpace(kv[1]), "\"")
labels = append(labels, pb.Label{Name: key,
Value: val})
}
}
ts := pb.TimeSeries{
Labels: labels, Samples: []pb.Sample{{Value: value,
Timestamp: timestamp}},
}
writeReq := &pb.WriteRequest{
Timeseries: []pb.TimeSeries{ts},
} return writeReq, nil
}
func main() {
var err error db, err = sql.Open("sqlite3", "./auth.db") if err !=
nil {
log.Fatalf("Failed to open database: %v", err)
} defer db.Close()
mimirURLStr := os.Getenv("MIMIR_URL") if mimirURLStr == "" {
mimirURLStr =
"https://examplemimir.example.com/api/v1/push"
} mimirURL, err = url.Parse(mimirURLStr) if err != nil {
log.Fatalf("Invalid MIMIR_URL: %v", err)
}
mimirUsername = os.Getenv("MIMIR_USERNAME") if mimirUsername ==
"" {
mimirUsername = "your_username"
} mimirPassword = os.Getenv("MIMIR_PASSWORD") if mimirPassword ==
"" {
mimirPassword = "your_password"
}
httpClient = &http.Client{
Timeout: 10 * time.Second,
}
http.HandleFunc("/api/v1/push", func(w http.ResponseWriter,
r *http.Request) {
if !authenticate(r) {
http.Error(w, "Unauthorized",
http.StatusUnauthorized) return
}
username, _, ok := r.BasicAuth() // Extract username if
!ok {
http.Error(w, "Unauthorized",
http.StatusUnauthorized) // Should not happen
if authenticate passed, but good to check return
}
data, err := io.ReadAll(r.Body) if err != nil {
http.Error(w, "Failed to read request body",
http.StatusInternalServerError) return
} r.Body.Close()
// writeReq, err := parseRawMetric(string(data)) //
writeReq is now used
writeReq, err :=
parseRawMetric(string(data), username)
// Pass username to parseRawMetric
if err != nil {
http.Error(w, fmt.Sprintf("Failed to parse
metric: %v", err), http.StatusBadRequest)
log.Printf("Error parsing metric: %v, Raw data:
%s", err, string(data)) return
}
serialized, err := proto.Marshal(writeReq) if err != nil {
http.Error(w, "Failed to marshal protobuf
message", http.StatusInternalServerError)
log.Printf("Error marshaling protobuf: %v,
WriteRequest: %+v", err, writeReq) return
}
compressedData := snappy.Encode(nil, serialized)
backendReq, err := http.NewRequestWithContext(r.Context(),
"POST", mimirURL.String(),
bytes.NewReader(compressedData)) if err != nil {
http.Error(w, "Failed to create backend request",
http.StatusInternalServerError) log.Printf("Error
creating backend request: %v", err) return
} backendReq.Header.Set("Content-Type",
"application/x-protobuf")
backendReq.Header.Set("Content-Encoding", "snappy")
backendReq.SetBasicAuth(mimirUsername, mimirPassword)
resp, err := httpClient.Do(backendReq) if err != nil {
http.Error(w, fmt.Sprintf("Failed to push to
Mimir backend: %v", err), http.StatusBadGateway)
log.Printf("Error pushing to Mimir: %v, Mimir URL:
%s", err, mimirURL.String()) return
} defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body) http.Error(w,
fmt.Sprintf("Mimir returned error: %s (Status
Code: %d)", string(body), resp.StatusCode),
http.StatusBadGateway) log.Printf("Mimir returned
error: %s (Status Code: %d), Mimir URL: %s",
string(body), resp.StatusCode, mimirURL.String())
return
}
w.WriteHeader(resp.StatusCode) io.Copy(w, resp.Body)
})
log.Println("HTTP server running on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func authenticate(r *http.Request) bool {
authHeader := r.Header.Get("Authorization") if authHeader ==
"" || !strings.HasPrefix(authHeader, "Basic ") {
return false
}
decoded, err :=
base64.StdEncoding.DecodeString(strings.TrimPrefix(authHeader,
"Basic ")) if err != nil {
return false
}
parts := strings.SplitN(string(decoded), ":", 2) if len(parts)
!= 2 {
return false
}
username, password := parts[0], parts[1] return
validateUser(username, password)
}
func validateUser(username, password string) bool {
var storedPassword string err := db.QueryRow("SELECT password
FROM users WHERE username = ?", username).Scan(&storedPassword)
if err != nil {
return false
} return password == storedPassword
}
mkdir push-proxy cd push-proxy go mod init go mod tidy go build
set -a MIMIR_URL=https://mimir-push-url.example.com/api/v1/push MIMIR_USERNAME=mybasicauthuser MIMIR_PASSWORD=mybasicauthpass
simple test
curl -u admin:password123 http://localhost:8080/api/v1/push -d '{}'
simple test with data
curl -X POST -g \
-u admin:password123 \
--data "my_metric1{label1=\"value1\",label2=\"value2\"} 12.34 $(date +%s)" \
http://localhost:8080/api/v1/push?job=my_job&instance=my_instance