HLD: Top-K Streaming (Top-100 Searches Per Day) ​
L4 scoping note. This is THE classic Google-style question. The interviewer wants to hear you reason about approximation vs exactness, streaming vs batch, and distributed aggregation -- not memorize algorithms. Pick one approach as your primary and explicitly call out the trade-offs against the alternatives. 1B queries/day is real but not astronomical -- it's ~11.5K QPS. A single machine can sort a day's worth of queries given enough memory; the interesting question is how to do it online and in a distributed way.
Understanding the Problem ​
What is Top-K? ​
Given a stream of events (search queries, hashtags, product views, error messages -- anything with a "key" field), return the K most frequent keys over a time window. Google's "trending searches," Twitter's trending hashtags, and YouTube's trending videos are all top-K problems. The interview value: it forces you to reason about memory, accuracy, and distribution in a very bounded problem.
Functional Requirements ​
Core (above the line):
- Compute top-100 unique search queries per day -- given the raw query stream, output the top-100 at day's end.
- Support near-real-time updates -- during the day, expose the current top-100 (can be approximate).
- Historical queries -- "what were the top-100 on 2026-03-15?" should return in milliseconds.
Below the line (out of scope):
- Multi-dimensional top-K (by country, by device, by user segment) -- doable but explodes the design surface
- Sub-second freshness on the daily top-K (minute-level is fine)
- Personalized trending (that's a different problem)
- Spam filtering (assume input is clean)
Non-Functional Requirements ​
Core:
- Scale -- 1B queries/day = ~11.5K QPS average, ~30K QPS peak. Cardinality of unique queries in a day is roughly 100M-500M (most queries are long-tail singletons).
- Accuracy -- exact for the daily batch output. Approximate is acceptable for real-time.
- Low query latency -- retrieving the current top-K (cached) < 100 ms.
- Durability -- raw query data should be retained at least 30 days for reprocessing / debugging.
Below the line:
- Strict exactly-once ingestion (we will accept at-least-once and tolerate small duplication for real-time; the daily batch can dedupe)
- Global consistency on real-time top-K (eventual is fine)
L4 sanity check: 11.5K QPS is moderate. A well-tuned system on a handful of machines handles ingestion. The tricky part is the memory + merge story for counting 100M+ distinct keys.
The Set Up ​
Core Entities ​
| Entity | Description |
|---|---|
| QueryEvent | A single search event: { query, timestamp, userId, country } |
| CountEntry | (query, count) -- a single row in the aggregate |
| TopKSnapshot | The top-K list for a given time window: { windowStart, windowEnd, entries: [...] } |
The API ​
Two surfaces: ingestion (internal) and query (external).
Ingestion (internal -- pushed to Kafka):
Topic: search_events
Partition key: hash(query) mod N (discussed below)
Message:
{
"query": "llm agents",
"timestamp": 1714500000,
"userId": "u_123",
"country": "US"
}Query API (external):
GET /api/topk?window=day&date=2026-04-18&k=100
Response: 200 OK
{
"window": "day",
"date": "2026-04-18",
"entries": [
{ "query": "llm agents", "count": 14203921 },
{ "query": "taylor swift concert", "count": 9120103 },
...
],
"computedAt": "2026-04-19T00:05:00Z",
"approximate": false
}GET /api/topk?window=live&k=100
Response: 200 OK
{
"window": "live",
"entries": [ ... ],
"computedAt": "2026-04-19T14:32:00Z",
"approximate": true
}Two kinds of top-K: live (approximate, last-N-minutes rolling) and day (exact, yesterday's batch).
High-Level Design ​
Two parallel pipelines: a streaming layer for live, a batch layer for daily exact. This is the classic Lambda Architecture (or Kappa if you want to use streaming for both).
+--- [Stream aggregator] ---> [Redis: live top-K]
|
[Search frontend] -> [Kafka: search_events] ---+
|
+--- [Daily batch job] ---> [TopK store (Bigtable/Postgres)]
[Client] -> [Query API] -> reads from Redis (live) or TopK store (historical)Flow 1: Ingestion ​
- Search frontend publishes every query event to Kafka topic
search_events. - Kafka retains events for 7 days (for replay / reprocessing).
- Two independent consumer groups: one for the streaming aggregator, one for the batch pipeline's checkpoint.
Flow 2: Live top-K (approximate) ​
- Stream aggregator consumes from Kafka.
- Maintains an in-memory Count-Min Sketch (approximate counts) + min-heap of top-K candidates.
- Every 30 seconds, flushes the current top-K snapshot to Redis under key
topk:live. - Query API reads from Redis -- sub-millisecond.
Flow 3: Daily exact top-K (batch) ​
- At 00:05 daily, a batch job (Spark / Dataflow / MapReduce) reads all events from Kafka for the previous day (via Kafka's time-based offset lookup) or from a long-term archive (GCS / S3).
- Partitions the data by
hash(query) mod N. Each partition computes local counts via HashMap. - Each partition emits its local top-K (say top-10000 to be safe for merging).
- A final reducer merges all partition top-K lists and extracts the global top-100.
- Writes to
topk_daily(date, rank, query, count)in a persistent store (Bigtable or Postgres).
Flow 4: Query ​
- Client hits
GET /api/topk. - Query API:
window=live: readtopk:livefrom Redis.window=day: read fromtopk_dailyin the DB.
- Return JSON.
Potential Deep Dives ​
1) How do we count 1B events/day with 100M+ unique keys? ​
This is the core algorithmic question. Walk through the evolution.
Bad Solution: HashMap + sort at query time ​
- Approach: Dump all events into a giant HashMap<query, count>. At query time, sort by count, take top-K. O(N log N) in time, O(U) in memory where U is unique keys.
- Challenges: 100M unique keys * (50 bytes key + 8 bytes count + overhead) = 10+ GB per day. Fits on one beefy machine but doesn't scale if the stream grows. Sorting 100M entries is several seconds. Not real-time.
Good Solution: HashMap + min-heap of size K ​
- Approach: Same HashMap for counts, but maintain a min-heap of size K (K=100). When a count changes, update the heap only if the new count exceeds the heap's min. O(N log K) overall; much faster than sorting the whole thing.
- Challenges: Still O(U) memory for the full HashMap. Works for a day's worth on one machine but not at higher scales.
Great Solution: Count-Min Sketch + min-heap (for streaming / live) ​
- Approach: Use a Count-Min Sketch (CMS) -- an array of counters with multiple hash functions -- to estimate counts with bounded error. Maintain a min-heap of the top-K candidates observed so far. CMS memory is fixed (e.g., 10 MB for ~0.1% error); it does NOT grow with cardinality.
- Parameters: width
w = e / epsilon, depthd = ln(1 / delta). For epsilon = 0.0001, delta = 0.001:w ~= 27,183,d = 7. Total counters = ~190K. At 4 bytes each = ~760 KB. Tiny. - Heap management: Every event, increment CMS and look up
estimated_count(query). If heap has fewer than K items, push. Else, ifestimated_count > heap.min, push and pop. - Challenges: CMS over-estimates (never under) -- hash collisions inflate counts. The heap may contain items that are NOT actually top-K due to over-estimation of a "heavy hitter" collision. In practice, with proper sizing, error is small and bounded. Good for live/approximate. NOT suitable for the exact daily batch.
class CountMinSketch {
private final int[][] counters;
private final int width, depth;
private final long[] seeds;
public CountMinSketch(int width, int depth) {
this.width = width;
this.depth = depth;
this.counters = new int[depth][width];
this.seeds = new long[depth];
Random r = new Random(42);
for (int i = 0; i < depth; i++) seeds[i] = r.nextLong();
}
public void increment(String key) {
for (int i = 0; i < depth; i++) {
int idx = (int)((hash(key, seeds[i]) & 0x7FFFFFFF) % width);
counters[i][idx]++;
}
}
public int estimate(String key) {
int min = Integer.MAX_VALUE;
for (int i = 0; i < depth; i++) {
int idx = (int)((hash(key, seeds[i]) & 0x7FFFFFFF) % width);
min = Math.min(min, counters[i][idx]);
}
return min;
}
private long hash(String key, long seed) {
// MurmurHash or similar
return MurmurHash3.hash64(key, seed);
}
}
class TopKTracker {
private final CountMinSketch cms;
private final PriorityQueue<Entry> heap;
private final int k;
void observe(String query) {
cms.increment(query);
int est = cms.estimate(query);
// update heap
if (heap.size() < k) {
heap.offer(new Entry(query, est));
} else if (est > heap.peek().count) {
heap.poll();
heap.offer(new Entry(query, est));
}
}
}class CountMinSketch {
std::vector<std::vector<uint32_t>> counters;
std::vector<uint64_t> seeds;
size_t width, depth;
public:
CountMinSketch(size_t w, size_t d) : width(w), depth(d),
counters(d, std::vector<uint32_t>(w, 0)), seeds(d) {
std::mt19937_64 rng(42);
for (auto& s : seeds) s = rng();
}
void increment(const std::string& key) {
for (size_t i = 0; i < depth; ++i)
counters[i][murmur_hash(key, seeds[i]) % width]++;
}
uint32_t estimate(const std::string& key) const {
uint32_t m = UINT32_MAX;
for (size_t i = 0; i < depth; ++i)
m = std::min(m, counters[i][murmur_hash(key, seeds[i]) % width]);
return m;
}
};class CountMinSketch {
private counters: Uint32Array[];
constructor(private width: number, private depth: number, private seeds: number[]) {
this.counters = Array.from({ length: depth }, () => new Uint32Array(width));
}
increment(key: string) {
for (let i = 0; i < this.depth; i++) {
const idx = murmurHash(key, this.seeds[i]) % this.width;
this.counters[i][idx]++;
}
}
estimate(key: string): number {
let min = Infinity;
for (let i = 0; i < this.depth; i++) {
const idx = murmurHash(key, this.seeds[i]) % this.width;
min = Math.min(min, this.counters[i][idx]);
}
return min;
}
}2) Distributed top-K: how do we partition? ​
Bad Solution: Round-robin partitioning ​
- Approach: Events distributed round-robin across N workers. Each worker maintains its own top-K. Merge at the end.
- Challenges: Broken. A query that appears 1M times is scattered across N workers with 1M/N count each. No worker's local top-K reflects the global popularity correctly -- heavy hitters get diluted below the per-worker threshold and vanish.
Good Solution: Hash partitioning by query key ​
- Approach:
partition = hash(query) mod N. All events for a given query go to the same worker. That worker sees the query's TRUE count. Each worker computes its local top-K. At merge time, take the union and pick global top-K. - Why it works: Since each query lives entirely on one partition, its count is exact within that partition. The global top-K is a subset of the union of per-partition top-K lists (if a query is in the global top-100, it's in its partition's top-100).
- Safety: Ask each partition for top-M where M > K (e.g., top-10000) to avoid edge cases where a global-top-100 item is rank 150 in a heavily-contested partition. In practice, M = 10*K is overkill-safe.
- Challenges: Hot partitions if one query is extremely popular (10% of all traffic is one trending term). Mitigate with further sub-partitioning or accept the imbalance if the partition can keep up.
Great Solution: Two-stage with pre-aggregation ​
- Approach: Stage 1: local workers pre-aggregate within short windows (e.g., 1 second) before shuffling. Instead of sending raw events, send
(query, local_count)pairs. Massively reduces shuffle volume. - Stage 2: hash-partition the pre-aggregated counts to per-query aggregators, which maintain running totals.
- Stage 3: each aggregator reports its local top-M to a final merger.
- Pros: Far less network traffic. Combiner pattern, same as MapReduce's combiner step.
Back-of-envelope: 11.5K QPS * 86400 sec = 1B events. With N=100 partitions, each handles ~10M events/day = ~115 QPS. Trivial. Memory per partition = ~1M unique keys = ~100 MB. Fits easily.
3) Streaming (real-time) vs batch (daily exact) -- do we need both? ​
Bad Solution: Streaming only ​
- Use only the CMS approach. Accept approximation for daily reports.
- Challenges: Product teams want the published "Year in Search" numbers to be exact. Over-estimates in CMS become bad headlines ("Why is the #1 query actually #7 by real count?").
Good Solution: Batch only ​
- Every night, full MapReduce over the day's raw log. Exact.
- Challenges: No real-time visibility. During a breaking news event, we can't surface trending searches for hours. Bad UX.
Great Solution: Lambda Architecture ​
- Streaming layer: CMS + heap for live, approximate. Refreshed every 30 seconds.
- Batch layer: Daily Spark/Dataflow job for exact. Overwrites live counts at day boundary.
- Serving layer: Query API reads batch for historical, streaming for live.
- Trade-off: Two code paths. Ensure they produce consistent schemas.
Alternative: Kappa architecture (streaming only, but with replay) ​
- Keep all events in Kafka with long retention.
- For daily exact, use the streaming engine (Flink) with exact counting (HashMap, not CMS) over a bounded window.
- Single codebase. Flink/Dataflow can do this with managed state backends.
Either works at L4. Name the pattern and pick one.
4) Handling skew: the "hot query" problem ​
Some queries are genuinely popular -- "taylor swift" during tour season might be 1% of all traffic. One partition gets overloaded.
- Salting: For known-hot keys, partition by
hash(query + random_salt) mod Nwhere salt is 0..K-1. Split the hot key across K partitions. At merge time, sum across the K partitions. - Local combining: Pre-aggregate within each worker before emitting, so even a hot key's network footprint is bounded.
- Dynamic scaling: The streaming system (Flink) can rebalance partitions based on load metrics.
L4 note: Mention salting. Don't design the full dynamic rebalancer.
5) Retention and reprocessing ​
- Kafka: 7-day retention. Lets you reprocess the last week if a bug is found.
- Long-term archive: sink all events to cold storage (GCS, S3, HDFS) via a simple consumer. Parquet format with daily partitions. ~50 GB/day compressed at 50 bytes/event * 1B events. Cheap.
- If the daily batch job fails or has a bug, re-run it from the archive. Idempotent: it overwrites the day's snapshot.
6) Storage for top-K results ​
Choice between Postgres, Bigtable, or just Redis.
- Bigtable / Cassandra:
row_key = date,column_family = entries,cell = (rank, query, count). Time-range scans efficient. - Postgres: Simple table. Works up to billions of rows. Daily top-100 is tiny (100 rows/day * 10 years = 365K rows). Trivial.
- Redis: For live top-K only. Sorted set
ZADD topk:live count query.ZREVRANGEto read.
For L4, Postgres for historical + Redis for live is the simplest and fully adequate. Bigtable mention is bonus if you want to flex Google-stack knowledge, but not required.
7) When exact matters vs when approximation is fine ​
Make this explicit to the interviewer:
- Exact required: published daily rankings, billing, ML training data, compliance reports.
- Approximate fine: dashboards, trending widgets, internal monitoring, anomaly detection.
We serve both from the Lambda architecture.
What is Expected at Each Level ​
L3 / Mid-level ​
- Propose the naive HashMap + sort solution.
- Recognize that memory is a problem at 1B events but might not know the algorithm for it.
- Might not distinguish streaming vs batch; probably misses partitioning strategy.
L4 ​
- Propose Count-Min Sketch (or mention Space-Saving / Misra-Gries) for streaming.
- Explain hash-partitioning by query and why round-robin fails.
- Lambda architecture: streaming for live, batch for exact.
- Back-of-envelope on unique cardinality, memory, QPS per partition.
- Know the over-count property of CMS and that a min-heap on top needs slack (top-M > K).
- Retention and reprocessing discussion.
L5 / Senior ​
- Drill into skew handling (salting, dynamic partitioning).
- Propose Kappa as an alternative and weigh it honestly.
- Discuss Flink exact-once semantics (with idempotent sinks, checkpointing).
- Consider multi-dimensional top-K (by country, device, etc.) and the state explosion.
- Operational concerns: checkpoint storage cost, schema evolution for events, backfill strategy.
- Trade-offs between managed services (Dataflow, Kinesis Analytics) vs self-hosted Flink.