ruby

How to Build Event Sourcing and CQRS Systems in Ruby on Rails

Learn how to implement Event Sourcing and CQRS in Rails to build auditable, scalable apps that retain complete data history. Start building smarter systems today.

How to Build Event Sourcing and CQRS Systems in Ruby on Rails

Let me walk you through a way to build Rails applications that can remember every single thing that ever happened to your data. It might sound complex, but the core idea is simple: instead of just saving the current state of a record, like an order total, you save every individual change that led to that total as a permanent, unchangeable fact.

Think of it like a bank statement. You don’t just see your current balance; you see a list of every deposit and withdrawal. This list is the source of truth. Your current balance is just a convenient view calculated from that list. This approach is called Event Sourcing. Combined with a related idea called Command Query Responsibility Segregation (CQRS), it helps you build systems that are robust, scalable, and give you a complete history of every action.

Let’s start with where we store these immutable facts, which we call events. We need an event store. I often use Redis for this because it’s fast and good at handling lists, but the concept works with any durable storage.

# A basic event store using Redis
class EventStore
  def initialize(redis: Redis.current, namespace: 'es')
    @redis = redis
    @namespace = namespace
  end

  def append(stream_id, events, expected_version: nil)
    stream_key = "#{@namespace}:stream:#{stream_id}"
    
    @redis.multi do |transaction|
      current_version = transaction.lindex(stream_key, -1)&.then { |json| JSON.parse(json)['version'] } || 0
      
      # This prevents two people from changing the same thing at the same time
      if expected_version && current_version != expected_version
        raise "Version conflict. Expected #{expected_version}, found #{current_version}"
      end
      
      events.each_with_index do |event, index|
        version = current_version + index + 1
        event_data = event.merge(
          event_id: SecureRandom.uuid,
          stream_id: stream_id,
          version: version,
          timestamp: Time.current.iso8601
        )
        
        transaction.rpush(stream_key, event_data.to_json)
      end
    end
  end

  def read_stream(stream_id)
    stream_key = "#{@namespace}:stream:#{stream_id}"
    events_json = @redis.lrange(stream_key, 0, -1)
    
    events_json.map do |json|
      JSON.parse(json, symbolize_names: true)
    end
  end
end

This store saves events to a stream (a list tied to an ID, like Order:123). The expected_version check is crucial. It ensures that if I load an order at version 5 and try to save new events, no one else has saved version 6 in the meantime. It’s a simple way to manage concurrency.

Now, where do these events come from? They come from your core business objects, which we call Aggregates. An Aggregate is a cluster of objects (like an Order and its LineItems) that we treat as a single unit for changes. It protects its own business rules.

class Order
  attr_reader :id, :version, :uncommitted_events
  
  def initialize(id)
    @id = id
    @state = :draft
    @items = []
    @total = 0.0
    @version = 0
    @uncommitted_events = []
  end
  
  def create(customer_id, items)
    # Business rule: Items are required
    raise "Must have items" if items.empty?
    
    event = {
      type: 'order_created',
      order_id: @id,
      customer_id: customer_id,
      items: items,
      total: items.sum { |i| i[:price] * i[:quantity] }
    }
    apply_event(event)
  end
  
  def add_item(product_id, quantity, price)
    # Business rule: Can't add items to a submitted order
    raise "Order is already submitted" unless @state == :draft
    
    event = {
      type: 'item_added',
      order_id: @id,
      product_id: product_id,
      quantity: quantity,
      price: price
    }
    apply_event(event)
  end
  
  def submit
    raise "Order is already submitted" unless @state == :draft
    apply_event(type: 'order_submitted', order_id: @id)
  end
  
  private
  
  def apply_event(event)
    # First, update the object's internal state based on the event
    case event[:type]
    when 'order_created'
      @state = :draft
      @customer_id = event[:customer_id]
      @items = event[:items]
      @total = event[:total]
    when 'item_added'
      @items << { product_id: event[:product_id], quantity: event[:quantity], price: event[:price] }
      @total += event[:quantity] * event[:price]
    when 'order_submitted'
      @state = :submitted
    end
    
    # Then, remember the event so we can save it later
    @uncommitted_events << event
    @version += 1
  end
  
  # This is the magic of Event Sourcing. We can rebuild state from scratch.
  def load_from_history(events)
    events.each do |event|
      apply_event(event)
      # Keep track of the version from the stored event
      @version = event[:version] if event[:version]
    end
  end
