Skip to content

42 β€” LLD: Top-K Most Talkative Users from Chat Logs ​

Google context: This shows up as a "start simple, keep making it cleaner" coding round. The interviewer gives you a file path and an integer K, you produce a working one-pass solution in ten minutes, then they start adding pressure: "what if the log format changes?", "what if we also want top-K by total message length?", "what if the file does not fit in memory?" The signal is whether you can refactor the one-file script into composable pieces without a rewrite. Separation of concerns is the entire rubric.

Understanding the Problem ​

You are given a path to a chat log file and an integer K. You must return the top-K users by message count. Each line in the file is formatted as timestamp | user | message. Lines may be malformed; those should be skipped. The K most talkative users should be returned sorted by message count descending.

What is this system?

It is a log analyzer. The input is a stream of semi-structured text, the output is a ranked list of the noisiest users. This is one of the oldest interview problems on the planet β€” the entire value is in how you factor it. A monolithic function that reads, parses, counts, and sorts in one place is a junior answer. A pipeline of interchangeable components, each doing one thing, is the senior answer.


Requirements ​

Clarifying Questions ​

You: The format is timestamp | user | message β€” is the delimiter always | with spaces, or just |?

Interviewer: | with spaces around the pipe. Assume consistent for well-formed lines.

You: What counts as a malformed line?

Interviewer: Anything that does not split into exactly three non-empty fields. Empty lines, lines with missing fields, lines with extra pipes in the message β€” you decide how to handle the last one.

Important judgment call β€” "extra pipes in the message" is real. If I split on | with limit=3, the message can contain pipes and still parse.

You: Can the user field be an email, a numeric ID, anything?

Interviewer: Opaque string. Treat it as a key.

You: Is the message content relevant, or are we purely counting lines?

Interviewer: For now just counting. But imagine we want "top-K by total message length" later.

That is the extensibility seed. The counter should not be hardcoded to +1.

You: Is K guaranteed to be smaller than the number of distinct users?

Interviewer: Not guaranteed. If K exceeds distinct users, return all of them ranked.

You: Tiebreaker when two users have the same count?

Interviewer: Lexicographically smaller user wins.

You: Is the file small enough to fit in memory? Are we streaming line by line?

Interviewer: Assume it might be large. Stream it. Do not load the whole thing.

You: Should errors be logged, thrown, or silently skipped?

Interviewer: Skip malformed lines. Expose a counter of skipped lines for observability.

Final Requirements ​

  1. Take a file path and an integer K. Return the top-K users by message count.
  2. Stream the file line by line β€” do not load it all into memory.
  3. Skip malformed lines without crashing. Track the skip count.
  4. If K exceeds the number of distinct users, return all of them ranked.
  5. Tiebreaker: lexicographically smaller user wins.
  6. The design must allow swapping log format (JSON, CSV, syslog) and swapping the metric (count, message length, active-hour count) with minimal change.

Out of scope: Persistence, distributed processing, real-time streaming from a socket, duplicate detection across files.

Deferred tradeoffs: Single-machine, single-threaded. We discuss how to shard for very large files in the extensibility section.


Core Entities and Relationships ​

Walking through the nouns and verbs:

  • ChatLogEntry β€” the parsed representation of a valid line. Fields: timestamp, user, message. Plain value type.
  • ChatLogParser β€” reads the file, emits valid ChatLogEntry objects, counts skipped lines. Has zero knowledge of what downstream does with entries.
  • MetricExtractor β€” given an entry, returns the value to add to its user's score. For "top-K by count" this is always 1. For "top-K by total length" this is message.length(). Interface, not a concrete class.
  • UserActivityTracker β€” accumulates scores per user. Takes entries in, updates the internal map. Does not know about files or formats.
  • TopKSelector β€” given a map of user -> score, returns the top K using a min-heap.
  • ChatLogAnalyzer β€” the faΓ§ade. Glues the pipeline together. This is the only class the caller interacts with.
