Mimir push test client

From UVOO Tech Wiki
Jump to navigation Jump to search

.env

export MIMIR_USER
export MIMIR_PASS
export MIMIR_URL

Prep Depends

#!/bin/bash
set -eu
apt install -y sudo
sudo apt update && sudo apt install -y wget python3-requests python3-snappy python3-grpc-tools
# maybe needed or not python3-grpc-tools python3-protobuf

mkdir -p prometheus_proto/gogoproto
cd prometheus_proto
wget https://raw.githubusercontent.com/prometheus/prometheus/main/prompb/remote.proto
wget https://raw.githubusercontent.com/prometheus/prometheus/main/prompb/types.proto
wget -P gogoproto https://raw.githubusercontent.com/gogo/protobuf/master/gogoproto/gogo.proto

sudo apt install -y protobuf-compiler

protoc --proto_path=. --python_out=. gogoproto/gogo.proto
protoc --proto_path=. --python_out=. types.proto
protoc --proto_path=. --python_out=. remote.proto

mv *.py ../
mv gogoproto ../
cd ..

client-send-metric.py

import os
import time
import requests
import snappy
import warnings

# Suppress the InsecureRequestWarning when verify=False is used
warnings.filterwarnings("ignore", message="Unverified HTTPS request")

# Assuming these are available from previous setup or environment variables
from remote_pb2 import WriteRequest
from types_pb2 import TimeSeries
from gogoproto.gogo_pb2 import *

MIMIR_URL = os.getenv('MIMIR_URL')
AUTH = (os.getenv('MIMIR_USER'), os.getenv('MIMIR_PASS'))
TLS_VERIFY = False
        # "X-Scope-OrgID": "org1"

def create_remote_write_payload():
    """
    Constructs a Remote Write Protobuf payload for Mimir.
    """
    remote_write = WriteRequest()
    series = remote_write.timeseries.add()
    series.labels.add(name="__name__", value="example_metric")
    series.labels.add(name="job", value="example")
    ts = int(time.time() * 1000)
    sample = series.samples.add(value=42, timestamp=ts)
    return remote_write.SerializeToString()

def send_protobuf():
    """
    Compresses and sends the Protobuf payload to Mimir.
    """
    payload = create_remote_write_payload()
    compressed_payload = snappy.compress(payload)

    headers = {
        "Content-Type": "application/x-protobuf",
        "Content-Encoding": "snappy",
    }

    print(f"Sending data to {MIMIR_URL} with TLS verification skipped...")
    # Add verify=False here to skip TLS verification
    response = requests.post(MIMIR_URL, data=compressed_payload, headers=headers, auth=AUTH, verify=TLS_VERIFY)

    print(f"Protobuf Response: {response.status_code} - {response.text}")

# Ensure environment variables are set before running
# Example:
# export MIMIR_URL="https://your-mimir-endpoint:8080/api/v1/push"
# export MIMIR_USER="your_username"
# export MIMIR_PASS="your_password"

if __name__ == "__main__":
    if not MIMIR_URL:
        print("Error: MIMIR_URL environment variable is not set.")
    else:
        send_protobuf()