end

The Aggregate is the boss. It decides if a command (like “add item”) is allowed. If it is, it produces an event (like item_added). The event is a fact: “Item X was added at this time.” The Aggregate’s state (its instance variables) is just a cache derived from applying all past events.

We need something to connect the command from a user to the Aggregate and then to the Event Store. That’s a Command Handler.

class CreateOrderHandler
  def initialize(event_store)
    @event_store = event_store
  end
  
  def handle(command)
    # 1. Create a new aggregate instance
    order = Order.new(command.order_id)
    
    # 2. Let the aggregate process the command. It will produce events.
    order.create(command.customer_id, command.items)
    
    # 3. Save the new events to the stream.
    @event_store.append(
      command.order_id,
      order.uncommitted_events,
      expected_version: 0 # It's new, so we expect no prior version.
    )
    
    # 4. Return a result
    { success: true, order_id: command.order_id }
  rescue => e
    { success: false, error: e.message }
  end
end

# A simple command object
CreateOrder = Struct.new(:order_id, :customer_id, :items, keyword_init: true)

So far, we’ve covered the “write side.” We’ve captured intent (Commands), enforced rules (Aggregates), and stored facts (Events). This gives us a perfect audit log. But how do we show data to users? Loading an aggregate from hundreds of events every time is slow. This is where CQRS comes in.

CQRS says: use one model for writing (Commands) and a completely separate, optimized model for reading (Queries). They don’t have to share a database schema. The read model is built by listening to the events and updating its own specialized tables.

We build these read models with Projections. A projection is a listener that says, “When event X happens, I will update my specific table or view.”

# A projection that maintains a quick-to-query summary of orders
class OrderSummaryProjection
  def handle_event(event)
    case event[:type]
    when 'order_created'
      OrderSummary.create!(
        id: event[:order_id],
        customer_id: event[:customer_id],
        status: 'draft',
        total_amount: event[:total],
        item_count: event[:items].size,
        version: event[:version]
      )
      
      # We could also populate a separate items table
      event[:items].each do |item|
        OrderItem.create!(
          order_id: event[:order_id],
          product_id: item[:product_id],
          quantity: item[:quantity],
          price: item[:price]
        )
      end
      
    when 'item_added'
      summary = OrderSummary.find(event[:order_id])
      summary.update!(
        total_amount: summary.total_amount + (event[:quantity] * event[:price]),
        item_count: summary.item_count + 1,
        version: event[:version]
      )
      OrderItem.create!(
        order_id: event[:order_id],
        product_id: event[:product_id],
        quantity: event[:quantity],
        price: event[:price]
      )
      
    when 'order_submitted'
      OrderSummary.where(id: event[:order_id]).update_all(status: 'submitted', version: event[:version])
    end
  end
end

# The ActiveRecord models for the read side
class OrderSummary < ApplicationRecord
  # Has columns: id, customer_id, status, total_amount, item_count, version
end

class OrderItem < ApplicationRecord
  # Has columns: order_id, product_id, quantity, price
end

Now, our read side is just simple, fast SQL queries with no complex business logic.

class OrdersController < ApplicationController
  # Write side: Handle a command
  def create
    command = CreateOrder.new(
      order_id: SecureRandom.uuid,
      customer_id: current_user.id,
      items: params[:items]
    )
    
    handler = CreateOrderHandler.new(EventStore.current)
    result = handler.handle(command)
    
    if result[:success]
      render json: { id: result[:order_id] }, status: :created
    else
      render json: { error: result[:error] }, status: :unprocessable_entity
    end
  end
  
  # Read side: Query the projection
  def show
    # This is now a simple, fast lookup on a denormalized table.
    summary = OrderSummary.find(params[:id])
    items = OrderItem.where(order_id: params[:id])
    
    render json: {
      summary: summary,
      items: items
    }
  end
  
  def index
    # We can add any index we want for fast searching!
    summaries = OrderSummary.where(customer_id: current_user.id).order(created_at: :desc)
    render json: summaries
  end