EntityResponsibility
ChatLogEntryParsed line (timestamp, user, message)
ChatLogParserReads file, emits entries, counts malformed lines
MetricExtractorPluggable rule for "how much does this entry contribute to its user's score?"
UserActivityTrackerAccumulates per-user scores
TopKSelectorUses a min-heap of size K to find the top K
ChatLogAnalyzerOrchestrates the pipeline

The parser does not know about counting. The counter does not know about file format. The selector does not know about users β€” it works on any Map<String, Number>. That is the whole point of the design.


Class Design ​

ChatLogEntry ​

State: timestamp, user, message. All immutable.

ChatLogParser (Interface) ​

Methods:

  • parse(path) -> Iterable<ChatLogEntry> β€” a lazy stream.
  • skippedCount() -> long

The interface-over-implementation choice here is deliberate. The concrete PipeDelimitedParser handles timestamp | user | message. A JsonLinesParser could implement the same interface.

PipeDelimitedParser ​

State:

  • skipped: long
  • onMalformed: (line, reason) -> void β€” optional callback

Implementation: Opens the file, iterates lines, splits with limit=3 so pipes inside messages are preserved. If split yields fewer than 3 parts or any part is empty, increment skipped and skip.

MetricExtractor (Interface) ​

Methods:

  • extract(entry: ChatLogEntry) -> long

CountMetric (default) ​

Always returns 1.

MessageLengthMetric ​

Returns entry.message.length().

UserActivityTracker ​

State:

  • scores: Map<String, Long>
  • metric: MetricExtractor

Methods:

  • record(entry) β€” scores[entry.user] += metric.extract(entry)
  • snapshot() -> Map<String, Long>

TopKSelector ​

Methods (static or stateless):

  • topK(scores: Map<String, Long>, k: int) -> List<(String, Long)>

Algorithm: Min-heap of size K keyed by (score, -user) so that on equal score the lexicographically smaller user is preferred in the heap (stays in if we are keeping top K).

Wait β€” that tiebreaker logic is the subtle part. When two users tie, the lex-smaller wins. In a min-heap of size K, we pop the smallest element when full. For the tiebreaker to be correct, the comparator has to say: smaller score first; on equal score, the lexicographically larger user is "smaller" in the heap so it gets evicted first. Let us pin that down in the implementation.

ChatLogAnalyzer ​

State:

  • parser: ChatLogParser
  • tracker: UserActivityTracker

Methods:

  • topK(k: int) -> List<(String, Long)>

Design principles:

  • Single Responsibility β€” each class has exactly one reason to change.
  • Strategy Pattern β€” MetricExtractor and ChatLogParser are both pluggable strategies.
  • Dependency Injection β€” ChatLogAnalyzer receives its parser and tracker; it does not construct them. This makes unit testing trivial β€” hand in a fake parser that yields a fixed list.

Implementation ​

Core Algorithm: Top-K Selection ​

Bad: Sort all users by score descending, take the first K. O(N log N) where N is the number of distinct users.

Good: Min-heap of size K. For each user, push; if heap size > K, pop. O(N log K). Returns heap contents in any order; sort the K elements at the end (O(K log K)).

Great: Same min-heap approach. The algorithmic complexity does not improve further β€” N log K is information-theoretic optimal for unknown distributions. The "great" version is about getting the tiebreaker right and handling K > N gracefully.

Tiebreaker Deep Dive ​

Comparator for the min-heap: (a, b) ->

  • If a.score != b.score: smaller score is "less" (gets evicted first). Return a.score - b.score.
  • If scores are equal: we want the lex-larger user to be evicted first (lex-smaller to win). Return b.user.compareTo(a.user).

This way when the heap is full and we see a new entry with score equal to the heap min, if the new user is lex-larger we skip; if lex-smaller we evict the current min and push the new one.

