Mimir push test client
Jump to navigation
Jump to search
.env
export MIMIR_USER export MIMIR_PASS export MIMIR_URL
Prep Depends
#!/bin/bash set -eu apt install -y sudo sudo apt update && sudo apt install -y wget python3-requests python3-snappy python3-grpc-tools # maybe needed or not python3-grpc-tools python3-protobuf mkdir -p prometheus_proto/gogoproto cd prometheus_proto wget https://raw.githubusercontent.com/prometheus/prometheus/main/prompb/remote.proto wget https://raw.githubusercontent.com/prometheus/prometheus/main/prompb/types.proto wget -P gogoproto https://raw.githubusercontent.com/gogo/protobuf/master/gogoproto/gogo.proto sudo apt install -y protobuf-compiler protoc --proto_path=. --python_out=. gogoproto/gogo.proto protoc --proto_path=. --python_out=. types.proto protoc --proto_path=. --python_out=. remote.proto mv *.py ../ mv gogoproto ../ cd ..
client-send-metric.py
import os import time import requests import snappy import warnings # Suppress the InsecureRequestWarning when verify=False is used warnings.filterwarnings("ignore", message="Unverified HTTPS request") # Assuming these are available from previous setup or environment variables from remote_pb2 import WriteRequest from types_pb2 import TimeSeries from gogoproto.gogo_pb2 import * MIMIR_URL = os.getenv('MIMIR_URL') AUTH = (os.getenv('MIMIR_USER'), os.getenv('MIMIR_PASS')) TLS_VERIFY = False # "X-Scope-OrgID": "org1" def create_remote_write_payload(): """ Constructs a Remote Write Protobuf payload for Mimir. """ remote_write = WriteRequest() series = remote_write.timeseries.add() series.labels.add(name="__name__", value="example_metric") series.labels.add(name="job", value="example") ts = int(time.time() * 1000) sample = series.samples.add(value=42, timestamp=ts) return remote_write.SerializeToString() def send_protobuf(): """ Compresses and sends the Protobuf payload to Mimir. """ payload = create_remote_write_payload() compressed_payload = snappy.compress(payload) headers = { "Content-Type": "application/x-protobuf", "Content-Encoding": "snappy", } print(f"Sending data to {MIMIR_URL} with TLS verification skipped...") # Add verify=False here to skip TLS verification response = requests.post(MIMIR_URL, data=compressed_payload, headers=headers, auth=AUTH, verify=TLS_VERIFY) print(f"Protobuf Response: {response.status_code} - {response.text}") # Ensure environment variables are set before running # Example: # export MIMIR_URL="https://your-mimir-endpoint:8080/api/v1/push" # export MIMIR_USER="your_username" # export MIMIR_PASS="your_password" if __name__ == "__main__": if not MIMIR_URL: print("Error: MIMIR_URL environment variable is not set.") else: send_protobuf()