end

See the separation? The create action deals with commands, aggregates, and events. The show and index actions just query regular Rails models that are kept up-to-date by projections. The write side ensures correctness. The read side ensures speed.

How do we keep the projections updated? We need a publisher-subscriber system. When an event is saved, we notify all interested projections.

class EventPublisher
  @@subscribers = []
  
  def self.subscribe(subscriber, event_types: nil)
    @@subscribers << { subscriber: subscriber, event_types: event_types }
  end
  
  def self.publish(event)
    @@subscribers.each do |sub|
      next if sub[:event_types] && !sub[:event_types].include?(event[:type])
      
      # In a real app, do this in a background job!
      sub[:subscriber].handle_event(event)
    end
  end
end

# We modify our EventStore to publish after appending
class EventStore
  def append(stream_id, events, expected_version: nil)
    # ... (same transaction logic as before) ...
    
    @redis.multi do |transaction|
      # ... save events ...
    end
    
    # After saving, tell everyone about the new events
    events.each { |event| EventPublisher.publish(event) }
  end
end

# During application startup, we register our projections
EventPublisher.subscribe(OrderSummaryProjection.new)

A powerful feature of this setup is event replay. If you discover a bug in your OrderSummaryProjection, you can fix the code, clear the order_summaries table, and replay every order_created and item_added event from the beginning of time to rebuild a correct read model. Your source of truth (the events) is immutable, so this is always possible.

class ProjectionRebuilder
  def rebuild(projection_class)
    projection = projection_class.new
    
    # Clear the projection's data
    projection.clear_data
    
    # Fetch every stream ID from the event store
    all_stream_ids = EventStore.current.all_streams
    
    all_stream_ids.each do |stream_id|
      events = EventStore.current.read_stream(stream_id)
      events.each { |event| projection.handle_event(event) }
    end
  end
end

As your system grows, loading an aggregate from thousands of events becomes slow. We can optimize this with Snapshots. Periodically, we save the aggregate’s current state (its version and all its instance variables). Next time we load it, we start from the latest snapshot and only replay the events that happened after it.

class Snapshot
  attr_accessor :aggregate_id, :aggregate_type, :version, :state
  
  def self.take(aggregate)
    new(
      aggregate_id: aggregate.id,
      aggregate_type: aggregate.class.name,
      version: aggregate.version,
      state: Marshal.dump(aggregate) # A simple way to save object state
    )
  end
end

class SnapshotRepository
  SNAPSHOT_INTERVAL = 50 # Save a snapshot every 50 events
  
  def load(aggregate_id, aggregate_class)
    snapshot = SnapshotStore.load(aggregate_id)
    
    if snapshot
      # Start with the snapped-shot state
      aggregate = Marshal.load(snapshot.state)
      # Load only events that happened after the snapshot
      events = EventStore.current.read_stream(aggregate_id, from_version: snapshot.version + 1)
      aggregate.load_from_history(events)
    else
      # No snapshot, load all events
      aggregate = aggregate_class.new(aggregate_id)
      events = EventStore.current.read_stream(aggregate_id)
      aggregate.load_from_history(events)
    end
    
    # Maybe take a snapshot if we've passed the interval
    if aggregate.version % SNAPSHOT_INTERVAL == 0
      SnapshotStore.save(Snapshot.take(aggregate))
    end
    
    aggregate
  end
end

This is a lot to take in. Let me summarize the flow in plain steps:

  1. A user action (like “Add Item to Cart”) becomes a Command object.
  2. A Command Handler receives this command. It fetches the relevant Aggregate (e.g., the Order) from the Repository. The repository loads the Aggregate by reading its past Events from the Event Store.
  3. The Handler passes the command to the Aggregate. The Aggregate checks business rules (“Is the order still open?”). If the command is valid, the Aggregate produces a new Event (e.g., ItemAdded) and updates its own internal state. The event is added to a list of new, uncommitted events.
  4. The Handler saves these new events to the Event Store.
  5. After saving, an Event Publisher announces the new event.
  6. Various Projections are listening. They update their own specialized database tables (the Read Model) based on the event. One projection might update an order_summaries table. Another might update a table for customer analytics.
  7. When a user wants to view data, a Query simply fetches it from the optimized Read Model tables. No business logic runs here.

