27m read
Tags: elixir, event-sourcing, architecture, cqrs

Most developers reach for event sourcing when they shouldn't. They read about it solving problems at scale---Kafka at LinkedIn, event-driven architectures at Netflix---and assume it will magically fix their CRUD application's complexity.scale-gap It won't.

But for the right problems? Event sourcing changes how you think about data entirely. Instead of storing the current state of your system, you store every change that led to that state; your database becomes a log of facts, immutable and complete. And Elixir, with its functional paradigm and pattern matching, is a surprisingly natural fit for this approach.

What Event Sourcing Actually Is

Traditional databases store state. You have a users table with a row for each user; when a user changes their email, you update that row. The old email is gone---overwritten, forgotten, as if it never existed.

Event sourcing inverts this. You store events: UserRegistered, EmailChanged, AccountSuspended. The current state is derived by replaying these events.cqrs Want to know a user's email? Walk through every event that affected that user and compute it.

This sounds inefficient; for a single read, it is. But consider what you gain:

  • Complete audit trail. Every change is preserved. Regulatory compliance becomes trivial.
  • Time travel. Want to know what the system looked like on March 15th at 3 PM? Replay events up to that point.
  • Bug investigation. Reproduce any state by replaying events up to the moment before the bug occurred.
  • Decoupled read models. Build multiple projections optimized for different query patterns.

The trade-off is complexity. You're building a distributed system even when you don't think you are; events need to be versioned, projections need to be rebuilt, snapshots need to be managed. Not free.

Building an Event Store with Ecto

A minimal event store doesn't need much---just Ecto and a few migrations. No libraries, no frameworks.

First, the events table:

defmodule MyApp.Repo.Migrations.CreateEvents do
  use Ecto.Migration

  def change do
    create table(:events, primary_key: false) do
      add :id, :uuid, primary_key: true
      add :stream_id, :string, null: false
      add :stream_version, :integer, null: false
      add :event_type, :string, null: false
      add :data, :map, null: false
      add :metadata, :map, default: %{}

      timestamps(updated_at: false)
    end

    create unique_index(:events, [:stream_id, :stream_version])
    create index(:events, [:stream_id])
    create index(:events, [:event_type])
  end
end

The stream_id groups related events---typically an aggregate ID. The stream_version ensures ordering and enables optimistic concurrency control; that unique index prevents two processes from appending conflicting events to the same stream.

Now the event store module:

defmodule MyApp.EventStore do
  alias MyApp.Repo
  alias MyApp.Events.StoredEvent
  import Ecto.Query

  def append_events(stream_id, expected_version, events) do
    Repo.transaction(fn ->
      current_version = get_stream_version(stream_id)

      if current_version != expected_version do
        Repo.rollback(:concurrency_conflict)
      end

      events
      |> Enum.with_index(expected_version + 1)
      |> Enum.each(fn {event, version} ->
        %StoredEvent{
          id: Ecto.UUID.generate(),
          stream_id: stream_id,
          stream_version: version,
          event_type: event.__struct__ |> Module.split() |> List.last(),
          data: Map.from_struct(event),
          metadata: %{correlation_id: get_correlation_id()}
        }
        |> Repo.insert!()
      end)
    end)
  end

  def read_stream(stream_id, from_version \\ 0) do
    StoredEvent
    |> where([e], e.stream_id == ^stream_id)
    |> where([e], e.stream_version > ^from_version)
    |> order_by([e], asc: e.stream_version)
    |> Repo.all()
    |> Enum.map(&deserialize_event/1)
  end

  defp get_stream_version(stream_id) do
    StoredEvent
    |> where([e], e.stream_id == ^stream_id)
    |> select([e], max(e.stream_version))
    |> Repo.one() || 0
  end

  defp deserialize_event(%StoredEvent{event_type: type, data: data}) do
    module = Module.concat([MyApp.Events, type])
    struct(module, atomize_keys(data))
  end

  defp atomize_keys(map) do
    Map.new(map, fn {k, v} -> {String.to_existing_atom(k), v} end)
  end
end

The expected_version parameter is where optimistic concurrency lives. Before appending, we verify that no other process has written to this stream since we last read it; if someone has, we fail fast rather than corrupt the event log.atom-exhaustion

Projections: Rebuilding State from Events

Events are useless without projections. A projection is a read model---a representation of state derived by folding over events.

