ruby

Building Event-Sourced Ruby Systems: Complete Guide with PostgreSQL and Command Patterns

Discover practical Ruby techniques for building event-sourced systems with audit trails and temporal analysis. Learn event stores, concurrency, and projections. Perfect for financial apps.

Building Event-Sourced Ruby Systems: Complete Guide with PostgreSQL and Command Patterns

Building Event-Sourced Systems in Ruby

Event sourcing captures state changes as immutable records, providing an audit trail and enabling temporal analysis. I’ve found this approach particularly valuable for financial systems where transaction history matters. Let me share practical Ruby techniques I’ve implemented across multiple projects.

Event Store Foundations
A reliable event store needs atomic writes and version control. Here’s how I handle stream persistence:

class PostgresEventStore
  def append_to_stream(stream_name, events, expected_version: nil)
    connection.transaction do
      current_version = get_stream_version(stream_name)
      raise ConcurrencyError unless expected_version == current_version
      
      events.each_with_index do |event, index|
        insert_event(
          stream_name: stream_name,
          event_type: event.class.name,
          data: event.to_h,
          metadata: { causation_id: SecureRandom.uuid },
          version: current_version + index + 1
        )
      end
    end
  end

  private

  def insert_event(stream_name:, event_type:, data:, metadata:, version:)
    sql = <<~SQL
      INSERT INTO events 
        (stream, type, data, metadata, version) 
      VALUES ($1, $2, $3, $4, $5)
    SQL
    connection.exec_params(sql, [stream_name, event_type, data.to_json, metadata.to_json, version])
  end
end

# Usage
store = PostgresEventStore.new
events = [InvoiceCreated.new(invoice_id: "INV-001", amount: 500)]
store.append_to_stream("Invoice-INV-001", events, expected_version: 0)

This pattern ensures sequential event ordering using database transactions. The expected_version check prevents concurrent modifications. I’ve used this with PostgreSQL and Redis backends - both work well when you enforce strict versioning.

Command Validation Pipelines
Before processing commands, I validate them through transformation pipelines:

class CreateOrderCommand
  include ActiveModel::Validations
  attr_accessor :user_id, :items, :total
  
  validates :user_id, presence: true
  validates :items, length: { minimum: 1 }
  validate :total_matches_items
  
  def initialize(params)
    @user_id = params[:user_id]
    @items = params[:items]
    @total = params[:total]
  end
  
  private
  
  def total_matches_items
    calculated = items.sum(&:price)
    errors.add(:total, "doesn't match items") if total != calculated
  end
end

class CommandHandler
  def handle(command)
    return Failure(:invalid_command) unless command.valid?
    
    aggregate = load_aggregate(command.aggregate_id)
    events = aggregate.process(command)
    
    event_store.append(aggregate.id, events, aggregate.version)
    Success(:created)
  rescue ConcurrencyError
    Retry.new(delay: 200.ms)
  end
end

I layer validations: basic checks at command level, business rules in aggregates. This separation keeps core domain logic clean while catching issues early.

Projection Rebuilding
For fast state reconstruction, I implement memory-efficient projections:

class AccountBalanceProjection
  def initialize
    @balances = {}
  end
  
  def apply(event)
    case event
    when FundsDeposited
      @balances[event.account_id] ||= 0
      @balances[event.account_id] += event.amount
    when FundsWithdrawn
      @balances[event.account_id] -= event.amount
    end
  end
  
  def current_balance(account_id)
    @balances.fetch(account_id, 0)
  end
end

# Rebuild from scratch
projection = AccountBalanceProjection.new
event_store.read_all_events.each { |e| projection.apply(e) }

# Partial rebuild since checkpoint
last_seq = projection.checkpoint
events = event_store.read_events_after(last_seq)
events.each { |e| projection.apply(e) }

I include checkpoint markers to resume from specific positions. For large datasets, I parallelize processing across event partitions.

Concurrency Management
Optimistic locking prevents race conditions:

class InventoryItem
  def initialize(events)
    @version = 0
    @stock = 0
    events.each { |e| apply(e) }
  end
  
  def restock(quantity)
    raise ArgumentError if quantity <= 0
    [ItemRestocked.new(quantity: quantity)]
  end
  
  def apply(event)
    case event
    when ItemRestocked
      @stock += event.quantity
      @version += 1
    end
  end
end

# Handler usage
def handle_restock(cmd)
  events = event_store.load_stream(cmd.sku)
  item = InventoryItem.new(events)
  
  new_events = item.restock(cmd.quantity)
  event_store.append(cmd.sku, new_events, item.version)
end

The version counter increments with each applied event. When persisting, we verify the aggregate’s version matches the stream’s head position.

Event Version Migration
Schemas evolve - here’s how I handle legacy formats:

class AddressUpdated
  def self.upcast(old_event)
    new(
      customer_id: old_event.user_id,
      address: {
        street: old_event.street,
        city: old_event.city,
        # Added in v2
        country: old_event.country || "USA"
      }
    )
  end
end

