Skip to content

HLD: Top-K Streaming (Top-100 Searches Per Day) ​

L4 scoping note. This is THE classic Google-style question. The interviewer wants to hear you reason about approximation vs exactness, streaming vs batch, and distributed aggregation -- not memorize algorithms. Pick one approach as your primary and explicitly call out the trade-offs against the alternatives. 1B queries/day is real but not astronomical -- it's ~11.5K QPS. A single machine can sort a day's worth of queries given enough memory; the interesting question is how to do it online and in a distributed way.


Understanding the Problem ​

What is Top-K? ​

Given a stream of events (search queries, hashtags, product views, error messages -- anything with a "key" field), return the K most frequent keys over a time window. Google's "trending searches," Twitter's trending hashtags, and YouTube's trending videos are all top-K problems. The interview value: it forces you to reason about memory, accuracy, and distribution in a very bounded problem.

Functional Requirements ​

Core (above the line):

  1. Compute top-100 unique search queries per day -- given the raw query stream, output the top-100 at day's end.
  2. Support near-real-time updates -- during the day, expose the current top-100 (can be approximate).
  3. Historical queries -- "what were the top-100 on 2026-03-15?" should return in milliseconds.

Below the line (out of scope):

  • Multi-dimensional top-K (by country, by device, by user segment) -- doable but explodes the design surface
  • Sub-second freshness on the daily top-K (minute-level is fine)
  • Personalized trending (that's a different problem)
  • Spam filtering (assume input is clean)

Non-Functional Requirements ​

Core:

  1. Scale -- 1B queries/day = ~11.5K QPS average, ~30K QPS peak. Cardinality of unique queries in a day is roughly 100M-500M (most queries are long-tail singletons).
  2. Accuracy -- exact for the daily batch output. Approximate is acceptable for real-time.
  3. Low query latency -- retrieving the current top-K (cached) < 100 ms.
  4. Durability -- raw query data should be retained at least 30 days for reprocessing / debugging.

Below the line:

  • Strict exactly-once ingestion (we will accept at-least-once and tolerate small duplication for real-time; the daily batch can dedupe)
  • Global consistency on real-time top-K (eventual is fine)

L4 sanity check: 11.5K QPS is moderate. A well-tuned system on a handful of machines handles ingestion. The tricky part is the memory + merge story for counting 100M+ distinct keys.


The Set Up ​

Core Entities ​

EntityDescription
QueryEventA single search event: { query, timestamp, userId, country }
CountEntry(query, count) -- a single row in the aggregate
TopKSnapshotThe top-K list for a given time window: { windowStart, windowEnd, entries: [...] }

The API ​

Two surfaces: ingestion (internal) and query (external).

Ingestion (internal -- pushed to Kafka):

Topic: search_events
Partition key: hash(query) mod N  (discussed below)

Message:
{
  "query": "llm agents",
  "timestamp": 1714500000,
  "userId": "u_123",
  "country": "US"
}

Query API (external):

GET /api/topk?window=day&date=2026-04-18&k=100

Response: 200 OK
{
  "window": "day",
  "date": "2026-04-18",
  "entries": [
    { "query": "llm agents", "count": 14203921 },
    { "query": "taylor swift concert", "count": 9120103 },
    ...
  ],
  "computedAt": "2026-04-19T00:05:00Z",
  "approximate": false
}
GET /api/topk?window=live&k=100

Response: 200 OK
{
  "window": "live",
  "entries": [ ... ],
  "computedAt": "2026-04-19T14:32:00Z",
  "approximate": true
}

Two kinds of top-K: live (approximate, last-N-minutes rolling) and day (exact, yesterday's batch).


High-Level Design ​

Two parallel pipelines: a streaming layer for live, a batch layer for daily exact. This is the classic Lambda Architecture (or Kappa if you want to use streaming for both).

                                          +--- [Stream aggregator] ---> [Redis: live top-K]
                                          |
[Search frontend] -> [Kafka: search_events] ---+
                                          |
                                          +--- [Daily batch job] ---> [TopK store (Bigtable/Postgres)]

[Client] -> [Query API] -> reads from Redis (live) or TopK store (historical)

Flow 1: Ingestion ​

  1. Search frontend publishes every query event to Kafka topic search_events.
  2. Kafka retains events for 7 days (for replay / reprocessing).
  3. Two independent consumer groups: one for the streaming aggregator, one for the batch pipeline's checkpoint.

Flow 2: Live top-K (approximate) ​

  1. Stream aggregator consumes from Kafka.
  2. Maintains an in-memory Count-Min Sketch (approximate counts) + min-heap of top-K candidates.
  3. Every 30 seconds, flushes the current top-K snapshot to Redis under key topk:live.
  4. Query API reads from Redis -- sub-millisecond.

Flow 3: Daily exact top-K (batch) ​

  1. At 00:05 daily, a batch job (Spark / Dataflow / MapReduce) reads all events from Kafka for the previous day (via Kafka's time-based offset lookup) or from a long-term archive (GCS / S3).
  2. Partitions the data by hash(query) mod N. Each partition computes local counts via HashMap.
  3. Each partition emits its local top-K (say top-10000 to be safe for merging).
  4. A final reducer merges all partition top-K lists and extracts the global top-100.
  5. Writes to topk_daily(date, rank, query, count) in a persistent store (Bigtable or Postgres).

Flow 4: Query ​

  1. Client hits GET /api/topk.
  2. Query API:
    • window=live: read topk:live from Redis.
    • window=day: read from topk_daily in the DB.
  3. Return JSON.

Potential Deep Dives ​

1) How do we count 1B events/day with 100M+ unique keys? ​

This is the core algorithmic question. Walk through the evolution.

Bad Solution: HashMap + sort at query time ​

  • Approach: Dump all events into a giant HashMap<query, count>. At query time, sort by count, take top-K. O(N log N) in time, O(U) in memory where U is unique keys.
  • Challenges: 100M unique keys * (50 bytes key + 8 bytes count + overhead) = 10+ GB per day. Fits on one beefy machine but doesn't scale if the stream grows. Sorting 100M entries is several seconds. Not real-time.

Good Solution: HashMap + min-heap of size K ​

  • Approach: Same HashMap for counts, but maintain a min-heap of size K (K=100). When a count changes, update the heap only if the new count exceeds the heap's min. O(N log K) overall; much faster than sorting the whole thing.
  • Challenges: Still O(U) memory for the full HashMap. Works for a day's worth on one machine but not at higher scales.

Great Solution: Count-Min Sketch + min-heap (for streaming / live) ​

  • Approach: Use a Count-Min Sketch (CMS) -- an array of counters with multiple hash functions -- to estimate counts with bounded error. Maintain a min-heap of the top-K candidates observed so far. CMS memory is fixed (e.g., 10 MB for ~0.1% error); it does NOT grow with cardinality.
  • Parameters: width w = e / epsilon, depth d = ln(1 / delta). For epsilon = 0.0001, delta = 0.001: w ~= 27,183, d = 7. Total counters = ~190K. At 4 bytes each = ~760 KB. Tiny.
  • Heap management: Every event, increment CMS and look up estimated_count(query). If heap has fewer than K items, push. Else, if estimated_count > heap.min, push and pop.
  • Challenges: CMS over-estimates (never under) -- hash collisions inflate counts. The heap may contain items that are NOT actually top-K due to over-estimation of a "heavy hitter" collision. In practice, with proper sizing, error is small and bounded. Good for live/approximate. NOT suitable for the exact daily batch.
java
class CountMinSketch {
    private final int[][] counters;
    private final int width, depth;
    private final long[] seeds;

    public CountMinSketch(int width, int depth) {
        this.width = width;
        this.depth = depth;
        this.counters = new int[depth][width];
        this.seeds = new long[depth];
        Random r = new Random(42);
        for (int i = 0; i < depth; i++) seeds[i] = r.nextLong();
    }

    public void increment(String key) {
        for (int i = 0; i < depth; i++) {
            int idx = (int)((hash(key, seeds[i]) & 0x7FFFFFFF) % width);
            counters[i][idx]++;
        }
    }

    public int estimate(String key) {
        int min = Integer.MAX_VALUE;
        for (int i = 0; i < depth; i++) {
            int idx = (int)((hash(key, seeds[i]) & 0x7FFFFFFF) % width);
            min = Math.min(min, counters[i][idx]);
        }
        return min;
    }

    private long hash(String key, long seed) {
        // MurmurHash or similar
        return MurmurHash3.hash64(key, seed);
    }
}

class TopKTracker {
    private final CountMinSketch cms;
    private final PriorityQueue<Entry> heap;
    private final int k;

    void observe(String query) {
        cms.increment(query);
        int est = cms.estimate(query);
        // update heap
        if (heap.size() < k) {
            heap.offer(new Entry(query, est));
        } else if (est > heap.peek().count) {
            heap.poll();
            heap.offer(new Entry(query, est));
        }
    }
}
cpp
class CountMinSketch {
    std::vector<std::vector<uint32_t>> counters;
    std::vector<uint64_t> seeds;
    size_t width, depth;
public:
    CountMinSketch(size_t w, size_t d) : width(w), depth(d),
        counters(d, std::vector<uint32_t>(w, 0)), seeds(d) {
        std::mt19937_64 rng(42);
        for (auto& s : seeds) s = rng();
    }
    void increment(const std::string& key) {
        for (size_t i = 0; i < depth; ++i)
            counters[i][murmur_hash(key, seeds[i]) % width]++;
    }
    uint32_t estimate(const std::string& key) const {
        uint32_t m = UINT32_MAX;
        for (size_t i = 0; i < depth; ++i)
            m = std::min(m, counters[i][murmur_hash(key, seeds[i]) % width]);
        return m;
    }
};
typescript
class CountMinSketch {
  private counters: Uint32Array[];
  constructor(private width: number, private depth: number, private seeds: number[]) {
    this.counters = Array.from({ length: depth }, () => new Uint32Array(width));
  }
  increment(key: string) {
    for (let i = 0; i < this.depth; i++) {
      const idx = murmurHash(key, this.seeds[i]) % this.width;
      this.counters[i][idx]++;
    }
  }
  estimate(key: string): number {
    let min = Infinity;
    for (let i = 0; i < this.depth; i++) {
      const idx = murmurHash(key, this.seeds[i]) % this.width;
      min = Math.min(min, this.counters[i][idx]);
    }
    return min;
  }
}

2) Distributed top-K: how do we partition? ​

Bad Solution: Round-robin partitioning ​

  • Approach: Events distributed round-robin across N workers. Each worker maintains its own top-K. Merge at the end.
  • Challenges: Broken. A query that appears 1M times is scattered across N workers with 1M/N count each. No worker's local top-K reflects the global popularity correctly -- heavy hitters get diluted below the per-worker threshold and vanish.

Good Solution: Hash partitioning by query key ​

  • Approach: partition = hash(query) mod N. All events for a given query go to the same worker. That worker sees the query's TRUE count. Each worker computes its local top-K. At merge time, take the union and pick global top-K.
  • Why it works: Since each query lives entirely on one partition, its count is exact within that partition. The global top-K is a subset of the union of per-partition top-K lists (if a query is in the global top-100, it's in its partition's top-100).
  • Safety: Ask each partition for top-M where M > K (e.g., top-10000) to avoid edge cases where a global-top-100 item is rank 150 in a heavily-contested partition. In practice, M = 10*K is overkill-safe.
  • Challenges: Hot partitions if one query is extremely popular (10% of all traffic is one trending term). Mitigate with further sub-partitioning or accept the imbalance if the partition can keep up.

Great Solution: Two-stage with pre-aggregation ​

  • Approach: Stage 1: local workers pre-aggregate within short windows (e.g., 1 second) before shuffling. Instead of sending raw events, send (query, local_count) pairs. Massively reduces shuffle volume.
  • Stage 2: hash-partition the pre-aggregated counts to per-query aggregators, which maintain running totals.
  • Stage 3: each aggregator reports its local top-M to a final merger.
  • Pros: Far less network traffic. Combiner pattern, same as MapReduce's combiner step.

Back-of-envelope: 11.5K QPS * 86400 sec = 1B events. With N=100 partitions, each handles ~10M events/day = ~115 QPS. Trivial. Memory per partition = ~1M unique keys = ~100 MB. Fits easily.

3) Streaming (real-time) vs batch (daily exact) -- do we need both? ​

