Building Resilient Distributed Systems with Event Sourcing
Featured
1/25/2024
18 min read
Tridip Dutta
Architecture

Building Resilient Distributed Systems with Event Sourcing

Learn how to design fault-tolerant distributed systems using event sourcing patterns, CQRS, and eventual consistency for scalable architectures.

Event Sourcing
CQRS
Distributed Systems
Architecture

Building Resilient Distributed Systems with Event Sourcing

As systems grow in complexity and scale, traditional CRUD-based architectures often struggle with consistency, auditability, and scalability challenges. Event sourcing offers a powerful alternative approach that treats events as the source of truth, enabling more resilient and scalable distributed systems. This comprehensive guide explores how to implement event sourcing patterns effectively.

Understanding Event Sourcing

Event sourcing is an architectural pattern where state changes are stored as a sequence of events rather than just the current state. Instead of updating records in place, every change is captured as an immutable event.

Key Concepts:

  • Events: Immutable facts about what happened
  • Event Store: Persistent storage for events
  • Aggregates: Domain entities that produce events
  • Projections: Read models built from events
  • Event Streams: Ordered sequences of events

Traditional vs Event Sourcing:

// Traditional CRUD
const user = await User.findById(userId);
user.email = newEmail;
await user.save(); // Previous state is lost

// Event Sourcing
const event = new UserEmailChanged(userId, newEmail, timestamp);
await eventStore.append(userId, event); // All history preserved

Core Components of Event Sourcing

1. Events

Events represent facts that have occurred in your domain:

// Base Event class
class DomainEvent {
  constructor(aggregateId, eventType, data, metadata = {}) {
    this.aggregateId = aggregateId;
    this.eventType = eventType;
    this.data = data;
    this.metadata = {
      ...metadata,
      timestamp: new Date().toISOString(),
      eventId: generateUUID(),
      version: 1
    };
  }
}

// Specific domain events
class UserRegistered extends DomainEvent {
  constructor(userId, email, name) {
    super(userId, 'UserRegistered', { email, name });
  }
}

class UserEmailChanged extends DomainEvent {
  constructor(userId, oldEmail, newEmail) {
    super(userId, 'UserEmailChanged', { oldEmail, newEmail });
  }
}

class OrderPlaced extends DomainEvent {
  constructor(orderId, customerId, items, total) {
    super(orderId, 'OrderPlaced', { customerId, items, total });
  }
}

2. Event Store

The event store is the central component that persists events:

class EventStore {
  constructor(database) {
    this.db = database;
  }

  async appendEvents(streamId, events, expectedVersion = -1) {
    const transaction = await this.db.beginTransaction();
    
    try {
      // Optimistic concurrency control
      const currentVersion = await this.getCurrentVersion(streamId);
      if (expectedVersion !== -1 && currentVersion !== expectedVersion) {
        throw new ConcurrencyError('Stream version mismatch');
      }

      // Append events
      for (let i = 0; i < events.length; i++) {
        const event = {
          streamId,
          eventType: events[i].eventType,
          eventData: JSON.stringify(events[i].data),
          metadata: JSON.stringify(events[i].metadata),
          version: currentVersion + i + 1,
          timestamp: new Date()
        };
        
        await this.db.query(
          'INSERT INTO events (stream_id, event_type, event_data, metadata, version, timestamp) VALUES (?, ?, ?, ?, ?, ?)',
          [event.streamId, event.eventType, event.eventData, event.metadata, event.version, event.timestamp]
        );
      }

      await transaction.commit();
      
      // Publish events for projections
      await this.publishEvents(events);
      
      return currentVersion + events.length;
    } catch (error) {
      await transaction.rollback();
      throw error;
    }
  }

  async getEvents(streamId, fromVersion = 0) {
    const rows = await this.db.query(
      'SELECT * FROM events WHERE stream_id = ? AND version > ? ORDER BY version',
      [streamId, fromVersion]
    );

    return rows.map(row => ({
      eventType: row.event_type,
      data: JSON.parse(row.event_data),
      metadata: JSON.parse(row.metadata),
      version: row.version,
      timestamp: row.timestamp
    }));
  }

  async getCurrentVersion(streamId) {
    const result = await this.db.query(
      'SELECT MAX(version) as version FROM events WHERE stream_id = ?',
      [streamId]
    );
    return result[0]?.version || 0;
  }

  async publishEvents(events) {
    // Publish to message bus for projections and other services
    for (const event of events) {
      await this.messageBus.publish(event.eventType, event);
    }
  }
}

3. Aggregates

Aggregates are domain entities that encapsulate business logic and produce events:

class User {
  constructor(id) {
    this.id = id;
    this.version = 0;
    this.uncommittedEvents = [];
    this.email = null;
    this.name = null;
    this.isActive = false;
  }