What do you gain from this complexity?

  • Complete History: You have a log of every single change.
  • Auditability: You know who did what and when.
  • Temporal Queries: You can ask, “What did this order look like last Tuesday?”
  • Debugging: You can replay events to reproduce bugs.
  • Read/Write Scalability: You can scale your read database separately from your write event store. The read model can be a completely different technology (like Elasticsearch for search).
  • Business Intelligence: Your events are a rich source of data for analytics.

It’s not without costs. You introduce eventual consistency between the write and read models. A user might submit an order and not see it in their list for a few hundred milliseconds until the projection updates. You must handle that in your user interface. You also have more moving parts: event stores, projections, publishers.

I find this pattern is not for every part of every application. Use it for the core, complex domains where audit trails, history, and complex business rules are critical. For simple CRUD screens, traditional Active Record is much simpler and perfectly adequate.

Start small. Try modeling a single bounded context, like Ordering or Inventory, with event sourcing. Use a simple in-memory event publisher and a single projection. Get a feel for the flow. You’ll discover a powerful new way to think about the lifetime of your application’s data, where every change tells a story.

Keywords: event sourcing Rails, CQRS Rails, event sourcing Ruby on Rails, command query responsibility segregation, Rails event store, immutable events Rails, aggregate pattern Ruby, event driven architecture Rails, Rails CQRS tutorial, event sourcing tutorial Ruby, Rails audit trail, event store Redis Ruby, Rails command handler, projection pattern Rails, read model write model Rails, event replay Rails, Rails domain driven design, DDD Rails, Rails scalable architecture, event sourcing vs CRUD, Rails append only log, Rails immutable data, snapshotting event sourcing, Rails pub sub pattern, eventual consistency Rails, Rails event publisher, Rails background jobs events, Rails bounded context, event sourcing benefits, audit log Rails application, temporal queries Rails, Rails read model optimization, Rails write model, Rails aggregate root, event sourcing complexity, Rails data history, CQRS read side write side, Rails projection rebuilder, Rails Struct command object, Rails Redis event store, denormalized read model Rails, Rails order management system, event sourcing patterns, Ruby event sourcing implementation, Rails scalability patterns, Rails microservices events, domain events Rails, Rails business logic patterns, event sourcing snapshot, Rails complete audit trail



Similar Posts
Blog Image
Why Should You Trust Figaro to Keep Your App Secrets Safe?

Safeguard Your Secrets: Figaro's Role in Secure Environment Configuration

Blog Image
Unlock Seamless User Authentication: Mastering OAuth2 in Rails Apps

OAuth2 in Rails simplifies third-party authentication. Add gems, configure OmniAuth, set routes, create controllers, and implement user model. Secure with HTTPS, validate state, handle errors, and test thoroughly. Consider token expiration and scope management.

Blog Image
8 Essential Ruby Gems for Better Database Schema Management

Discover 8 powerful Ruby gems for database management that ensure data integrity and validate schemas. Learn practical strategies for maintaining complex database structures in Ruby applications. Optimize your workflow today!

Blog Image
Rust's Trait Specialization: Boost Performance Without Sacrificing Flexibility

Rust's trait specialization allows for more specific implementations of generic code, boosting performance without sacrificing flexibility. It enables efficient handling of specific types, optimizes collections, resolves trait ambiguities, and aids in creating zero-cost abstractions. While powerful, it should be used judiciously to avoid overly complex code structures.

Blog Image
Top 10 Ruby Gems for Robust Rails Authentication: A Developer's Guide

Discover the top 10 Ruby gems for robust Rails authentication. Learn to implement secure login, OAuth, 2FA, and more. Boost your app's security today!

Blog Image
Ruby Concurrency Mastery: From Thread Pools to Ractors for High-Performance Applications

Learn Ruby concurrency patterns: thread pools, Fibers, Ractors & work-stealing for scalable applications. Master GIL, async I/O, and parallel processing.