Skip to content

HLD: Distributed Logging / Metrics Collection ​

Understanding the Problem ​

What is Distributed Logging and Metrics? ​

A platform that collects logs and metrics from tens of thousands of production services emitting hundreds of millions of events per second, stores them cost-efficiently, makes them queryable in near real time, and survives backpressure from hot paths so a production incident doesn't take down the observability system along with everything else. At Uber this is M3 (metrics), Jaeger (tracing), and log aggregation built on Kafka + Pinot/Elastic tiers. Interviewers reach for this problem when they want to see infrastructure maturity: tail-based sampling, cardinality management, tiered storage, and multi-tenant isolation.

Functional Requirements ​

Core (above the line):

  1. Accept log lines and metric samples from every service.
  2. Store with retention (logs: 7–30 days hot, cold archive 1 year; metrics: 13 months rolled-up).
  3. Query: "logs for service X in last hour containing error Y"; "p99 latency for endpoint Z over last 7 days".
  4. Alert on rule-based conditions with < 1 min delay.

Below the line (out of scope):

  • Business analytics — separate data warehouse.
  • Distributed tracing — separate Jaeger stack (shares ingestion patterns).
  • Long-term ML training data — separate pipeline.

Non-Functional Requirements ​

Core:

  1. Scale — 500M events/sec globally at peak.
  2. Latency — ingestion p99 < 1s; query p99 < 2s for last-hour range; 10s for last-7-days.
  3. Durability — lossy is acceptable for logs (≤ 0.1%), not for metrics tied to SLOs.
  4. Cost — storage dominates; must tier aggressively.

Below the line:

  • Sub-second query on cold archive — we accept minutes of thaw.

Capacity Estimation ​

Back-of-envelope on the whiteboard:

  • Ingress: 500M events/sec Ɨ 500 B avg = 250 GB/s raw; ~60 GB/s after agent-side compression.
  • Daily volume: 21.6 PB/day raw. With 4Ɨ compression, 5 PB/day stored hot. Cold archive to S3-class with another 3Ɨ cold compression → ~1.6 PB/day cold.
  • Unique time series: 50M active series per region at peak; cardinality explosions must be bounded via budget enforcement at ingest.
  • Hot Elasticsearch: 5 PB/day Ɨ 7 days = 35 PB in hot tier; roughly 3500 nodes Ɨ 10 TB SSD each.
  • Cold Parquet archive: 1.6 PB/day Ɨ 30 days hot-cold = 50 PB; object storage at ~$0.02/GB-month → ~$1M/month at cost before discounts.

The Set Up ​

Core Entities ​

EntityDescription
LogEventservice, host, level, ts, message, trace_id, attrs{}
MetricSamplemetric, tags, value, ts
QueryRequesttext/filter DSL, time range, service scope

API Design ​

Per-host agents push batches over gRPC. Query is REST on top of a DSL similar to LogQL / PromQL.

typescript
// Ingest (internal, per-host agent pushes)
POST /v1/logs/batch
Body: { events: LogEvent[] }

POST /v1/metrics/batch
Body: { samples: MetricSample[] }

// Query
POST /v1/query/logs
Body: { query: string; from: ts; to: ts; service?: string; limit: number }
Response: { events: LogEvent[]; nextCursor? }

POST /v1/query/metric
Body: { promql: string; from: ts; to: ts; step: string }
Response: { series: [{ tags, points: [[ts, value]] }] }
java
service LogIngest {
  rpc StreamEvents(stream LogBatch) returns (IngestAck);
}
service Query {
  rpc QueryLogs(LogQuery) returns (stream LogEvent);
  rpc QueryMetric(MetricQuery) returns (MetricResult);
}

