Metrics & Telemetry System¶
Comprehensive monitoring and metrics collection for the AST server to track performance, detect issues, and optimize the system.
Overview¶
The metrics system provides real-time visibility into: - Event processing performance (latency, throughput) - UDP packet health (loss rate, gaps) - WebSocket connectivity (clients, broadcasts) - Cache effectiveness (hit rates) - Error tracking (by type and frequency) - System health (uptime, resource usage)
Key Benefits: - 📊 Data-driven optimization - Know what to optimize - 🔍 Proactive monitoring - Detect issues before users complain - 📈 Performance tracking - Track improvements over time - 🚨 Alerting - Set thresholds for critical metrics
Quick Start¶
Basic Usage¶
from src.server.api import ASTServer
# Metrics enabled by default
server = ASTServer()
# Process some events...
await server.process_live_event("/live/track/renamed", [0, "Bass"], 1, time.time())
# Get metrics summary
summary = server.get_metrics_summary()
print(f"Event processing latency (p95): {summary['event_processing']['latency_ms']['p95']}ms")
print(f"Total events processed: {summary['event_processing']['total_processed']}")
# Get full metrics
metrics = server.get_metrics()
print(json.dumps(metrics, indent=2))
Output Example¶
{
"timestamp": "2025-11-28T10:30:00",
"uptime_seconds": 3600.5,
"event_processing": {
"total_processed": 15420,
"latency_ms": {
"p50": 5.2,
"p95": 18.7,
"p99": 45.3
}
},
"udp_listener": {
"packets_received": 15500,
"packets_dropped": 80
},
"websocket": {
"clients_connected": 3,
"broadcasts_sent": 15420
},
"cache": {
"hit_rate_percent": 87.3
},
"errors": {
"errors.event_processing": 2
}
}
Architecture¶
graph TB
Event[Event Occurs] --> Instrument[Instrumented Code]
Instrument --> Metrics[MetricsCollector]
Metrics --> Timings[(Timings<br/>p50/p95/p99)]
Metrics --> Counters[(Counters<br/>Incrementing)]
Metrics --> Gauges[(Gauges<br/>Point-in-time)]
Timings --> Export[MetricsExporter]
Counters --> Export
Gauges --> Export
Export --> JSON[JSON Export]
Export --> Summary[Summary Export]
JSON --> API[API Endpoint<br/>get_metrics]
Summary --> API2[API Endpoint<br/>get_metrics_summary]
style Metrics fill:#afa
style Export fill:#ffa
Core Concepts¶
Metric Types¶
1. Timings - Duration Measurements¶
Tracks how long operations take, with percentile calculations.
# Manual timing
start = time.time()
process_event()
duration = time.time() - start
server.metrics.timing('event.processing', duration)
# Context manager (recommended)
with server.metrics.timer('event.processing'):
process_event()
# Get statistics
stats = server.metrics.get_timing('event.processing')
print(f"Mean: {stats['mean']:.3f}s")
print(f"p50: {stats['p50']:.3f}s")
print(f"p95: {stats['p95']:.3f}s")
print(f"p99: {stats['p99']:.3f}s")
When to use: Any operation where latency matters (event processing, API calls, database queries)
2. Counters - Incrementing Values¶
Tracks counts of events or occurrences.
# Increment by 1
server.metrics.increment('events.processed')
# Increment by custom amount
server.metrics.increment('events.dropped', amount=5)
# Decrement
server.metrics.decrement('websocket.clients')
# Get current value
stats = server.metrics.get_counter('events.processed')
print(f"Total events: {stats['value']}")
When to use: Counting events, errors, requests, connections
3. Gauges - Point-in-Time Values¶
Tracks current state or value that can go up and down.
# Set current value
server.metrics.gauge('websocket.clients', 3.0)
server.metrics.gauge('cache.hit_rate', 0.87)
server.metrics.gauge('udp.current_sequence', 12345.0)
# Get statistics (current, min, max, mean)
stats = server.metrics.get_gauge('websocket.clients')
print(f"Current: {stats['current']}")
print(f"Peak: {stats['max']}")
print(f"Average: {stats['mean']}")
When to use: Current connections, queue depth, memory usage, percentages
Tags for Categorization¶
Use tags to categorize metrics by different dimensions:
# Event type categorization
server.metrics.timing(
'event.processing.duration',
duration,
tags={'event_type': '/live/track/renamed'}
)
server.metrics.increment(
'events.processed',
tags={'event_type': '/live/scene/added'}
)
# Handler categorization
server.metrics.timing(
'handler.duration',
duration,
tags={'handler': 'scene_handler'}
)
# Get metrics by tag
stats = server.metrics.get_timing(
'event.processing.duration',
tags={'event_type': '/live/track/renamed'}
)
Automatically Tracked Metrics¶
The AST server automatically tracks these metrics:
Event Processing¶
# Every event processed
'events.received' # Counter: Total events received
'events.received.by_type' # Counter: By event type (tagged)
'events.processed' # Counter: Successfully processed
'events.processed.by_type' # Counter: By event type (tagged)
'events.ignored.no_ast' # Counter: Ignored (no AST loaded)
'events.unhandled' # Counter: Unhandled event types
'events.unhandled.by_type' # Counter: By event type (tagged)
'event.processing.duration' # Timing: Processing time
'event.processing.duration[event_type=*]' # Timing: By event type (tagged)
Errors¶
'errors.event_processing' # Counter: Processing errors
'errors.event_processing.by_type' # Counter: By event type (tagged)
Example: Viewing Auto-Tracked Metrics¶
# Get all metrics
all_metrics = server.get_metrics()
# Event counters
counters = all_metrics['metrics']['counters']
print(f"Total events received: {counters['events.received']['value']}")
print(f"Total events processed: {counters['events.processed']['value']}")
print(f"Total errors: {counters.get('errors.event_processing', {}).get('value', 0)}")
# Event timings
timings = all_metrics['metrics']['timings']
for key, stats in timings.items():
if 'event.processing.duration' in key:
print(f"{key}:")
print(f" p50: {stats['p50']*1000:.2f}ms")
print(f" p95: {stats['p95']*1000:.2f}ms")
Usage Examples¶
Example 1: Basic Monitoring¶
# Create server with metrics
server = ASTServer(enable_metrics=True)
# Simulate event processing
for i in range(100):
await server.process_live_event(
"/live/track/renamed",
[i % 10, f"Track {i}"],
i,
time.time()
)
# Check performance
summary = server.get_metrics_summary()
print(f"Processed {summary['event_processing']['total_processed']} events")
print(f"p95 latency: {summary['event_processing']['latency_ms']['p95']}ms")
Example 2: Custom Metrics in Handlers¶
class CustomHandler(BaseEventHandler):
async def handle_custom_event(self, args, seq_num):
# Time the operation
with self.server.metrics.timer('custom.operation'):
result = await self._do_work(args)
# Track success/failure
if result:
self.server.metrics.increment('custom.success')
else:
self.server.metrics.increment('custom.failure')
# Track gauge (e.g., queue depth)
self.server.metrics.gauge('custom.queue_depth', len(self.queue))
return result
Example 3: Performance Profiling¶
# Profile specific operations
def profile_ast_operations(server):
# Track different operations
with server.metrics.timer('operation.load_project'):
server.load_project("test.als")
with server.metrics.timer('operation.find_tracks'):
server.find_nodes_by_type('track')
with server.metrics.timer('operation.compute_diff'):
server.diff_with_file("test2.als")
# Print profiling results
operations = ['load_project', 'find_tracks', 'compute_diff']
for op in operations:
stats = server.metrics.get_timing(f'operation.{op}')
if stats:
print(f"{op}: {stats['mean']*1000:.2f}ms (p95: {stats['p95']*1000:.2f}ms)")
Example 4: Monitoring Over Time¶
import time
# Reset metrics to start fresh
server.metrics.reset()
# Run for 60 seconds
start_time = time.time()
while time.time() - start_time < 60:
# Process events...
await process_events()
# Check metrics every 10 seconds
if int(time.time() - start_time) % 10 == 0:
summary = server.get_metrics_summary()
print(f"[{int(time.time() - start_time)}s] "
f"Processed: {summary['event_processing']['total_processed']}, "
f"p95: {summary['event_processing']['latency_ms']['p95']}ms")
# Final summary
final_metrics = server.get_metrics_summary()
print("\nFinal Statistics:")
print(json.dumps(final_metrics, indent=2))
Example 5: Error Tracking¶
# Track different error types
try:
process_event()
except ValueError as e:
server.metrics.increment('errors.validation',
tags={'error_type': 'ValueError'})
except KeyError as e:
server.metrics.increment('errors.missing_key',
tags={'error_type': 'KeyError'})
# Check error rates
all_metrics = server.get_metrics()
for key, stats in all_metrics['metrics']['counters'].items():
if 'error' in key:
print(f"{key}: {stats['value']}")
Example 6: UDP Packet Loss Tracking¶
# In UDP listener callback
async def udp_event_callback(event_path, args, seq_num, timestamp):
server.metrics.increment('udp.packet.received')
# Detect gaps
if seq_num != last_seq_num + 1:
gap_size = seq_num - last_seq_num - 1
server.metrics.increment('udp.packet.dropped', amount=gap_size)
server.metrics.increment('udp.sequence.gap')
# Track current sequence
server.metrics.gauge('udp.current_sequence', float(seq_num))
last_seq_num = seq_num
# Monitor packet loss
summary = server.get_metrics_summary()
if 'udp_listener' in summary:
received = summary['udp_listener'].get('packets_received', 0)
dropped = summary['udp_listener'].get('packets_dropped', 0)
loss_rate = dropped / received if received > 0 else 0
print(f"Packet loss rate: {loss_rate:.2%}")
Example 7: WebSocket Health Monitoring¶
# Track WebSocket events
class ASTWebSocketServer:
async def broadcast(self, message):
# Track broadcast attempt
self.server.metrics.increment('websocket.broadcast.attempt')
# Track clients
self.server.metrics.gauge('websocket.clients', len(self.clients))
# Track message size
message_size = len(json.dumps(message))
self.server.metrics.timing('websocket.message.size', message_size)
try:
# Send to clients
await self._send_to_clients(message)
self.server.metrics.increment('websocket.broadcast.success')
except Exception as e:
self.server.metrics.increment('websocket.broadcast.failure')
# Monitor WebSocket health
summary = server.get_metrics_summary()
if 'websocket' in summary:
print(f"Connected clients: {summary['websocket']['clients_connected']}")
print(f"Broadcasts sent: {summary['websocket']['broadcasts_sent']}")
Configuration¶
Enable/Disable Metrics¶
# Enable (default)
server = ASTServer(enable_metrics=True)
# Disable for production if overhead is a concern
server = ASTServer(enable_metrics=False)
# Check if enabled
if server.metrics.enabled:
server.metrics.increment('custom.metric')
Reset Metrics¶
# Reset all metrics
server.metrics.reset()
# Reset only counters (keep timings and gauges)
server.metrics.reset_counters()
API Reference¶
MetricsCollector Methods¶
# Timing
server.metrics.timing(name, duration_seconds, tags=None)
server.metrics.timer(name, tags=None) # Context manager
# Counter
server.metrics.increment(name, amount=1, tags=None)
server.metrics.decrement(name, amount=1, tags=None)
# Gauge
server.metrics.gauge(name, value, tags=None)
# Get specific metric
server.metrics.get_timing(name, tags=None)
server.metrics.get_counter(name, tags=None)
server.metrics.get_gauge(name, tags=None)
# Get all metrics
server.metrics.get_all_metrics()
# Reset
server.metrics.reset()
server.metrics.reset_counters()
ASTServer Methods¶
# Get full metrics (JSON format)
metrics = server.get_metrics()
# Get summary (high-level overview)
summary = server.get_metrics_summary()
Exporting Metrics¶
JSON Export¶
from src.server.utils import MetricsExporter
# Get all metrics
all_metrics = server.metrics.get_all_metrics()
# Export as JSON
json_export = MetricsExporter.to_json(all_metrics)
# Save to file
with open('metrics.json', 'w') as f:
json.dump(json_export, f, indent=2)
Summary Export¶
# Get high-level summary
summary = MetricsExporter.to_summary(all_metrics)
# Print key metrics
print(f"Uptime: {summary['uptime_seconds']}s")
print(f"Event processing p95: {summary['event_processing']['latency_ms']['p95']}ms")
print(f"Cache hit rate: {summary['cache']['hit_rate_percent']}%")
Custom Export Format¶
# Create custom export
def export_to_prometheus(metrics):
"""Export metrics in Prometheus format."""
lines = []
# Counters
for name, stats in metrics['counters'].items():
lines.append(f"{name} {stats['value']}")
# Gauges
for name, stats in metrics['gauges'].items():
lines.append(f"{name} {stats['current']}")
# Timings (use p95)
for name, stats in metrics['timings'].items():
lines.append(f"{name}_p95 {stats['p95']}")
return '\n'.join(lines)
# Use custom export
prom_format = export_to_prometheus(server.get_metrics()['metrics'])
print(prom_format)
Monitoring Dashboard¶
Real-Time Dashboard Example¶
import time
import os
def clear_screen():
os.system('clear' if os.name == 'posix' else 'cls')
def display_dashboard(server):
while True:
clear_screen()
summary = server.get_metrics_summary()
print("=" * 60)
print(" AST Server Health Dashboard")
print("=" * 60)
# Uptime
uptime = summary.get('uptime_seconds', 0)
print(f"\n⏱️ Uptime: {uptime:.0f}s ({uptime/60:.1f}min)")
# Event Processing
if 'event_processing' in summary:
ep = summary['event_processing']
print(f"\n📊 Event Processing:")
print(f" Total processed: {ep.get('total_processed', 0)}")
if 'latency_ms' in ep:
lat = ep['latency_ms']
print(f" Latency p50: {lat.get('p50', 0)}ms")
print(f" Latency p95: {lat.get('p95', 0)}ms")
print(f" Latency p99: {lat.get('p99', 0)}ms")
# UDP Listener
if 'udp_listener' in summary:
udp = summary['udp_listener']
received = udp.get('packets_received', 0)
dropped = udp.get('packets_dropped', 0)
loss_rate = (dropped / received * 100) if received > 0 else 0
print(f"\n📡 UDP Listener:")
print(f" Packets received: {received}")
print(f" Packets dropped: {dropped}")
print(f" Loss rate: {loss_rate:.2f}%")
# WebSocket
if 'websocket' in summary:
ws = summary['websocket']
print(f"\n🌐 WebSocket:")
print(f" Clients: {ws.get('clients_connected', 0)}")
print(f" Broadcasts: {ws.get('broadcasts_sent', 0)}")
# Cache
if 'cache' in summary:
cache = summary['cache']
print(f"\n💾 Cache:")
print(f" Hit rate: {cache.get('hit_rate_percent', 0):.1f}%")
# Errors
if 'errors' in summary:
errors = summary['errors']
total_errors = sum(errors.values())
print(f"\n⚠️ Errors: {total_errors}")
print("\n" + "=" * 60)
time.sleep(1) # Update every second
# Run dashboard
display_dashboard(server)
Performance Characteristics¶
Overhead¶
- Enabled metrics: < 0.1ms per operation
- Disabled metrics: < 0.001ms per operation (essentially free)
- Memory: ~1-2 MB for typical workload (10k operations)
Recommendations¶
Development: Keep enabled for visibility
Production (high performance): Keep enabled, overhead is minimal
Production (memory constrained): Disable if needed
Best Practices¶
1. Use Descriptive Names¶
# Good: Clear, hierarchical names
server.metrics.timing('event.processing.duration')
server.metrics.timing('ast.diff.computation')
server.metrics.increment('websocket.broadcast.success')
# Bad: Unclear names
server.metrics.timing('duration')
server.metrics.increment('count')
2. Use Tags for Categorization¶
# Good: One metric with tags
server.metrics.timing('event.processing',
tags={'event_type': event_path})
# Bad: Many separate metrics
server.metrics.timing('event.processing.track_renamed')
server.metrics.timing('event.processing.scene_added')
# ... (this creates too many separate metrics)
3. Use Appropriate Metric Types¶
# Timing for durations
with server.metrics.timer('operation'):
do_work()
# Counter for counts
server.metrics.increment('events.processed')
# Gauge for current state
server.metrics.gauge('queue.depth', len(queue))
4. Monitor Critical Paths¶
# Always instrument critical paths
with server.metrics.timer('critical.event_processing'):
process_event()
# Optional: Instrument less critical paths
if server.metrics.enabled:
with server.metrics.timer('optional.logging'):
log_debug_info()
5. Set Up Alerting¶
# Check metrics periodically
summary = server.get_metrics_summary()
# Alert on high latency
if summary['event_processing']['latency_ms']['p95'] > 100:
send_alert("High event processing latency!")
# Alert on packet loss
if summary['udp_listener']['packets_dropped'] > 100:
send_alert("High UDP packet loss!")
# Alert on errors
if summary['errors'].get('errors.event_processing', 0) > 10:
send_alert("Many event processing errors!")
Troubleshooting¶
High Latency¶
# Identify slow operations
timings = server.get_metrics()['metrics']['timings']
for name, stats in sorted(timings.items(), key=lambda x: x[1]['p95'], reverse=True):
print(f"{name}: p95={stats['p95']*1000:.2f}ms")
Memory Growth¶
# Check metric counts
all_metrics = server.get_metrics()
print(f"Timing metrics: {len(all_metrics['metrics']['timings'])}")
print(f"Counter metrics: {len(all_metrics['metrics']['counters'])}")
print(f"Gauge metrics: {len(all_metrics['metrics']['gauges'])}")
# If too many, reset periodically
if len(all_metrics['metrics']['timings']) > 1000:
server.metrics.reset()
Missing Metrics¶
# Verify metrics are enabled
if not server.metrics.enabled:
print("⚠️ Metrics are disabled!")
# Verify metrics are being recorded
server.metrics.increment('test.counter')
stats = server.metrics.get_counter('test.counter')
print(f"Test counter: {stats}")
See Also¶
- Caching System - Performance optimization via caching
- WebSocket AST - AST server architecture
- Performance Tuning - General performance tips