Key Takeaways
- Event sourcing provides complete audit trail and time-travel debugging capabilities
- CQRS separation enables independent scaling of reads and writes
- Snapshots are essential for performance with large event streams
- Proper event versioning and migration strategies prevent production disasters
- Event streaming with Kafka enables real-time projections and system integration
Why Event Sourcing?
Your database shows current state. But how did it get there? Who changed what? When? Why?
-- Traditional: Current state only
SELECT balance FROM accounts WHERE id = 123;
-- Result: 1000
-- Event sourced: Complete history
SELECT * FROM events WHERE aggregate_id = 123;
-- Shows every deposit, withdrawal, fee, interest
We needed audit trail for financial compliance. Event sourcing gave us that plus time travel, debugging superpowers, and perfect scalability.
Core Concepts in 5 Minutes
Event sourcing stores state changes as a sequence of events rather than overwriting data. Instead of UPDATE statements that destroy history, we append immutable events that tell the complete story.
Traditional systems show what IS. Event sourcing shows what HAPPENED. This distinction transforms debugging, auditing, and analytics. When a bug corrupts data, we can replay events to find exactly when and how it occurred.
Events are facts about the past - they cannot be changed or deleted. This immutability provides natural audit logging and enables powerful patterns like temporal queries and retroactive fixes.
State becomes a left-fold over events. Current balance isn't stored; it's calculated by replaying all deposits and withdrawals. This sounds slow but with snapshots and projections, it's actually faster than traditional systems for many use cases.
// Events capture business intent
type AccountOpened struct {
AccountID string
Currency string
}
type MoneyDeposited struct {
AccountID string
Amount decimal.Decimal
}
// State derived from event history
func (a *Account) Apply(event Event) {
// Rebuild state by replaying events
}
Production Event Store
A production event store needs to handle millions of events efficiently. Our PostgreSQL-based implementation processes 10K events/second with proper indexing and partitioning. The append-only nature makes it extremely fast - no updates, no deletes, just inserts.
Event ordering is critical for consistency. We use database sequences per aggregate to ensure events are applied in the correct order. This prevents race conditions where concurrent operations might corrupt state.
The schema design balances normalization with query performance. Event data is stored as JSON for flexibility, while frequently queried fields (aggregate_id, event_type) are indexed columns. This hybrid approach enables both fast queries and schema evolution.
Metadata tracks important context: user ID, correlation ID, causation ID. This audit trail proves invaluable for debugging and compliance. Every state change is traceable to its origin.
type EventStore struct {
db *sql.DB
}
type StoredEvent struct {
ID uuid.UUID
AggregateID string
EventType string
EventVersion int
EventData json.RawMessage
Metadata json.RawMessage
OccurredAt time.Time
}
// Append-only schema with proper indexes
const schema = `
CREATE TABLE events (
id UUID PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_version INT NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
occurred_at TIMESTAMP NOT NULL,
recorded_at TIMESTAMP NOT NULL DEFAULT NOW(),
-- Ensure events are ordered per aggregate
UNIQUE(aggregate_id, event_version),
-- Indexes for queries
INDEX idx_aggregate (aggregate_id, event_version),
INDEX idx_event_type (event_type),
INDEX idx_occurred_at (occurred_at)
);
-- Global event sequence for ordering
CREATE SEQUENCE IF NOT EXISTS global_event_sequence;
ALTER TABLE events ADD COLUMN global_sequence BIGINT DEFAULT nextval('global_event_sequence');
CREATE INDEX idx_global_sequence ON events(global_sequence);
`
func (es *EventStore) SaveEvents(ctx context.Context, aggregateID, aggregateType string, events []Event, expectedVersion int) error {
tx, err := es.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Check optimistic concurrency
var currentVersion int
err = tx.QueryRow(`
SELECT COALESCE(MAX(event_version), 0)
FROM events
WHERE aggregate_id = $1`,
aggregateID,
).Scan(¤tVersion)
if err != nil {
return err
}
if currentVersion != expectedVersion {
return fmt.Errorf("concurrency conflict: expected version %d, got %d",
expectedVersion, currentVersion)
}
// Save events
version := expectedVersion
for _, event := range events {
version++
eventData, err := json.Marshal(event)
if err != nil {
return err
}
metadata := map[string]interface{}{
"user_id": ctx.Value("user_id"),
"trace_id": ctx.Value("trace_id"),
"source": ctx.Value("source"),
}
metadataJSON, _ := json.Marshal(metadata)
_, err = tx.Exec(`
INSERT INTO events (
aggregate_id, aggregate_type, event_type,
event_version, event_data, metadata, occurred_at
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
aggregateID,
aggregateType,
event.EventType(),
version,
eventData,
metadataJSON,
event.OccurredAt(),
)
if err != nil {
return err
}
}
return tx.Commit()
}
func (es *EventStore) GetEvents(ctx context.Context, aggregateID string, fromVersion int) ([]StoredEvent, error) {
rows, err := es.db.QueryContext(ctx, `
SELECT
id, aggregate_id, aggregate_type, event_type,
event_version, event_data, metadata,
occurred_at, recorded_at
FROM events
WHERE aggregate_id = $1 AND event_version > $2
ORDER BY event_version`,
aggregateID, fromVersion,
)
if err != nil {
return nil, err
}
defer rows.Close()
var events []StoredEvent
for rows.Next() {
var e StoredEvent
err := rows.Scan(
&e.ID, &e.AggregateID, &e.AggregateType,
&e.EventType, &e.EventVersion, &e.EventData,
&e.Metadata, &e.OccurredAt, &e.RecordedAt,
)
if err != nil {
return nil, err
}
events = append(events, e)
}
return events, nil
}
Aggregate Root Pattern
type AggregateRoot struct {
ID string
Version int
uncommittedEvents []Event
}
func (a *AggregateRoot) RecordEvent(event Event) {
a.uncommittedEvents = append(a.uncommittedEvents, event)
a.Version++
}
func (a *AggregateRoot) GetUncommittedEvents() []Event {
return a.uncommittedEvents
}
func (a *AggregateRoot) MarkEventsAsCommitted() {
a.uncommittedEvents = []Event{}
}
// Example: Account aggregate
type Account struct {
AggregateRoot
Balance decimal.Decimal
Currency string
Status string
}
func (a *Account) Deposit(amount decimal.Decimal) error {
if amount.LessThanOrEqual(decimal.Zero) {
return fmt.Errorf("invalid deposit amount: %v must be positive", amount)
}
event := MoneyDeposited{
AccountID: a.ID,
Amount: amount,
Timestamp: time.Now(),
}
a.Apply(event)
a.RecordEvent(event)
return nil
}
func (a *Account) Withdraw(amount decimal.Decimal) error {
if amount.GreaterThan(a.Balance) {
return fmt.Errorf("insufficient funds: attempting to withdraw %v from balance %v", amount, a.Balance)
}
event := MoneyWithdrawn{
AccountID: a.ID,
Amount: amount,
Timestamp: time.Now(),
}
a.Apply(event)
a.RecordEvent(event)
return nil
}
func (a *Account) Apply(event Event) {
switch e := event.(type) {
case MoneyDeposited:
a.Balance = a.Balance.Add(e.Amount)
case MoneyWithdrawn:
a.Balance = a.Balance.Sub(e.Amount)
}
}
CQRS: Command and Query Separation
// Write side: Commands modify aggregates
type CommandHandler struct {
eventStore *EventStore
eventBus *EventBus
}
func (h *CommandHandler) Handle(cmd Command) error {
switch c := cmd.(type) {
case DepositMoney:
return h.handleDeposit(c)
case WithdrawMoney:
return h.handleWithdraw(c)
}
return errors.New("unknown command")
}
func (h *CommandHandler) handleDeposit(cmd DepositMoney) error {
// Load aggregate from events
account := &Account{}
events, err := h.eventStore.GetEvents(ctx, cmd.AccountID, 0)
if err != nil {
return err
}
for _, e := range events {
account.Apply(e)
}
// Execute business logic
err = account.Deposit(cmd.Amount)
if err != nil {
return err
}
// Save new events
err = h.eventStore.SaveEvents(
ctx,
account.ID,
"Account",
account.GetUncommittedEvents(),
account.Version,
)
if err != nil {
return err
}
// Publish for projections
for _, event := range account.GetUncommittedEvents() {
h.eventBus.Publish(event)
}
return nil
}
// Read side: Projections for queries
type AccountProjection struct {
db *sql.DB
}
func (p *AccountProjection) Handle(event Event) error {
switch e := event.(type) {
case MoneyDeposited:
_, err := p.db.Exec(`
UPDATE account_projections
SET balance = balance + $1, updated_at = NOW()
WHERE account_id = $2`,
e.Amount, e.AccountID,
)
return err
case MoneyWithdrawn:
_, err := p.db.Exec(`
UPDATE account_projections
SET balance = balance - $1, updated_at = NOW()
WHERE account_id = $2`,
e.Amount, e.AccountID,
)
return err
}
return nil
}
// Query handler reads from projections
type QueryHandler struct {
db *sql.DB
}
func (q *QueryHandler) GetAccountBalance(accountID string) (decimal.Decimal, error) {
var balance decimal.Decimal
err := q.db.QueryRow(`
SELECT balance FROM account_projections WHERE account_id = $1`,
accountID,
).Scan(&balance)
return balance, err
}
⚠️ Eventual Consistency Tradeoff
CQRS introduces eventual consistency between write and read models:
- Events are written immediately to the event store
- Projections update asynchronously (typically milliseconds to seconds)
- Queries may return stale data until projections catch up
- Design your UX to handle this: optimistic UI updates, "processing" states, or read-your-writes guarantees where critical
Snapshots for Performance
type Snapshot struct {
AggregateID string
Version int
Data []byte
CreatedAt time.Time
}
func (es *EventStore) SaveSnapshot(ctx context.Context, snapshot Snapshot) error {
_, err := es.db.ExecContext(ctx, `
INSERT INTO snapshots (aggregate_id, version, data, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (aggregate_id)
DO UPDATE SET version = $2, data = $3, created_at = $4`,
snapshot.AggregateID,
snapshot.Version,
snapshot.Data,
snapshot.CreatedAt,
)
return err
}
func (es *EventStore) GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error) {
var s Snapshot
err := es.db.QueryRowContext(ctx, `
SELECT aggregate_id, version, data, created_at
FROM snapshots
WHERE aggregate_id = $1`,
aggregateID,
).Scan(&s.AggregateID, &s.Version, &s.Data, &s.CreatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
return &s, err
}
// Load aggregate with snapshot optimization
func LoadAccount(es *EventStore, accountID string) (*Account, error) {
account := &Account{}
// Try to load snapshot
snapshot, err := es.GetSnapshot(ctx, accountID)
if err != nil {
return nil, err
}
fromVersion := 0
if snapshot != nil {
// Restore from snapshot
err = json.Unmarshal(snapshot.Data, account)
if err != nil {
return nil, err
}
fromVersion = snapshot.Version
}
// Apply events after snapshot
events, err := es.GetEvents(ctx, accountID, fromVersion)
if err != nil {
return nil, err
}
for _, e := range events {
account.Apply(e)
}
// Create new snapshot every 100 events
if len(events) > 100 {
snapshotData, _ := json.Marshal(account)
es.SaveSnapshot(ctx, Snapshot{
AggregateID: accountID,
Version: account.Version,
Data: snapshotData,
CreatedAt: time.Now(),
})
}
return account, nil
}
Event Streaming with Kafka
type EventStreamer struct {
eventStore *EventStore
producer *kafka.Writer
lastSeq int64
}
func (s *EventStreamer) StreamEvents(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case $1
ORDER BY global_sequence
LIMIT 1000`,
s.lastSeq,
)
if err != nil {
return
}
defer rows.Close()
var messages []kafka.Message
var maxSeq int64
for rows.Next() {
var seq int64
var aggregateID, eventType string
var eventData json.RawMessage
var occurredAt time.Time
rows.Scan(&seq, &aggregateID, &eventType, &eventData, &occurredAt)
messages = append(messages, kafka.Message{
Topic: fmt.Sprintf("events.%s", eventType),
Key: []byte(aggregateID),
Value: eventData,
Headers: []kafka.Header{
{Key: "event_type", Value: []byte(eventType)},
{Key: "occurred_at", Value: []byte(occurredAt.Format(time.RFC3339))},
},
})
maxSeq = seq
}
if len(messages) > 0 {
err := s.producer.WriteMessages(ctx, messages...)
if err == nil {
s.lastSeq = maxSeq
}
}
}
Temporal Queries (Time Travel)
// Get account state at specific point in time
func (es *EventStore) GetAggregateAtTime(ctx context.Context, aggregateID string, pointInTime time.Time) (*Account, error) {
events, err := es.db.QueryContext(ctx, `
SELECT event_type, event_data
FROM events
WHERE aggregate_id = $1 AND occurred_at
Saga Pattern for Distributed Transactions
type TransferSaga struct {
ID string
FromAccount string
ToAccount string
Amount decimal.Decimal
State string
CompletedSteps []string
}
func (s *TransferSaga) Handle(event Event) ([]Command, error) {
switch e := event.(type) {
case TransferInitiated:
return []Command{
WithdrawMoney{AccountID: e.FromAccount, Amount: e.Amount},
}, nil
case MoneyWithdrawn:
if e.AccountID == s.FromAccount {
s.CompletedSteps = append(s.CompletedSteps, "withdrawn")
return []Command{
DepositMoney{AccountID: s.ToAccount, Amount: s.Amount},
}, nil
}
case MoneyDeposited:
if e.AccountID == s.ToAccount {
s.State = "completed"
return []Command{
MarkTransferComplete{TransferID: s.ID},
}, nil
}
case WithdrawFailed:
s.State = "failed"
return nil, nil
case DepositFailed:
// Compensate - refund the withdrawal
return []Command{
DepositMoney{AccountID: s.FromAccount, Amount: s.Amount},
}, nil
}
return nil, nil
}
Event Store Consistency Warning
Event stores require careful attention to:
- Optimistic concurrency control to prevent data corruption
- Event ordering guarantees within aggregates
- Backup and recovery procedures for event streams
- Event schema evolution and versioning strategies
Security Considerations
Event Sourcing Security Best Practices
- Event Encryption: Encrypt sensitive data in event payloads
- Access Control: Role-based access to event streams and projections
- Audit Trail: Include user context and authorization in event metadata
- Data Privacy: Implement "right to be forgotten" through cryptographic erasure
- Replay Security: Ensure event replay doesn't bypass current security rules
// Secure event with encryption
type SecureEvent struct {
BaseEvent
EncryptedPayload []byte
KeyID string
Nonce []byte
}
// GDPR-compliant cryptographic erasure
type GDPREventStore struct {
*EventStore
keyManager *KeyManager
}
func (ges *GDPREventStore) ForgetUser(ctx context.Context, userID string) error {
events, err := ges.GetEventsByUser(ctx, userID)
if err != nil {
return fmt.Errorf("failed to find user events: %w", err)
}
for _, event := range events {
if err := ges.keyManager.RevokeKey(event.KeyID); err != nil {
return fmt.Errorf("failed to revoke key %s: %w", event.KeyID, err)
}
}
return ges.MarkUserForgotten(ctx, userID)
}
Testing Strategy
📊 Event Sourcing Testing Framework
Comprehensive testing approach for event-sourced systems:
- Event Store Tests: Test consistency, concurrency, and durability
- Aggregate Tests: Unit test business logic and invariants
- Projection Tests: Verify read model consistency
- Integration Tests: End-to-end command/query flows
- Event Schema Tests: Test event evolution and migration
// Event store integration test
func TestEventStore(t *testing.T) {
es := setupTestEventStore(t)
defer es.Close()
t.Run("ConcurrencyControl", func(t *testing.T) {
aggregateID := uuid.New().String()
// First save succeeds
err := es.SaveEvents(context.Background(), aggregateID, "Account",
[]Event{&AccountOpened{AccountID: aggregateID}}, 0)
require.NoError(t, err)
// Second save with wrong version fails
err = es.SaveEvents(context.Background(), aggregateID, "Account",
[]Event{&MoneyDeposited{AccountID: aggregateID}}, 0)
require.Error(t, err)
require.Contains(t, err.Error(), "concurrency conflict")
})
}
Production Monitoring
// Event store metrics
type Metrics struct {
EventsWritten prometheus.Counter
EventsRead prometheus.Counter
SnapshotCreated prometheus.Counter
WriteLatency prometheus.Histogram
ReadLatency prometheus.Histogram
}
// Health checks
func (es *EventStore) HealthCheck() error {
// Check write capability
testEvent := HealthCheckEvent{
ID: uuid.New().String(),
Timestamp: time.Now(),
}
err := es.SaveEvents(ctx, "health", "HealthCheck", []Event{testEvent}, 0)
if err != nil {
return fmt.Errorf("write check failed: %w", err)
}
// Check read capability
events, err := es.GetEvents(ctx, "health", 0)
if err != nil {
return fmt.Errorf("read check failed: %w", err)
}
if len(events) == 0 {
return errors.New("no events found")
}
return nil
}
// Lag monitoring
func MonitorProjectionLag(db *sql.DB) {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
var lag time.Duration
db.QueryRow(`
SELECT MAX(NOW() - updated_at)
FROM projection_checkpoints`
).Scan(&lag)
projectionLag.Set(lag.Seconds())
if lag > 5*time.Minute {
alert("Projection lag exceeds 5 minutes")
}
}
}
Performance Optimizations
// 1. Batch event writes
func (es *EventStore) SaveEventsBatch(events []EventWithAggregate) error {
// Use COPY for bulk insert
stmt, err := es.db.Prepare(pq.CopyIn("events",
"aggregate_id", "aggregate_type", "event_type",
"event_version", "event_data", "occurred_at"))
if err != nil {
return err
}
for _, e := range events {
_, err = stmt.Exec(e.AggregateID, e.AggregateType,
e.EventType, e.Version, e.Data, e.OccurredAt)
if err != nil {
return err
}
}
return stmt.Close()
}
// 2. Parallel projection updates
func UpdateProjectionsParallel(events []Event) {
var wg sync.WaitGroup
ch := make(chan Event, 100)
// Start workers
for i := 0; i
Migration from Traditional System
// Generate events from existing state
func MigrateToEventSourcing(db *sql.DB, es *EventStore) error {
rows, err := db.Query(`
SELECT id, balance, created_at, updated_at
FROM accounts`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var id string
var balance decimal.Decimal
var createdAt, updatedAt time.Time
rows.Scan(&id, &balance, &createdAt, &updatedAt)
// Create initial event
events := []Event{
AccountOpened{
AccountID: id,
Timestamp: createdAt,
},
}
// Infer deposit event from balance
if balance.GreaterThan(decimal.Zero) {
events = append(events, MoneyDeposited{
AccountID: id,
Amount: balance,
Timestamp: updatedAt,
})
}
es.SaveEvents(ctx, id, "Account", events, 0)
}
return nil
}
Lessons from Production
| Metric | Before (CRUD) | After (Event Sourcing) |
|---|---|---|
| Write throughput | 1K/sec | 10K/sec |
| Read latency p99 | 5ms | 2ms (projections) |
| Audit completeness | 60% | 100% |
| Debug time | Hours | Minutes (replay) |
| Storage cost | $1K/month | $3-5K/month |
When NOT to Use Event Sourcing
- CRUD is sufficient (most apps)
- No audit requirements
- Simple domain logic
- Team unfamiliar with the pattern
- Storage cost is critical
The Verdict
Event sourcing isn't free. 3-5x storage cost (events + projections + snapshots). Complex to implement. Mental model shift.
But for financial systems, audit-heavy domains, or complex business logic? It's transformative. Complete history, perfect audit trail, time travel debugging, and horizontal scalability.
Start small: Event source one aggregate. See the benefits. Then expand. Don't go all-in immediately.