message MetricSample {
  string metric = 1;
  map<string, string> tags = 2;
  double value = 3;
  int64 ts_ms = 4;
}
cpp
// Per-host buffering + batching
while (true) {
  auto batch = ringBuffer.drain(/*max=*/64_KB, /*deadline=*/500ms);
  if (batch.empty()) continue;

  auto ctx = grpc::ClientContext{};
  LogBatch req;
  for (auto& e : batch) req.add_events()->CopyFrom(e);

  // backpressure: if ack times out, keep in disk buffer
  if (!ingest->SendBatch(&ctx, req, &ack)) diskBuffer.spill(batch);
}

High-Level Design ​

App process -> stdout
     |
     v
+---------------+
| Host agent    |  (per node, buffers + samples)
| (Fluentd/     |
|  Vector)      |
+---------------+
     |
     v
+---------------+
| Ingest tier   |
| (gRPC, auth)  |
+---------------+
     |
     v
+---------------+
| Kafka         |  (sharded by service)
+---------------+
   /           \
  v             v
+--------+  +-----------+
| Logs   |  | Metrics   |
| indexer|  | TSDB (M3) |
| (Lucene|  +-----------+
|  /     |
|  Elastic|
|  /Loki) |
+--------+
  |
  v
Cold archive (S3/HDFS, Parquet, 1yr)

End-to-End Flow ​

  1. Each app writes structured logs to stdout and metrics via a SDK. A per-host agent (Fluentd, Vector, or Uber's internal agent) buffers and samples events.
  2. Agent pushes batches over gRPC to the Ingest tier at the nearest region.
  3. Ingest performs auth and writes to Kafka, sharded by service for partition affinity.
  4. Log Indexer consumes, writes to hot storage (Elasticsearch / Loki). Metrics consumer writes to M3 TSDB with downsampling rules.
  5. After 7 days, a tiering job rolls hot data into Parquet in object storage; after 30 days, into Glacier-class cold tier.
  6. Query service federates across hot/warm/cold tiers based on the query's time range.

Why These Components ​

  • Agent buffering absorbs upstream outages without blocking apps.
  • Kafka is the durable fan-out backbone; multiple consumer tiers (logs, metrics, tracing) stay isolated.
  • M3 handles metrics with native downsampling and was built for Uber's scale.
  • Parquet on object storage is the cheapest way to keep a long tail searchable.

Data Model Detail ​

  • Hot logs (Elasticsearch / Loki): Indexed by service, level, trace_id; full-text inverted index on message. Sharded daily; 4 shards per day per region at hot tier.
  • Warm logs (Parquet on S3): Partitioned by (dt=YYYY-MM-DD, service). Columns: ts, host, level, message, trace_id, attrs (map).
  • Metrics (M3): Stores time series keyed by (metric, tags...) with downsampled rollups: raw 10s, 1m, 10m, 1h rollups automatically generated.
  • Kafka topics: logs.{service} and metrics.{team}, RF=3, retention 24h (logs) / 7d (metrics).

Capacity Walkthrough ​

  • 500M events/sec Ɨ 500 B = 250 GB/s raw, ~60 GB/s after 4Ɨ compression.
  • Kafka cluster: 300 brokers at 200 MB/s each (durable writes with RF=3) → 60 GB/s. Dedicated hardware with NVMe storage.
  • Hot Elasticsearch: 5 PB of daily hot data / 500 nodes Ɨ 10 TB SSD/node = 10 TB/node utilized. Retention 7 days.
  • Cold Parquet archive: 5 PB/day Ɨ 30 days = 150 PB hot-cold; Glacier for older at ~$0.004/GB/month.
  • Query: p99 < 2s for last-hour via hot tier; 10s for last-7-days via warm tier.

Potential Deep Dives ​

1) How do we handle ingest backpressure without blocking production? ​

An observability system that takes down production during an incident is worse than useless.

Bad Solution — Synchronous push ​

  • Approach: Agents push synchronously; when the ingest tier is slow, agents block the app.
  • Challenges: Catastrophic during incidents exactly when logs matter most.

