Mimir push test client
Jump to navigation
Jump to search
.env
export MIMIR_USER export MIMIR_PASS export MIMIR_URL
Prep Depends
#!/bin/bash set -eu apt install -y sudo sudo apt update && sudo apt install -y wget python3-requests python3-snappy python3-grpc-tools # maybe needed or not python3-grpc-tools python3-protobuf mkdir -p prometheus_proto/gogoproto cd prometheus_proto wget https://raw.githubusercontent.com/prometheus/prometheus/main/prompb/remote.proto wget https://raw.githubusercontent.com/prometheus/prometheus/main/prompb/types.proto wget -P gogoproto https://raw.githubusercontent.com/gogo/protobuf/master/gogoproto/gogo.proto sudo apt install -y protobuf-compiler protoc --proto_path=. --python_out=. gogoproto/gogo.proto protoc --proto_path=. --python_out=. types.proto protoc --proto_path=. --python_out=. remote.proto mv *.py ../ mv gogoproto ../ cd ..
client-send-metric.py
import os
import time
import requests
import snappy
import warnings
# Suppress the InsecureRequestWarning when verify=False is used
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
# Assuming these are available from previous setup or environment variables
from remote_pb2 import WriteRequest
from types_pb2 import TimeSeries
from gogoproto.gogo_pb2 import *
MIMIR_URL = os.getenv('MIMIR_URL')
AUTH = (os.getenv('MIMIR_USER'), os.getenv('MIMIR_PASS'))
TLS_VERIFY = False
# "X-Scope-OrgID": "org1"
def create_remote_write_payload():
"""
Constructs a Remote Write Protobuf payload for Mimir.
"""
remote_write = WriteRequest()
series = remote_write.timeseries.add()
series.labels.add(name="__name__", value="example_metric")
series.labels.add(name="job", value="example")
ts = int(time.time() * 1000)
sample = series.samples.add(value=42, timestamp=ts)
return remote_write.SerializeToString()
def send_protobuf():
"""
Compresses and sends the Protobuf payload to Mimir.
"""
payload = create_remote_write_payload()
compressed_payload = snappy.compress(payload)
headers = {
"Content-Type": "application/x-protobuf",
"Content-Encoding": "snappy",
}
print(f"Sending data to {MIMIR_URL} with TLS verification skipped...")
# Add verify=False here to skip TLS verification
response = requests.post(MIMIR_URL, data=compressed_payload, headers=headers, auth=AUTH, verify=TLS_VERIFY)
print(f"Protobuf Response: {response.status_code} - {response.text}")
# Ensure environment variables are set before running
# Example:
# export MIMIR_URL="https://your-mimir-endpoint:8080/api/v1/push"
# export MIMIR_USER="your_username"
# export MIMIR_PASS="your_password"
if __name__ == "__main__":
if not MIMIR_URL:
print("Error: MIMIR_URL environment variable is not set.")
else:
send_protobuf()