Saltar al contenido principal
Version: Siguiente

Sistema de Eventos y Patrón Outbox

Este documento describe la arquitectura orientada a eventos implementada en Lana Bank, centrándose en el patrón de event sourcing usado dentro de las entidades de dominio y el patrón outbox usado para la publicación confiable de eventos.

Modelo Dual de Eventos

El sistema implementa un modelo dual que separa los eventos internos de entidad de los eventos de dominio externos. Esta separación permite una integración confiable entre contextos acotados manteniendo una estricta consistencia dentro de cada dominio.

Eventos de Entidad vs Eventos de Dominio

Tipo de EventoPropósitoAlcanceEjemplos
Eventos de EntidadCambios de estado internosUn único agregadoCreditFacilityEvent::Initialized, ObligationEvent::DueRecorded
Eventos de DominioEventos de negocio para consumo externoIntegración entre dominiosCoreCreditEvent::FacilityActivated, CoreCreditEvent::ObligationDue

Los eventos de entidad se persisten como fuente de la verdad mediante event sourcing. Los eventos de dominio se derivan y publican a través del patrón outbox.

Event Sourcing para Entidades

Arquitectura de Event Sourcing

┌──────────────────┐
│ Business Command │
└────────┬─────────┘


┌──────────────────┐ ┌──────────────────┐
│ Domain Entity │───▶│ Entity Events │
│ (execute logic) │ │ push(new_event) │
└────────┬─────────┘ └────────┬─────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Repository │ │ Domain Publisher│
│ update_in_op() │ │ publish() │
└────────┬─────────┘ └────────┬─────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ PostgreSQL │ │ Outbox Table │
│ (Event Store) │ │ │
└──────────────────┘ └──────────────────┘

Implementación con es-entity

Las entidades de dominio usan el framework es-entity:

use es_entity::*;

#[derive(EsEntity)]
pub struct CreditFacility {
events: EntityEvents<CreditFacilityEvent>,
// ... otros campos
}

#[derive(EsEvent)]
pub enum CreditFacilityEvent {
Initialized {
id: CreditFacilityId,
customer_id: CustomerId,
terms: TermsId,
},
Activated {
activated_at: DateTime<Utc>,
},
DisbursalInitiated {
disbursal_id: DisbursalId,
amount: Money,
},
// ... más variantes
}

Flujo de Procesamiento

  1. Comando de negocio: Se recibe una solicitud (ej. activar facilidad)
  2. Ejecución de lógica: La entidad valida y ejecuta la operación
  3. Emisión de evento: Se crea y agrega el evento al historial
  4. Persistencia: El repositorio guarda los nuevos eventos en transacción
  5. Publicación: El publisher convierte a eventos de dominio y publica al outbox
impl CreditFacility {
pub fn activate(&mut self) -> Result<(), CreditFacilityError> {
// Validar estado actual
if self.status != CreditFacilityStatus::Approved {
return Err(CreditFacilityError::InvalidStatus);
}

// Emitir evento
self.events.push(CreditFacilityEvent::Activated {
activated_at: Utc::now(),
});

Ok(())
}
}

Patrón Outbox

El patrón outbox garantiza la publicación confiable de eventos persistiendo los eventos de dominio en la misma transacción de base de datos que los datos de negocio.

Arquitectura del Outbox

┌─────────────────────────────────────────────────────────────────┐
│ PostgreSQL Transaction │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Entity Events │ │ Outbox Events │ │
│ │ Table │ │ Table │ │
│ └─────────────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

│ (async)

┌─────────────────────────────────────────────────────────────────┐
│ Event Consumers │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Background Jobs │ │ External Systems │ │
│ └─────────────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

Esquema de la Tabla Outbox

ColumnaTipoPropósito
sequenceBIGSERIALOrden de eventos autoincremental
recorded_atTIMESTAMPTZMarca de tiempo de creación
event_typeTEXTDiscriminador para variantes de evento
payloadJSONBEvento de dominio serializado
trace_contextJSONBContexto de trazabilidad para observabilidad
CREATE TABLE outbox_events (
sequence BIGSERIAL PRIMARY KEY,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
trace_context JSONB
);

