HLD: Distributed Logging / Metrics Collection ā
Understanding the Problem ā
What is Distributed Logging and Metrics? ā
A platform that collects logs and metrics from tens of thousands of production services emitting hundreds of millions of events per second, stores them cost-efficiently, makes them queryable in near real time, and survives backpressure from hot paths so a production incident doesn't take down the observability system along with everything else. At Uber this is M3 (metrics), Jaeger (tracing), and log aggregation built on Kafka + Pinot/Elastic tiers. Interviewers reach for this problem when they want to see infrastructure maturity: tail-based sampling, cardinality management, tiered storage, and multi-tenant isolation.
Functional Requirements ā
Core (above the line):
- Accept log lines and metric samples from every service.
- Store with retention (logs: 7ā30 days hot, cold archive 1 year; metrics: 13 months rolled-up).
- Query: "logs for service X in last hour containing error Y"; "p99 latency for endpoint Z over last 7 days".
- Alert on rule-based conditions with < 1 min delay.
Below the line (out of scope):
- Business analytics ā separate data warehouse.
- Distributed tracing ā separate Jaeger stack (shares ingestion patterns).
- Long-term ML training data ā separate pipeline.
Non-Functional Requirements ā
Core:
- Scale ā 500M events/sec globally at peak.
- Latency ā ingestion p99 < 1s; query p99 < 2s for last-hour range; 10s for last-7-days.
- Durability ā lossy is acceptable for logs (⤠0.1%), not for metrics tied to SLOs.
- Cost ā storage dominates; must tier aggressively.
Below the line:
- Sub-second query on cold archive ā we accept minutes of thaw.
Capacity Estimation ā
Back-of-envelope on the whiteboard:
- Ingress: 500M events/sec Ć 500 B avg = 250 GB/s raw; ~60 GB/s after agent-side compression.
- Daily volume: 21.6 PB/day raw. With 4Ć compression, 5 PB/day stored hot. Cold archive to S3-class with another 3Ć cold compression ā ~1.6 PB/day cold.
- Unique time series: 50M active series per region at peak; cardinality explosions must be bounded via budget enforcement at ingest.
- Hot Elasticsearch: 5 PB/day Ć 7 days = 35 PB in hot tier; roughly 3500 nodes Ć 10 TB SSD each.
- Cold Parquet archive: 1.6 PB/day Ć 30 days hot-cold = 50 PB; object storage at ~$0.02/GB-month ā ~$1M/month at cost before discounts.
The Set Up ā
Core Entities ā
| Entity | Description |
|---|---|
| LogEvent | service, host, level, ts, message, trace_id, attrs{} |
| MetricSample | metric, tags, value, ts |
| QueryRequest | text/filter DSL, time range, service scope |
API Design ā
Per-host agents push batches over gRPC. Query is REST on top of a DSL similar to LogQL / PromQL.
// Ingest (internal, per-host agent pushes)
POST /v1/logs/batch
Body: { events: LogEvent[] }
POST /v1/metrics/batch
Body: { samples: MetricSample[] }
// Query
POST /v1/query/logs
Body: { query: string; from: ts; to: ts; service?: string; limit: number }
Response: { events: LogEvent[]; nextCursor? }
POST /v1/query/metric
Body: { promql: string; from: ts; to: ts; step: string }
Response: { series: [{ tags, points: [[ts, value]] }] }service LogIngest {
rpc StreamEvents(stream LogBatch) returns (IngestAck);
}
service Query {
rpc QueryLogs(LogQuery) returns (stream LogEvent);
rpc QueryMetric(MetricQuery) returns (MetricResult);
}
message MetricSample {
string metric = 1;
map<string, string> tags = 2;
double value = 3;
int64 ts_ms = 4;
}// Per-host buffering + batching
while (true) {
auto batch = ringBuffer.drain(/*max=*/64_KB, /*deadline=*/500ms);
if (batch.empty()) continue;
auto ctx = grpc::ClientContext{};
LogBatch req;
for (auto& e : batch) req.add_events()->CopyFrom(e);
// backpressure: if ack times out, keep in disk buffer
if (!ingest->SendBatch(&ctx, req, &ack)) diskBuffer.spill(batch);
}High-Level Design ā
App process -> stdout
|
v
+---------------+
| Host agent | (per node, buffers + samples)
| (Fluentd/ |
| Vector) |
+---------------+
|
v
+---------------+
| Ingest tier |
| (gRPC, auth) |
+---------------+
|
v
+---------------+
| Kafka | (sharded by service)
+---------------+
/ \
v v
+--------+ +-----------+
| Logs | | Metrics |
| indexer| | TSDB (M3) |
| (Lucene| +-----------+
| / |
| Elastic|
| /Loki) |
+--------+
|
v
Cold archive (S3/HDFS, Parquet, 1yr)End-to-End Flow ā
- Each app writes structured logs to stdout and metrics via a SDK. A per-host agent (Fluentd, Vector, or Uber's internal agent) buffers and samples events.
- Agent pushes batches over gRPC to the Ingest tier at the nearest region.
- Ingest performs auth and writes to Kafka, sharded by service for partition affinity.
- Log Indexer consumes, writes to hot storage (Elasticsearch / Loki). Metrics consumer writes to M3 TSDB with downsampling rules.
- After 7 days, a tiering job rolls hot data into Parquet in object storage; after 30 days, into Glacier-class cold tier.
- Query service federates across hot/warm/cold tiers based on the query's time range.
Why These Components ā
- Agent buffering absorbs upstream outages without blocking apps.
- Kafka is the durable fan-out backbone; multiple consumer tiers (logs, metrics, tracing) stay isolated.
- M3 handles metrics with native downsampling and was built for Uber's scale.
- Parquet on object storage is the cheapest way to keep a long tail searchable.
Data Model Detail ā
- Hot logs (Elasticsearch / Loki): Indexed by
service,level,trace_id; full-text inverted index onmessage. Sharded daily; 4 shards per day per region at hot tier. - Warm logs (Parquet on S3): Partitioned by
(dt=YYYY-MM-DD, service). Columns:ts,host,level,message,trace_id,attrs (map). - Metrics (M3): Stores time series keyed by
(metric, tags...)with downsampled rollups: raw 10s, 1m, 10m, 1h rollups automatically generated. - Kafka topics:
logs.{service}andmetrics.{team}, RF=3, retention 24h (logs) / 7d (metrics).
Capacity Walkthrough ā
- 500M events/sec Ć 500 B = 250 GB/s raw, ~60 GB/s after 4Ć compression.
- Kafka cluster: 300 brokers at 200 MB/s each (durable writes with RF=3) ā 60 GB/s. Dedicated hardware with NVMe storage.
- Hot Elasticsearch: 5 PB of daily hot data / 500 nodes Ć 10 TB SSD/node = 10 TB/node utilized. Retention 7 days.
- Cold Parquet archive: 5 PB/day Ć 30 days = 150 PB hot-cold; Glacier for older at ~$0.004/GB/month.
- Query: p99 < 2s for last-hour via hot tier; 10s for last-7-days via warm tier.
Potential Deep Dives ā
1) How do we handle ingest backpressure without blocking production? ā
An observability system that takes down production during an incident is worse than useless.
Bad Solution ā Synchronous push ā
- Approach: Agents push synchronously; when the ingest tier is slow, agents block the app.
- Challenges: Catastrophic during incidents exactly when logs matter most.
Good Solution ā Per-host disk buffer with async push ā
- Approach: Ring buffer in memory, spill to disk on ingest lag.
- Challenges: Survives short outages but not long ones. Disk fills up.
Great Solution ā Tail-based sampling with importance weighting ā
- Approach:
- Error logs, slow requests, and traced requests always pass.
- For steady-state noise (INFO/DEBUG), apply head-based sampling at the agent (1/100) that can be dynamically adjusted from the control plane.
- During incidents, rate of important signals is preserved while total bytes drop.
- Backpressure signal flows back via gRPC flow control; agent downgrades sampling rate rather than blocking.
- Challenges: Sampling configuration drift across agents ā use a central control plane with versioned configs and staged rollouts.
2) How do we query across PB-scale tiers? ā
No single engine can hold a year of logs cost-effectively.
Bad Solution ā Everything in Elasticsearch ā
- Approach: Single Elasticsearch cluster holds all data.
- Challenges: Works for ~1 day but fails cost-wise at PB scale. Cluster becomes a burning money pit.
Good Solution ā Last 24h hot, else fail ā
- Approach: Only index the last 24 hours.
- Challenges: Retro debugging and incident postmortems after 24h become impossible.
Great Solution ā Tiered storage with transparent query routing ā
- Approach:
- 0ā24h: Elasticsearch / OpenSearch / Loki on SSD. Full-text search, sub-second.
- 1ā7d: Warm tier, same engines on HDD with frozen indices (still queryable, slower).
- 7ā30d: Parquet files in object storage, queryable via Presto/Trino with column-pruning; query latency 10ā60s.
- 30dā1yr: Glacier-class, requires thaw (hours).
- Query router inspects time range and federates across tiers; merges results with dedup on event id.
- Challenges: Federated query latency spikes when a single query spans multiple tiers. Cap cross-tier queries to last-7-days by default; require explicit flag for deeper scans.
2.5) How do we handle log compression and parsing cost? ā
500 GB/s of raw logs requires smart compression end-to-end.
Good Solution ā GZIP on agent, decompress at ingest ā
- Approach: Agent compresses batches; ingest tier decompresses and indexes.
- Challenges: CPU-heavy; tail latency during ingest spikes.
Great Solution ā Zstandard + columnar batching ā
- Approach:
- Agent batches logs into columnar layout (e.g., per-field arrays) and compresses with Zstandard (compression ratio ~5-6x vs text).
- Ingest tier passes columnar batches through to Kafka without re-encoding; downstream consumers parse lazily.
- For Parquet archives, direct conversion from the columnar batch keeps CPU low.
- Challenges: Columnar logs are not grep-friendly at the agent; if on-host debugging matters, keep a rotating plain-text file too.
3) How do we control metric cardinality? ā
High-cardinality tags (user_id, request_id) explode the TSDB.
Bad Solution ā Allow any tag ā
- Approach: Teams can attach any tags.
- Challenges: High-cardinality tags blow up memory; queries stall; the TSDB falls over.
Good Solution ā Documentation + discipline ā
- Approach: Guidelines for cardinality limits.
- Challenges: Guidelines are ignored under deadline pressure.
Great Solution ā Enforced cardinality budgets at ingest + pre-aggregation ā
- Approach:
- Per-metric tag set is registered; ingest layer rejects unregistered tags.
- A cardinality watchdog computes unique series/min and alerts at 80% budget.
- Pre-aggregate common rollups (p50/p95/p99 by service, by region) at ingest so queries read rolled-up series rather than raw samples.
- M3 (Uber's TSDB) has native downsampling ā mention it.
- Challenges: New tags need a registration flow ā reviewers must approve before rollout. Automate this via a self-service portal with guard rails.
4) How do we correlate logs across services for a single user request? ā
Distributed systems span many services; a single user-visible error may be traced to a buried service 5 hops away.
Good Solution ā Request IDs in log context ā
- Approach: Every log line includes a
request_id/trace_idset at the edge. - Challenges: Works once the convention is enforced; relies on every service respecting it.
Great Solution ā OpenTelemetry trace context propagation + log/trace correlation ā
- Approach:
- All services propagate W3C trace context headers.
- Jaeger collects traces; each log line carries
trace_idandspan_id. - UI joins logs ā traces on
trace_idfor one-click drill-down.
- Challenges: Must instrument every SDK; rolling out to legacy services is a long tail. Sampling strategy must be coordinated (tail-based on traces mirrors log sampling).
4.5) How do we structure logs to make them useful? ā
Unstructured free-text logs scale badly. Structure scales.
Good Solution ā Key-value pairs in log lines ā
- Approach:
level=ERROR service=payments txn=abc msg="declined". - Challenges: Parsing regex gets hairy; schema drift.
Great Solution ā Structured logs with enforced schemas + type registry ā
- Approach:
- SDKs emit JSON (or protobuf) logs with well-typed fields.
- Each service registers its log schema with a central registry; ingest rejects logs violating the schema (after a warning period).
- Queries use schema to autocomplete and validate.
- Challenges: Legacy services need migration. Provide compatibility shims for unstructured lines to be labeled
service_unknown.
5) How do we support multi-tenancy fairly? ā
One noisy team should not bury another team's logs.
Good Solution ā Per-team Kafka quotas ā
- Approach: Apply producer quotas per team at Kafka.
- Challenges: Kafka quotas are coarse-grained; once a team exhausts quota, their logs drop without notification.
Great Solution ā Per-tenant sharding + dynamic quotas with noisy-neighbor detection ā
- Approach:
- Dedicated Kafka topics per top-10 producers; shared pool for the long tail.
- Ingest tier tracks bytes/sec per team; exceeding budgets triggers automatic sampling.
- Slack alerts to team owners when their throughput trends toward the budget.
- Challenges: Rebalancing shard placement as teams grow ā use Cadence workflows for "migrate team X from shared to dedicated pool".
Rapid-Fire Q&A Anticipations ā
- "What's the primary driver of cost?" Storage ā both hot SSD and egress bandwidth for cross-region replication.
- "How do you handle team onboarding?" Self-service: register service, get Kafka topics, dashboards auto-generated from metric naming conventions.
- "What's the default retention?" 7 days hot, 30 days warm, 1 year cold. Teams can request longer retention with a cost charge-back.
- "Cross-region replication for logs?" Typically no ā each region keeps its own. For incident investigation, federated query crosses regions on demand.
- "How do we alert on high error rate for a newly deployed service?" Alert rules parameterized by service tag; new services inherit default rules until team writes custom ones.
Alternatives Considered ā
- ELK stack vs Loki vs custom: ELK is the default; Loki is cheaper because it indexes only labels, not full text. Uber mixes approaches by tier.
- Prometheus vs M3: Prometheus is a single-binary TSDB; M3 is horizontally scalable and Uber-origin. For Uber scale, M3.
- Push vs pull metrics: Prometheus pulls; M3 accepts both. Push works better for ephemeral workloads (serverless, batch).
- Clickhouse vs Elasticsearch vs Pinot: Clickhouse is great for analytical log queries; Elasticsearch dominates full-text search; Pinot is real-time OLAP. Uber uses all three at different layers.
- OpenTelemetry vs proprietary SDKs: OTel is the vendor-neutral standard; adopt it for new services, migrate legacy.
Frequently Asked Follow-ups ā
- "How do you detect a cardinality runaway before it kills the TSDB?" ā Watchdog on unique series per metric; alert at 80% of budget; auto-drop tags if sustained.
- "What's the cost per GB?" ā Hot SSD: ~$0.10/GB-month at cloud prices. Warm HDD: ~$0.03. Cold S3: ~$0.02. Glacier: ~$0.004. Tiered retention reduces 1-year TCO by ~20x.
- "How do you search cold logs?" ā Presto/Trino on Parquet in S3 with column pruning and partition filtering. Typical 30-60s query latency for multi-day scans.
- "What about logs with PII?" ā Redact at agent time (regex rules on known PII fields); enforce at ingest (reject records with structured PII).
- "What happens in a multi-region incident?" ā Each region is autonomous; queries fall back to local data. Cross-region queries run via federated query layer with a degraded SLA.
Visual Aids to Draw ā
- Ingestion pipeline: app ā agent ā gRPC ā Kafka ā indexers/TSDB ā tiered storage.
- Tiered storage funnel: hot SSD ā warm HDD ā Parquet in S3 ā Glacier; retention arrows and cost tags.
- Cardinality explosion chart: unique series growing exponentially when a high-cardinality tag is added.
- Query federation flow: query router dispatches subqueries to hot/warm/cold tiers and merges.
- Alerting flow: M3 ā rule evaluator ā routing service ā PagerDuty, with dedup for flapping alerts.
What's Expected at Each Level ā
Mid-level (L4) ā
- Agent ā Kafka ā storage. Tiered retention. Basic sampling.
- Understands the scale but may miss cardinality subtleties.
Senior (L5 / L5A) ā
- Backpressure with importance weighting.
- Metric cardinality management.
- Tiered query plan with concrete latency expectations.
- Back-of-envelope on storage costs across tiers.
Staff+ (L6) ā
- Cost curves per tier and projected 3-year TCO.
- SLOs for ingest and query separately.
- Multi-tenancy isolation ā one noisy team shouldn't bury another's logs.
- Cross-region replication for DR; query federation when a region is degraded.
Common Pitfalls ā
- Synchronous agent push. Catastrophic during incidents.
- Ignoring cardinality. The #1 way to kill a TSDB.
- One-tier storage. PB-scale retention needs tiered hot/warm/cold.
- No multi-tenancy isolation. One bad producer can bury everyone's signals.
- Pretending exactly-once. Agents + Kafka give at-least-once; dedup at query time with event IDs.
Walkthrough: Interview Dialogue Example ā
Interviewer: "I'm debugging a p99 latency spike in service Z. Walk me through what your system provides."
You should answer:
- I'd start with the p99 time series for service Z in M3, scoped to the affected region. Query:
histogram_quantile(0.99, rate(request_duration_bucket{service="Z", region="us-east"}[5m])). Served from hot tier, returns in ~500ms. - Overlay error rate and saturation (CPU, memory). The rule-based alert that paged me already shows these on the same dashboard.
- Identify the affected time window and drill into traces: sample Jaeger traces where
service=Z AND duration > 500ms. Traces join with logs viatrace_id. - In the trace, I see service Z calls service Y downstream. Go to service Y's logs for that
trace_idin the same window. Hot-tier log query in Elasticsearch returns in ~1s. - If I need to check historical baseline (last week's p99 for comparison), that's in the warm Parquet tier ā query via Presto, 10s latency acceptable for a one-off.
- If I suspect a recently deployed change, diff the last two weeks of error rate from M3 downsampled series. All within 30 seconds of starting to debug.
What If They Pivot Mid-Interview? ā
- "Add distributed tracing." ā Same ingestion pattern; agents propagate trace context; Jaeger collectors consume from Kafka; traces join logs on
trace_id. - "Design alert rule evaluation." ā Scheduled Flink job runs PromQL on M3 every 30s; matches go to routing service (PagerDuty). Rules in a git-managed config with versioned deployment.
- "What if a team wants to replay logs after a bug was fixed?" ā Provide a "reingest from cold" tool that pulls a time range from Parquet back into hot Elasticsearch for search; bill the requesting team.
- "Security ā who can see production logs?" ā RBAC on query; team-owned namespaces; audit trail of queries. PII fields masked based on caller role.
Reliability and Observability ā
- SLO: 99.95% ingest availability; < 0.1% loss for non-critical logs; metrics tied to SLOs are durability-first.
- Failure modes:
- Ingest tier overloaded ā agents spill to disk, sample DEBUG events, backpressure into gRPC flow control.
- Kafka broker failure ā replication factor 3 covers it; consumer groups rebalance automatically.
- Query engine overload ā tiered query routing with per-tier concurrency caps; long-range queries queue behind short ones.
- Deployment: Canary agent versions at 1% of hosts; roll out to regions one at a time with a 24h bake.
- Monitoring:
ingest_lag_seconds,events_dropped_total,cardinality_current,query_p99_by_tier. Alert on > 1% loss or > 60s ingest lag. - Runbook: On cardinality explosion, freeze the offending metric at the ingest tier and page the owning team. On query engine overload, reject long-range queries with a retry-after hint.
Uber-Specific Notes ā
- M3 is Uber's open-source TSDB ā name-drop it and mention Prometheus wire compatibility.
- Jaeger for tracing is Uber-origin; traces are a separate pipeline but share ingestion patterns.
- Uber's internal analytics uses a mix of Kafka, Hive, Spark, and Pinot (Uber co-created Pinot for real-time OLAP). Pinot is the right mention for real-time queryable aggregates.
- Kraken is Uber's P2P docker registry ā not directly relevant but shows you know the infra surface.
- For alerting, the pattern is: rule evaluator queries M3 on a schedule, emits alerts to a routing service (PagerDuty / OpsGenie). Rules live in a git-backed config repo.
- Interviewers sometimes ask about log-based alerting: a Kafka consumer runs saved searches continuously and emits an alert when matches exceed a threshold ā useful for errors that are not yet captured as metrics.
- Close with: "SLO is 99.95% ingest availability with < 0.1% loss for non-critical logs; metrics tied to SLOs are durability-first. During observability-system incidents, production apps never block on logging ā the agent spills to disk or drops DEBUG-level events."
Scaling Milestones ā
- Small company: stdout ā SSH grep. Prometheus in a single instance.
- Growing: ELK stack with per-host agents. 7-day retention.
- Large: Kafka ingest, tiered hot/warm storage, Prometheus federation or M3 for metrics.
- Uber-scale: Tail-based sampling, Parquet cold storage, Pinot for real-time OLAP, OpenTelemetry everywhere, per-tenant quotas.
Summary Checklist ā
- [ ] Agent buffering + tail-based sampling.
- [ ] Kafka as durable backbone.
- [ ] Tiered storage (SSD ā HDD ā Parquet ā Glacier).
- [ ] Query routing across tiers.
- [ ] Cardinality budgets enforced at ingest.
- [ ] Multi-tenancy with quotas.
- [ ] OpenTelemetry for trace/log correlation.
- [ ] Alert rules evaluated from M3.
- [ ] PII redaction at agent.
Key Numbers to Memorize ā
| Metric | Value |
|---|---|
| Events/sec (peak) | 500M |
| Ingress bandwidth | 250 GB/s |
| Raw daily volume | 21.6 PB |
| Hot daily (compressed) | 5 PB |
| Hot retention (logs) | 7ā30 days |
| Cold retention (logs) | 1 year |
| Metrics rolled-up retention | 13 months |
| Ingest p99 | < 1s |
| Query p99 (last hour) | < 2s |
| Query p99 (last 7 days) | < 10s |
| Lossiness budget (non-critical logs) | ⤠0.1% |
One-Liner You Should Remember ā
"500M events/sec at peak: agents with tail-based sampling and disk buffering ā Kafka ā tiered storage (hot SSD, warm HDD, cold Parquet on S3, frozen Glacier) with federated query routing; M3 TSDB with enforced cardinality budgets; OpenTelemetry correlates logs and traces. 99.95% ingest SLO, 5 PB/day hot compressed, 21.6 PB/day raw."