Bad Solution: Streaming only ​

  • Use only the CMS approach. Accept approximation for daily reports.
  • Challenges: Product teams want the published "Year in Search" numbers to be exact. Over-estimates in CMS become bad headlines ("Why is the #1 query actually #7 by real count?").

Good Solution: Batch only ​

  • Every night, full MapReduce over the day's raw log. Exact.
  • Challenges: No real-time visibility. During a breaking news event, we can't surface trending searches for hours. Bad UX.

Great Solution: Lambda Architecture ​

  • Streaming layer: CMS + heap for live, approximate. Refreshed every 30 seconds.
  • Batch layer: Daily Spark/Dataflow job for exact. Overwrites live counts at day boundary.
  • Serving layer: Query API reads batch for historical, streaming for live.
  • Trade-off: Two code paths. Ensure they produce consistent schemas.

Alternative: Kappa architecture (streaming only, but with replay) ​

  • Keep all events in Kafka with long retention.
  • For daily exact, use the streaming engine (Flink) with exact counting (HashMap, not CMS) over a bounded window.
  • Single codebase. Flink/Dataflow can do this with managed state backends.

Either works at L4. Name the pattern and pick one.

4) Handling skew: the "hot query" problem ​

Some queries are genuinely popular -- "taylor swift" during tour season might be 1% of all traffic. One partition gets overloaded.

  • Salting: For known-hot keys, partition by hash(query + random_salt) mod N where salt is 0..K-1. Split the hot key across K partitions. At merge time, sum across the K partitions.
  • Local combining: Pre-aggregate within each worker before emitting, so even a hot key's network footprint is bounded.
  • Dynamic scaling: The streaming system (Flink) can rebalance partitions based on load metrics.

