Difference between revisions of "Mimir push reverse proxy in go"

From UVOO Tech Wiki
Jump to navigation Jump to search
Line 17: Line 17:
  
 
import (
 
import (
    "database/sql"
+
        "bytes" // "context" "database/sql" "encoding/base64" "fmt" "io"
    "encoding/base64"
+
        "log" "net/http" "net/url" "os" "strconv" "strings" "time"
    "log"
 
    "net/http"
 
    "net/http/httputil"
 
    "net/url"
 
    "os"
 
    "strings"
 
  
    _ "github.com/mattn/go-sqlite3"
+
        "github.com/golang/protobuf/proto" "github.com/golang/snappy"
 +
        pb "github.com/prometheus/prometheus/prompb" _
 +
        "github.com/mattn/go-sqlite3"
 
)
 
)
  
// Database connection
+
var db *sql.DB var mimirURL *url.URL var mimirUsername, mimirPassword
var db *sql.DB
+
string var httpClient *http.Client
 +
 
 +
func parseRawMetric(raw string) (*pb.WriteRequest, error) {
 +
        raw = strings.TrimSpace(raw)
 +
 
 +
        idxOpen := strings.Index(raw, "{") if idxOpen == -1 {
 +
                return nil, fmt.Errorf("invalid format: missing '{'")
 +
        } idxClose := strings.Index(raw, "}") if idxClose == -1 ||
 +
        idxClose < idxOpen {
 +
                return nil, fmt.Errorf("invalid format: missing '}'")
 +
        }
 +
 
 +
        metricName := strings.TrimSpace(raw[:idxOpen])
 +
        labelPart := raw[idxOpen+1 : idxClose] rest :=
 +
        strings.TrimSpace(raw[idxClose+1:])
 +
 
 +
        parts := strings.Fields(rest) if len(parts) < 2 {
 +
                return nil, fmt.Errorf("invalid format: missing value
 +
                or timestamp")
 +
        }
 +
 
 +
        value, err := strconv.ParseFloat(parts[0], 64) if err != nil {
 +
                return nil, fmt.Errorf("invalid metric value: %v", err)
 +
        }
 +
 
 +
        tsSec, err := strconv.ParseInt(parts[1], 10, 64) if err != nil {
 +
                return nil, fmt.Errorf("invalid timestamp: %v", err)
 +
        } timestamp := tsSec * 1000
 +
 
 +
        labels := []pb.Label{{Name: "__name__", Value: metricName}}
 +
        labelPart = strings.TrimSpace(labelPart) if labelPart != "" {
 +
                labelPairs := strings.Split(labelPart, ",") for _,
 +
                pair := range labelPairs {
 +
                        pair = strings.TrimSpace(pair) if pair == "" {
 +
                                continue
 +
                        } kv := strings.SplitN(pair, "=", 2) if len(kv)
 +
                        != 2 {
 +
                                return nil, fmt.Errorf("invalid label
 +
                                format: %s", pair)
 +
                        } key := strings.TrimSpace(kv[0]) val :=
 +
                        strings.Trim(strings.TrimSpace(kv[1]), "\"")
 +
                        labels = append(labels, pb.Label{Name: key,
 +
                        Value: val})
 +
                }
 +
        }
 +
 
 +
        ts := pb.TimeSeries{
 +
                Labels:  labels, Samples: []pb.Sample{{Value: value,
 +
                Timestamp: timestamp}},
 +
        }
 +
 
 +
        writeReq := &pb.WriteRequest{
 +
                Timeseries: []pb.TimeSeries{ts},
 +
        } return writeReq, nil
 +
}
  
 
func main() {
 
func main() {
    var err error
+
        var err error db, err = sql.Open("sqlite3", "./auth.db") if err !=
    db, err = sql.Open("sqlite3", "./auth.db")
+
        nil {
    if err != nil {
+
                log.Fatalf("Failed to open database: %v", err)
        log.Fatalf("Failed to open database: %v", err)
+
        } defer db.Close()
    }
+
 
    defer db.Close()
+
        mimirURLStr := os.Getenv("MIMIR_URL") if mimirURLStr == "" {
 +
                mimirURLStr =
 +
                "https://examplemimir.example.com/api/v1/push"
 +
        } mimirURL, err = url.Parse(mimirURLStr) if err != nil {
 +
                log.Fatalf("Invalid MIMIR_URL: %v", err)
 +
        }
 +
 
 +
        mimirUsername = os.Getenv("MIMIR_USERNAME") if mimirUsername ==
 +
        "" {
 +
                mimirUsername = "your_username"
 +
        } mimirPassword = os.Getenv("MIMIR_PASSWORD") if mimirPassword ==
 +
        "" {
 +
                mimirPassword = "your_password"
 +
        }
 +
 
 +
        httpClient = &http.Client{
 +
                Timeout: 10 * time.Second,
 +
        }
 +
 
 +
        http.HandleFunc("/api/v1/push", func(w http.ResponseWriter,
 +
        r *http.Request) {
 +
                if !authenticate(r) {
 +
                        http.Error(w, "Unauthorized",
 +
                        http.StatusUnauthorized) return
 +
                }
 +
 
 +
                data, err := io.ReadAll(r.Body) if err != nil {
 +
                        http.Error(w, "Failed to read request body",
 +
                        http.StatusInternalServerError) return
 +
                } r.Body.Close()
 +
 
 +
                writeReq, err := parseRawMetric(string(data)) // writeReq
 +
                is now used if err != nil {
 +
                        http.Error(w, fmt.Sprintf("Failed to parse
 +
                        metric: %v", err), http.StatusBadRequest)
 +
                        log.Printf("Error parsing metric: %v, Raw data:
 +
                        %s", err, string(data)) return
 +
                }
  
    // Read environment variables for backend credentials and target URL
+
                serialized, err := proto.Marshal(writeReq) if err != nil {
    backendUsername := os.Getenv("BACKEND_USERNAME")
+
                        http.Error(w, "Failed to marshal protobuf
    if backendUsername == "" {
+
                        message", http.StatusInternalServerError)
        log.Fatal("Environment variable BACKEND_USERNAME is required")
+
                        log.Printf("Error marshaling protobuf: %v,
    }
+
                        WriteRequest: %+v", err, writeReq) return
 +
                }
  
    backendPassword := os.Getenv("BACKEND_PASSWORD")
+
                compressedData := snappy.Encode(nil, serialized)
    if backendPassword == "" {
 
        log.Fatal("Environment variable BACKEND_PASSWORD is required")
 
    }
 
  
    targetURL := os.Getenv("TARGET_URL")
+
                backendReq, err := http.NewRequestWithContext(r.Context(),
    if targetURL == "" {
+
                "POST", mimirURL.String(),
        log.Fatal("Environment variable TARGET_URL is required")
+
                bytes.NewReader(compressedData)) if err != nil {
    }
+
                        http.Error(w, "Failed to create backend request",
 +
                        http.StatusInternalServerError) log.Printf("Error
 +
                        creating backend request: %v", err) return
 +
                } backendReq.Header.Set("Content-Type",
 +
                "application/x-protobuf")
 +
                backendReq.Header.Set("Content-Encoding", "snappy")
 +
                backendReq.SetBasicAuth(mimirUsername, mimirPassword)
  
    target, err := url.Parse(targetURL)
+
                resp, err := httpClient.Do(backendReq) if err != nil {
    if err != nil {
+
                        http.Error(w, fmt.Sprintf("Failed to push to
        log.Fatalf("Invalid target URL: %v", err)
+
                        Mimir backend: %v", err), http.StatusBadGateway)
    }
+
                        log.Printf("Error pushing to Mimir: %v, Mimir URL:
 +
                        %s", err, mimirURL.String()) return
 +
                } defer resp.Body.Close()
  
    proxy := httputil.NewSingleHostReverseProxy(target)
+
                if resp.StatusCode >= 400 {
    // Modify the proxy's director to add backend Basic Auth credentials
+
                        body, _ := io.ReadAll(resp.Body) http.Error(w,
    originalDirector := proxy.Director
+
                        fmt.Sprintf("Mimir returned error: %s (Status
    proxy.Director = func(req *http.Request) {
+
                        Code: %d)", string(body), resp.StatusCode),
        originalDirector(req)
+
                        http.StatusBadGateway) log.Printf("Mimir returned
        req.SetBasicAuth(backendUsername, backendPassword)
+
                        error: %s (Status Code: %d), Mimir URL: %s",
    }
+
                        string(body), resp.StatusCode, mimirURL.String())
 +
                        return
 +
                }
  
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+
                w.WriteHeader(resp.StatusCode) io.Copy(w, resp.Body)
        if !authenticate(r) {
+
         })
            http.Error(w, "Unauthorized", http.StatusUnauthorized)
 
            return
 
         }
 
        proxy.ServeHTTP(w, r)
 
    })
 
  
    log.Println("Reverse proxy running on :8080")
+
        log.Println("HTTP server running on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
+
        log.Fatal(http.ListenAndServe(":8080", nil))
 
}
 
}
  
// Authenticate using Basic Auth with SQLite
 
 
func authenticate(r *http.Request) bool {
 
func authenticate(r *http.Request) bool {
    authHeader := r.Header.Get("Authorization")
+
        authHeader := r.Header.Get("Authorization") if authHeader ==
    if authHeader == "" || !strings.HasPrefix(authHeader, "Basic ") {
+
        "" || !strings.HasPrefix(authHeader, "Basic ") {
        return false
+
                return false
    }
+
        }
  
    decoded, err := base64.StdEncoding.DecodeString(strings.TrimPrefix(authHeader, "Basic "))
+
        decoded, err :=
    if err != nil {
+
        base64.StdEncoding.DecodeString(strings.TrimPrefix(authHeader,
        return false
+
        "Basic ")) if err != nil {
    }
+
                return false
 +
        }
  
    parts := strings.SplitN(string(decoded), ":", 2)
+
        parts := strings.SplitN(string(decoded), ":", 2) if len(parts)
    if len(parts) != 2 {
+
        != 2 {
        return false
+
                return false
    }
+
        }
  
    username, password := parts[0], parts[1]
+
        username, password := parts[0], parts[1] return
    return validateUser(username, password)
+
        validateUser(username, password)
 
}
 
}
  
// Validate user from SQLite database
 
 
func validateUser(username, password string) bool {
 
func validateUser(username, password string) bool {
    var storedPassword string
+
        var storedPassword string err := db.QueryRow("SELECT password
    err := db.QueryRow("SELECT password FROM users WHERE username = ?", username).Scan(&storedPassword)
+
        FROM users WHERE username = ?", username).Scan(&storedPassword)
    if err != nil {
+
        if err != nil {
        return false
+
                return false
    }
+
        } return password == storedPassword
    return password == storedPassword
 
 
}
 
}
 
```
 
```
Line 123: Line 211:
 
```
 
```
 
set -a
 
set -a
TARGET_URL=https://myprompush.example.com
+
MIMIR_URL=https://mimir.uvoo.io/api/v1/push
BACKEND_USERNAME=foo
+
MIMIR_USERNAME=mybasicauthuser
BACKEND_PASSWORD=bar
+
MIMIR_PASSWORD=mybasicauthpass
 
```
 
```
  
  
 +
simple test
 +
```
 +
curl -u admin:password123 http://localhost:8080/api/v1/push -d '{}'
  
 
```
 
```
curl -u admin:password123 http://localhost:8080/api/v1/push -d '{}'
 
  
 +
simple test with data
 +
```
 +
curl -X POST -g \
 +
    -u admin:password123 \
 +
    --data "my_metric1{label1=\"value1\",label2=\"value2\"} 12.34 $(date +%s)" \
 +
    http://localhost:8080/api/v1/push?job=my_job&instance=my_instance
 
```
 
```

Revision as of 20:58, 23 February 2025

sqlite3 auth.db
CREATE TABLE users (
    username TEXT PRIMARY KEY,
    password TEXT NOT NULL
);

INSERT INTO users (username, password) VALUES ('admin', 'password123');

package main

import (
        "bytes" // "context" "database/sql" "encoding/base64" "fmt" "io"
        "log" "net/http" "net/url" "os" "strconv" "strings" "time"

        "github.com/golang/protobuf/proto" "github.com/golang/snappy"
        pb "github.com/prometheus/prometheus/prompb" _
        "github.com/mattn/go-sqlite3"
)

var db *sql.DB var mimirURL *url.URL var mimirUsername, mimirPassword
string var httpClient *http.Client

func parseRawMetric(raw string) (*pb.WriteRequest, error) {
        raw = strings.TrimSpace(raw)

        idxOpen := strings.Index(raw, "{") if idxOpen == -1 {
                return nil, fmt.Errorf("invalid format: missing '{'")
        } idxClose := strings.Index(raw, "}") if idxClose == -1 ||
        idxClose < idxOpen {
                return nil, fmt.Errorf("invalid format: missing '}'")
        }

        metricName := strings.TrimSpace(raw[:idxOpen])
        labelPart := raw[idxOpen+1 : idxClose] rest :=
        strings.TrimSpace(raw[idxClose+1:])

        parts := strings.Fields(rest) if len(parts) < 2 {
                return nil, fmt.Errorf("invalid format: missing value
                or timestamp")
        }

        value, err := strconv.ParseFloat(parts[0], 64) if err != nil {
                return nil, fmt.Errorf("invalid metric value: %v", err)
        }

        tsSec, err := strconv.ParseInt(parts[1], 10, 64) if err != nil {
                return nil, fmt.Errorf("invalid timestamp: %v", err)
        } timestamp := tsSec * 1000

        labels := []pb.Label{{Name: "__name__", Value: metricName}}
        labelPart = strings.TrimSpace(labelPart) if labelPart != "" {
                labelPairs := strings.Split(labelPart, ",") for _,
                pair := range labelPairs {
                        pair = strings.TrimSpace(pair) if pair == "" {
                                continue
                        } kv := strings.SplitN(pair, "=", 2) if len(kv)
                        != 2 {
                                return nil, fmt.Errorf("invalid label
                                format: %s", pair)
                        } key := strings.TrimSpace(kv[0]) val :=
                        strings.Trim(strings.TrimSpace(kv[1]), "\"")
                        labels = append(labels, pb.Label{Name: key,
                        Value: val})
                }
        }

        ts := pb.TimeSeries{
                Labels:  labels, Samples: []pb.Sample{{Value: value,
                Timestamp: timestamp}},
        }

        writeReq := &pb.WriteRequest{
                Timeseries: []pb.TimeSeries{ts},
        } return writeReq, nil
}

func main() {
        var err error db, err = sql.Open("sqlite3", "./auth.db") if err !=
        nil {
                log.Fatalf("Failed to open database: %v", err)
        } defer db.Close()

        mimirURLStr := os.Getenv("MIMIR_URL") if mimirURLStr == "" {
                mimirURLStr =
                "https://examplemimir.example.com/api/v1/push"
        } mimirURL, err = url.Parse(mimirURLStr) if err != nil {
                log.Fatalf("Invalid MIMIR_URL: %v", err)
        }

        mimirUsername = os.Getenv("MIMIR_USERNAME") if mimirUsername ==
        "" {
                mimirUsername = "your_username"
        } mimirPassword = os.Getenv("MIMIR_PASSWORD") if mimirPassword ==
        "" {
                mimirPassword = "your_password"
        }

        httpClient = &http.Client{
                Timeout: 10 * time.Second,
        }

        http.HandleFunc("/api/v1/push", func(w http.ResponseWriter,
        r *http.Request) {
                if !authenticate(r) {
                        http.Error(w, "Unauthorized",
                        http.StatusUnauthorized) return
                }

                data, err := io.ReadAll(r.Body) if err != nil {
                        http.Error(w, "Failed to read request body",
                        http.StatusInternalServerError) return
                } r.Body.Close()

                writeReq, err := parseRawMetric(string(data)) // writeReq
                is now used if err != nil {
                        http.Error(w, fmt.Sprintf("Failed to parse
                        metric: %v", err), http.StatusBadRequest)
                        log.Printf("Error parsing metric: %v, Raw data:
                        %s", err, string(data)) return
                }

                serialized, err := proto.Marshal(writeReq) if err != nil {
                        http.Error(w, "Failed to marshal protobuf
                        message", http.StatusInternalServerError)
                        log.Printf("Error marshaling protobuf: %v,
                        WriteRequest: %+v", err, writeReq) return
                }

                compressedData := snappy.Encode(nil, serialized)

                backendReq, err := http.NewRequestWithContext(r.Context(),
                "POST", mimirURL.String(),
                bytes.NewReader(compressedData)) if err != nil {
                        http.Error(w, "Failed to create backend request",
                        http.StatusInternalServerError) log.Printf("Error
                        creating backend request: %v", err) return
                } backendReq.Header.Set("Content-Type",
                "application/x-protobuf")
                backendReq.Header.Set("Content-Encoding", "snappy")
                backendReq.SetBasicAuth(mimirUsername, mimirPassword)

                resp, err := httpClient.Do(backendReq) if err != nil {
                        http.Error(w, fmt.Sprintf("Failed to push to
                        Mimir backend: %v", err), http.StatusBadGateway)
                        log.Printf("Error pushing to Mimir: %v, Mimir URL:
                        %s", err, mimirURL.String()) return
                } defer resp.Body.Close()

                if resp.StatusCode >= 400 {
                        body, _ := io.ReadAll(resp.Body) http.Error(w,
                        fmt.Sprintf("Mimir returned error: %s (Status
                        Code: %d)", string(body), resp.StatusCode),
                        http.StatusBadGateway) log.Printf("Mimir returned
                        error: %s (Status Code: %d), Mimir URL: %s",
                        string(body), resp.StatusCode, mimirURL.String())
                        return
                }

                w.WriteHeader(resp.StatusCode) io.Copy(w, resp.Body)
        })

        log.Println("HTTP server running on :8080")
        log.Fatal(http.ListenAndServe(":8080", nil))
}

func authenticate(r *http.Request) bool {
        authHeader := r.Header.Get("Authorization") if authHeader ==
        "" || !strings.HasPrefix(authHeader, "Basic ") {
                return false
        }

        decoded, err :=
        base64.StdEncoding.DecodeString(strings.TrimPrefix(authHeader,
        "Basic ")) if err != nil {
                return false
        }

        parts := strings.SplitN(string(decoded), ":", 2) if len(parts)
        != 2 {
                return false
        }

        username, password := parts[0], parts[1] return
        validateUser(username, password)
}

func validateUser(username, password string) bool {
        var storedPassword string err := db.QueryRow("SELECT password
        FROM users WHERE username = ?", username).Scan(&storedPassword)
        if err != nil {
                return false
        } return password == storedPassword
}
mkdir push-proxy
cd push-proxy
go mod init
go mod tidy
go build
set -a
MIMIR_URL=https://mimir.uvoo.io/api/v1/push
MIMIR_USERNAME=mybasicauthuser
MIMIR_PASSWORD=mybasicauthpass

simple test

curl -u admin:password123 http://localhost:8080/api/v1/push -d '{}'

simple test with data

curl -X POST -g \
    -u admin:password123 \
    --data "my_metric1{label1=\"value1\",label2=\"value2\"} 12.34 $(date +%s)" \
    http://localhost:8080/api/v1/push?job=my_job&instance=my_instance