Good Solution — Per-host disk buffer with async push ​

  • Approach: Ring buffer in memory, spill to disk on ingest lag.
  • Challenges: Survives short outages but not long ones. Disk fills up.

Great Solution — Tail-based sampling with importance weighting ​

  • Approach:
    • Error logs, slow requests, and traced requests always pass.
    • For steady-state noise (INFO/DEBUG), apply head-based sampling at the agent (1/100) that can be dynamically adjusted from the control plane.
    • During incidents, rate of important signals is preserved while total bytes drop.
    • Backpressure signal flows back via gRPC flow control; agent downgrades sampling rate rather than blocking.
  • Challenges: Sampling configuration drift across agents — use a central control plane with versioned configs and staged rollouts.

2) How do we query across PB-scale tiers? ​

No single engine can hold a year of logs cost-effectively.

Bad Solution — Everything in Elasticsearch ​

  • Approach: Single Elasticsearch cluster holds all data.
  • Challenges: Works for ~1 day but fails cost-wise at PB scale. Cluster becomes a burning money pit.

Good Solution — Last 24h hot, else fail ​

  • Approach: Only index the last 24 hours.
  • Challenges: Retro debugging and incident postmortems after 24h become impossible.

Great Solution — Tiered storage with transparent query routing ​

  • Approach:
    • 0–24h: Elasticsearch / OpenSearch / Loki on SSD. Full-text search, sub-second.
    • 1–7d: Warm tier, same engines on HDD with frozen indices (still queryable, slower).
    • 7–30d: Parquet files in object storage, queryable via Presto/Trino with column-pruning; query latency 10–60s.
    • 30d–1yr: Glacier-class, requires thaw (hours).
    • Query router inspects time range and federates across tiers; merges results with dedup on event id.
  • Challenges: Federated query latency spikes when a single query spans multiple tiers. Cap cross-tier queries to last-7-days by default; require explicit flag for deeper scans.

2.5) How do we handle log compression and parsing cost? ​

500 GB/s of raw logs requires smart compression end-to-end.

Good Solution — GZIP on agent, decompress at ingest ​

  • Approach: Agent compresses batches; ingest tier decompresses and indexes.
  • Challenges: CPU-heavy; tail latency during ingest spikes.

Great Solution — Zstandard + columnar batching ​

  • Approach:
    • Agent batches logs into columnar layout (e.g., per-field arrays) and compresses with Zstandard (compression ratio ~5-6x vs text).
    • Ingest tier passes columnar batches through to Kafka without re-encoding; downstream consumers parse lazily.
    • For Parquet archives, direct conversion from the columnar batch keeps CPU low.
  • Challenges: Columnar logs are not grep-friendly at the agent; if on-host debugging matters, keep a rotating plain-text file too.

3) How do we control metric cardinality? ​

High-cardinality tags (user_id, request_id) explode the TSDB.

Bad Solution — Allow any tag ​

  • Approach: Teams can attach any tags.
  • Challenges: High-cardinality tags blow up memory; queries stall; the TSDB falls over.

Good Solution — Documentation + discipline ​

  • Approach: Guidelines for cardinality limits.
  • Challenges: Guidelines are ignored under deadline pressure.

Great Solution — Enforced cardinality budgets at ingest + pre-aggregation ​

  • Approach:
    • Per-metric tag set is registered; ingest layer rejects unregistered tags.
    • A cardinality watchdog computes unique series/min and alerts at 80% budget.
    • Pre-aggregate common rollups (p50/p95/p99 by service, by region) at ingest so queries read rolled-up series rather than raw samples.
    • M3 (Uber's TSDB) has native downsampling — mention it.
  • Challenges: New tags need a registration flow — reviewers must approve before rollout. Automate this via a self-service portal with guard rails.

4) How do we correlate logs across services for a single user request? ​

Distributed systems span many services; a single user-visible error may be traced to a buried service 5 hops away.