L4 note: Mention salting. Don't design the full dynamic rebalancer.

5) Retention and reprocessing ​

  • Kafka: 7-day retention. Lets you reprocess the last week if a bug is found.
  • Long-term archive: sink all events to cold storage (GCS, S3, HDFS) via a simple consumer. Parquet format with daily partitions. ~50 GB/day compressed at 50 bytes/event * 1B events. Cheap.
  • If the daily batch job fails or has a bug, re-run it from the archive. Idempotent: it overwrites the day's snapshot.

6) Storage for top-K results ​

Choice between Postgres, Bigtable, or just Redis.

  • Bigtable / Cassandra: row_key = date, column_family = entries, cell = (rank, query, count). Time-range scans efficient.
  • Postgres: Simple table. Works up to billions of rows. Daily top-100 is tiny (100 rows/day * 10 years = 365K rows). Trivial.
  • Redis: For live top-K only. Sorted set ZADD topk:live count query. ZREVRANGE to read.

For L4, Postgres for historical + Redis for live is the simplest and fully adequate. Bigtable mention is bonus if you want to flex Google-stack knowledge, but not required.

7) When exact matters vs when approximation is fine ​

Make this explicit to the interviewer:

  • Exact required: published daily rankings, billing, ML training data, compliance reports.
  • Approximate fine: dashboards, trending widgets, internal monitoring, anomaly detection.

We serve both from the Lambda architecture.


What is Expected at Each Level ​

L3 / Mid-level ​

  • Propose the naive HashMap + sort solution.
  • Recognize that memory is a problem at 1B events but might not know the algorithm for it.
  • Might not distinguish streaming vs batch; probably misses partitioning strategy.

L4 ​

  • Propose Count-Min Sketch (or mention Space-Saving / Misra-Gries) for streaming.
  • Explain hash-partitioning by query and why round-robin fails.
  • Lambda architecture: streaming for live, batch for exact.
  • Back-of-envelope on unique cardinality, memory, QPS per partition.
  • Know the over-count property of CMS and that a min-heap on top needs slack (top-M > K).
  • Retention and reprocessing discussion.

L5 / Senior ​

  • Drill into skew handling (salting, dynamic partitioning).
  • Propose Kappa as an alternative and weigh it honestly.
  • Discuss Flink exact-once semantics (with idempotent sinks, checkpointing).
  • Consider multi-dimensional top-K (by country, device, etc.) and the state explosion.
  • Operational concerns: checkpoint storage cost, schema evolution for events, backfill strategy.
  • Trade-offs between managed services (Dataflow, Kinesis Analytics) vs self-hosted Flink.

Frontend interview preparation reference.