  // Command methods
  static register(userId, email, name) {
    const user = new User(userId);
    user.applyEvent(new UserRegistered(userId, email, name));
    return user;
  }

  changeEmail(newEmail) {
    if (!this.isActive) {
      throw new Error('Cannot change email for inactive user');
    }
    
    if (this.email === newEmail) {
      return; // No change needed
    }

    this.applyEvent(new UserEmailChanged(this.id, this.email, newEmail));
  }

  deactivate() {
    if (!this.isActive) {
      throw new Error('User is already inactive');
    }

    this.applyEvent(new UserDeactivated(this.id));
  }

  // Event application
  applyEvent(event) {
    this.when(event);
    this.uncommittedEvents.push(event);
    this.version++;
  }

  when(event) {
    switch (event.eventType) {
      case 'UserRegistered':
        this.email = event.data.email;
        this.name = event.data.name;
        this.isActive = true;
        break;
      
      case 'UserEmailChanged':
        this.email = event.data.newEmail;
        break;
      
      case 'UserDeactivated':
        this.isActive = false;
        break;
    }
  }

  // Hydrate from events
  static fromEvents(events) {
    const user = new User(events[0].aggregateId);
    
    events.forEach(event => {
      user.when(event);
      user.version++;
    });
    
    return user;
  }

  getUncommittedEvents() {
    return this.uncommittedEvents;
  }

  markEventsAsCommitted() {
    this.uncommittedEvents = [];
  }
}

CQRS (Command Query Responsibility Segregation)

CQRS separates read and write operations, often used alongside event sourcing:

Command Side (Write Model)

class UserCommandHandler {
  constructor(eventStore, userRepository) {
    this.eventStore = eventStore;
    this.userRepository = userRepository;
  }

  async handle(command) {
    switch (command.type) {
      case 'RegisterUser':
        return await this.handleRegisterUser(command);
      
      case 'ChangeUserEmail':
        return await this.handleChangeUserEmail(command);
      
      default:
        throw new Error(`Unknown command type: ${command.type}`);
    }
  }

  async handleRegisterUser(command) {
    const { userId, email, name } = command.data;
    
    // Business logic validation
    const existingUser = await this.userRepository.findByEmail(email);
    if (existingUser) {
      throw new Error('Email already registered');
    }

    const user = User.register(userId, email, name);
    await this.userRepository.save(user);
    
    return { success: true, userId };
  }

  async handleChangeUserEmail(command) {
    const { userId, newEmail } = command.data;
    
    const user = await this.userRepository.findById(userId);
    if (!user) {
      throw new Error('User not found');
    }

    user.changeEmail(newEmail);
    await this.userRepository.save(user);
    
    return { success: true };
  }
}

class UserRepository {
  constructor(eventStore) {
    this.eventStore = eventStore;
  }

  async findById(userId) {
    const events = await this.eventStore.getEvents(userId);
    if (events.length === 0) {
      return null;
    }
    return User.fromEvents(events);
  }

  async save(user) {
    const uncommittedEvents = user.getUncommittedEvents();
    if (uncommittedEvents.length === 0) {
      return;
    }

    await this.eventStore.appendEvents(
      user.id,
      uncommittedEvents,
      user.version - uncommittedEvents.length
    );
    
    user.markEventsAsCommitted();
  }
}

Query Side (Read Model)

class UserProjection {
  constructor(database) {
    this.db = database;
  }

  async handleEvent(event) {
    switch (event.eventType) {
      case 'UserRegistered':
        await this.handleUserRegistered(event);
        break;
      
      case 'UserEmailChanged':
        await this.handleUserEmailChanged(event);
        break;
      
      case 'UserDeactivated':
        await this.handleUserDeactivated(event);
        break;
    }
  }

  async handleUserRegistered(event) {
    await this.db.query(
      'INSERT INTO user_projections (id, email, name, is_active, created_at) VALUES (?, ?, ?, ?, ?)',
      [event.aggregateId, event.data.email, event.data.name, true, event.metadata.timestamp]
    );
  }

  async handleUserEmailChanged(event) {
    await this.db.query(
      'UPDATE user_projections SET email = ?, updated_at = ? WHERE id = ?',
      [event.data.newEmail, event.metadata.timestamp, event.aggregateId]
    );
  }

  async handleUserDeactivated(event) {
    await this.db.query(
      'UPDATE user_projections SET is_active = ?, updated_at = ? WHERE id = ?',
      [false, event.metadata.timestamp, event.aggregateId]
    );
  }
}

class UserQueryService {
  constructor(database) {
    this.db = database;
  }

  async findById(userId) {
    const result = await this.db.query(
      'SELECT * FROM user_projections WHERE id = ?',
      [userId]
    );
    return result[0] || null;
  }

  async findByEmail(email) {
    const result = await this.db.query(
      'SELECT * FROM user_projections WHERE email = ?',
      [email]
    );
    return result[0] || null;
  }