Good Solution — Request IDs in log context ​

  • Approach: Every log line includes a request_id / trace_id set at the edge.
  • Challenges: Works once the convention is enforced; relies on every service respecting it.

Great Solution — OpenTelemetry trace context propagation + log/trace correlation ​

  • Approach:
    • All services propagate W3C trace context headers.
    • Jaeger collects traces; each log line carries trace_id and span_id.
    • UI joins logs ↔ traces on trace_id for one-click drill-down.
  • Challenges: Must instrument every SDK; rolling out to legacy services is a long tail. Sampling strategy must be coordinated (tail-based on traces mirrors log sampling).

4.5) How do we structure logs to make them useful? ​

Unstructured free-text logs scale badly. Structure scales.

Good Solution — Key-value pairs in log lines ​

  • Approach: level=ERROR service=payments txn=abc msg="declined".
  • Challenges: Parsing regex gets hairy; schema drift.

Great Solution — Structured logs with enforced schemas + type registry ​

  • Approach:
    • SDKs emit JSON (or protobuf) logs with well-typed fields.
    • Each service registers its log schema with a central registry; ingest rejects logs violating the schema (after a warning period).
    • Queries use schema to autocomplete and validate.
  • Challenges: Legacy services need migration. Provide compatibility shims for unstructured lines to be labeled service_unknown.

5) How do we support multi-tenancy fairly? ​

One noisy team should not bury another team's logs.

Good Solution — Per-team Kafka quotas ​

  • Approach: Apply producer quotas per team at Kafka.
  • Challenges: Kafka quotas are coarse-grained; once a team exhausts quota, their logs drop without notification.

Great Solution — Per-tenant sharding + dynamic quotas with noisy-neighbor detection ​

  • Approach:
    • Dedicated Kafka topics per top-10 producers; shared pool for the long tail.
    • Ingest tier tracks bytes/sec per team; exceeding budgets triggers automatic sampling.
    • Slack alerts to team owners when their throughput trends toward the budget.
  • Challenges: Rebalancing shard placement as teams grow — use Cadence workflows for "migrate team X from shared to dedicated pool".

Rapid-Fire Q&A Anticipations ​

  • "What's the primary driver of cost?" Storage — both hot SSD and egress bandwidth for cross-region replication.
  • "How do you handle team onboarding?" Self-service: register service, get Kafka topics, dashboards auto-generated from metric naming conventions.
  • "What's the default retention?" 7 days hot, 30 days warm, 1 year cold. Teams can request longer retention with a cost charge-back.
  • "Cross-region replication for logs?" Typically no — each region keeps its own. For incident investigation, federated query crosses regions on demand.
  • "How do we alert on high error rate for a newly deployed service?" Alert rules parameterized by service tag; new services inherit default rules until team writes custom ones.

Alternatives Considered ​

  • ELK stack vs Loki vs custom: ELK is the default; Loki is cheaper because it indexes only labels, not full text. Uber mixes approaches by tier.
  • Prometheus vs M3: Prometheus is a single-binary TSDB; M3 is horizontally scalable and Uber-origin. For Uber scale, M3.
  • Push vs pull metrics: Prometheus pulls; M3 accepts both. Push works better for ephemeral workloads (serverless, batch).
  • Clickhouse vs Elasticsearch vs Pinot: Clickhouse is great for analytical log queries; Elasticsearch dominates full-text search; Pinot is real-time OLAP. Uber uses all three at different layers.
  • OpenTelemetry vs proprietary SDKs: OTel is the vendor-neutral standard; adopt it for new services, migrate legacy.

