用 Go 实现事件溯源:从零到生产
Event Sourcing in Go: From Zero to Production

原始链接: https://skoredin.pro/blog/golang/event-sourcing-go

## 事件溯源:摘要 事件溯源从根本上改变了数据存储方式,从*当前状态*转变为代表每次更改的*不可变事件*序列。系统不再使用`UPDATE`语句,而是追加事件——关于*发生的事情*的事实——提供完整的审计跟踪,并实现“时间旅行”调试。 主要优势包括改进的审计性、可扩展性(通过CQRS – 分离读/写关注点)和调试能力。通过快照(定期状态保存)和投影(从事件异步更新的读模型,通常使用Kafka进行实时集成)来实现大型事件流的性能。 成功的实施需要仔细关注事件版本控制、迁移策略和强大的并发控制。生产系统使用仅追加数据库,并带有索引和分区,以及用于审计的元数据。 虽然事件溯源具有显著优势,但并非普遍适用。它会引入复杂性和增加存储成本。它最适合需要强审计跟踪、复杂业务逻辑或调试历史数据至关重要的领域——例如金融系统——可能不适用于简单的CRUD应用程序。

黑客新闻 新 | 过去 | 评论 | 提问 | 展示 | 招聘 | 提交 登录 Go 中的事件溯源:从零到生产 (skoredin.pro) 4 点赞 by tdom 2 小时前 | 隐藏 | 过去 | 收藏 | 讨论 指南 | 常见问题 | 列表 | API | 安全 | 法律 | 申请 YC | 联系 搜索:
相关文章

原文

Key Takeaways

  • Event sourcing provides complete audit trail and time-travel debugging capabilities
  • CQRS separation enables independent scaling of reads and writes
  • Snapshots are essential for performance with large event streams
  • Proper event versioning and migration strategies prevent production disasters
  • Event streaming with Kafka enables real-time projections and system integration

Why Event Sourcing?

Your database shows current state. But how did it get there? Who changed what? When? Why?

-- Traditional: Current state only
SELECT balance FROM accounts WHERE id = 123;
-- Result: 1000

-- Event sourced: Complete history
SELECT * FROM events WHERE aggregate_id = 123;
-- Shows every deposit, withdrawal, fee, interest

We needed audit trail for financial compliance. Event sourcing gave us that plus time travel, debugging superpowers, and perfect scalability.

Core Concepts in 5 Minutes

Event sourcing stores state changes as a sequence of events rather than overwriting data. Instead of UPDATE statements that destroy history, we append immutable events that tell the complete story.

Traditional systems show what IS. Event sourcing shows what HAPPENED. This distinction transforms debugging, auditing, and analytics. When a bug corrupts data, we can replay events to find exactly when and how it occurred.

Events are facts about the past - they cannot be changed or deleted. This immutability provides natural audit logging and enables powerful patterns like temporal queries and retroactive fixes.

State becomes a left-fold over events. Current balance isn't stored; it's calculated by replaying all deposits and withdrawals. This sounds slow but with snapshots and projections, it's actually faster than traditional systems for many use cases.

// Events capture business intent
type AccountOpened struct {
    AccountID string
    Currency  string
}

type MoneyDeposited struct {
    AccountID string
    Amount    decimal.Decimal
}

// State derived from event history
func (a *Account) Apply(event Event) {
    // Rebuild state by replaying events
}

Production Event Store

A production event store needs to handle millions of events efficiently. Our PostgreSQL-based implementation processes 10K events/second with proper indexing and partitioning. The append-only nature makes it extremely fast - no updates, no deletes, just inserts.

Event ordering is critical for consistency. We use database sequences per aggregate to ensure events are applied in the correct order. This prevents race conditions where concurrent operations might corrupt state.

The schema design balances normalization with query performance. Event data is stored as JSON for flexibility, while frequently queried fields (aggregate_id, event_type) are indexed columns. This hybrid approach enables both fast queries and schema evolution.