Core Method: ChatLogAnalyzer.topK(k) ​

  1. Ask parser for a stream of entries.
  2. For each entry, call tracker.record(entry).
  3. After the stream ends, call TopKSelector.topK(tracker.snapshot(), k).
  4. Return the result sorted by score descending, then by user ascending.

Edge cases:

  • K <= 0: return empty list.
  • K >= number of distinct users: return all.
  • Empty file: return empty list, tracker has empty map.
  • All lines malformed: parser's skipped equals total lines, tracker empty, result empty.

Verification ​

Log file:

2026-01-01 10:00:00 | alice | hi
2026-01-01 10:00:01 | bob | hey
malformed-line-no-pipes
2026-01-01 10:00:02 | alice | how are you
2026-01-01 10:00:03 | carol | hello | all
2026-01-01 10:00:04 | bob |
2026-01-01 10:00:05 | alice | yo

Trace with K=2, CountMetric:

  1. Parser yields entry(alice, "hi"). Tracker: {alice: 1}.
  2. Parser yields entry(bob, "hey"). Tracker: {alice: 1, bob: 1}.
  3. Parser sees "malformed-line-no-pipes". Split on | yields 1 part. Skip. skipped = 1.
  4. Parser yields entry(alice, "how are you"). Tracker: {alice: 2, bob: 1}.
  5. Parser yields entry(carol, "hello | all"). Split with limit=3 yields ["2026-01-01 10:00:03", "carol", "hello | all"]. Tracker: {alice: 2, bob: 1, carol: 1}.
  6. Parser sees 2026-01-01 10:00:04 | bob | . Empty message β€” skip. skipped = 2. (Judgment call. Could also count as a message; we chose to skip.)
  7. Parser yields entry(alice, "yo"). Tracker: {alice: 3, bob: 1, carol: 1}.

Top-K selection with K=2:

  • Push (alice, 3). Heap: [(alice, 3)].
  • Push (bob, 1). Heap: [(bob, 1), (alice, 3)] (min at root).
  • Push (carol, 1). Heap size would exceed K. Compare current min (bob, 1) with (carol, 1). Scores equal. Tiebreaker: lex-larger user evicted first. "carol" > "bob", so carol is the lex-larger. Current min is bob β€” so we want to keep bob and discard carol. Do not push.
  • Result: [(bob, 1), (alice, 3)]. Sort descending: [(alice, 3), (bob, 1)].

Output: [(alice, 3), (bob, 1)]. skipped = 2. Correct.


Complete Code Implementation ​

java
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Stream;

public final class ChatLogTopK {

    // --- value type ---
    public record ChatLogEntry(String timestamp, String user, String message) {}

    // --- parser ---
    public interface ChatLogParser {
        Stream<ChatLogEntry> parse(Path path) throws IOException;
        long skippedCount();
    }

    public static final class PipeDelimitedParser implements ChatLogParser {
        private long skipped = 0;

        public Stream<ChatLogEntry> parse(Path path) throws IOException {
            BufferedReader reader = Files.newBufferedReader(path);
            return reader.lines()
                .map(this::parseLine)
                .filter(Objects::nonNull)
                .onClose(() -> {
                    try { reader.close(); } catch (IOException ignored) {}
                });
        }

        private ChatLogEntry parseLine(String line) {
            if (line == null || line.isBlank()) { skipped++; return null; }
            String[] parts = line.split(" \\| ", 3);
            if (parts.length != 3) { skipped++; return null; }
            String ts = parts[0].trim(), user = parts[1].trim(), msg = parts[2];
            if (ts.isEmpty() || user.isEmpty() || msg.isEmpty()) { skipped++; return null; }
            return new ChatLogEntry(ts, user, msg);
        }

        public long skippedCount() { return skipped; }
    }

    // --- metric ---
    public interface MetricExtractor {
        long extract(ChatLogEntry entry);
    }

    public static final class CountMetric implements MetricExtractor {
        public long extract(ChatLogEntry e) { return 1L; }
    }

    public static final class MessageLengthMetric implements MetricExtractor {
        public long extract(ChatLogEntry e) { return e.message().length(); }
    }