Frequently Asked Follow-ups ​

  • "How do you detect a cardinality runaway before it kills the TSDB?" — Watchdog on unique series per metric; alert at 80% of budget; auto-drop tags if sustained.
  • "What's the cost per GB?" — Hot SSD: ~$0.10/GB-month at cloud prices. Warm HDD: ~$0.03. Cold S3: ~$0.02. Glacier: ~$0.004. Tiered retention reduces 1-year TCO by ~20x.
  • "How do you search cold logs?" — Presto/Trino on Parquet in S3 with column pruning and partition filtering. Typical 30-60s query latency for multi-day scans.
  • "What about logs with PII?" — Redact at agent time (regex rules on known PII fields); enforce at ingest (reject records with structured PII).
  • "What happens in a multi-region incident?" — Each region is autonomous; queries fall back to local data. Cross-region queries run via federated query layer with a degraded SLA.

Visual Aids to Draw ​

  • Ingestion pipeline: app → agent → gRPC → Kafka → indexers/TSDB → tiered storage.
  • Tiered storage funnel: hot SSD → warm HDD → Parquet in S3 → Glacier; retention arrows and cost tags.
  • Cardinality explosion chart: unique series growing exponentially when a high-cardinality tag is added.
  • Query federation flow: query router dispatches subqueries to hot/warm/cold tiers and merges.
  • Alerting flow: M3 → rule evaluator → routing service → PagerDuty, with dedup for flapping alerts.

What's Expected at Each Level ​

Mid-level (L4) ​

  • Agent → Kafka → storage. Tiered retention. Basic sampling.
  • Understands the scale but may miss cardinality subtleties.

Senior (L5 / L5A) ​

  • Backpressure with importance weighting.
  • Metric cardinality management.
  • Tiered query plan with concrete latency expectations.
  • Back-of-envelope on storage costs across tiers.

Staff+ (L6) ​

  • Cost curves per tier and projected 3-year TCO.
  • SLOs for ingest and query separately.
  • Multi-tenancy isolation — one noisy team shouldn't bury another's logs.
  • Cross-region replication for DR; query federation when a region is degraded.

Common Pitfalls ​

  • Synchronous agent push. Catastrophic during incidents.
  • Ignoring cardinality. The #1 way to kill a TSDB.
  • One-tier storage. PB-scale retention needs tiered hot/warm/cold.
  • No multi-tenancy isolation. One bad producer can bury everyone's signals.
  • Pretending exactly-once. Agents + Kafka give at-least-once; dedup at query time with event IDs.

Walkthrough: Interview Dialogue Example ​

Interviewer: "I'm debugging a p99 latency spike in service Z. Walk me through what your system provides."