defmodule MyApp.Projections.AccountBalance do
  use GenServer
  alias MyApp.EventStore

  defstruct [:account_id, balance: 0, version: 0]

  def start_link(account_id) do
    GenServer.start_link(__MODULE__, account_id, name: via_tuple(account_id))
  end

  def get_balance(account_id) do
    GenServer.call(via_tuple(account_id), :get_balance)
  end

  @impl true
  def init(account_id) do
    state = rebuild_from_events(account_id)
    schedule_sync()
    {:ok, state}
  end

  @impl true
  def handle_call(:get_balance, _from, state) do
    {:reply, state.balance, state}
  end

  @impl true
  def handle_info(:sync, state) do
    new_events = EventStore.read_stream(state.account_id, state.version)
    new_state = apply_events(state, new_events)
    schedule_sync()
    {:noreply, new_state}
  end

  defp rebuild_from_events(account_id) do
    events = EventStore.read_stream(account_id)
    apply_events(%__MODULE__{account_id: account_id}, events)
  end

  defp apply_events(state, events) do
    Enum.reduce(events, state, &apply_event/2)
  end

  defp apply_event(%{event_type: "MoneyDeposited", data: %{amount: amount}}, state) do
    %{state | balance: state.balance + amount, version: state.version + 1}
  end

  defp apply_event(%{event_type: "MoneyWithdrawn", data: %{amount: amount}}, state) do
    %{state | balance: state.balance - amount, version: state.version + 1}
  end

  defp apply_event(_unknown_event, state), do: state

  defp schedule_sync, do: Process.send_after(self(), :sync, 1000)

  defp via_tuple(account_id), do: {:via, Registry, {MyApp.Registry, account_id}}
end

Notice the pattern matching in apply_event/2. Each event type gets its own clause; unknown events fall through harmlessly---forward compatibility baked in. The projection polls for new events periodically; in production you'd likely use PostgreSQL LISTEN/NOTIFY or a message broker for push-based updates.pg-notify

Snapshots: Taming the Replay Problem

Event streams grow. An account with ten years of transactions might have thousands of events; replaying all of them on every read isn't practical.

Snapshots solve this. Periodically, you serialize the current state and store it alongside the events; on load, you restore from the latest snapshot and replay only subsequent events.

defmodule MyApp.Snapshots do
  alias MyApp.Repo
  alias MyApp.Snapshots.Snapshot
  import Ecto.Query

  def save_snapshot(stream_id, version, state) do
    %Snapshot{
      stream_id: stream_id,
      version: version,
      data: :erlang.term_to_binary(state)
    }
    |> Repo.insert!(
      on_conflict: {:replace, [:version, :data, :updated_at]},
      conflict_target: :stream_id
    )
  end

  def load_snapshot(stream_id) do
    Snapshot
    |> where([s], s.stream_id == ^stream_id)
    |> Repo.one()
    |> case do
      nil -> nil
      snapshot ->
        {:ok, snapshot.version, :erlang.binary_to_term(snapshot.data)}
    end
  end
end

Integrating snapshots into our projection:term-to-binary

defp rebuild_from_events(account_id) do
  case Snapshots.load_snapshot(account_id) do
    {:ok, version, state} ->
      events = EventStore.read_stream(account_id, version)
      apply_events(state, events)

    nil ->
      events = EventStore.read_stream(account_id)
      state = apply_events(%__MODULE__{account_id: account_id}, events)

      # Snapshot every 100 events
      if state.version >= 100 and rem(state.version, 100) == 0 do
        Snapshots.save_snapshot(account_id, state.version, state)
      end

      state
  end
end

The snapshotting frequency is a tuning parameter; too frequent and you're wasting storage, too infrequent and replays crawl. I've seen 100-event intervals work well for most applications, but the right number depends entirely on your event size and projection complexity.snapshot-interval

Schema Evolution: The Hard Problem

Events are immutable. You can't go back and change them. But your domain model evolves; new requirements emerge, and the event you designed in 2023 doesn't capture what you need in 2026.

Three strategies worth knowing:

Upcasting: Transform old events to new formats at read time.

defmodule MyApp.EventUpcasters do
  # v1 had separate first_name and last_name
  # v2 combines them into full_name
  def upcast(%{event_type: "UserRegistered", data: data, metadata: %{version: 1}}) do
    full_name = "#{data["first_name"]} #{data["last_name"]}"

    %{
      event_type: "UserRegistered",
      data: Map.put(data, "full_name", full_name),
      metadata: %{version: 2}
    }
  end

  def upcast(event), do: event
end

Apply upcasters when deserializing:

defp deserialize_event(%StoredEvent{} = stored_event) do
  stored_event
  |> Map.from_struct()
  |> EventUpcasters.upcast()
  |> build_domain_event()
end

Event versioning: Include version in the event type itself.

