
Building Resilient Distributed Systems with Event Sourcing
Learn how to design fault-tolerant distributed systems using event sourcing patterns, CQRS, and eventual consistency for scalable architectures.
Building Resilient Distributed Systems with Event Sourcing
As systems grow in complexity and scale, traditional CRUD-based architectures often struggle with consistency, auditability, and scalability challenges. Event sourcing offers a powerful alternative approach that treats events as the source of truth, enabling more resilient and scalable distributed systems. This comprehensive guide explores how to implement event sourcing patterns effectively.
Understanding Event Sourcing
Event sourcing is an architectural pattern where state changes are stored as a sequence of events rather than just the current state. Instead of updating records in place, every change is captured as an immutable event.
Key Concepts:
- Events: Immutable facts about what happened
- Event Store: Persistent storage for events
- Aggregates: Domain entities that produce events
- Projections: Read models built from events
- Event Streams: Ordered sequences of events
Traditional vs Event Sourcing:
// Traditional CRUD
const user = await User.findById(userId);
user.email = newEmail;
await user.save(); // Previous state is lost
// Event Sourcing
const event = new UserEmailChanged(userId, newEmail, timestamp);
await eventStore.append(userId, event); // All history preserved
Core Components of Event Sourcing
1. Events
Events represent facts that have occurred in your domain:
// Base Event class
class DomainEvent {
constructor(aggregateId, eventType, data, metadata = {}) {
this.aggregateId = aggregateId;
this.eventType = eventType;
this.data = data;
this.metadata = {
...metadata,
timestamp: new Date().toISOString(),
eventId: generateUUID(),
version: 1
};
}
}
// Specific domain events
class UserRegistered extends DomainEvent {
constructor(userId, email, name) {
super(userId, 'UserRegistered', { email, name });
}
}
class UserEmailChanged extends DomainEvent {
constructor(userId, oldEmail, newEmail) {
super(userId, 'UserEmailChanged', { oldEmail, newEmail });
}
}
class OrderPlaced extends DomainEvent {
constructor(orderId, customerId, items, total) {
super(orderId, 'OrderPlaced', { customerId, items, total });
}
}
2. Event Store
The event store is the central component that persists events:
class EventStore {
constructor(database) {
this.db = database;
}
async appendEvents(streamId, events, expectedVersion = -1) {
const transaction = await this.db.beginTransaction();
try {
// Optimistic concurrency control
const currentVersion = await this.getCurrentVersion(streamId);
if (expectedVersion !== -1 && currentVersion !== expectedVersion) {
throw new ConcurrencyError('Stream version mismatch');
}
// Append events
for (let i = 0; i < events.length; i++) {
const event = {
streamId,
eventType: events[i].eventType,
eventData: JSON.stringify(events[i].data),
metadata: JSON.stringify(events[i].metadata),
version: currentVersion + i + 1,
timestamp: new Date()
};
await this.db.query(
'INSERT INTO events (stream_id, event_type, event_data, metadata, version, timestamp) VALUES (?, ?, ?, ?, ?, ?)',
[event.streamId, event.eventType, event.eventData, event.metadata, event.version, event.timestamp]
);
}
await transaction.commit();
// Publish events for projections
await this.publishEvents(events);
return currentVersion + events.length;
} catch (error) {
await transaction.rollback();
throw error;
}
}
async getEvents(streamId, fromVersion = 0) {
const rows = await this.db.query(
'SELECT * FROM events WHERE stream_id = ? AND version > ? ORDER BY version',
[streamId, fromVersion]
);
return rows.map(row => ({
eventType: row.event_type,
data: JSON.parse(row.event_data),
metadata: JSON.parse(row.metadata),
version: row.version,
timestamp: row.timestamp
}));
}
async getCurrentVersion(streamId) {
const result = await this.db.query(
'SELECT MAX(version) as version FROM events WHERE stream_id = ?',
[streamId]
);
return result[0]?.version || 0;
}
async publishEvents(events) {
// Publish to message bus for projections and other services
for (const event of events) {
await this.messageBus.publish(event.eventType, event);
}
}
}
3. Aggregates
Aggregates are domain entities that encapsulate business logic and produce events:
class User {
constructor(id) {
this.id = id;
this.version = 0;
this.uncommittedEvents = [];
this.email = null;
this.name = null;
this.isActive = false;
}
// Command methods
static register(userId, email, name) {
const user = new User(userId);
user.applyEvent(new UserRegistered(userId, email, name));
return user;
}
changeEmail(newEmail) {
if (!this.isActive) {
throw new Error('Cannot change email for inactive user');
}
if (this.email === newEmail) {
return; // No change needed
}
this.applyEvent(new UserEmailChanged(this.id, this.email, newEmail));
}
deactivate() {
if (!this.isActive) {
throw new Error('User is already inactive');
}
this.applyEvent(new UserDeactivated(this.id));
}
// Event application
applyEvent(event) {
this.when(event);
this.uncommittedEvents.push(event);
this.version++;
}
when(event) {
switch (event.eventType) {
case 'UserRegistered':
this.email = event.data.email;
this.name = event.data.name;
this.isActive = true;
break;
case 'UserEmailChanged':
this.email = event.data.newEmail;
break;
case 'UserDeactivated':
this.isActive = false;
break;
}
}
// Hydrate from events
static fromEvents(events) {
const user = new User(events[0].aggregateId);
events.forEach(event => {
user.when(event);
user.version++;
});
return user;
}
getUncommittedEvents() {
return this.uncommittedEvents;
}
markEventsAsCommitted() {
this.uncommittedEvents = [];
}
}
CQRS (Command Query Responsibility Segregation)
CQRS separates read and write operations, often used alongside event sourcing:
Command Side (Write Model)
class UserCommandHandler {
constructor(eventStore, userRepository) {
this.eventStore = eventStore;
this.userRepository = userRepository;
}
async handle(command) {
switch (command.type) {
case 'RegisterUser':
return await this.handleRegisterUser(command);
case 'ChangeUserEmail':
return await this.handleChangeUserEmail(command);
default:
throw new Error(`Unknown command type: ${command.type}`);
}
}
async handleRegisterUser(command) {
const { userId, email, name } = command.data;
// Business logic validation
const existingUser = await this.userRepository.findByEmail(email);
if (existingUser) {
throw new Error('Email already registered');
}
const user = User.register(userId, email, name);
await this.userRepository.save(user);
return { success: true, userId };
}
async handleChangeUserEmail(command) {
const { userId, newEmail } = command.data;
const user = await this.userRepository.findById(userId);
if (!user) {
throw new Error('User not found');
}
user.changeEmail(newEmail);
await this.userRepository.save(user);
return { success: true };
}
}
class UserRepository {
constructor(eventStore) {
this.eventStore = eventStore;
}
async findById(userId) {
const events = await this.eventStore.getEvents(userId);
if (events.length === 0) {
return null;
}
return User.fromEvents(events);
}
async save(user) {
const uncommittedEvents = user.getUncommittedEvents();
if (uncommittedEvents.length === 0) {
return;
}
await this.eventStore.appendEvents(
user.id,
uncommittedEvents,
user.version - uncommittedEvents.length
);
user.markEventsAsCommitted();
}
}
Query Side (Read Model)
class UserProjection {
constructor(database) {
this.db = database;
}
async handleEvent(event) {
switch (event.eventType) {
case 'UserRegistered':
await this.handleUserRegistered(event);
break;
case 'UserEmailChanged':
await this.handleUserEmailChanged(event);
break;
case 'UserDeactivated':
await this.handleUserDeactivated(event);
break;
}
}
async handleUserRegistered(event) {
await this.db.query(
'INSERT INTO user_projections (id, email, name, is_active, created_at) VALUES (?, ?, ?, ?, ?)',
[event.aggregateId, event.data.email, event.data.name, true, event.metadata.timestamp]
);
}
async handleUserEmailChanged(event) {
await this.db.query(
'UPDATE user_projections SET email = ?, updated_at = ? WHERE id = ?',
[event.data.newEmail, event.metadata.timestamp, event.aggregateId]
);
}
async handleUserDeactivated(event) {
await this.db.query(
'UPDATE user_projections SET is_active = ?, updated_at = ? WHERE id = ?',
[false, event.metadata.timestamp, event.aggregateId]
);
}
}
class UserQueryService {
constructor(database) {
this.db = database;
}
async findById(userId) {
const result = await this.db.query(
'SELECT * FROM user_projections WHERE id = ?',
[userId]
);
return result[0] || null;
}
async findByEmail(email) {
const result = await this.db.query(
'SELECT * FROM user_projections WHERE email = ?',
[email]
);
return result[0] || null;
}
async findActiveUsers(limit = 100, offset = 0) {
return await this.db.query(
'SELECT * FROM user_projections WHERE is_active = true LIMIT ? OFFSET ?',
[limit, offset]
);
}
}
Handling Complex Business Processes
Sagas for Long-Running Processes
class OrderProcessSaga {
constructor(commandBus, eventBus) {
this.commandBus = commandBus;
this.eventBus = eventBus;
this.state = new Map();
}
async handleEvent(event) {
switch (event.eventType) {
case 'OrderPlaced':
await this.handleOrderPlaced(event);
break;
case 'PaymentProcessed':
await this.handlePaymentProcessed(event);
break;
case 'PaymentFailed':
await this.handlePaymentFailed(event);
break;
case 'InventoryReserved':
await this.handleInventoryReserved(event);
break;
case 'InventoryReservationFailed':
await this.handleInventoryReservationFailed(event);
break;
}
}
async handleOrderPlaced(event) {
const { orderId, customerId, items, total } = event.data;
// Store saga state
this.state.set(orderId, {
orderId,
customerId,
items,
total,
status: 'processing',
steps: ['payment_pending', 'inventory_pending']
});
// Start payment process
await this.commandBus.send({
type: 'ProcessPayment',
data: { orderId, customerId, amount: total }
});
// Reserve inventory
await this.commandBus.send({
type: 'ReserveInventory',
data: { orderId, items }
});
}
async handlePaymentProcessed(event) {
const sagaState = this.state.get(event.data.orderId);
if (!sagaState) return;
sagaState.steps = sagaState.steps.filter(step => step !== 'payment_pending');
if (sagaState.steps.length === 0) {
await this.completeOrder(event.data.orderId);
}
}
async handlePaymentFailed(event) {
const sagaState = this.state.get(event.data.orderId);
if (!sagaState) return;
// Compensate - cancel inventory reservation
await this.commandBus.send({
type: 'CancelInventoryReservation',
data: { orderId: event.data.orderId }
});
// Cancel order
await this.commandBus.send({
type: 'CancelOrder',
data: { orderId: event.data.orderId, reason: 'Payment failed' }
});
this.state.delete(event.data.orderId);
}
async completeOrder(orderId) {
await this.commandBus.send({
type: 'CompleteOrder',
data: { orderId }
});
this.state.delete(orderId);
}
}
Event Store Implementation Patterns
Snapshotting for Performance
class SnapshotStore {
constructor(database) {
this.db = database;
}
async saveSnapshot(aggregateId, snapshot, version) {
await this.db.query(
'INSERT OR REPLACE INTO snapshots (aggregate_id, snapshot_data, version, created_at) VALUES (?, ?, ?, ?)',
[aggregateId, JSON.stringify(snapshot), version, new Date()]
);
}
async getSnapshot(aggregateId) {
const result = await this.db.query(
'SELECT * FROM snapshots WHERE aggregate_id = ? ORDER BY version DESC LIMIT 1',
[aggregateId]
);
if (result.length === 0) {
return null;
}
return {
data: JSON.parse(result[0].snapshot_data),
version: result[0].version
};
}
}
class OptimizedUserRepository {
constructor(eventStore, snapshotStore) {
this.eventStore = eventStore;
this.snapshotStore = snapshotStore;
this.snapshotFrequency = 10; // Snapshot every 10 events
}
async findById(userId) {
// Try to get snapshot first
const snapshot = await this.snapshotStore.getSnapshot(userId);
let user;
let fromVersion = 0;
if (snapshot) {
user = User.fromSnapshot(snapshot.data);
fromVersion = snapshot.version;
} else {
user = new User(userId);
}
// Get events since snapshot
const events = await this.eventStore.getEvents(userId, fromVersion);
events.forEach(event => {
user.when(event);
user.version++;
});
return user;
}
async save(user) {
const uncommittedEvents = user.getUncommittedEvents();
if (uncommittedEvents.length === 0) {
return;
}
await this.eventStore.appendEvents(
user.id,
uncommittedEvents,
user.version - uncommittedEvents.length
);
// Create snapshot if needed
if (user.version % this.snapshotFrequency === 0) {
await this.snapshotStore.saveSnapshot(
user.id,
user.toSnapshot(),
user.version
);
}
user.markEventsAsCommitted();
}
}
Handling Distributed Challenges
Event Ordering and Causality
class CausalityTracker {
constructor() {
this.vectorClock = new Map();
}
generateEventId(nodeId) {
const currentTime = this.vectorClock.get(nodeId) || 0;
const newTime = currentTime + 1;
this.vectorClock.set(nodeId, newTime);
return {
nodeId,
timestamp: newTime,
vectorClock: new Map(this.vectorClock)
};
}
updateClock(eventId) {
// Update vector clock based on received event
for (const [nodeId, timestamp] of eventId.vectorClock) {
const currentTime = this.vectorClock.get(nodeId) || 0;
this.vectorClock.set(nodeId, Math.max(currentTime, timestamp));
}
}
happensBefore(eventA, eventB) {
// Check if eventA happens before eventB
for (const [nodeId, timestamp] of eventA.vectorClock) {
const timestampB = eventB.vectorClock.get(nodeId) || 0;
if (timestamp > timestampB) {
return false;
}
}
return true;
}
}
Event Deduplication
class EventDeduplicator {
constructor(database) {
this.db = database;
}
async isDuplicate(eventId) {
const result = await this.db.query(
'SELECT 1 FROM processed_events WHERE event_id = ?',
[eventId]
);
return result.length > 0;
}
async markAsProcessed(eventId) {
await this.db.query(
'INSERT OR IGNORE INTO processed_events (event_id, processed_at) VALUES (?, ?)',
[eventId, new Date()]
);
}
async processEvent(event, handler) {
if (await this.isDuplicate(event.metadata.eventId)) {
return; // Already processed
}
await handler(event);
await this.markAsProcessed(event.metadata.eventId);
}
}
Testing Event Sourced Systems
Unit Testing Aggregates
describe('User Aggregate', () => {
test('should register new user', () => {
const user = User.register('user-123', 'test@example.com', 'John Doe');
const events = user.getUncommittedEvents();
expect(events).toHaveLength(1);
expect(events[0].eventType).toBe('UserRegistered');
expect(events[0].data.email).toBe('test@example.com');
expect(user.email).toBe('test@example.com');
expect(user.isActive).toBe(true);
});
test('should change email for active user', () => {
const user = User.register('user-123', 'old@example.com', 'John Doe');
user.markEventsAsCommitted(); // Clear uncommitted events
user.changeEmail('new@example.com');
const events = user.getUncommittedEvents();
expect(events).toHaveLength(1);
expect(events[0].eventType).toBe('UserEmailChanged');
expect(events[0].data.newEmail).toBe('new@example.com');
expect(user.email).toBe('new@example.com');
});
test('should not change email for inactive user', () => {
const user = User.register('user-123', 'test@example.com', 'John Doe');
user.deactivate();
expect(() => {
user.changeEmail('new@example.com');
}).toThrow('Cannot change email for inactive user');
});
});
Integration Testing
describe('User Command Handler Integration', () => {
let eventStore;
let userRepository;
let commandHandler;
beforeEach(() => {
eventStore = new InMemoryEventStore();
userRepository = new UserRepository(eventStore);
commandHandler = new UserCommandHandler(eventStore, userRepository);
});
test('should handle register user command', async () => {
const command = {
type: 'RegisterUser',
data: {
userId: 'user-123',
email: 'test@example.com',
name: 'John Doe'
}
};
const result = await commandHandler.handle(command);
expect(result.success).toBe(true);
expect(result.userId).toBe('user-123');
// Verify events were stored
const events = await eventStore.getEvents('user-123');
expect(events).toHaveLength(1);
expect(events[0].eventType).toBe('UserRegistered');
});
});
Conclusion
Event sourcing provides a powerful foundation for building resilient distributed systems. By treating events as the source of truth, you gain complete auditability, temporal queries, and the ability to rebuild state from any point in time.
Key benefits include:
- Complete audit trail of all changes
- Temporal queries and time travel debugging
- Scalable read models through CQRS
- Resilient architecture with eventual consistency
- Business insight through event analytics
However, event sourcing also introduces complexity in terms of eventual consistency, event schema evolution, and operational overhead. Consider your specific requirements and team expertise when deciding whether to adopt this pattern.
Start with simple aggregates and gradually build complexity as you gain experience with the patterns and tools.
Resources
- Event Store Documentation
- CQRS Journey by Microsoft
- Event Sourcing by Martin Fowler
- Versioning in an Event Sourced System
Event sourcing transforms how we think about data and state management. By embracing events as first-class citizens, we can build systems that are more resilient, auditable, and aligned with business processes.
About Tridip Dutta
Creative Developer passionate about creating innovative digital experiences and exploring AI. I love sharing knowledge to help developers build better apps.