  async findActiveUsers(limit = 100, offset = 0) {
    return await this.db.query(
      'SELECT * FROM user_projections WHERE is_active = true LIMIT ? OFFSET ?',
      [limit, offset]
    );
  }
}

Handling Complex Business Processes

Sagas for Long-Running Processes

class OrderProcessSaga {
  constructor(commandBus, eventBus) {
    this.commandBus = commandBus;
    this.eventBus = eventBus;
    this.state = new Map();
  }

  async handleEvent(event) {
    switch (event.eventType) {
      case 'OrderPlaced':
        await this.handleOrderPlaced(event);
        break;
      
      case 'PaymentProcessed':
        await this.handlePaymentProcessed(event);
        break;
      
      case 'PaymentFailed':
        await this.handlePaymentFailed(event);
        break;
      
      case 'InventoryReserved':
        await this.handleInventoryReserved(event);
        break;
      
      case 'InventoryReservationFailed':
        await this.handleInventoryReservationFailed(event);
        break;
    }
  }

  async handleOrderPlaced(event) {
    const { orderId, customerId, items, total } = event.data;
    
    // Store saga state
    this.state.set(orderId, {
      orderId,
      customerId,
      items,
      total,
      status: 'processing',
      steps: ['payment_pending', 'inventory_pending']
    });

    // Start payment process
    await this.commandBus.send({
      type: 'ProcessPayment',
      data: { orderId, customerId, amount: total }
    });

    // Reserve inventory
    await this.commandBus.send({
      type: 'ReserveInventory',
      data: { orderId, items }
    });
  }

  async handlePaymentProcessed(event) {
    const sagaState = this.state.get(event.data.orderId);
    if (!sagaState) return;

    sagaState.steps = sagaState.steps.filter(step => step !== 'payment_pending');
    
    if (sagaState.steps.length === 0) {
      await this.completeOrder(event.data.orderId);
    }
  }

  async handlePaymentFailed(event) {
    const sagaState = this.state.get(event.data.orderId);
    if (!sagaState) return;

    // Compensate - cancel inventory reservation
    await this.commandBus.send({
      type: 'CancelInventoryReservation',
      data: { orderId: event.data.orderId }
    });

    // Cancel order
    await this.commandBus.send({
      type: 'CancelOrder',
      data: { orderId: event.data.orderId, reason: 'Payment failed' }
    });

    this.state.delete(event.data.orderId);
  }

  async completeOrder(orderId) {
    await this.commandBus.send({
      type: 'CompleteOrder',
      data: { orderId }
    });

    this.state.delete(orderId);
  }
}

Event Store Implementation Patterns

Snapshotting for Performance

class SnapshotStore {
  constructor(database) {
    this.db = database;
  }

  async saveSnapshot(aggregateId, snapshot, version) {
    await this.db.query(
      'INSERT OR REPLACE INTO snapshots (aggregate_id, snapshot_data, version, created_at) VALUES (?, ?, ?, ?)',
      [aggregateId, JSON.stringify(snapshot), version, new Date()]
    );
  }

  async getSnapshot(aggregateId) {
    const result = await this.db.query(
      'SELECT * FROM snapshots WHERE aggregate_id = ? ORDER BY version DESC LIMIT 1',
      [aggregateId]
    );

    if (result.length === 0) {
      return null;
    }

    return {
      data: JSON.parse(result[0].snapshot_data),
      version: result[0].version
    };
  }
}

class OptimizedUserRepository {
  constructor(eventStore, snapshotStore) {
    this.eventStore = eventStore;
    this.snapshotStore = snapshotStore;
    this.snapshotFrequency = 10; // Snapshot every 10 events
  }

  async findById(userId) {
    // Try to get snapshot first
    const snapshot = await this.snapshotStore.getSnapshot(userId);
    
    let user;
    let fromVersion = 0;

    if (snapshot) {
      user = User.fromSnapshot(snapshot.data);
      fromVersion = snapshot.version;
    } else {
      user = new User(userId);
    }

    // Get events since snapshot
    const events = await this.eventStore.getEvents(userId, fromVersion);
    
    events.forEach(event => {
      user.when(event);
      user.version++;
    });

    return user;
  }

  async save(user) {
    const uncommittedEvents = user.getUncommittedEvents();
    if (uncommittedEvents.length === 0) {
      return;
    }

    await this.eventStore.appendEvents(
      user.id,
      uncommittedEvents,
      user.version - uncommittedEvents.length
    );

    // Create snapshot if needed
    if (user.version % this.snapshotFrequency === 0) {
      await this.snapshotStore.saveSnapshot(
        user.id,
        user.toSnapshot(),
        user.version
      );
    }

    user.markEventsAsCommitted();
  }
}

Handling Distributed Challenges

