45 — LLD: Connection Pool with Request Queue
Understanding the Problem
A connection pool hands out a bounded set of N connections to many callers. acquire(timeoutMs) returns a connection or throws on timeout. release(conn) gives it back. When the pool is saturated, new acquirers park in a FIFO queue. Health-check and evict bad connections.
What is an Object Pool?
A classic Gang-of-Four pattern. Rather than construct and destroy expensive resources per call (TCP sockets, DB connections, HTTP clients), you keep a warm pool and loan them out. The interesting design is what happens when the pool is empty.
Requirements
Clarifying Questions
You: Blocking acquire, async acquire, or both?
Interviewer: Async with a timeout.
Returns a promise/future.
You: How do callers behave on timeout?
Interviewer: The promise rejects with a clear error.
The waiter is then pruned from the queue cleanly.
You: Should we validate on every acquire?
Interviewer: Yes, a light-weight health check.
factory.validate(conn) before handing out a free one.
You: Do connections expire?
Interviewer: Assume no max-age for v1, but the design should allow a reaper.
Seam for an idle-reaper thread.
Final Requirements
- Bounded pool of
Nconnections with aConnectionFactory. acquire(timeoutMs)returns a connection or times out.release(conn)returns to pool, hands to next waiter, or closes if unhealthy.- Waiters park FIFO.
drain()closes everything cleanly.- Thread-safe.
Out of scope: read-replicas, connection pinning by query, cross-region failover.
Core Entities and Relationships
| Entity | Responsibility |
|---|---|
Connection | Handle with healthy flag and close(). |
ConnectionFactory | create(), validate(conn). |
Waiter | {resolve, reject, timer}. |
ConnectionPool | Free stack, in-use set, waiter queue. |
Why a free stack instead of queue? LIFO keeps the hot connection warm. Tail connections drift to the back and become candidates for eviction by a reaper.
Why a separate Waiter type? Because the timer must be cancelled on both timeout and successful resolution; encapsulating the trio prevents leaks.
Design is iterative — start with acquire/release, then add the waiter queue.
Class Design
Connection (interface)
Methods: close()State: id, healthy
ConnectionPool
State:
free: Connection[]— LIFOinUse: Set<Connection>waiters: Waiter[]— FIFOmax: int,factory: ConnectionFactory
Methods:
acquire(timeoutMs) -> Promise<Connection>release(conn) -> voiddrain() -> Promise<void>
Design principles at play:
- Factory Pattern —
ConnectionFactorycreates and validates. - Object Pool Pattern — the pool itself.
- Single Responsibility — pool knows nothing about what the connection does.
Implementation
Core Logic: Acquire
Bad approach: busy-loop on a shared counter.
- Wastes CPU, races badly.
Good approach: check free list, create if under max, else queue. Each waiter carries a timeout.
- Clean, linear code path.
Great approach: in Java, use a ReentrantLock with a Condition — notEmpty.await(timeout) handles the wait cleanly. On release, notEmpty.signal().
Verification
Pool max=2.
acquire(1s)→ free empty, size 0 < 2,factory.create()→ C1, inUse={C1}. Return C1.acquire(1s)→ same flow → C2, inUse={C1, C2}.acquire(1s)→ pool full. Waiter W1 queued with timer.release(C1)→ no waiters? wait, W1 is queued. Pop W1, clearTimeout, resolve W1 with C1.acquire(100ms)→ waits, timer fires, reject withACQUIRE_TIMEOUT. Waiter removed from queue.release(C2)with C2.healthy = false → C2.close(), inUse loses C2, free unchanged.
Thread Safety
- Java: single
ReentrantLockaround acquire/release plus aCondition. Operations are microseconds, so granularity is fine. - Careful: timeout + signal race. Standard idiom: after timeout expires, re-check state under the lock before failing — another thread may have assigned you a connection.
- Leak detection: track acquire timestamp; if a connection is inUse too long, log and optionally kill it.
- In Node.js, the single event loop gives you free serialisation — the above code is already "thread-safe" in that sense. For real multi-threading in Node (worker threads), use
Atomicsor a dedicated lock library.
Complete Code Implementation
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
interface Connection { String id(); boolean healthy(); void close(); }
interface ConnectionFactory {
Connection create();
boolean validate(Connection c);
}
public class ConnectionPool {
private final ConnectionFactory factory;
private final int max;
private final Deque<Connection> free = new ArrayDeque<>();
private final Set<Connection> inUse = new HashSet<>();
private final Deque<Long> waiterIds = new ArrayDeque<>();
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private volatile boolean draining = false;
public ConnectionPool(ConnectionFactory f, int max) { this.factory = f; this.max = max; }
public Connection acquire(long timeoutMs) throws InterruptedException, TimeoutException {
long deadline = System.nanoTime() + timeoutMs * 1_000_000L;
lock.lock();
try {
while (true) {
if (draining) throw new IllegalStateException("POOL_DRAINING");
if (!free.isEmpty()) {
Connection c = free.pop();
if (factory.validate(c)) { inUse.add(c); return c; }
c.close(); // validation failed; drop and retry
continue;
}
if (inUse.size() + free.size() < max) {
Connection c = factory.create();
inUse.add(c);
return c;
}
long remaining = deadline - System.nanoTime();
if (remaining <= 0) throw new TimeoutException("ACQUIRE_TIMEOUT");
notEmpty.awaitNanos(remaining);
}
} finally { lock.unlock(); }
}
public void release(Connection c) {
lock.lock();
try {
inUse.remove(c);
if (draining || !c.healthy()) { c.close(); return; }
free.push(c);
notEmpty.signal();
} finally { lock.unlock(); }
}
public void drain() {
lock.lock();
try {
draining = true;
for (Connection c : free) c.close();
free.clear();
notEmpty.signalAll(); // wake waiters so they see draining
} finally { lock.unlock(); }
}
}#include <chrono>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <unordered_set>
struct Connection { virtual ~Connection() = default; virtual bool healthy() const = 0; virtual void close() = 0; };
struct ConnectionFactory {
virtual std::shared_ptr<Connection> create() = 0;
virtual bool validate(Connection& c) = 0;
};
class ConnectionPool {
public:
ConnectionPool(std::shared_ptr<ConnectionFactory> f, int max_) : factory(std::move(f)), maxSize(max_) {}
std::shared_ptr<Connection> acquire(int timeoutMs) {
auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeoutMs);
std::unique_lock<std::mutex> lk(mu);
while (true) {
if (draining) throw std::runtime_error("POOL_DRAINING");
if (!free.empty()) {
auto c = free.front(); free.pop_front();
if (factory->validate(*c)) { inUse.insert(c); return c; }
c->close();
continue;
}
if ((int)(inUse.size() + free.size()) < maxSize) {
auto c = factory->create();
inUse.insert(c);
return c;
}
if (notEmpty.wait_until(lk, deadline) == std::cv_status::timeout)
throw std::runtime_error("ACQUIRE_TIMEOUT");
}
}
void release(std::shared_ptr<Connection> c) {
std::lock_guard<std::mutex> lk(mu);
inUse.erase(c);
if (draining || !c->healthy()) { c->close(); return; }
free.push_front(c);
notEmpty.notify_one();
}
void drain() {
std::lock_guard<std::mutex> lk(mu);
draining = true;
for (auto& c : free) c->close();
free.clear();
notEmpty.notify_all();
}
private:
std::shared_ptr<ConnectionFactory> factory;
int maxSize;
std::deque<std::shared_ptr<Connection>> free;
std::unordered_set<std::shared_ptr<Connection>> inUse;
std::mutex mu;
std::condition_variable notEmpty;
bool draining = false;
};interface Connection { id: string; healthy: boolean; close(): Promise<void>; }
interface ConnectionFactory { create(): Promise<Connection>; validate(c: Connection): Promise<boolean>; }
type Waiter = {
resolve: (c: Connection) => void;
reject: (e: Error) => void;
timer: ReturnType<typeof setTimeout>;
};
class ConnectionPool {
private free: Connection[] = [];
private inUse = new Set<Connection>();
private waiters: Waiter[] = [];
private draining = false;
constructor(
private factory: ConnectionFactory,
private max: number,
) {}
async acquire(timeoutMs: number): Promise<Connection> {
if (this.draining) throw new Error("POOL_DRAINING");
const reused = this.free.pop();
if (reused && (await this.factory.validate(reused))) {
this.inUse.add(reused);
return reused;
}
if (this.inUse.size + this.free.length < this.max) {
const conn = await this.factory.create();
this.inUse.add(conn);
return conn;
}
return new Promise<Connection>((resolve, reject) => {
const timer = setTimeout(() => {
this.waiters = this.waiters.filter((w) => w.resolve !== resolve);
reject(new Error("ACQUIRE_TIMEOUT"));
}, timeoutMs);
this.waiters.push({ resolve, reject, timer });
});
}
release(conn: Connection): void {
this.inUse.delete(conn);
if (this.draining || !conn.healthy) { void conn.close(); return; }
const next = this.waiters.shift();
if (next) {
clearTimeout(next.timer);
this.inUse.add(conn);
next.resolve(conn);
return;
}
this.free.push(conn);
}
async drain(): Promise<void> {
this.draining = true;
for (const w of this.waiters) { clearTimeout(w.timer); w.reject(new Error("POOL_DRAINING")); }
this.waiters = [];
await Promise.all([...this.free, ...this.inUse].map((c) => c.close()));
this.free = []; this.inUse.clear();
}
}Extensibility
1. "Idle connection reaper"
A background task that wakes every T seconds, scans
free, and closes anything idle longer than the idle threshold. Recreates on next acquire.
setInterval(() => {
const now = Date.now();
this.free = this.free.filter((c) => {
if (now - (c as any).lastReleased > IDLE_MAX_MS) { c.close(); return false; }
return true;
});
}, 30_000);2. "Dynamic resize"
setMax(n). If shrinking, do not close in-use connections; instead stop placing released ones back and close them on return. If growing, wake waiters that fit in the new headroom.
3. "Observability"
Expose counters:
acquireCount,acquireTimeoutCount,avgAcquireMs,inUseGauge. Essential for tuningmaxin production.
What is Expected at Each Level
| Level | Expectations |
|---|---|
| Mid | Pool and release, no timeouts, no health check. |
| Senior / SMTS | Full waiter queue with timeouts and signal semantics, health check, drain. |
| Staff | Draining under load, leak detection, metrics, tracing hooks, dynamic resize. |