    // --- tracker ---
    public static final class UserActivityTracker {
        private final Map<String, Long> scores = new HashMap<>();
        private final MetricExtractor metric;
        public UserActivityTracker(MetricExtractor metric) { this.metric = metric; }
        public void record(ChatLogEntry e) {
            scores.merge(e.user(), metric.extract(e), Long::sum);
        }
        public Map<String, Long> snapshot() { return Collections.unmodifiableMap(scores); }
    }

    // --- selector ---
    public static final class TopKSelector {
        public static List<Map.Entry<String, Long>> topK(Map<String, Long> scores, int k) {
            if (k <= 0 || scores.isEmpty()) return List.of();
            // Min-heap: smallest score first; on tie, lex-larger user first (evicts first)
            PriorityQueue<Map.Entry<String, Long>> heap = new PriorityQueue<>((a, b) -> {
                int c = Long.compare(a.getValue(), b.getValue());
                if (c != 0) return c;
                return b.getKey().compareTo(a.getKey());
            });
            for (var e : scores.entrySet()) {
                heap.offer(e);
                if (heap.size() > k) heap.poll();
            }
            List<Map.Entry<String, Long>> result = new ArrayList<>(heap);
            result.sort((a, b) -> {
                int c = Long.compare(b.getValue(), a.getValue());
                return c != 0 ? c : a.getKey().compareTo(b.getKey());
            });
            return result;
        }
    }

    // --- analyzer (faΓ§ade) ---
    public static final class ChatLogAnalyzer {
        private final ChatLogParser parser;
        private final UserActivityTracker tracker;

        public ChatLogAnalyzer(ChatLogParser parser, UserActivityTracker tracker) {
            this.parser = parser;
            this.tracker = tracker;
        }

        public List<Map.Entry<String, Long>> topK(Path path, int k) throws IOException {
            try (Stream<ChatLogEntry> stream = parser.parse(path)) {
                stream.forEach(tracker::record);
            }
            return TopKSelector.topK(tracker.snapshot(), k);
        }

        public long skipped() { return parser.skippedCount(); }
    }

    // --- demo ---
    public static void main(String[] args) throws IOException {
        var analyzer = new ChatLogAnalyzer(
            new PipeDelimitedParser(),
            new UserActivityTracker(new CountMetric())
        );
        List<Map.Entry<String, Long>> top = analyzer.topK(Path.of(args[0]), Integer.parseInt(args[1]));
        top.forEach(e -> System.out.println(e.getKey() + " " + e.getValue()));
        System.err.println("Skipped: " + analyzer.skipped());
    }
}
cpp
#include <algorithm>
#include <fstream>
#include <functional>
#include <memory>
#include <queue>
#include <string>
#include <unordered_map>
#include <vector>

struct ChatLogEntry { std::string timestamp, user, message; };

struct MetricExtractor {
    virtual long extract(const ChatLogEntry&) = 0;
    virtual ~MetricExtractor() = default;
};
struct CountMetric : MetricExtractor {
    long extract(const ChatLogEntry&) override { return 1; }
};
struct MessageLengthMetric : MetricExtractor {
    long extract(const ChatLogEntry& e) override { return (long)e.message.size(); }
};

class PipeDelimitedParser {
    long skipped_ = 0;
public:
    template <typename F>
    void parse(const std::string& path, F&& onEntry) {
        std::ifstream in(path);
        std::string line;
        while (std::getline(in, line)) {
            auto p1 = line.find(" | ");
            if (p1 == std::string::npos) { ++skipped_; continue; }
            auto p2 = line.find(" | ", p1 + 3);
            if (p2 == std::string::npos) { ++skipped_; continue; }
            std::string ts = line.substr(0, p1);
            std::string user = line.substr(p1 + 3, p2 - (p1 + 3));
            std::string msg = line.substr(p2 + 3);
            if (ts.empty() || user.empty() || msg.empty()) { ++skipped_; continue; }
            onEntry(ChatLogEntry{ts, user, msg});
        }
    }
    long skipped() const { return skipped_; }
};