Event Ordering and Causality

class CausalityTracker {
  constructor() {
    this.vectorClock = new Map();
  }

  generateEventId(nodeId) {
    const currentTime = this.vectorClock.get(nodeId) || 0;
    const newTime = currentTime + 1;
    this.vectorClock.set(nodeId, newTime);
    
    return {
      nodeId,
      timestamp: newTime,
      vectorClock: new Map(this.vectorClock)
    };
  }

  updateClock(eventId) {
    // Update vector clock based on received event
    for (const [nodeId, timestamp] of eventId.vectorClock) {
      const currentTime = this.vectorClock.get(nodeId) || 0;
      this.vectorClock.set(nodeId, Math.max(currentTime, timestamp));
    }
  }

  happensBefore(eventA, eventB) {
    // Check if eventA happens before eventB
    for (const [nodeId, timestamp] of eventA.vectorClock) {
      const timestampB = eventB.vectorClock.get(nodeId) || 0;
      if (timestamp > timestampB) {
        return false;
      }
    }
    return true;
  }
}

Event Deduplication

class EventDeduplicator {
  constructor(database) {
    this.db = database;
  }

  async isDuplicate(eventId) {
    const result = await this.db.query(
      'SELECT 1 FROM processed_events WHERE event_id = ?',
      [eventId]
    );
    return result.length > 0;
  }

  async markAsProcessed(eventId) {
    await this.db.query(
      'INSERT OR IGNORE INTO processed_events (event_id, processed_at) VALUES (?, ?)',
      [eventId, new Date()]
    );
  }

  async processEvent(event, handler) {
    if (await this.isDuplicate(event.metadata.eventId)) {
      return; // Already processed
    }

    await handler(event);
    await this.markAsProcessed(event.metadata.eventId);
  }
}

Testing Event Sourced Systems

Unit Testing Aggregates

describe('User Aggregate', () => {
  test('should register new user', () => {
    const user = User.register('user-123', 'test@example.com', 'John Doe');
    
    const events = user.getUncommittedEvents();
    expect(events).toHaveLength(1);
    expect(events[0].eventType).toBe('UserRegistered');
    expect(events[0].data.email).toBe('test@example.com');
    expect(user.email).toBe('test@example.com');
    expect(user.isActive).toBe(true);
  });

  test('should change email for active user', () => {
    const user = User.register('user-123', 'old@example.com', 'John Doe');
    user.markEventsAsCommitted(); // Clear uncommitted events
    
    user.changeEmail('new@example.com');
    
    const events = user.getUncommittedEvents();
    expect(events).toHaveLength(1);
    expect(events[0].eventType).toBe('UserEmailChanged');
    expect(events[0].data.newEmail).toBe('new@example.com');
    expect(user.email).toBe('new@example.com');
  });

  test('should not change email for inactive user', () => {
    const user = User.register('user-123', 'test@example.com', 'John Doe');
    user.deactivate();
    
    expect(() => {
      user.changeEmail('new@example.com');
    }).toThrow('Cannot change email for inactive user');
  });
});

Integration Testing

describe('User Command Handler Integration', () => {
  let eventStore;
  let userRepository;
  let commandHandler;

  beforeEach(() => {
    eventStore = new InMemoryEventStore();
    userRepository = new UserRepository(eventStore);
    commandHandler = new UserCommandHandler(eventStore, userRepository);
  });

  test('should handle register user command', async () => {
    const command = {
      type: 'RegisterUser',
      data: {
        userId: 'user-123',
        email: 'test@example.com',
        name: 'John Doe'
      }
    };

    const result = await commandHandler.handle(command);
    
    expect(result.success).toBe(true);
    expect(result.userId).toBe('user-123');

    // Verify events were stored
    const events = await eventStore.getEvents('user-123');
    expect(events).toHaveLength(1);
    expect(events[0].eventType).toBe('UserRegistered');
  });
});

Conclusion

Event sourcing provides a powerful foundation for building resilient distributed systems. By treating events as the source of truth, you gain complete auditability, temporal queries, and the ability to rebuild state from any point in time.

Key benefits include:

  • Complete audit trail of all changes
  • Temporal queries and time travel debugging
  • Scalable read models through CQRS
  • Resilient architecture with eventual consistency
  • Business insight through event analytics

However, event sourcing also introduces complexity in terms of eventual consistency, event schema evolution, and operational overhead. Consider your specific requirements and team expertise when deciding whether to adopt this pattern.

Start with simple aggregates and gradually build complexity as you gain experience with the patterns and tools.

Resources


Event sourcing transforms how we think about data and state management. By embracing events as first-class citizens, we can build systems that are more resilient, auditable, and aligned with business processes.

TD

About Tridip Dutta

Creative Developer passionate about creating innovative digital experiences and exploring AI. I love sharing knowledge to help developers build better apps.