Skip to main content
Version: 0.45.0

Event System and Outbox Pattern

This document describes Lana's event system, including event sourcing, the outbox pattern, and cross-domain communication.

Overview

Lana uses an event-driven architecture with:

  • Event Sourcing: State changes captured as events
  • Outbox Pattern: Reliable event publishing
  • Domain Events: Cross-context communication

Event Types

Entity Events

Internal state changes within an aggregate:

#[derive(EsEvent)]
pub enum CreditFacilityEvent {
Initialized {
id: CreditFacilityId,
customer_id: CustomerId,
amount: UsdCents,
},
CollateralPosted {
collateral_id: CollateralId,
amount: Satoshis,
},
Activated {
activated_at: DateTime<Utc>,
},
DisbursalInitiated {
disbursal_id: DisbursalId,
amount: UsdCents,
},
}

Outbox Pattern

The outbox ensures reliable event delivery:

Outbox Table Structure

CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type VARCHAR NOT NULL,
aggregate_id UUID NOT NULL,
event_type VARCHAR NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL,
processed_at TIMESTAMP,
correlation_id UUID
);

Publishing Events

impl CreditFacilityRepo {
pub async fn create(&self, facility: CreditFacility) -> Result<CreditFacility> {
let mut tx = self.pool.begin().await?;

// Save entity
facility.persist(&mut tx).await?;

// Publish to outbox (same transaction)
for event in facility.events() {
self.outbox.publish(&mut tx, event).await?;
}

tx.commit().await?;
Ok(facility)
}
}

Event Processing

Event Processor

pub struct EventProcessor {
handlers: Vec<Box<dyn EventHandler>>,
}

impl EventProcessor {
pub async fn process_pending(&self) -> Result<()> {
let events = self.outbox.fetch_pending(100).await?;

for event in events {
for handler in &self.handlers {
if handler.can_handle(&event) {
handler.handle(&event).await?;
}
}
self.outbox.mark_processed(event.id).await?;
}

Ok(())
}
}

Event Handlers

pub struct CustomerActivationHandler {
deposit_service: DepositService,
}

#[async_trait]
impl EventHandler for CustomerActivationHandler {
fn can_handle(&self, event: &OutboxEvent) -> bool {
event.event_type == "CustomerActivated"
}

async fn handle(&self, event: &OutboxEvent) -> Result<()> {
let payload: CustomerActivatedEvent = serde_json::from_value(event.payload)?;

// Create deposit account for new customer
self.deposit_service
.create_account(payload.customer_id)
.await?;

Ok(())
}
}

Cross-Domain Communication

Event Correlation

Events can be correlated for tracing:

pub struct CorrelationContext {
correlation_id: Uuid,
causation_id: Option<Uuid>,
trace_id: String,
}

impl Outbox {
pub async fn publish_with_context(
&self,
tx: &mut Transaction,
event: impl Event,
context: CorrelationContext,
) -> Result<()> {
// Include correlation data in outbox record
}
}

Idempotency

Event handlers must be idempotent:

impl DepositAccountCreationHandler {
async fn handle(&self, event: &CustomerActivatedEvent) -> Result<()> {
// Check if already processed
if self.repo.exists_for_customer(event.customer_id).await? {
return Ok(()); // Already created
}

// Create new account
self.repo.create_account(event.customer_id).await
}
}

Event Replay

Events can be replayed for recovery or testing:

pub async fn replay_events(
from: DateTime<Utc>,
to: DateTime<Utc>,
) -> Result<()> {
let events = outbox.fetch_range(from, to).await?;

for event in events {
processor.process(event).await?;
}

Ok(())
}

Monitoring

Metrics

  • Events published per second
  • Event processing latency
  • Pending event count
  • Handler success/failure rates

Alerts

  • High pending event count
  • Processing failures
  • Slow handlers