Saltar al contenido principal
Version: Siguiente

Sistema de Trabajos en Segundo Plano

Este documento describe el sistema de procesamiento de trabajos en segundo plano en Lana Bank, que proporciona ejecución confiable de tareas asíncronas con lógica de reintentos, control de concurrencia y compatibilidad con trazado distribuido.

Propósito

El sistema de trabajos habilita:

  • Programación basada en tiempo (cron-like)
  • Procesamiento impulsado por eventos
  • Operaciones de larga duración independientes del ciclo solicitud-respuesta

Arquitectura del Sistema

El sistema sigue una arquitectura basada en 'pull', donde un despachador central consulta trabajos pendientes en la tabla job_executions de PostgreSQL.

┌─────────────────────────────────────────────────────────────────┐
│ Fuentes de Creación de Trabajos │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ Trabajos │ │ Trabajos │ │ Trabajos │ │
│ │ Programados │ │ por Eventos │ │ de Sondeo │ │
│ │ (tipo cron) │ │ (outbox) │ │ (auto-rep.) │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ Infraestructura de Trabajos │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ JobTracker │ │
│ │ (Control de Concurrencia) │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ JobDispatcher │ │
│ │ (Gestor de Ejecución) │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ job_executions │ │
│ │ (Tabla PostgreSQL) │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

Componentes Principales

JobTracker - Control de Concurrencia

El JobTracker gestiona la concurrencia rastreando los trabajos en ejecución y determinando tamaños de lote para consultas.

ComponenteTipoResponsabilidad
running_jobsAtomicUsizeContador thread-safe de ejecuciones activas
notifyNotifyNotificación asíncrona para cambios de estado
next_batch_size()métodoCalcula cuántos trabajos consultar
dispatch_job()métodoIncrementa contador al iniciar trabajo
job_completed()métodoDecrementa contador y notifica
pub fn next_batch_size(&self) -> Option<usize> {
let n_running = self.running_jobs.load(Ordering::SeqCst);
if n_running < self.min_jobs {
Some(self.max_jobs - n_running)
} else {
None
}
}

JobDispatcher - Gestor de Ejecución

El JobDispatcher maneja el ciclo de vida completo de un trabajo: inicialización, ejecución, reintentos y finalización.

TransiciónMétodoOperación DB
Pending → Runningexecute_job()SELECT + UPDATE state='running'
Running → Heartbeatkeep_job_alive()UPDATE alive_at
Running → Completecomplete_job()DELETE de job_executions
Running → Failedfail_job()UPDATE con reintento o DELETE
Running → Rescheduledreschedule_job()UPDATE execute_at

Ciclo de Vida del Trabajo

        ┌─────────┐
│ Pending │
└────┬────┘
│ consultar

┌─────────┐
│ Running │◄──────────────┐
└────┬────┘ │
│ │ heartbeat
├─────────┬──────────┘
│ │
éxito │ │ error
▼ ▼
┌─────────┐ ┌─────────┐
│Complete │ │ Failed │
└─────────┘ └────┬────┘
│ reintento

┌─────────────┐
│ Rescheduled │
└─────────────┘

Tipos de Trabajos

Trabajos Impulsados por Eventos

Activados por eventos del outbox:

pub struct UserOnboardingJob {
customers: Customers,
outbox_consumer: OutboxConsumer,
}

impl Job for UserOnboardingJob {
const NAME: &'static str = "user-onboarding";

async fn run(&self, _: CurrentJob) -> Result<JobCompletion, JobError> {
let events = self.outbox_consumer
.poll::<CoreCustomerEvent>()
.await?;

for event in events {
if let CoreCustomerEvent::KycCompleted { id, .. } = event.payload {
self.customers.provision_user(id).await?;
}
self.outbox_consumer.ack(event.sequence).await?;
}

Ok(JobCompletion::Complete)
}
}

Trabajos Programados

Ejecución periódica tipo cron:

pub struct InterestAccrualJob {
credit_facilities: CreditFacilities,
}

impl Job for InterestAccrualJob {
const NAME: &'static str = "interest-accrual";

async fn run(&self, _: CurrentJob) -> Result<JobCompletion, JobError> {
self.credit_facilities.accrue_interest().await?;

// Reprogramar para mañana
Ok(JobCompletion::RescheduleAt(
Utc::now() + Duration::days(1)
))
}
}

Trabajos de Sondeo

Auto-reprogramación para sincronización continua:

pub struct CollateralSyncJob {
custody: Custody,
price_service: PriceService,
}

impl Job for CollateralSyncJob {
const NAME: &'static str = "collateral-sync";

async fn run(&self, _: CurrentJob) -> Result<JobCompletion, JobError> {
let wallets = self.custody.list_active_wallets().await?;

for wallet in wallets {
let balance = self.custody.sync_balance(&wallet).await?;
let price = self.price_service.get_btc_price().await?;
self.custody.update_collateral_value(&wallet, balance, price).await?;
}

// Reprogramar en 5 minutos
Ok(JobCompletion::RescheduleIn(Duration::minutes(5)))
}
}

Esquema de Base de Datos

CREATE TABLE job_executions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_type TEXT NOT NULL,
execute_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
state TEXT NOT NULL DEFAULT 'pending',
attempt_index INT NOT NULL DEFAULT 0,
alive_at TIMESTAMPTZ,
payload JSONB,
trace_context JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_job_executions_pending
ON job_executions(execute_at)
WHERE state = 'pending';

CREATE INDEX idx_job_executions_stale
ON job_executions(alive_at)
WHERE state = 'running';

Lógica de Reintentos

Configuración de RetrySettings

pub struct RetrySettings {
pub max_attempts: u32,
pub initial_interval: Duration,
pub max_interval: Duration,
pub multiplier: f64,
}

impl Default for RetrySettings {
fn default() -> Self {
Self {
max_attempts: 5,
initial_interval: Duration::seconds(5),
max_interval: Duration::hours(1),
multiplier: 2.0,
}
}
}

Backoff Exponencial

fn calculate_next_attempt(&self, attempt: u32) -> Duration {
let interval = self.initial_interval.as_secs_f64()
* self.multiplier.powi(attempt as i32);

Duration::seconds(
interval.min(self.max_interval.as_secs_f64()) as i64
)
}

Mecanismo Keep-Alive

El sistema mantiene latidos para detectar trabajos que se han quedado colgados:

async fn keep_alive_loop(&self, job_id: Uuid, interval: Duration) {
let mut ticker = tokio::time::interval(interval / 4);

loop {
ticker.tick().await;

if let Err(e) = self.update_alive_at(job_id).await {
tracing::warn!("Failed to update keep-alive: {}", e);
break;
}
}
}

Los trabajos sin latido reciente se consideran fallidos y se reprograman.

Observabilidad y Trazado

Preservación del Contexto de Trazas

Los trabajos preservan el contexto de trazabilidad:

impl JobDispatcher {
async fn execute_with_tracing(&self, job: CurrentJob) {
// Restaurar contexto de traza del trabajo
if let Some(trace_ctx) = &job.trace_context {
let parent = trace_ctx.extract();
let span = tracing::info_span!(
"job.execute",
job.type = job.job_type,
job.id = %job.id
);
span.set_parent(parent);

span.in_scope(|| {
self.runner.run(job)
}).await
}
}
}

Puntos de Instrumentación

SpanPropósito
job.dispatchCiclo de vida completo del trabajo
job.executeEjecución de la lógica del trabajo
job.retryIntentos de reintento
job.completeFinalización exitosa
job.failFallo del trabajo

Integración con la Aplicación

Registro de Trabajos

// lana/app/src/lib.rs
impl LanaApp {
pub async fn register_jobs(&self, registry: &mut JobRegistry) {
registry.register(InterestAccrualJob::new(
self.credit_facilities.clone()
));

registry.register(CollateralSyncJob::new(
self.custody.clone(),
self.price_service.clone()
));

registry.register(UserOnboardingJob::new(
self.customers.clone(),
self.outbox_consumer.clone()
));
}
}

Configuración

# config/jobs.yaml
jobs:
min_concurrent: 2
max_concurrent: 10
poll_interval_ms: 1000
keep_alive_interval_ms: 5000
stale_threshold_ms: 30000

retry:
max_attempts: 5
initial_interval_ms: 5000
max_interval_ms: 3600000
multiplier: 2.0

Desarrollo Local

En desarrollo con Tilt, los trabajos se pueden monitorear en la UI de Tilt. Los logs de trabajos se envían al colector OTEL configurado.

# Ver trabajos pendientes
psql -c "SELECT * FROM job_executions WHERE state = 'pending'"

# Ver trabajos fallidos
psql -c "SELECT * FROM job_executions WHERE state = 'failed'"