42 β LLD: Top-K Most Talkative Users from Chat Logs β
Google context: This shows up as a "start simple, keep making it cleaner" coding round. The interviewer gives you a file path and an integer K, you produce a working one-pass solution in ten minutes, then they start adding pressure: "what if the log format changes?", "what if we also want top-K by total message length?", "what if the file does not fit in memory?" The signal is whether you can refactor the one-file script into composable pieces without a rewrite. Separation of concerns is the entire rubric.
Understanding the Problem β
You are given a path to a chat log file and an integer K. You must return the top-K users by message count. Each line in the file is formatted as timestamp | user | message. Lines may be malformed; those should be skipped. The K most talkative users should be returned sorted by message count descending.
What is this system?
It is a log analyzer. The input is a stream of semi-structured text, the output is a ranked list of the noisiest users. This is one of the oldest interview problems on the planet β the entire value is in how you factor it. A monolithic function that reads, parses, counts, and sorts in one place is a junior answer. A pipeline of interchangeable components, each doing one thing, is the senior answer.
Requirements β
Clarifying Questions β
You: The format is
timestamp | user | messageβ is the delimiter always|with spaces, or just|?
Interviewer:
|with spaces around the pipe. Assume consistent for well-formed lines.
You: What counts as a malformed line?
Interviewer: Anything that does not split into exactly three non-empty fields. Empty lines, lines with missing fields, lines with extra pipes in the message β you decide how to handle the last one.
Important judgment call β "extra pipes in the message" is real. If I split on | with limit=3, the message can contain pipes and still parse.
You: Can the user field be an email, a numeric ID, anything?
Interviewer: Opaque string. Treat it as a key.
You: Is the message content relevant, or are we purely counting lines?
Interviewer: For now just counting. But imagine we want "top-K by total message length" later.
That is the extensibility seed. The counter should not be hardcoded to +1.
You: Is K guaranteed to be smaller than the number of distinct users?
Interviewer: Not guaranteed. If K exceeds distinct users, return all of them ranked.
You: Tiebreaker when two users have the same count?
Interviewer: Lexicographically smaller user wins.
You: Is the file small enough to fit in memory? Are we streaming line by line?
Interviewer: Assume it might be large. Stream it. Do not load the whole thing.
You: Should errors be logged, thrown, or silently skipped?
Interviewer: Skip malformed lines. Expose a counter of skipped lines for observability.
Final Requirements β
- Take a file path and an integer K. Return the top-K users by message count.
- Stream the file line by line β do not load it all into memory.
- Skip malformed lines without crashing. Track the skip count.
- If K exceeds the number of distinct users, return all of them ranked.
- Tiebreaker: lexicographically smaller user wins.
- The design must allow swapping log format (JSON, CSV, syslog) and swapping the metric (count, message length, active-hour count) with minimal change.
Out of scope: Persistence, distributed processing, real-time streaming from a socket, duplicate detection across files.
Deferred tradeoffs: Single-machine, single-threaded. We discuss how to shard for very large files in the extensibility section.
Core Entities and Relationships β
Walking through the nouns and verbs:
- ChatLogEntry β the parsed representation of a valid line. Fields:
timestamp,user,message. Plain value type. - ChatLogParser β reads the file, emits valid
ChatLogEntryobjects, counts skipped lines. Has zero knowledge of what downstream does with entries. - MetricExtractor β given an entry, returns the value to add to its user's score. For "top-K by count" this is always
1. For "top-K by total length" this ismessage.length(). Interface, not a concrete class. - UserActivityTracker β accumulates scores per user. Takes entries in, updates the internal map. Does not know about files or formats.
- TopKSelector β given a map of user -> score, returns the top K using a min-heap.
- ChatLogAnalyzer β the faΓ§ade. Glues the pipeline together. This is the only class the caller interacts with.
| Entity | Responsibility |
|---|---|
ChatLogEntry | Parsed line (timestamp, user, message) |
ChatLogParser | Reads file, emits entries, counts malformed lines |
MetricExtractor | Pluggable rule for "how much does this entry contribute to its user's score?" |
UserActivityTracker | Accumulates per-user scores |
TopKSelector | Uses a min-heap of size K to find the top K |
ChatLogAnalyzer | Orchestrates the pipeline |
The parser does not know about counting. The counter does not know about file format. The selector does not know about users β it works on any Map<String, Number>. That is the whole point of the design.
Class Design β
ChatLogEntry β
State: timestamp, user, message. All immutable.
ChatLogParser (Interface) β
Methods:
parse(path) -> Iterable<ChatLogEntry>β a lazy stream.skippedCount() -> long
The interface-over-implementation choice here is deliberate. The concrete PipeDelimitedParser handles timestamp | user | message. A JsonLinesParser could implement the same interface.
PipeDelimitedParser β
State:
skipped: longonMalformed: (line, reason) -> voidβ optional callback
Implementation: Opens the file, iterates lines, splits with limit=3 so pipes inside messages are preserved. If split yields fewer than 3 parts or any part is empty, increment skipped and skip.
MetricExtractor (Interface) β
Methods:
extract(entry: ChatLogEntry) -> long
CountMetric (default) β
Always returns 1.
MessageLengthMetric β
Returns entry.message.length().
UserActivityTracker β
State:
scores: Map<String, Long>metric: MetricExtractor
Methods:
record(entry)βscores[entry.user] += metric.extract(entry)snapshot() -> Map<String, Long>
TopKSelector β
Methods (static or stateless):
topK(scores: Map<String, Long>, k: int) -> List<(String, Long)>
Algorithm: Min-heap of size K keyed by (score, -user) so that on equal score the lexicographically smaller user is preferred in the heap (stays in if we are keeping top K).
Wait β that tiebreaker logic is the subtle part. When two users tie, the lex-smaller wins. In a min-heap of size K, we pop the smallest element when full. For the tiebreaker to be correct, the comparator has to say: smaller score first; on equal score, the lexicographically larger user is "smaller" in the heap so it gets evicted first. Let us pin that down in the implementation.
ChatLogAnalyzer β
State:
parser: ChatLogParsertracker: UserActivityTracker
Methods:
topK(k: int) -> List<(String, Long)>
Design principles:
- Single Responsibility β each class has exactly one reason to change.
- Strategy Pattern β
MetricExtractorandChatLogParserare both pluggable strategies. - Dependency Injection β
ChatLogAnalyzerreceives its parser and tracker; it does not construct them. This makes unit testing trivial β hand in a fake parser that yields a fixed list.
Implementation β
Core Algorithm: Top-K Selection β
Bad: Sort all users by score descending, take the first K. O(N log N) where N is the number of distinct users.
Good: Min-heap of size K. For each user, push; if heap size > K, pop. O(N log K). Returns heap contents in any order; sort the K elements at the end (O(K log K)).
Great: Same min-heap approach. The algorithmic complexity does not improve further β N log K is information-theoretic optimal for unknown distributions. The "great" version is about getting the tiebreaker right and handling K > N gracefully.
Tiebreaker Deep Dive β
Comparator for the min-heap: (a, b) ->
- If
a.score != b.score: smaller score is "less" (gets evicted first). Returna.score - b.score. - If scores are equal: we want the lex-larger user to be evicted first (lex-smaller to win). Return
b.user.compareTo(a.user).
This way when the heap is full and we see a new entry with score equal to the heap min, if the new user is lex-larger we skip; if lex-smaller we evict the current min and push the new one.
Core Method: ChatLogAnalyzer.topK(k) β
- Ask parser for a stream of entries.
- For each entry, call
tracker.record(entry). - After the stream ends, call
TopKSelector.topK(tracker.snapshot(), k). - Return the result sorted by score descending, then by user ascending.
Edge cases:
- K <= 0: return empty list.
- K >= number of distinct users: return all.
- Empty file: return empty list, tracker has empty map.
- All lines malformed: parser's
skippedequals total lines, tracker empty, result empty.
Verification β
Log file:
2026-01-01 10:00:00 | alice | hi
2026-01-01 10:00:01 | bob | hey
malformed-line-no-pipes
2026-01-01 10:00:02 | alice | how are you
2026-01-01 10:00:03 | carol | hello | all
2026-01-01 10:00:04 | bob |
2026-01-01 10:00:05 | alice | yoTrace with K=2, CountMetric:
- Parser yields entry(alice, "hi"). Tracker:
{alice: 1}. - Parser yields entry(bob, "hey"). Tracker:
{alice: 1, bob: 1}. - Parser sees "malformed-line-no-pipes". Split on
|yields 1 part. Skip.skipped = 1. - Parser yields entry(alice, "how are you"). Tracker:
{alice: 2, bob: 1}. - Parser yields entry(carol, "hello | all"). Split with
limit=3yields["2026-01-01 10:00:03", "carol", "hello | all"]. Tracker:{alice: 2, bob: 1, carol: 1}. - Parser sees
2026-01-01 10:00:04 | bob |. Empty message β skip.skipped = 2. (Judgment call. Could also count as a message; we chose to skip.) - Parser yields entry(alice, "yo"). Tracker:
{alice: 3, bob: 1, carol: 1}.
Top-K selection with K=2:
- Push (alice, 3). Heap: [(alice, 3)].
- Push (bob, 1). Heap: [(bob, 1), (alice, 3)] (min at root).
- Push (carol, 1). Heap size would exceed K. Compare current min (bob, 1) with (carol, 1). Scores equal. Tiebreaker: lex-larger user evicted first. "carol" > "bob", so carol is the lex-larger. Current min is bob β so we want to keep bob and discard carol. Do not push.
- Result: [(bob, 1), (alice, 3)]. Sort descending: [(alice, 3), (bob, 1)].
Output: [(alice, 3), (bob, 1)]. skipped = 2. Correct.
Complete Code Implementation β
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Stream;
public final class ChatLogTopK {
// --- value type ---
public record ChatLogEntry(String timestamp, String user, String message) {}
// --- parser ---
public interface ChatLogParser {
Stream<ChatLogEntry> parse(Path path) throws IOException;
long skippedCount();
}
public static final class PipeDelimitedParser implements ChatLogParser {
private long skipped = 0;
public Stream<ChatLogEntry> parse(Path path) throws IOException {
BufferedReader reader = Files.newBufferedReader(path);
return reader.lines()
.map(this::parseLine)
.filter(Objects::nonNull)
.onClose(() -> {
try { reader.close(); } catch (IOException ignored) {}
});
}
private ChatLogEntry parseLine(String line) {
if (line == null || line.isBlank()) { skipped++; return null; }
String[] parts = line.split(" \\| ", 3);
if (parts.length != 3) { skipped++; return null; }
String ts = parts[0].trim(), user = parts[1].trim(), msg = parts[2];
if (ts.isEmpty() || user.isEmpty() || msg.isEmpty()) { skipped++; return null; }
return new ChatLogEntry(ts, user, msg);
}
public long skippedCount() { return skipped; }
}
// --- metric ---
public interface MetricExtractor {
long extract(ChatLogEntry entry);
}
public static final class CountMetric implements MetricExtractor {
public long extract(ChatLogEntry e) { return 1L; }
}
public static final class MessageLengthMetric implements MetricExtractor {
public long extract(ChatLogEntry e) { return e.message().length(); }
}
// --- tracker ---
public static final class UserActivityTracker {
private final Map<String, Long> scores = new HashMap<>();
private final MetricExtractor metric;
public UserActivityTracker(MetricExtractor metric) { this.metric = metric; }
public void record(ChatLogEntry e) {
scores.merge(e.user(), metric.extract(e), Long::sum);
}
public Map<String, Long> snapshot() { return Collections.unmodifiableMap(scores); }
}
// --- selector ---
public static final class TopKSelector {
public static List<Map.Entry<String, Long>> topK(Map<String, Long> scores, int k) {
if (k <= 0 || scores.isEmpty()) return List.of();
// Min-heap: smallest score first; on tie, lex-larger user first (evicts first)
PriorityQueue<Map.Entry<String, Long>> heap = new PriorityQueue<>((a, b) -> {
int c = Long.compare(a.getValue(), b.getValue());
if (c != 0) return c;
return b.getKey().compareTo(a.getKey());
});
for (var e : scores.entrySet()) {
heap.offer(e);
if (heap.size() > k) heap.poll();
}
List<Map.Entry<String, Long>> result = new ArrayList<>(heap);
result.sort((a, b) -> {
int c = Long.compare(b.getValue(), a.getValue());
return c != 0 ? c : a.getKey().compareTo(b.getKey());
});
return result;
}
}
// --- analyzer (faΓ§ade) ---
public static final class ChatLogAnalyzer {
private final ChatLogParser parser;
private final UserActivityTracker tracker;
public ChatLogAnalyzer(ChatLogParser parser, UserActivityTracker tracker) {
this.parser = parser;
this.tracker = tracker;
}
public List<Map.Entry<String, Long>> topK(Path path, int k) throws IOException {
try (Stream<ChatLogEntry> stream = parser.parse(path)) {
stream.forEach(tracker::record);
}
return TopKSelector.topK(tracker.snapshot(), k);
}
public long skipped() { return parser.skippedCount(); }
}
// --- demo ---
public static void main(String[] args) throws IOException {
var analyzer = new ChatLogAnalyzer(
new PipeDelimitedParser(),
new UserActivityTracker(new CountMetric())
);
List<Map.Entry<String, Long>> top = analyzer.topK(Path.of(args[0]), Integer.parseInt(args[1]));
top.forEach(e -> System.out.println(e.getKey() + " " + e.getValue()));
System.err.println("Skipped: " + analyzer.skipped());
}
}#include <algorithm>
#include <fstream>
#include <functional>
#include <memory>
#include <queue>
#include <string>
#include <unordered_map>
#include <vector>
struct ChatLogEntry { std::string timestamp, user, message; };
struct MetricExtractor {
virtual long extract(const ChatLogEntry&) = 0;
virtual ~MetricExtractor() = default;
};
struct CountMetric : MetricExtractor {
long extract(const ChatLogEntry&) override { return 1; }
};
struct MessageLengthMetric : MetricExtractor {
long extract(const ChatLogEntry& e) override { return (long)e.message.size(); }
};
class PipeDelimitedParser {
long skipped_ = 0;
public:
template <typename F>
void parse(const std::string& path, F&& onEntry) {
std::ifstream in(path);
std::string line;
while (std::getline(in, line)) {
auto p1 = line.find(" | ");
if (p1 == std::string::npos) { ++skipped_; continue; }
auto p2 = line.find(" | ", p1 + 3);
if (p2 == std::string::npos) { ++skipped_; continue; }
std::string ts = line.substr(0, p1);
std::string user = line.substr(p1 + 3, p2 - (p1 + 3));
std::string msg = line.substr(p2 + 3);
if (ts.empty() || user.empty() || msg.empty()) { ++skipped_; continue; }
onEntry(ChatLogEntry{ts, user, msg});
}
}
long skipped() const { return skipped_; }
};
class UserActivityTracker {
std::unordered_map<std::string, long> scores_;
MetricExtractor* metric_;
public:
explicit UserActivityTracker(MetricExtractor* m) : metric_(m) {}
void record(const ChatLogEntry& e) { scores_[e.user] += metric_->extract(e); }
const std::unordered_map<std::string, long>& snapshot() const { return scores_; }
};
std::vector<std::pair<std::string, long>> topK(
const std::unordered_map<std::string, long>& scores, int k) {
if (k <= 0 || scores.empty()) return {};
auto cmp = [](const std::pair<std::string,long>& a,
const std::pair<std::string,long>& b) {
if (a.second != b.second) return a.second > b.second; // min-heap by score
return a.first < b.first; // tie: lex-larger evicts first
};
std::priority_queue<std::pair<std::string,long>,
std::vector<std::pair<std::string,long>>, decltype(cmp)> heap(cmp);
for (auto& kv : scores) {
heap.push(kv);
if ((int)heap.size() > k) heap.pop();
}
std::vector<std::pair<std::string,long>> out;
while (!heap.empty()) { out.push_back(heap.top()); heap.pop(); }
std::sort(out.begin(), out.end(), [](auto& a, auto& b) {
if (a.second != b.second) return a.second > b.second;
return a.first < b.first;
});
return out;
}import * as fs from "node:fs";
import * as readline from "node:readline";
export interface ChatLogEntry { timestamp: string; user: string; message: string; }
export interface ChatLogParser {
parse(path: string, onEntry: (e: ChatLogEntry) => void): Promise<void>;
skippedCount(): number;
}
export class PipeDelimitedParser implements ChatLogParser {
private skipped = 0;
async parse(path: string, onEntry: (e: ChatLogEntry) => void): Promise<void> {
const rl = readline.createInterface({
input: fs.createReadStream(path),
crlfDelay: Infinity,
});
for await (const raw of rl) {
const line = raw.trim();
if (!line) { this.skipped++; continue; }
const parts = this.splitN(line, " | ", 3);
if (parts.length !== 3) { this.skipped++; continue; }
const [ts, user, msg] = parts.map(s => s.trim());
if (!ts || !user || !msg) { this.skipped++; continue; }
onEntry({ timestamp: ts, user, message: msg });
}
}
skippedCount() { return this.skipped; }
private splitN(s: string, sep: string, n: number): string[] {
const out: string[] = [];
let i = 0;
while (out.length < n - 1) {
const j = s.indexOf(sep, i);
if (j === -1) break;
out.push(s.substring(i, j));
i = j + sep.length;
}
out.push(s.substring(i));
return out;
}
}
export interface MetricExtractor { extract(e: ChatLogEntry): number; }
export class CountMetric implements MetricExtractor { extract() { return 1; } }
export class MessageLengthMetric implements MetricExtractor {
extract(e: ChatLogEntry) { return e.message.length; }
}
export class UserActivityTracker {
private scores = new Map<string, number>();
constructor(private metric: MetricExtractor) {}
record(e: ChatLogEntry): void {
this.scores.set(e.user, (this.scores.get(e.user) ?? 0) + this.metric.extract(e));
}
snapshot(): Map<string, number> { return this.scores; }
}
export class TopKSelector {
static topK(scores: Map<string, number>, k: number): Array<[string, number]> {
if (k <= 0 || scores.size === 0) return [];
// TypeScript has no built-in heap; for clarity we sort. O(N log N).
// In a real interview, note the trade and offer a heap for O(N log K).
const arr: Array<[string, number]> = [...scores.entries()];
arr.sort((a, b) => b[1] - a[1] || a[0].localeCompare(b[0]));
return arr.slice(0, k);
}
}
export class ChatLogAnalyzer {
constructor(
private parser: ChatLogParser,
private tracker: UserActivityTracker,
) {}
async topK(path: string, k: number): Promise<Array<[string, number]>> {
await this.parser.parse(path, e => this.tracker.record(e));
return TopKSelector.topK(this.tracker.snapshot(), k);
}
skipped(): number { return this.parser.skippedCount(); }
}Extensibility β
1. "The log format just changed to JSON lines β can you swap it in?" β
You answer: "Parser is an interface. I write JsonLinesParser implementing ChatLogParser and hand it to the analyzer. The rest of the code is untouched."
public static final class JsonLinesParser implements ChatLogParser {
private long skipped = 0;
public Stream<ChatLogEntry> parse(Path path) throws IOException {
BufferedReader reader = Files.newBufferedReader(path);
return reader.lines().map(line -> {
try {
var node = objectMapper.readTree(line);
return new ChatLogEntry(
node.get("ts").asText(),
node.get("user").asText(),
node.get("msg").asText()
);
} catch (Exception e) { skipped++; return null; }
}).filter(Objects::nonNull).onClose(() -> { /* close */ });
}
public long skippedCount() { return skipped; }
}Tradeoff: The parser interface returns ChatLogEntry, which is committed to three fields. If you need format-specific metadata (priority, thread ID), you promote ChatLogEntry to an open schema. For now, three fields are enough.
2. "Top-K by total message length, then by number of emojis." β
You answer: "The metric is already a strategy. I add MessageLengthMetric and EmojiCountMetric and pass them into the tracker. For combined metrics (e.g., weighted sum), I add a CompositeMetric."
public static final class CompositeMetric implements MetricExtractor {
private final List<MetricExtractor> parts;
private final double[] weights;
// extract = weighted sum over parts
}Tradeoff: Composite metrics can hide magic constants. I keep the weights as constructor args so callers see them.
3. "The file is 500 GB. Can you shard?" β
You answer: "The tracker is commutative β record on the same user from any shard adds to the same counter. I split the file into chunks, run a tracker per chunk in parallel, then merge the snapshots with Map.merge(..., Long::sum). Top-K runs once on the merged map. The parser stays per-shard."
public static Map<String, Long> mergeTrackers(List<UserActivityTracker> trackers) {
Map<String, Long> merged = new HashMap<>();
for (var t : trackers) {
t.snapshot().forEach((k, v) -> merged.merge(k, v, Long::sum));
}
return merged;
}Tradeoff: Merge is O(total distinct users), which might dominate for wide key spaces. If merging is expensive, use a reduce tree β pairwise merges β to stay parallel. Below ~10M keys, a sequential merge is fine.
What is Expected at Each Level β
| Level | Expectations |
|---|---|
| Junior | Single-function script: read file, split each line, Map<String, Integer> count, sort descending, take first K. Gets the happy path right; struggles with malformed lines or pipes in the message. |
| Mid-level | Extracts a parseLine function, uses HashMap.merge, uses a min-heap of size K. Handles malformed lines gracefully. When asked to swap format, cleanly refactors by promoting the parser to an interface. |
| Senior | Lays out the pipeline upfront β parser, metric, tracker, selector, analyzer β with clear boundaries. Names the extensibility axes (format, metric, sharding) before the interviewer asks. Discusses streaming vs batch, tiebreaker subtleties, and the difference between "empty message = skip" vs "empty message = count zero" as a conscious product decision. Google signal: the refactoring instinct is visible when they write the one-line solution first, then say "let me pull the parser out β the interviewer is going to ask about JSON in ten minutes," and do it before being asked. |