class UserActivityTracker {
    std::unordered_map<std::string, long> scores_;
    MetricExtractor* metric_;
public:
    explicit UserActivityTracker(MetricExtractor* m) : metric_(m) {}
    void record(const ChatLogEntry& e) { scores_[e.user] += metric_->extract(e); }
    const std::unordered_map<std::string, long>& snapshot() const { return scores_; }
};

std::vector<std::pair<std::string, long>> topK(
    const std::unordered_map<std::string, long>& scores, int k) {
    if (k <= 0 || scores.empty()) return {};
    auto cmp = [](const std::pair<std::string,long>& a,
                  const std::pair<std::string,long>& b) {
        if (a.second != b.second) return a.second > b.second; // min-heap by score
        return a.first < b.first;                             // tie: lex-larger evicts first
    };
    std::priority_queue<std::pair<std::string,long>,
        std::vector<std::pair<std::string,long>>, decltype(cmp)> heap(cmp);
    for (auto& kv : scores) {
        heap.push(kv);
        if ((int)heap.size() > k) heap.pop();
    }
    std::vector<std::pair<std::string,long>> out;
    while (!heap.empty()) { out.push_back(heap.top()); heap.pop(); }
    std::sort(out.begin(), out.end(), [](auto& a, auto& b) {
        if (a.second != b.second) return a.second > b.second;
        return a.first < b.first;
    });
    return out;
}
typescript
import * as fs from "node:fs";
import * as readline from "node:readline";

export interface ChatLogEntry { timestamp: string; user: string; message: string; }

export interface ChatLogParser {
  parse(path: string, onEntry: (e: ChatLogEntry) => void): Promise<void>;
  skippedCount(): number;
}

export class PipeDelimitedParser implements ChatLogParser {
  private skipped = 0;
  async parse(path: string, onEntry: (e: ChatLogEntry) => void): Promise<void> {
    const rl = readline.createInterface({
      input: fs.createReadStream(path),
      crlfDelay: Infinity,
    });
    for await (const raw of rl) {
      const line = raw.trim();
      if (!line) { this.skipped++; continue; }
      const parts = this.splitN(line, " | ", 3);
      if (parts.length !== 3) { this.skipped++; continue; }
      const [ts, user, msg] = parts.map(s => s.trim());
      if (!ts || !user || !msg) { this.skipped++; continue; }
      onEntry({ timestamp: ts, user, message: msg });
    }
  }
  skippedCount() { return this.skipped; }
  private splitN(s: string, sep: string, n: number): string[] {
    const out: string[] = [];
    let i = 0;
    while (out.length < n - 1) {
      const j = s.indexOf(sep, i);
      if (j === -1) break;
      out.push(s.substring(i, j));
      i = j + sep.length;
    }
    out.push(s.substring(i));
    return out;
  }
}

export interface MetricExtractor { extract(e: ChatLogEntry): number; }
export class CountMetric implements MetricExtractor { extract() { return 1; } }
export class MessageLengthMetric implements MetricExtractor {
  extract(e: ChatLogEntry) { return e.message.length; }
}

export class UserActivityTracker {
  private scores = new Map<string, number>();
  constructor(private metric: MetricExtractor) {}
  record(e: ChatLogEntry): void {
    this.scores.set(e.user, (this.scores.get(e.user) ?? 0) + this.metric.extract(e));
  }
  snapshot(): Map<string, number> { return this.scores; }
}

export class TopKSelector {
  static topK(scores: Map<string, number>, k: number): Array<[string, number]> {
    if (k <= 0 || scores.size === 0) return [];
    // TypeScript has no built-in heap; for clarity we sort. O(N log N).
    // In a real interview, note the trade and offer a heap for O(N log K).
    const arr: Array<[string, number]> = [...scores.entries()];
    arr.sort((a, b) => b[1] - a[1] || a[0].localeCompare(b[0]));
    return arr.slice(0, k);
  }
}