Metadata tracks important context: user ID, correlation ID, causation ID. This audit trail proves invaluable for debugging and compliance. Every state change is traceable to its origin.

type EventStore struct {
    db *sql.DB
}

type StoredEvent struct {
    ID            uuid.UUID
    AggregateID   string
    EventType     string
    EventVersion  int
    EventData     json.RawMessage
    Metadata      json.RawMessage
    OccurredAt    time.Time
}

// Append-only schema with proper indexes
const schema = `
CREATE TABLE events (
    id UUID PRIMARY KEY,
    aggregate_id VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_version INT NOT NULL,
    event_data JSONB NOT NULL,
    metadata JSONB,
    occurred_at TIMESTAMP NOT NULL,
    recorded_at TIMESTAMP NOT NULL DEFAULT NOW(),
    
    -- Ensure events are ordered per aggregate
    UNIQUE(aggregate_id, event_version),
    
    -- Indexes for queries
    INDEX idx_aggregate (aggregate_id, event_version),
    INDEX idx_event_type (event_type),
    INDEX idx_occurred_at (occurred_at)
);

-- Global event sequence for ordering
CREATE SEQUENCE IF NOT EXISTS global_event_sequence;
ALTER TABLE events ADD COLUMN global_sequence BIGINT DEFAULT nextval('global_event_sequence');
CREATE INDEX idx_global_sequence ON events(global_sequence);
`