You should answer:

  1. I'd start with the p99 time series for service Z in M3, scoped to the affected region. Query: histogram_quantile(0.99, rate(request_duration_bucket{service="Z", region="us-east"}[5m])). Served from hot tier, returns in ~500ms.
  2. Overlay error rate and saturation (CPU, memory). The rule-based alert that paged me already shows these on the same dashboard.
  3. Identify the affected time window and drill into traces: sample Jaeger traces where service=Z AND duration > 500ms. Traces join with logs via trace_id.
  4. In the trace, I see service Z calls service Y downstream. Go to service Y's logs for that trace_id in the same window. Hot-tier log query in Elasticsearch returns in ~1s.
  5. If I need to check historical baseline (last week's p99 for comparison), that's in the warm Parquet tier — query via Presto, 10s latency acceptable for a one-off.
  6. If I suspect a recently deployed change, diff the last two weeks of error rate from M3 downsampled series. All within 30 seconds of starting to debug.

What If They Pivot Mid-Interview? ​

  • "Add distributed tracing." — Same ingestion pattern; agents propagate trace context; Jaeger collectors consume from Kafka; traces join logs on trace_id.
  • "Design alert rule evaluation." — Scheduled Flink job runs PromQL on M3 every 30s; matches go to routing service (PagerDuty). Rules in a git-managed config with versioned deployment.
  • "What if a team wants to replay logs after a bug was fixed?" — Provide a "reingest from cold" tool that pulls a time range from Parquet back into hot Elasticsearch for search; bill the requesting team.
  • "Security — who can see production logs?" — RBAC on query; team-owned namespaces; audit trail of queries. PII fields masked based on caller role.

Reliability and Observability ​

  • SLO: 99.95% ingest availability; < 0.1% loss for non-critical logs; metrics tied to SLOs are durability-first.
  • Failure modes:
    • Ingest tier overloaded → agents spill to disk, sample DEBUG events, backpressure into gRPC flow control.
    • Kafka broker failure → replication factor 3 covers it; consumer groups rebalance automatically.
    • Query engine overload → tiered query routing with per-tier concurrency caps; long-range queries queue behind short ones.
  • Deployment: Canary agent versions at 1% of hosts; roll out to regions one at a time with a 24h bake.
  • Monitoring: ingest_lag_seconds, events_dropped_total, cardinality_current, query_p99_by_tier. Alert on > 1% loss or > 60s ingest lag.
  • Runbook: On cardinality explosion, freeze the offending metric at the ingest tier and page the owning team. On query engine overload, reject long-range queries with a retry-after hint.

Uber-Specific Notes ​

  • M3 is Uber's open-source TSDB — name-drop it and mention Prometheus wire compatibility.
  • Jaeger for tracing is Uber-origin; traces are a separate pipeline but share ingestion patterns.
  • Uber's internal analytics uses a mix of Kafka, Hive, Spark, and Pinot (Uber co-created Pinot for real-time OLAP). Pinot is the right mention for real-time queryable aggregates.
  • Kraken is Uber's P2P docker registry — not directly relevant but shows you know the infra surface.
  • For alerting, the pattern is: rule evaluator queries M3 on a schedule, emits alerts to a routing service (PagerDuty / OpsGenie). Rules live in a git-backed config repo.
  • Interviewers sometimes ask about log-based alerting: a Kafka consumer runs saved searches continuously and emits an alert when matches exceed a threshold — useful for errors that are not yet captured as metrics.
  • Close with: "SLO is 99.95% ingest availability with < 0.1% loss for non-critical logs; metrics tied to SLOs are durability-first. During observability-system incidents, production apps never block on logging — the agent spills to disk or drops DEBUG-level events."

Scaling Milestones ​

  • Small company: stdout → SSH grep. Prometheus in a single instance.
  • Growing: ELK stack with per-host agents. 7-day retention.
  • Large: Kafka ingest, tiered hot/warm storage, Prometheus federation or M3 for metrics.
  • Uber-scale: Tail-based sampling, Parquet cold storage, Pinot for real-time OLAP, OpenTelemetry everywhere, per-tenant quotas.

Summary Checklist ​

  • [ ] Agent buffering + tail-based sampling.
  • [ ] Kafka as durable backbone.
  • [ ] Tiered storage (SSD → HDD → Parquet → Glacier).
  • [ ] Query routing across tiers.
  • [ ] Cardinality budgets enforced at ingest.
  • [ ] Multi-tenancy with quotas.
  • [ ] OpenTelemetry for trace/log correlation.
  • [ ] Alert rules evaluated from M3.
  • [ ] PII redaction at agent.

Key Numbers to Memorize ​

MetricValue
Events/sec (peak)500M
Ingress bandwidth250 GB/s
Raw daily volume21.6 PB
Hot daily (compressed)5 PB
Hot retention (logs)7–30 days
Cold retention (logs)1 year
Metrics rolled-up retention13 months
Ingest p99< 1s
Query p99 (last hour)< 2s
Query p99 (last 7 days)< 10s
Lossiness budget (non-critical logs)≤ 0.1%

One-Liner You Should Remember ​

"500M events/sec at peak: agents with tail-based sampling and disk buffering → Kafka → tiered storage (hot SSD, warm HDD, cold Parquet on S3, frozen Glacier) with federated query routing; M3 TSDB with enforced cardinality budgets; OpenTelemetry correlates logs and traces. 99.95% ingest SLO, 5 PB/day hot compressed, 21.6 PB/day raw."

Frontend interview preparation reference.