Difference between revisions of "Loki scripts"

From UVOO Tech Wiki
Jump to navigation Jump to search
(Created page with "``` import os from azure.storage.blob import ContainerClient # Set environment variables or hardcode for testing ACCOUNT_URL = os.environ["AZURE_ACCOUNT_URL"] # e.g.,...")
 
 
Line 33: Line 33:
 
else:
 
else:
 
     print("No small blobs found.")
 
     print("No small blobs found.")
 +
```
 +
 +
 +
```
 +
import gzip
 +
import json
 +
import os
 +
from glob import glob
 +
from datetime import datetime
 +
 +
def load_chunk(filepath):
 +
    with gzip.open(filepath, 'rt') as f:
 +
        return [json.loads(line) for line in f]
 +
 +
def save_chunk(entries, output_path):
 +
    entries.sort(key=lambda x: x['ts'])  # Sort by timestamp
 +
    with gzip.open(output_path, 'wt') as f:
 +
        for e in entries:
 +
            json.dump(e, f)
 +
            f.write('\n')
 +
 +
def merge_chunks(input_dir, output_file):
 +
    all_entries = []
 +
    for file in glob(os.path.join(input_dir, '*.json.gz')):
 +
        print(f"Reading {file}")
 +
        try:
 +
            entries = load_chunk(file)
 +
            all_entries.extend(entries)
 +
        except Exception as e:
 +
            print(f"⚠️ Failed to read {file}: {e}")
 +
 +
    print(f"Merging {len(all_entries)} entries")
 +
    save_chunk(all_entries, output_file)
 +
    print(f"✅ Merged chunk saved to {output_file}")
 +
 +
# Usage
 +
merge_chunks("downloaded_chunks", "merged_output/stream-A.json.gz")
 
```
 
```

Latest revision as of 13:03, 20 June 2025

import os
from azure.storage.blob import ContainerClient

# Set environment variables or hardcode for testing
ACCOUNT_URL = os.environ["AZURE_ACCOUNT_URL"]        # e.g., "https://myaccount.blob.core.windows.net"
CONTAINER = os.environ["AZURE_CONTAINER_NAME"]       # e.g., "loki-chunks"
ACCOUNT_KEY = os.environ["AZURE_ACCOUNT_KEY"]
SIZE_THRESHOLD = 100 * 1024                          # e.g., 100KB

# Connect to container
container_client = ContainerClient(
    account_url=ACCOUNT_URL,
    container_name=CONTAINER,
    credential=ACCOUNT_KEY
)

print(f"Scanning container '{CONTAINER}' for small chunks...")

to_delete = []
for blob in container_client.list_blobs(name_starts_with=""):
    if blob.size < SIZE_THRESHOLD:
        print(f"- {blob.name} ({blob.size} bytes)")
        to_delete.append(blob.name)

# Confirm and delete
if to_delete:
    confirm = input(f"Delete {len(to_delete)} small blobs? (y/n): ")
    if confirm.lower() == 'y':
        for name in to_delete:
            container_client.delete_blob(name)
        print("Deleted.")
else:
    print("No small blobs found.")
import gzip
import json
import os
from glob import glob
from datetime import datetime

def load_chunk(filepath):
    with gzip.open(filepath, 'rt') as f:
        return [json.loads(line) for line in f]

def save_chunk(entries, output_path):
    entries.sort(key=lambda x: x['ts'])  # Sort by timestamp
    with gzip.open(output_path, 'wt') as f:
        for e in entries:
            json.dump(e, f)
            f.write('\n')

def merge_chunks(input_dir, output_file):
    all_entries = []
    for file in glob(os.path.join(input_dir, '*.json.gz')):
        print(f"Reading {file}")
        try:
            entries = load_chunk(file)
            all_entries.extend(entries)
        except Exception as e:
            print(f"⚠️ Failed to read {file}: {e}")

    print(f"Merging {len(all_entries)} entries")
    save_chunk(all_entries, output_file)
    print(f"✅ Merged chunk saved to {output_file}")

# Usage
merge_chunks("downloaded_chunks", "merged_output/stream-A.json.gz")