Difference between revisions of "Loki scripts"
Jump to navigation
Jump to search
(Created page with "``` import os from azure.storage.blob import ContainerClient # Set environment variables or hardcode for testing ACCOUNT_URL = os.environ["AZURE_ACCOUNT_URL"] # e.g.,...") |
|||
| Line 33: | Line 33: | ||
else: | else: | ||
print("No small blobs found.") | print("No small blobs found.") | ||
| + | ``` | ||
| + | |||
| + | |||
| + | ``` | ||
| + | import gzip | ||
| + | import json | ||
| + | import os | ||
| + | from glob import glob | ||
| + | from datetime import datetime | ||
| + | |||
| + | def load_chunk(filepath): | ||
| + | with gzip.open(filepath, 'rt') as f: | ||
| + | return [json.loads(line) for line in f] | ||
| + | |||
| + | def save_chunk(entries, output_path): | ||
| + | entries.sort(key=lambda x: x['ts']) # Sort by timestamp | ||
| + | with gzip.open(output_path, 'wt') as f: | ||
| + | for e in entries: | ||
| + | json.dump(e, f) | ||
| + | f.write('\n') | ||
| + | |||
| + | def merge_chunks(input_dir, output_file): | ||
| + | all_entries = [] | ||
| + | for file in glob(os.path.join(input_dir, '*.json.gz')): | ||
| + | print(f"Reading {file}") | ||
| + | try: | ||
| + | entries = load_chunk(file) | ||
| + | all_entries.extend(entries) | ||
| + | except Exception as e: | ||
| + | print(f"⚠️ Failed to read {file}: {e}") | ||
| + | |||
| + | print(f"Merging {len(all_entries)} entries") | ||
| + | save_chunk(all_entries, output_file) | ||
| + | print(f"✅ Merged chunk saved to {output_file}") | ||
| + | |||
| + | # Usage | ||
| + | merge_chunks("downloaded_chunks", "merged_output/stream-A.json.gz") | ||
``` | ``` | ||
Latest revision as of 13:03, 20 June 2025
import os
from azure.storage.blob import ContainerClient
# Set environment variables or hardcode for testing
ACCOUNT_URL = os.environ["AZURE_ACCOUNT_URL"] # e.g., "https://myaccount.blob.core.windows.net"
CONTAINER = os.environ["AZURE_CONTAINER_NAME"] # e.g., "loki-chunks"
ACCOUNT_KEY = os.environ["AZURE_ACCOUNT_KEY"]
SIZE_THRESHOLD = 100 * 1024 # e.g., 100KB
# Connect to container
container_client = ContainerClient(
account_url=ACCOUNT_URL,
container_name=CONTAINER,
credential=ACCOUNT_KEY
)
print(f"Scanning container '{CONTAINER}' for small chunks...")
to_delete = []
for blob in container_client.list_blobs(name_starts_with=""):
if blob.size < SIZE_THRESHOLD:
print(f"- {blob.name} ({blob.size} bytes)")
to_delete.append(blob.name)
# Confirm and delete
if to_delete:
confirm = input(f"Delete {len(to_delete)} small blobs? (y/n): ")
if confirm.lower() == 'y':
for name in to_delete:
container_client.delete_blob(name)
print("Deleted.")
else:
print("No small blobs found.")
import gzip
import json
import os
from glob import glob
from datetime import datetime
def load_chunk(filepath):
with gzip.open(filepath, 'rt') as f:
return [json.loads(line) for line in f]
def save_chunk(entries, output_path):
entries.sort(key=lambda x: x['ts']) # Sort by timestamp
with gzip.open(output_path, 'wt') as f:
for e in entries:
json.dump(e, f)
f.write('\n')
def merge_chunks(input_dir, output_file):
all_entries = []
for file in glob(os.path.join(input_dir, '*.json.gz')):
print(f"Reading {file}")
try:
entries = load_chunk(file)
all_entries.extend(entries)
except Exception as e:
print(f"⚠️ Failed to read {file}: {e}")
print(f"Merging {len(all_entries)} entries")
save_chunk(all_entries, output_file)
print(f"✅ Merged chunk saved to {output_file}")
# Usage
merge_chunks("downloaded_chunks", "merged_output/stream-A.json.gz")