CREATE INDEX idx_outbox_events_sequence ON outbox_events(sequence);

Preservación del Contexto de Trazas

El outbox preserva el contexto de trazabilidad para correlacionar eventos:

pub struct OutboxEvent {
pub sequence: i64,
pub recorded_at: DateTime<Utc>,
pub event_type: String,
pub payload: serde_json::Value,
pub trace_context: Option<TraceContext>,
}

impl OutboxEvent {
pub fn new<E: Serialize>(event: &E, trace_context: Option<TraceContext>) -> Self {
Self {
sequence: 0, // Asignado por la BD
recorded_at: Utc::now(),
event_type: std::any::type_name::<E>().to_string(),
payload: serde_json::to_value(event).unwrap(),
trace_context,
}
}
}

Eventos de Dominio Publicados

Tipos de Eventos del Dominio Core

// lana/events/src/lib.rs
pub enum CoreCreditEvent {
FacilityCreated {
id: CreditFacilityId,
customer_id: CustomerId,
},
FacilityActivated {
id: CreditFacilityId,
},
DisbursalCompleted {
facility_id: CreditFacilityId,
disbursal_id: DisbursalId,
amount: Money,
},
ObligationDue {
facility_id: CreditFacilityId,
obligation_id: ObligationId,
due_date: NaiveDate,
},
PaymentReceived {
facility_id: CreditFacilityId,
payment_id: PaymentId,
amount: Money,
},
}

pub enum CoreDepositEvent {
AccountCreated {
id: DepositAccountId,
customer_id: CustomerId,
},
DepositRecorded {
account_id: DepositAccountId,
amount: Money,
},
WithdrawalInitiated {
account_id: DepositAccountId,
withdrawal_id: WithdrawalId,
amount: Money,
},
}

pub enum CoreCustomerEvent {
CustomerCreated {
id: CustomerId,
},
KycCompleted {
id: CustomerId,
status: KycStatus,
},
}

Patrón Publisher

Implementación del Publisher

// core/credit/src/publisher.rs
pub struct CreditFacilityPublisher {
outbox: OutboxPublisher,
}

impl CreditFacilityPublisher {
pub async fn publish(
&self,
events: &[CreditFacilityEvent],
db_op: &mut DbOp<'_>,
) -> Result<(), PublisherError> {
for event in events {
if let Some(domain_event) = self.to_domain_event(event) {
self.outbox.publish(&domain_event, db_op).await?;
}
}
Ok(())
}

fn to_domain_event(&self, event: &CreditFacilityEvent) -> Option<CoreCreditEvent> {
match event {
CreditFacilityEvent::Activated { .. } => {
Some(CoreCreditEvent::FacilityActivated {
id: self.facility_id
})
}
// No todos los eventos internos se publican externamente
CreditFacilityEvent::InternalStateChange { .. } => None,
// ... más mappings
}
}
}

Publicación Selectiva

No todos los eventos de entidad se publican como eventos de dominio:

fn to_domain_event(&self, event: &CreditFacilityEvent) -> Option<CoreCreditEvent> {
match event {
// Eventos que SÍ se publican
CreditFacilityEvent::Activated { .. } => Some(CoreCreditEvent::FacilityActivated { .. }),
CreditFacilityEvent::DisbursalCompleted { .. } => Some(CoreCreditEvent::DisbursalCompleted { .. }),

// Eventos internos que NO se publican
CreditFacilityEvent::TermsUpdated { .. } => None,
CreditFacilityEvent::CollateralRevalued { .. } => None,
}
}

Integración con Jobs

Consumo de Eventos

// Job que consume eventos del outbox
pub struct CollateralSyncJob {
outbox_consumer: OutboxConsumer,
}

impl Job for CollateralSyncJob {
async fn run(&self) -> Result<(), JobError> {
let events = self.outbox_consumer
.poll::<CoreCreditEvent>()
.await?;

for event in events {
match event.payload {
CoreCreditEvent::FacilityActivated { id } => {
self.sync_collateral_for_facility(id).await?;
}
_ => {}
}
self.outbox_consumer.ack(event.sequence).await?;
}
Ok(())
}
}