export class ChatLogAnalyzer {
  constructor(
    private parser: ChatLogParser,
    private tracker: UserActivityTracker,
  ) {}
  async topK(path: string, k: number): Promise<Array<[string, number]>> {
    await this.parser.parse(path, e => this.tracker.record(e));
    return TopKSelector.topK(this.tracker.snapshot(), k);
  }
  skipped(): number { return this.parser.skippedCount(); }
}

Extensibility ​

1. "The log format just changed to JSON lines β€” can you swap it in?" ​

You answer: "Parser is an interface. I write JsonLinesParser implementing ChatLogParser and hand it to the analyzer. The rest of the code is untouched."

java
public static final class JsonLinesParser implements ChatLogParser {
    private long skipped = 0;
    public Stream<ChatLogEntry> parse(Path path) throws IOException {
        BufferedReader reader = Files.newBufferedReader(path);
        return reader.lines().map(line -> {
            try {
                var node = objectMapper.readTree(line);
                return new ChatLogEntry(
                    node.get("ts").asText(),
                    node.get("user").asText(),
                    node.get("msg").asText()
                );
            } catch (Exception e) { skipped++; return null; }
        }).filter(Objects::nonNull).onClose(() -> { /* close */ });
    }
    public long skippedCount() { return skipped; }
}

Tradeoff: The parser interface returns ChatLogEntry, which is committed to three fields. If you need format-specific metadata (priority, thread ID), you promote ChatLogEntry to an open schema. For now, three fields are enough.

2. "Top-K by total message length, then by number of emojis." ​

You answer: "The metric is already a strategy. I add MessageLengthMetric and EmojiCountMetric and pass them into the tracker. For combined metrics (e.g., weighted sum), I add a CompositeMetric."

java
public static final class CompositeMetric implements MetricExtractor {
    private final List<MetricExtractor> parts;
    private final double[] weights;
    // extract = weighted sum over parts
}

Tradeoff: Composite metrics can hide magic constants. I keep the weights as constructor args so callers see them.

3. "The file is 500 GB. Can you shard?" ​

You answer: "The tracker is commutative β€” record on the same user from any shard adds to the same counter. I split the file into chunks, run a tracker per chunk in parallel, then merge the snapshots with Map.merge(..., Long::sum). Top-K runs once on the merged map. The parser stays per-shard."

java
public static Map<String, Long> mergeTrackers(List<UserActivityTracker> trackers) {
    Map<String, Long> merged = new HashMap<>();
    for (var t : trackers) {
        t.snapshot().forEach((k, v) -> merged.merge(k, v, Long::sum));
    }
    return merged;
}

Tradeoff: Merge is O(total distinct users), which might dominate for wide key spaces. If merging is expensive, use a reduce tree β€” pairwise merges β€” to stay parallel. Below ~10M keys, a sequential merge is fine.


What is Expected at Each Level ​

LevelExpectations
JuniorSingle-function script: read file, split each line, Map<String, Integer> count, sort descending, take first K. Gets the happy path right; struggles with malformed lines or pipes in the message.
Mid-levelExtracts a parseLine function, uses HashMap.merge, uses a min-heap of size K. Handles malformed lines gracefully. When asked to swap format, cleanly refactors by promoting the parser to an interface.
SeniorLays out the pipeline upfront β€” parser, metric, tracker, selector, analyzer β€” with clear boundaries. Names the extensibility axes (format, metric, sharding) before the interviewer asks. Discusses streaming vs batch, tiebreaker subtleties, and the difference between "empty message = skip" vs "empty message = count zero" as a conscious product decision. Google signal: the refactoring instinct is visible when they write the one-line solution first, then say "let me pull the parser out β€” the interviewer is going to ask about JSON in ten minutes," and do it before being asked.

Frontend interview preparation reference.