defmodule MyApp.Events.UserRegisteredV1 do
  defstruct [:user_id, :first_name, :last_name, :email]
end

defmodule MyApp.Events.UserRegisteredV2 do
  defstruct [:user_id, :full_name, :email, :marketing_consent]
end

Copy-transform: Create a new event stream with transformed events. Use this sparingly---it's operationally expensive and risks data inconsistency.copy-transform

My preference is upcasting for minor changes---adding fields, renaming---and new event types for semantic shifts. If OrderPlaced fundamentally changes what it means, create OrderPlacedV2 rather than pretending they're the same thing.

Commanded: When to Use a Framework

The code above works. But once you move beyond toy examples, the list of things you need grows fast: command validation, aggregate lifecycle management, process managers for sagas, subscription handling.

Commanded is the de facto event sourcing library for Elixir.commanded It provides these abstractions:

defmodule MyApp.BankAccount do
  use Commanded.Aggregates.Aggregate

  defstruct [:account_id, :balance]

  def execute(%__MODULE__{account_id: nil}, %OpenAccount{} = cmd) do
    %AccountOpened{account_id: cmd.account_id, initial_balance: 0}
  end

  def execute(%__MODULE__{balance: balance}, %WithdrawMoney{amount: amount})
      when balance >= amount do
    %MoneyWithdrawn{amount: amount}
  end

  def execute(%__MODULE__{}, %WithdrawMoney{}) do
    {:error, :insufficient_funds}
  end

  def apply(%__MODULE__{} = account, %AccountOpened{} = event) do
    %{account | account_id: event.account_id, balance: event.initial_balance}
  end

  def apply(%__MODULE__{} = account, %MoneyWithdrawn{} = event) do
    %{account | balance: account.balance - event.amount}
  end
end

Commanded handles concurrency, event storage, and projection subscriptions; it gives you a clear aggregate pattern with established conventions. The trade-off is abstraction---you're working within Commanded's model of the world, and when your needs diverge from that model, you feel it.

When to roll your own versus using Commanded:

Roll your own when you need maximum control, have unusual storage requirements, or when learning the underlying concepts. Building from scratch teaches you what the framework hides.

Use Commanded when you're building a production system, need process managers, want solid subscription handling, or your team benefits from established conventions.

I've done both. For applications where event sourcing is central to the domain---financial systems, audit-heavy workflows---Commanded's conventions pay for themselves. For simpler cases where you're adding event sourcing to a specific bounded context, a lightweight custom implementation keeps things honest.

When NOT to Use Event Sourcing

Event sourcing is not a default architecture. It's a specialized tool.

Simple CRUD doesn't need it. If your domain is "user updates profile, we save it," a regular PostgreSQL table does the job; you're adding ceremony for zero benefit. Eventual consistency is the other trap---projections lag behind writes, and if your application requires read-your-writes consistency everywhere, you'll fight the model constantly.eventual-consistency

GDPR complicates things. The right-to-erasure doesn't play nicely with immutable event logs; crypto-shredding---encrypting events with per-user keys, then deleting the key---is the workaround, but it adds real complexity.crypto-shredding And then there's operational maturity. Event stores need monitoring, projection lag tracking, schema migration tooling. Small team shipping fast? This overhead might slow you down more than it helps.

One antipattern I see often: teams adopting event sourcing as a debugging crutch. "We can replay events to find bugs" is true, but if you need to do this regularly, you have a testing problem. Not a persistence problem.

The use cases where event sourcing genuinely earns its keep: financial systems where audit trails are mandatory, collaborative editing where conflict resolution needs event history, systems with business rules that evolve over time, and applications where multiple teams need different read models of the same underlying data.

Where This Leaves You

Event sourcing trades immediate simplicity for long-term flexibility. You're building infrastructure that pays dividends when requirements change---and requirements always change. But that infrastructure has a carrying cost; every projection is a process to monitor, every event schema a contract to maintain.

Elixir makes this trade-off more palatable than most languages. Pattern matching turns event handlers into clean, declarative code; GenServers provide natural homes for projections; the immutability of Elixir's data structures aligns with the immutability of event logs. The BEAM's concurrency model handles thousands of projection processes comfortably---this is, after all, what it was designed for.

I keep coming back to something Greg Young said years ago: event sourcing should be a tactical choice within a bounded context, not a system-wide religion. Most of the failures I've seen came from teams who made it an architectural identity rather than a tool. The event store doesn't care about your convictions; it cares about your event schema, your projection performance, and whether you've thought through what happens when you need to delete a user's data three years from now.


What do you think of what I said?

Share with me your thoughts. You can tweet me at @allanmacgregor.