class EventStore
  def load_stream(stream_id)
    raw_events = adapter.fetch_events(stream_id)
    
    raw_events.map do |record|
      event_class = Object.const_get(record.type)
      data = record.data
      
      # Transform legacy events
      if record.schema_version < 2
        data = event_class.upcast(data)
      end
      
      event_class.new(data)
    end
  end
end

I attach schema versions during persistence. When loading, outdated events pass through transformation methods before instantiation.

Snapshot Optimization
For aggregates with long histories, I periodically snapshot state:

class AccountSnapshot
  def self.from_aggregate(account)
    new(
      account_id: account.id,
      balance: account.balance,
      version: account.version
    )
  end
end

class AccountRepository
  def load(account_id)
    snapshot = snapshot_store.latest_snapshot(account_id)
    events = event_store.load_stream(account_id, from_version: snapshot.version + 1)
    
    account = Account.new(snapshot.account_id)
    account.restore_snapshot(snapshot)
    account.apply_events(events)
    account
  end
  
  def save(account)
    events = account.unpublished_events
    event_store.append(account.id, events, account.version - events.size)
    
    if account.version % 50 == 0
      snapshot = AccountSnapshot.from_aggregate(account)
      snapshot_store.save(snapshot)
    end
  end
end

Snapshots reduce replay overhead significantly. I typically trigger them after every 50 events or during low-traffic periods.

Read Model Synchronization
I use pub/sub for updating query-optimized views:

class ProjectionSubscriber
  def initialize(event_store, projections)
    @event_store = event_store
    @projections = projections
  end
  
  def start
    @thread = Thread.new do
      last_position = 0
      loop do
        events = @event_store.read_after(last_position, batch_size: 100)
        break if events.empty?
        
        events.each do |event|
          @projections.each { |p| p.process(event) }
          last_position = event.global_position
        end
      end
    end
  end
end

# ElasticSearch projection example
class ProductCatalogProjection
  def process(event)
    case event
    when ProductAdded
      elasticsearch.index(
        index: 'products',
        id: event.product_id,
        body: { name: event.name, price: event.price }
      )
    when ProductPriceChanged
      elasticsearch.update(
        index: 'products',
        id: event.product_id,
        body: { doc: { price: event.new_price } }
      )
    end
  end
end

Separate subscriptions allow independent update cadences. I’ve achieved sub-second read model updates using this pattern.

These techniques form a robust foundation for event-sourced systems. The immutability of events provides natural audit trails, while replay capabilities enable powerful diagnostics. I recommend starting with core business domains where history matters - financial operations or inventory management are excellent candidates. The initial complexity pays dividends in traceability and flexibility as requirements evolve.

Keywords: event sourcing ruby, ruby event store, event sourcing patterns, ruby cqrs, event driven architecture ruby, ruby aggregate patterns, ruby command handler, event store implementation, ruby event sourcing tutorial, building event sourced systems, ruby domain driven design, event sourcing best practices, ruby event streaming, postgresql event store, ruby projection patterns, event sourcing concurrency, ruby event versioning, event sourcing snapshots, ruby read models, event sourcing performance, ruby event replay, command query responsibility segregation, ruby event bus, event sourcing migration, ruby aggregate root, event sourcing validation, ruby event store design, event sourcing testing, ruby event serialization, immutable events ruby, ruby event sourcing framework, event sourcing architecture, ruby financial systems, audit trail implementation, temporal data analysis, ruby transaction history, event sourcing optimization, ruby event processing, event sourcing scalability, ruby event handlers, distributed event sourcing, ruby event sourcing library, microservices event sourcing, ruby event store patterns, event sourcing infrastructure, ruby event sourcing examples



Similar Posts
Blog Image
Mastering Rust's Advanced Trait System: Boost Your Code's Power and Flexibility

Rust's trait system offers advanced techniques for flexible, reusable code. Associated types allow placeholder types in traits. Higher-ranked trait bounds work with traits having lifetimes. Negative trait bounds specify what traits a type must not implement. Complex constraints on generic parameters enable flexible, type-safe APIs. These features improve code quality, enable extensible systems, and leverage Rust's powerful type system for better abstractions.

Blog Image
7 Essential Rails API Versioning Techniques for Seamless Production Evolution

Learn 7 proven Rails API versioning techniques for seamless functionality evolution. Master header routing, serializers & deprecation strategies. Improve stability today!

Blog Image
How to Build a Scalable Notification System in Ruby on Rails: A Complete Guide

Learn how to build a robust notification system in Ruby on Rails. Covers real-time updates, email delivery, push notifications, rate limiting, and analytics tracking. Includes practical code examples. #RubyOnRails #WebDev

Blog Image
Is FastJSONAPI the Secret Weapon Your Rails API Needs?

FastJSONAPI: Lightning Speed Serialization in Ruby on Rails

Blog Image
Unlock Modern JavaScript in Rails: Webpacker Mastery for Seamless Front-End Integration

Rails with Webpacker integrates modern JavaScript tooling into Rails, enabling efficient component integration, dependency management, and code organization. It supports React, TypeScript, and advanced features like code splitting and hot module replacement.

Blog Image
Is the Global Interpreter Lock the Secret Sauce to High-Performance Ruby Code?

Ruby's GIL: The Unsung Traffic Cop of Your Code's Concurrency Orchestra