func (es *EventStore) SaveEvents(ctx context.Context, aggregateID, aggregateType string, events []Event, expectedVersion int) error {
    tx, err := es.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // Check optimistic concurrency
    var currentVersion int
    err = tx.QueryRow(`
        SELECT COALESCE(MAX(event_version), 0) 
        FROM events 
        WHERE aggregate_id = $1`,
        aggregateID,
    ).Scan(&currentVersion)
    
    if err != nil {
        return err
    }
    
    if currentVersion != expectedVersion {
        return fmt.Errorf("concurrency conflict: expected version %d, got %d", 
            expectedVersion, currentVersion)
    }
    
    // Save events
    version := expectedVersion
    for _, event := range events {
        version++
        
        eventData, err := json.Marshal(event)
        if err != nil {
            return err
        }
        
        metadata := map[string]interface{}{
            "user_id":     ctx.Value("user_id"),
            "trace_id":    ctx.Value("trace_id"),
            "source":      ctx.Value("source"),
        }
        metadataJSON, _ := json.Marshal(metadata)
        
        _, err = tx.Exec(`
            INSERT INTO events (
                aggregate_id, aggregate_type, event_type, 
                event_version, event_data, metadata, occurred_at
            ) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
            aggregateID,
            aggregateType,
            event.EventType(),
            version,
            eventData,
            metadataJSON,
            event.OccurredAt(),
        )
        
        if err != nil {
            return err
        }
    }
    
    return tx.Commit()
}

func (es *EventStore) GetEvents(ctx context.Context, aggregateID string, fromVersion int) ([]StoredEvent, error) {
    rows, err := es.db.QueryContext(ctx, `
        SELECT 
            id, aggregate_id, aggregate_type, event_type,
            event_version, event_data, metadata, 
            occurred_at, recorded_at
        FROM events
        WHERE aggregate_id = $1 AND event_version > $2
        ORDER BY event_version`,
        aggregateID, fromVersion,
    )
    if err != nil {
        return nil, err
    }
    defer rows.Close()
    
    var events []StoredEvent
    for rows.Next() {
        var e StoredEvent
        err := rows.Scan(
            &e.ID, &e.AggregateID, &e.AggregateType,
            &e.EventType, &e.EventVersion, &e.EventData,
            &e.Metadata, &e.OccurredAt, &e.RecordedAt,
        )
        if err != nil {
            return nil, err
        }
        events = append(events, e)
    }
    
    return events, nil
}

Aggregate Root Pattern

type AggregateRoot struct {
    ID               string
    Version          int
    uncommittedEvents []Event
}

func (a *AggregateRoot) RecordEvent(event Event) {
    a.uncommittedEvents = append(a.uncommittedEvents, event)
    a.Version++
}

func (a *AggregateRoot) GetUncommittedEvents() []Event {
    return a.uncommittedEvents
}

func (a *AggregateRoot) MarkEventsAsCommitted() {
    a.uncommittedEvents = []Event{}
}

// Example: Account aggregate
type Account struct {
    AggregateRoot
    Balance  decimal.Decimal
    Currency string
    Status   string
}

func (a *Account) Deposit(amount decimal.Decimal) error {
    if amount.LessThanOrEqual(decimal.Zero) {
        return fmt.Errorf("invalid deposit amount: %v must be positive", amount)
    }
    
    event := MoneyDeposited{
        AccountID: a.ID,
        Amount:    amount,
        Timestamp: time.Now(),
    }
    
    a.Apply(event)
    a.RecordEvent(event)
    return nil
}

func (a *Account) Withdraw(amount decimal.Decimal) error {
    if amount.GreaterThan(a.Balance) {
        return fmt.Errorf("insufficient funds: attempting to withdraw %v from balance %v", amount, a.Balance)
    }
    
    event := MoneyWithdrawn{
        AccountID: a.ID,
        Amount:    amount,
        Timestamp: time.Now(),
    }
    
    a.Apply(event)
    a.RecordEvent(event)
    return nil
}

func (a *Account) Apply(event Event) {
    switch e := event.(type) {
    case MoneyDeposited:
        a.Balance = a.Balance.Add(e.Amount)
    case MoneyWithdrawn:
        a.Balance = a.Balance.Sub(e.Amount)
    }
}

CQRS: Command and Query Separation

// Write side: Commands modify aggregates
type CommandHandler struct {
    eventStore *EventStore
    eventBus   *EventBus
}

func (h *CommandHandler) Handle(cmd Command) error {
    switch c := cmd.(type) {
    case DepositMoney:
        return h.handleDeposit(c)
    case WithdrawMoney:
        return h.handleWithdraw(c)
    }
    return errors.New("unknown command")
}

func (h *CommandHandler) handleDeposit(cmd DepositMoney) error {
    // Load aggregate from events
    account := &Account{}
    events, err := h.eventStore.GetEvents(ctx, cmd.AccountID, 0)
    if err != nil {
        return err
    }
    
    for _, e := range events {
        account.Apply(e)
    }
    
    // Execute business logic
    err = account.Deposit(cmd.Amount)
    if err != nil {
        return err
    }
    
    // Save new events
    err = h.eventStore.SaveEvents(
        ctx, 
        account.ID, 
        "Account",
        account.GetUncommittedEvents(),
        account.Version,
    )
    if err != nil {
        return err
    }
    
    // Publish for projections
    for _, event := range account.GetUncommittedEvents() {
        h.eventBus.Publish(event)
    }
    
    return nil
}

// Read side: Projections for queries
type AccountProjection struct {
    db *sql.DB
}

func (p *AccountProjection) Handle(event Event) error {
    switch e := event.(type) {
    case MoneyDeposited:
        _, err := p.db.Exec(`
            UPDATE account_projections 
            SET balance = balance + $1, updated_at = NOW()
            WHERE account_id = $2`,
            e.Amount, e.AccountID,
        )
        return err
        
    case MoneyWithdrawn:
        _, err := p.db.Exec(`
            UPDATE account_projections 
            SET balance = balance - $1, updated_at = NOW()
            WHERE account_id = $2`,
            e.Amount, e.AccountID,
        )
        return err
    }
    return nil
}

// Query handler reads from projections
type QueryHandler struct {
    db *sql.DB
}

func (q *QueryHandler) GetAccountBalance(accountID string) (decimal.Decimal, error) {
    var balance decimal.Decimal
    err := q.db.QueryRow(`
        SELECT balance FROM account_projections WHERE account_id = $1`,
        accountID,
    ).Scan(&balance)
    return balance, err
}

⚠️ Eventual Consistency Tradeoff

CQRS introduces eventual consistency between write and read models:

  • Events are written immediately to the event store
  • Projections update asynchronously (typically milliseconds to seconds)
  • Queries may return stale data until projections catch up
  • Design your UX to handle this: optimistic UI updates, "processing" states, or read-your-writes guarantees where critical

Snapshots for Performance

type Snapshot struct {
    AggregateID string
    Version     int
    Data        []byte
    CreatedAt   time.Time
}

func (es *EventStore) SaveSnapshot(ctx context.Context, snapshot Snapshot) error {
    _, err := es.db.ExecContext(ctx, `
        INSERT INTO snapshots (aggregate_id, version, data, created_at)
        VALUES ($1, $2, $3, $4)
        ON CONFLICT (aggregate_id) 
        DO UPDATE SET version = $2, data = $3, created_at = $4`,
        snapshot.AggregateID,
        snapshot.Version,
        snapshot.Data,
        snapshot.CreatedAt,
    )
    return err
}

func (es *EventStore) GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error) {
    var s Snapshot
    err := es.db.QueryRowContext(ctx, `
        SELECT aggregate_id, version, data, created_at
        FROM snapshots
        WHERE aggregate_id = $1`,
        aggregateID,
    ).Scan(&s.AggregateID, &s.Version, &s.Data, &s.CreatedAt)
    
    if err == sql.ErrNoRows {
        return nil, nil
    }
    return &s, err
}

// Load aggregate with snapshot optimization
func LoadAccount(es *EventStore, accountID string) (*Account, error) {
    account := &Account{}
    
    // Try to load snapshot
    snapshot, err := es.GetSnapshot(ctx, accountID)
    if err != nil {
        return nil, err
    }
    
    fromVersion := 0
    if snapshot != nil {
        // Restore from snapshot
        err = json.Unmarshal(snapshot.Data, account)
        if err != nil {
            return nil, err
        }
        fromVersion = snapshot.Version
    }
    
    // Apply events after snapshot
    events, err := es.GetEvents(ctx, accountID, fromVersion)
    if err != nil {
        return nil, err
    }
    
    for _, e := range events {
        account.Apply(e)
    }
    
    // Create new snapshot every 100 events
    if len(events) > 100 {
        snapshotData, _ := json.Marshal(account)
        es.SaveSnapshot(ctx, Snapshot{
            AggregateID: accountID,
            Version:     account.Version,
            Data:        snapshotData,
            CreatedAt:   time.Now(),
        })
    }
    
    return account, nil
}

Event Streaming with Kafka

type EventStreamer struct {
    eventStore *EventStore
    producer   *kafka.Writer
    lastSeq    int64
}

func (s *EventStreamer) StreamEvents(ctx context.Context) {
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    
    for {
        select {
        case  $1
        ORDER BY global_sequence
        LIMIT 1000`,
        s.lastSeq,
    )
    if err != nil {
        return
    }
    defer rows.Close()
    
    var messages []kafka.Message
    var maxSeq int64
    
    for rows.Next() {
        var seq int64
        var aggregateID, eventType string
        var eventData json.RawMessage
        var occurredAt time.Time
        
        rows.Scan(&seq, &aggregateID, &eventType, &eventData, &occurredAt)
        
        messages = append(messages, kafka.Message{
            Topic: fmt.Sprintf("events.%s", eventType),
            Key:   []byte(aggregateID),
            Value: eventData,
            Headers: []kafka.Header{
                {Key: "event_type", Value: []byte(eventType)},
                {Key: "occurred_at", Value: []byte(occurredAt.Format(time.RFC3339))},
            },
        })
        
        maxSeq = seq
    }
    
    if len(messages) > 0 {
        err := s.producer.WriteMessages(ctx, messages...)
        if err == nil {
            s.lastSeq = maxSeq
        }
    }
}

Temporal Queries (Time Travel)

// Get account state at specific point in time
func (es *EventStore) GetAggregateAtTime(ctx context.Context, aggregateID string, pointInTime time.Time) (*Account, error) {
    events, err := es.db.QueryContext(ctx, `
        SELECT event_type, event_data
        FROM events
        WHERE aggregate_id = $1 AND occurred_at 

Saga Pattern for Distributed Transactions

type TransferSaga struct {
    ID          string
    FromAccount string
    ToAccount   string
    Amount      decimal.Decimal
    State       string
    CompletedSteps []string
}

func (s *TransferSaga) Handle(event Event) ([]Command, error) {
    switch e := event.(type) {
    case TransferInitiated:
        return []Command{
            WithdrawMoney{AccountID: e.FromAccount, Amount: e.Amount},
        }, nil
        
    case MoneyWithdrawn:
        if e.AccountID == s.FromAccount {
            s.CompletedSteps = append(s.CompletedSteps, "withdrawn")
            return []Command{
                DepositMoney{AccountID: s.ToAccount, Amount: s.Amount},
            }, nil
        }
        
    case MoneyDeposited:
        if e.AccountID == s.ToAccount {
            s.State = "completed"
            return []Command{
                MarkTransferComplete{TransferID: s.ID},
            }, nil
        }
        
    case WithdrawFailed:
        s.State = "failed"
        return nil, nil
        
    case DepositFailed:
        // Compensate - refund the withdrawal
        return []Command{
            DepositMoney{AccountID: s.FromAccount, Amount: s.Amount},
        }, nil
    }
    
    return nil, nil
}

Event Store Consistency Warning

Event stores require careful attention to:

  • Optimistic concurrency control to prevent data corruption
  • Event ordering guarantees within aggregates
  • Backup and recovery procedures for event streams
  • Event schema evolution and versioning strategies

Security Considerations

Event Sourcing Security Best Practices

  • Event Encryption: Encrypt sensitive data in event payloads
  • Access Control: Role-based access to event streams and projections
  • Audit Trail: Include user context and authorization in event metadata
  • Data Privacy: Implement "right to be forgotten" through cryptographic erasure
  • Replay Security: Ensure event replay doesn't bypass current security rules
// Secure event with encryption
type SecureEvent struct {
    BaseEvent
    EncryptedPayload []byte
    KeyID           string
    Nonce           []byte
}

// GDPR-compliant cryptographic erasure
type GDPREventStore struct {
    *EventStore
    keyManager *KeyManager
}

func (ges *GDPREventStore) ForgetUser(ctx context.Context, userID string) error {
    events, err := ges.GetEventsByUser(ctx, userID)
    if err != nil {
        return fmt.Errorf("failed to find user events: %w", err)
    }
    
    for _, event := range events {
        if err := ges.keyManager.RevokeKey(event.KeyID); err != nil {
            return fmt.Errorf("failed to revoke key %s: %w", event.KeyID, err)
        }
    }
    
    return ges.MarkUserForgotten(ctx, userID)
}

Testing Strategy

📊 Event Sourcing Testing Framework

Comprehensive testing approach for event-sourced systems:

  • Event Store Tests: Test consistency, concurrency, and durability
  • Aggregate Tests: Unit test business logic and invariants
  • Projection Tests: Verify read model consistency
  • Integration Tests: End-to-end command/query flows
  • Event Schema Tests: Test event evolution and migration
// Event store integration test
func TestEventStore(t *testing.T) {
    es := setupTestEventStore(t)
    defer es.Close()
    
    t.Run("ConcurrencyControl", func(t *testing.T) {
        aggregateID := uuid.New().String()
        
        // First save succeeds
        err := es.SaveEvents(context.Background(), aggregateID, "Account", 
            []Event{&AccountOpened{AccountID: aggregateID}}, 0)
        require.NoError(t, err)
        
        // Second save with wrong version fails
        err = es.SaveEvents(context.Background(), aggregateID, "Account", 
            []Event{&MoneyDeposited{AccountID: aggregateID}}, 0)
        require.Error(t, err)
        require.Contains(t, err.Error(), "concurrency conflict")
    })
}

Production Monitoring

// Event store metrics
type Metrics struct {
    EventsWritten   prometheus.Counter
    EventsRead      prometheus.Counter
    SnapshotCreated prometheus.Counter
    WriteLatency    prometheus.Histogram
    ReadLatency     prometheus.Histogram
}

// Health checks
func (es *EventStore) HealthCheck() error {
    // Check write capability
    testEvent := HealthCheckEvent{
        ID:        uuid.New().String(),
        Timestamp: time.Now(),
    }
    
    err := es.SaveEvents(ctx, "health", "HealthCheck", []Event{testEvent}, 0)
    if err != nil {
        return fmt.Errorf("write check failed: %w", err)
    }
    
    // Check read capability
    events, err := es.GetEvents(ctx, "health", 0)
    if err != nil {
        return fmt.Errorf("read check failed: %w", err)
    }
    
    if len(events) == 0 {
        return errors.New("no events found")
    }
    
    return nil
}

// Lag monitoring
func MonitorProjectionLag(db *sql.DB) {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        var lag time.Duration
        db.QueryRow(`
            SELECT MAX(NOW() - updated_at) 
            FROM projection_checkpoints`
        ).Scan(&lag)
        
        projectionLag.Set(lag.Seconds())
        
        if lag > 5*time.Minute {
            alert("Projection lag exceeds 5 minutes")
        }
    }
}

Performance Optimizations

// 1. Batch event writes
func (es *EventStore) SaveEventsBatch(events []EventWithAggregate) error {
    // Use COPY for bulk insert
    stmt, err := es.db.Prepare(pq.CopyIn("events",
        "aggregate_id", "aggregate_type", "event_type",
        "event_version", "event_data", "occurred_at"))
    if err != nil {
        return err
    }
    
    for _, e := range events {
        _, err = stmt.Exec(e.AggregateID, e.AggregateType,
            e.EventType, e.Version, e.Data, e.OccurredAt)
        if err != nil {
            return err
        }
    }
    
    return stmt.Close()
}

// 2. Parallel projection updates
func UpdateProjectionsParallel(events []Event) {
    var wg sync.WaitGroup
    ch := make(chan Event, 100)
    
    // Start workers
    for i := 0; i 

Migration from Traditional System

// Generate events from existing state
func MigrateToEventSourcing(db *sql.DB, es *EventStore) error {
    rows, err := db.Query(`
        SELECT id, balance, created_at, updated_at
        FROM accounts`)
    if err != nil {
        return err
    }
    defer rows.Close()
    
    for rows.Next() {
        var id string
        var balance decimal.Decimal
        var createdAt, updatedAt time.Time
        
        rows.Scan(&id, &balance, &createdAt, &updatedAt)
        
        // Create initial event
        events := []Event{
            AccountOpened{
                AccountID: id,
                Timestamp: createdAt,
            },
        }
        
        // Infer deposit event from balance
        if balance.GreaterThan(decimal.Zero) {
            events = append(events, MoneyDeposited{
                AccountID: id,
                Amount:    balance,
                Timestamp: updatedAt,
            })
        }
        
        es.SaveEvents(ctx, id, "Account", events, 0)
    }
    
    return nil
}

Lessons from Production

Metric Before (CRUD) After (Event Sourcing)
Write throughput 1K/sec 10K/sec
Read latency p99 5ms 2ms (projections)
Audit completeness 60% 100%
Debug time Hours Minutes (replay)
Storage cost $1K/month $3-5K/month

When NOT to Use Event Sourcing

  • CRUD is sufficient (most apps)
  • No audit requirements
  • Simple domain logic
  • Team unfamiliar with the pattern
  • Storage cost is critical

The Verdict

Event sourcing isn't free. 3-5x storage cost (events + projections + snapshots). Complex to implement. Mental model shift.

But for financial systems, audit-heavy domains, or complex business logic? It's transformative. Complete history, perfect audit trail, time travel debugging, and horizontal scalability.

Start small: Event source one aggregate. See the benefits. Then expand. Don't go all-in immediately.

联系我们 contact @ memedata.com