Saltar al contenido principal
Version: 0.52.0-rc.104

Sistema de Eventos y Patrón Outbox

Este documento describe el sistema de eventos de Lana, incluyendo event sourcing, el patrón outbox y la comunicación entre dominios.

Descripción General

Lana utiliza una arquitectura orientada a eventos con:

  • Event Sourcing: Los cambios de estado se capturan como eventos
  • Patrón Outbox: Publicación confiable de eventos
  • Eventos de Dominio: Comunicación entre contextos

Tipos de Eventos

Eventos de Entidad

Cambios de estado internos dentro de un agregado:

#[derive(EsEvent)]
pub enum CreditFacilityEvent {
Initialized {
id: CreditFacilityId,
customer_id: CustomerId,
amount: UsdCents,
},
CollateralPosted {
collateral_id: CollateralId,
amount: Satoshis,
},
Activated {
activated_at: DateTime<Utc>,
},
DisbursalInitiated {
disbursal_id: DisbursalId,
amount: UsdCents,
},
}

Patrón Outbox

El outbox garantiza la entrega confiable de eventos:

Estructura de la Tabla Outbox

CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type VARCHAR NOT NULL,
aggregate_id UUID NOT NULL,
event_type VARCHAR NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL,
processed_at TIMESTAMP,
correlation_id UUID
);

Publicación de Eventos

impl CreditFacilityRepo {
pub async fn create(&self, facility: CreditFacility) -> Result<CreditFacility> {
let mut tx = self.pool.begin().await?;

// Save entity
facility.persist(&mut tx).await?;

// Publish to outbox (same transaction)
for event in facility.events() {
self.outbox.publish(&mut tx, event).await?;
}

tx.commit().await?;
Ok(facility)
}
}

Procesamiento de Eventos

Procesador de Eventos

pub struct EventProcessor {
handlers: Vec<Box<dyn EventHandler>>,
}

impl EventProcessor {
pub async fn process_pending(&self) -> Result<()> {
let events = self.outbox.fetch_pending(100).await?;

for event in events {
for handler in &self.handlers {
if handler.can_handle(&event) {
handler.handle(&event).await?;
}
}
self.outbox.mark_processed(event.id).await?;
}

Ok(())
}
}

Manejadores de Eventos

pub struct CustomerActivationHandler {
deposit_service: DepositService,
}

#[async_trait]
impl EventHandler for CustomerActivationHandler {
fn can_handle(&self, event: &OutboxEvent) -> bool {
event.event_type == "CustomerActivated"
}

async fn handle(&self, event: &OutboxEvent) -> Result<()> {
let payload: CustomerActivatedEvent = serde_json::from_value(event.payload)?;

// Create deposit account for new customer
self.deposit_service
.create_account(payload.customer_id)
.await?;

Ok(())
}
}

Comunicación entre Dominios

Correlación de Eventos

Los eventos pueden correlacionarse para el rastreo:

pub struct CorrelationContext {
correlation_id: Uuid,
causation_id: Option<Uuid>,
trace_id: String,
}

impl Outbox {
pub async fn publish_with_context(
&self,
tx: &mut Transaction,
event: impl Event,
context: CorrelationContext,
) -> Result<()> {
// Include correlation data in outbox record
}
}

Idempotencia

Los manejadores de eventos deben ser idempotentes:

impl DepositAccountCreationHandler {
async fn handle(&self, event: &CustomerActivatedEvent) -> Result<()> {
// Check if already processed
if self.repo.exists_for_customer(event.customer_id).await? {
return Ok(()); // Already created
}

// Create new account
self.repo.create_account(event.customer_id).await
}
}

Reproducción de Eventos

Los eventos se pueden reproducir para recuperación o pruebas:

pub async fn replay_events(
from: DateTime<Utc>,
to: DateTime<Utc>,
) -> Result<()> {
let events = outbox.fetch_range(from, to).await?;

for event in events {
processor.process(event).await?;
}

Ok(())
}

Monitoreo

Métricas

  • Eventos publicados por segundo
  • Latencia de procesamiento de eventos
  • Cantidad de eventos pendientes
  • Tasas de éxito/fallo de manejadores

Alertas

  • Cantidad alta de eventos pendientes
  • Fallos de procesamiento
  • Manejadores lentos