Introduction
Welcome to the ES Entity documentation!
ES Entity is an opinionated rust library for persisting Event Sourced entities to PostgreSQL.
It promotes decoupling your domain code from persistence details by putting all the mapping logic onto Repository structs.
Almost all the generated queries are verified at compile time by sqlx under the hood to give strong type-safe guarantees.
The main traits that must to be derived are EsEvent and EsEntity so that they can be used by the EsRepo macro that generates all the persistence and query fns.
This book will explain how to use this library effectively as well as provide a general introduction on how to use Event Sourcing to persist the state of your domain entities.
Quickstart
This is the - just show me the damn code - section. Its an end to end working example without any background. On first reading its probably best to skip this and dive into the concepts first. When you are ready to start trying things out in your own implementation you can use this as a template with all the pieces in one place.
Example
Let’s assume there is a User entity in your domain that you wish to persist using EsEntity.
The first thing you will need is 2 tables in postgres. These are referred to as the ‘index table’ and the ‘events table’.
By convention they look like this:
$ cargo sqlx migrate add users
$ cat migrations/*_users.sql
-- The 'index' table that holds the latest values of some selected attributes.
CREATE TABLE users (
-- Mandatory id column
id UUID PRIMARY KEY,
-- Mandatory created_at column
created_at TIMESTAMPTZ NOT NULL,
-- Any other columns you want a quick 'index-based' lookup
name VARCHAR UNIQUE
);
-- The table that actually stores the events sequenced per entity.
-- This table has the same columns for every entity you create
-- by convention named `<entity>_events`.
CREATE TABLE user_events (
id UUID NOT NULL REFERENCES users(id),
sequence INT NOT NULL,
event_type VARCHAR NOT NULL,
event JSONB NOT NULL,
context JSONB DEFAULT NULL,
recorded_at TIMESTAMPTZ NOT NULL,
UNIQUE(id, sequence)
);
To persist the entity we need to setup a pattern with 5 components:
- The
EntityId - The
EntityEvent - The
NewEntity - The
Entityitself - And finally the
Repositorythat encodes the mapping.
Here’s a complete working example:
[dependencies]
es-entity = "0.9"
sqlx = "0.8" # Needs to be in scope for entity_id! macro
serde = { version = "1.0.219", features = ["derive"] } # To serialize the `EntityEvent`
derive_builder = "0.20.1" # For hydrating and building the entity state (optional)
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate tokio;
extern crate anyhow;
extern crate derive_builder;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use es_entity::*;
// Will create a uuid::Uuid wrapper type.
// But any type can act as the ID that fulfills:
// Clone + PartialEq + Eq + std::hash::Hash + Send + Sync
// + sqlx::Type<sqlx::Postgres>
es_entity::entity_id!{ UserId }
// The `EsEvent` must have `serde(tag = "type")` annotation.
#[derive(EsEvent, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
// Tell the macro what the `id` type is
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
NameUpdated { name: String },
}
// The `EsEntity` - using derive_builder is optional
// but useful for hydrating in the `TryFromEvents` trait.
#[derive(EsEntity, Builder)]
#[builder(pattern = "owned", build_fn(error = "EsEntityError"))]
pub struct User {
pub id: UserId,
pub name: String,
// The `events` container - mandatory field.
// Basically its a `Vec` wrapper with some ES specific augmentation.
events: EntityEvents<UserEvent>,
}
impl User {
// Mutation to update the name of a user.
pub fn update_name(&mut self, new_name: impl Into<String>) -> Idempotent<()> {
let new_name = new_name.into();
// The idempotency_guard macro is a helper to return quickly
// if a mutation has already been applied.
// It is not mandatory but very useful in the context of distributed / multi-thread
// systems to protect against replays.
idempotency_guard!(
self.events.iter_all().rev(),
// If this pattern matches return Idempotent::AlreadyApplied
UserEvent::NameUpdated { name } if name == &new_name,
// Stop searching here
=> UserEvent::NameUpdated { .. }
);
self.name = new_name.clone();
self.events.push(UserEvent::NameUpdated { name: new_name });
Idempotent::Executed(())
}
}
// Any EsEntity must implement `TryFromEvents`.
// This trait is what hydrates entities after loading the events from the database
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
let mut builder = UserBuilder::default();
for event in events.iter_all() {
match event {
UserEvent::Initialized { id, name } => {
builder = builder.id(*id).name(name.clone());
}
UserEvent::NameUpdated { name } => {
builder = builder.name(name.clone());
}
}
}
builder.events(events).build()
}
}
// The `NewEntity` - this represents the data of an entity in a pre-persisted state.
// Using derive_builder is not mandatory - any type can be used for the `NewEntity` state.
#[derive(Debug, Builder)]
pub struct NewUser {
#[builder(setter(into))]
pub id: UserId,
#[builder(setter(into))]
pub name: String,
}
impl NewUser {
pub fn builder() -> NewUserBuilder {
NewUserBuilder::default()
}
}
// The `NewEntity` type must implement `IntoEvents` to get the initial events that require persisting.
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
EntityEvents::init(
self.id,
[UserEvent::Initialized {
id: self.id,
name: self.name,
}],
)
}
}
// The `EsRepo` that will host all the persistence operations.
#[derive(EsRepo, Debug)]
#[es_repo(
entity = "User",
// Configure the columns that need populating in the index table
columns(
// The 'name' column
name(
// The rust type of the name attribute
ty = "String"
)))]
pub struct Users {
// Mandatory field so that the Repository can begin transactions
pool: sqlx::PgPool,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Connect to postgres
let pg_con = format!("postgres://user:password@localhost:5432/pg");
let pool = sqlx::PgPool::connect(&pg_con).await?;
let users = Users { pool };
let new_user = NewUser::builder()
.id(UserId::new())
.name("Frank")
.build()
.unwrap();
// The returned type is the hydrated Entity
let mut user = users.create(new_user).await?;
// Using the Idempotency::did_execute() to check if we need a DB roundtrip
if user.update_name("Dweezil").did_execute() {
users.update(&mut user).await?;
}
let loaded_user = users.find_by_id(user.id).await?;
assert_eq!(user.name, loaded_user.name);
Ok(())
}
Event Sourcing
Event sourcing is a software design pattern where state changes are stored as a sequence of events rather than as snapshots that get updated in place.
Instead of updating a database record directly, every change is recorded as an immutable event (e.g., UserCreated, EmailChanged, AccountDeactivated) which gets inserted as a row in a table. The current state is rebuilt by replaying these events in order.
One thing to note is that in es-entity the events are scoped to a specific type of Entity.
Thus they are (by convention) not all written to the same global events table like in some Event Sourcing approaches.
Rather each Entity-type gets its own events table - though it is possible to use a global table if desired.
Further the events are strictly ordered on a per-entity basis - there are no ordering guarantees across entities.
This can be interpreted as the Entity-type representing a topic and the EntityId playing the role of the PartitionKey that exists in some pub-sub / event-store systems.
Entity Pattern
In the Software Engineering community the term Entity can refer to many different things.
In the context of es-entity it is generally meant in the sense put forward by Domain Driven Design.
Strict adherence to DDD is not mandatory to use es-entity but there are a lot of benefits to be had by following these principles.
In DDD entities serve the following purpose:
- execute commands that
- execute business logic
- enforce domain invariants
- mutate state
- record events (in the context of Event Sourcing)
- supply queries that expose some of the entities state
They often host the most critical code in your application where correctness is of upmost importance.
Ideally they are unit-testable and thus should not be overly coupled to the persistence layer (as they generally are when using just about any ORM library / framework).
The design of es-entity is very deliberate in not getting in the way of testability of your Entities.
Each Entity type used in es-entity must have:
- an
EntityIdtype - an
EntityEventtype - a
NewEntitytype - the
Entitytype itself
Entity Event
In es-entity it is assumed that an Entity has an associated EntityEvent enum that represents all of the state changes (ie. types of events) that can originate from mutations of said Entity.
This enum must be serializable and is stored as a JSON-blob in the associated events table.
#![allow(unused)]
fn main() {
extern crate es_entity;
extern crate sqlx;
extern crate serde;
use serde::{Deserialize, Serialize};
use es_entity::*;
// Entities must always have an associated id type
type UserId = String;
// es_entity::entity_id! { UserId }; Can be used to create a Uuid wrapper struct.
#[derive(EsEvent, Debug, Serialize, Deserialize)]
// The `EsEvent` must have `serde(tag = "type")` annotation.
#[serde(tag = "type", rename_all = "snake_case")]
// Tell the macro what the `id` type is
#[es_event(id = "UserId")]
pub enum UserEvent {
// Typically there is a 'first' event that records the initial state of an `Entity`.
Initialized { id: UserId, name: String },
// Every mutation should result in an `Event` that represents the
// change that happened.
// This event represents that the `name` attribute of a user was updated.
NameUpdated { name: String },
}
}
New Entity
The NewEntity type represents the data of the Entity in a pre-persisted state.
It gets passed to the Repository::create function where the IntoEvents trait emits the initial EntityEvents which are then persisted and used to hydrate the actual Entity.
It is also recommended that any validation of the initial attributes is performed on this type to ensure that the Entity will be in a legal state once it gets hydrated for the first time.
Using the derive_builder crate is recommended for this purpose.
#![allow(unused)]
fn main() {
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate derive_builder;
use serde::{Deserialize, Serialize};
use es_entity::*;
use derive_builder::Builder;
es_entity::entity_id! { UserId };
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
const MAX_NAME_LENGTH: usize = 100;
#[derive(Debug, Builder)]
// Specify the `validation` fn
#[builder(build_fn(validate = "Self::validate"))]
pub struct NewUser {
#[builder(setter(into))]
pub id: UserId,
#[builder(setter(into))]
pub name: String,
}
impl NewUser {
pub fn builder() -> NewUserBuilder {
NewUserBuilder::default()
}
}
impl NewUserBuilder {
// Execute some validation that ensures the initial `Entity` is legal
fn validate(&self) -> Result<(), String> {
if self.name.as_ref().expect("name wasn't set").len() > MAX_NAME_LENGTH {
return Err("Name length exceeded".to_string());
}
Ok(())
}
}
impl IntoEvents<UserEvent> for NewUser {
// This `fn` returns the first `Events` of the `Entity`
fn into_events(self) -> EntityEvents<UserEvent> {
EntityEvents::init(
self.id,
[UserEvent::Initialized {
id: self.id,
name: self.name,
}],
)
}
}
#[cfg(test)]
mod tests {
#[test]
fn user_creation() {
let new_user = NewUser::builder().id("user-id").name("Steven").build();
assert!(new_user.is_ok());
}
}
}
Entity Type
The Entity type is a struct that holds the events: EntityEvents<EntityEvent> field.
Mutations of the entity append events to this collection.
The Entity is (re-)constructed from events via the TryFromEvents trait.
Other than the events field additional fields can be exposed and populated during hydration.
#![allow(unused)]
fn main() {
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate derive_builder;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use es_entity::*;
es_entity::entity_id! { UserId };
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
NameUpdated { name: String },
}
pub struct NewUser { id: UserId, name: String }
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
EntityEvents::init(
self.id,
[UserEvent::Initialized {
id: self.id,
name: self.name,
}],
)
}
}
// Using derive_builder is optional but useful for hydrating
// in the `TryFromEvents` trait.
#[derive(EsEntity, Builder)]
#[builder(pattern = "owned", build_fn(error = "EsEntityError"))]
pub struct User {
pub id: UserId,
pub name: String,
// The `events` container - mandatory field.
// Basically its a `Vec` wrapper with some ES specific augmentation.
events: EntityEvents<UserEvent>,
// Marker if you use a name other than `events`.
// #[es_entity(events)]
// different_name_for_events_field: EntityEvents<UserEvent>
}
impl User {
pub fn update_name(&mut self, new_name: impl Into<String>) -> Idempotent<()> {
let new_name = new_name.into();
// The idempotency_guard macro is a helper to return quickly
// if a mutation has already been applied.
// It is not mandatory but very useful in the context of distributed / multi-thread
// systems to protect against replays.
idempotency_guard!(
self.events.iter_all().rev(),
// If this pattern matches return Idempotent::AlreadyApplied
UserEvent::NameUpdated { name } if name == &new_name,
// Stop searching here
=> UserEvent::NameUpdated { .. }
);
self.name = new_name.clone();
self.events.push(UserEvent::NameUpdated { name: new_name });
Idempotent::Executed(())
}
}
// Any EsEntity must implement `TryFromEvents`.
// This trait is what hydrates entities after loading the events from the database
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
let mut builder = UserBuilder::default();
for event in events.iter_all() {
match event {
UserEvent::Initialized { id, name } => {
builder = builder.id(*id).name(name.clone());
}
UserEvent::NameUpdated { name } => {
builder = builder.name(name.clone());
}
}
}
builder.events(events).build()
}
}
#[cfg(test)]
mod tests {
fn fresh_user() -> User {
let id = UserId::new();
let initial_events = EntityEvents::init(
id,
[UserEvent::Initialized {
id,
name: "Willson"
}],
);
User::try_from_events(initial_events).expect("Could not create user");
}
#[test]
fn update_name() {
let mut user = fresh_user();
// There are no new events to persist after hydrating
assert!(!user.events.any_new());
let new_name = "Gavin".to_string();
user.update_name(new_name.clone()).unwrap();
assert_eq!(user.name, new_name);
// Mutations are expected to append events
assert!(user.events.any_new());
}
}
}
Idempotency
Idempotency means that performing the same operation multiple times has the same effect as doing it once.
It’s used to ensure that retrying a request doesn’t cause unintended side effects, such as duplicated Events being persisted.
It is particularly useful in the context of a distributed system where operations could be triggered from an asynchronous event queue (ie pub-sub).
Whenever you would like to have an exactly-once processing guarantee - you can easily achieve an effectively-once processing by ensuring your mutations are all idempotent.
Making your Entity mutations idempotent is very simple when doing Event Sourcing as you can easily check if the event you are about to append already exists in the history.
Example
To see the issue in action - lets consider the update_name mutation without an idempotency check.
#![allow(unused)]
fn main() {
pub enum UserEvent {
Initialized { id: u64, name: String },
NameUpdated { name: String },
}
pub struct User {
events: Vec<UserEvent>
}
impl User {
pub fn update_name(&mut self, new_name: impl Into<String>) {
let name = new_name.into();
self.events.push(UserEvent::NameUpdated { name });
}
}
}
In the above code we could easily record redundant events by calling the update_name mutation multiple times with the same input.
pub enum UserEvent {
Initialized { id: u64, name: String },
NameUpdated { name: String },
}
pub struct User {
events: Vec<UserEvent>
}
impl User {
pub fn update_name(&mut self, new_name: impl Into<String>) {
let name = new_name.into();
self.events.push(UserEvent::NameUpdated { name });
}
}
fn main() {
let mut user = User { events: vec![] };
user.update_name("Harrison");
// Causes a redundant event to be appended
user.update_name("Harrison");
assert_eq!(user.events.len(), 2);
}
To prevent this we can iterate through the events to check if it has already been applied:
pub enum UserEvent {
Initialized { id: u64, name: String },
NameUpdated { name: String },
}
pub struct User {
events: Vec<UserEvent>
}
impl User {
pub fn update_name(&mut self, new_name: impl Into<String>) {
let name = new_name.into();
for event in self.events.iter().rev() {
match event {
UserEvent::NameUpdated { name: existing_name } if existing_name == &name => {
return;
}
_ => ()
}
}
self.events.push(UserEvent::NameUpdated { name });
}
}
fn main() {
let mut user = User { events: vec![] };
user.update_name("Harrison");
// This update will be ignored
user.update_name("Harrison");
assert_eq!(user.events.len(), 1);
}
But now we just silently ignore the operation.
Better would be to signal back to the caller whether or not the operation was applied.
For that we use the Idempotent type:
extern crate es_entity;
pub enum UserEvent {
Initialized { id: u64, name: String },
NameUpdated { name: String },
}
pub struct User {
events: Vec<UserEvent>
}
use es_entity::Idempotent;
// #[must_use]
// pub enum Idempotent<T> {
// Executed(T),
// AlreadyApplied,
// }
impl User {
pub fn update_name(&mut self, new_name: impl Into<String>) -> Idempotent<()>{
let name = new_name.into();
for event in self.events.iter().rev() {
match event {
UserEvent::NameUpdated { name: existing_name } if existing_name == &name => {
return Idempotent::AlreadyApplied;
}
_ => ()
}
}
self.events.push(UserEvent::NameUpdated { name });
Idempotent::Executed(())
}
}
fn main() {
let mut user = User { events: vec![] };
assert!(user.update_name("Harrison").did_execute());
assert!(user.update_name("Harrison").was_already_applied());
}
To cut down on boilerplate this pattern of iterating the events to check if an event was already applied has been encoded into the idempotency_guard! macro:
extern crate es_entity;
pub enum UserEvent {
Initialized { id: u64, name: String },
NameUpdated { name: String },
}
pub struct User {
events: Vec<UserEvent>
}
use es_entity::{idempotency_guard, Idempotent};
impl User {
pub fn update_name(&mut self, new_name: impl Into<String>) -> Idempotent<()>{
let name = new_name.into();
idempotency_guard!(
// The iterator of events
self.events.iter().rev(),
// The pattern match to check whether an operation was already applied
UserEvent::NameUpdated { name: existing_name } if existing_name == &name
);
self.events.push(UserEvent::NameUpdated { name });
Idempotent::Executed(())
}
}
fn main() {
let mut user = User { events: vec![] };
assert!(user.update_name("Harrison").did_execute());
assert!(user.update_name("Harrison").was_already_applied());
}
Finally there is the case where an operation was applied in the past - but it is still legal to re-apply it. Like changing a name back to what it originally was:
extern crate es_entity;
pub enum UserEvent {
Initialized { id: u64, name: String },
NameUpdated { name: String },
}
pub struct User {
events: Vec<UserEvent>
}
use es_entity::{idempotency_guard, Idempotent};
impl User {
pub fn update_name(&mut self, new_name: impl Into<String>) -> Idempotent<()>{
let name = new_name.into();
idempotency_guard!(
self.events.iter().rev(),
UserEvent::NameUpdated { name: existing_name } if existing_name == &name,
// The `=>` signifies the pattern where to stop the iteration.
=> UserEvent::NameUpdated { .. }
);
self.events.push(UserEvent::NameUpdated { name });
Idempotent::Executed(())
}
}
fn main() {
let mut user = User { events: vec![] };
assert!(user.update_name("Harrison").did_execute());
assert!(user.update_name("Colin").did_execute());
assert!(user.update_name("Harrison").did_execute());
}
Without the => argument the second call of assert!(user.update_name("Harrison").did_execute()); would fail.
Repository
In the context of es-entity a Repository is a struct that hosts all operations performed with the database.
Thus it is responsible for all CRUD style interactions with the persistence layer.
The EsRepo macro generates functions such as:
createupdatefind_by_idlist_by_id- etc.
that hide away the complexity of querying and hydrating entities who’s state is represented in an Event Sourced way.
Under the hood the es_query! helper macro (which only works within fns inside EsRepo structs) handles loading the events while enabling you to write a ‘normal’ looking SQL query against the index table.
The following sections will take a deep dive in how this design came to be.
Database Tables
In es-entity every Entity gets 2 tables - the index and the events table.
This section will explain the rational behind that.
The Events Table
The events table always has the same fields:
CREATE TABLE user_events (
-- The entity id
id UUID NOT NULL REFERENCES users(id),
-- The sequence number of the event
-- Starting at 1 and incrementing by 1 each event
sequence INT NOT NULL,
-- The 'type' of the event (corresponding to the enum variant)
event_type VARCHAR NOT NULL,
-- The event data serialized as a JSON blob
event JSONB NOT NULL,
-- The 'event context'
-- additional metadata that can be collected out of band
-- only populated if 'event_context' attribute is set on the EsEvent
context JSONB DEFAULT NULL,
-- The time the event was recorded
recorded_at TIMESTAMPTZ NOT NULL,
-- Unique constraint to ensure there are no duplicate sequence numbers
UNIQUE(id, sequence)
);
In fact we could persist all events to a global table with that schema but partitioning the events per Entity gives us some benefits when querying (like read performance and referential integrity).
Intuitively you might think this is all we need as we can very easily query all the events for a specific Entity:
SELECT * FROM user_events WHERE id = $1 ORDER BY sequence
This is correct if we know the id of the Entity we are looking up.
But it becomes a lot more tricky when we want to do a lookup on a non-id field.
Assuming the Event-enum looks like this:
#![allow(unused)]
fn main() {
pub enum UserEvent {
Initialized { id: u64, name: String },
NameUpdated { name: String },
EmailUpdated { email: String },
}
}
and we want to lookup a user by email, the query would quickly become a lot more complicated. Lets consider the naive query:
SELECT * FROM user_events WHERE event->>'email' = $1;
This doesn’t work as it only gets a single event - but we want all events for that Entity.
SELECT *
FROM user_events
WHERE id = (
SELECT id
FROM user_events
WHERE event->>'email' = $1
LIMIT 1
)
ORDER BY sequence;
This also doesn’t work because perhaps the event that was found wasn’t the latest EmailUpdated event in the Users history.
But we want to get the user who’s email is currently $1.
So it could find some false positives.
When iterating with ChatGPT the next suggestion is:
WITH latest_email_updates AS (
SELECT id, MAX(sequence) AS max_sequence
FROM user_events
WHERE event_type = 'email_updated'
GROUP BY id
),
latest_emails AS (
SELECT e.id, e.event->>'email' AS email
FROM user_events e
JOIN latest_email_updates leu
ON e.id = leu.id AND e.sequence = leu.max_sequence
WHERE e.event_type = 'email_updated'
),
target_user AS (
SELECT id
FROM latest_emails
WHERE email = $1
)
SELECT *
FROM user_events
WHERE id = (SELECT id FROM target_user)
ORDER BY sequence;
This query might execute what we want but it still has issues.
The worst one being that we are leaking a lot of domain knowledge into the query.
Specifically the presence and shape of the EmailUpdated event is encoded into the query.
Preferably the specifics of the Event-schemas would only need to be known on the domain side encoded in the EntityEvent enum.
Also the whole query is quite inefficient.
Sure we could add an index on the event->>'email' field but that would introduce more implicit coupling.
Also what if we wanted something like a UNIQUE constraint on the email - but still allow emails swapped multiple times.
The Index Table
Enter the index table.
The index-table is a table that hosts 1 row per Entity with the columns populated by the latest values.
In that sense it looks very similar to a table that might hold the entire state of the Entity in a typical update-in-place persistence strategy.
The difference is that we only include columns that we want to index for fast lookup or some kind of constraint like UNIQUE or REFERENCES.
In that sense it is purely an optimization and does not represent the entire state of the Entity - for that you must load all the events.
CREATE TABLE users (
id UUID PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL,
email VARCHAR UNIQUE
);
Now the query simplifies to:
WITH target_entity AS (
SELECT id
FROM users
WHERE email = $1
)
SELECT e.*
FROM user_events e
JOIN target_entity te ON e.id = te.id
ORDER BY e.sequence;
As a result the query is much simpler and we are no longer leaking any domain information. We just have to ensure the index table gets updated atomically as we append the events to the events table.
es_query
The es_query! macro is a helper that allows you only to query the index table without needing to join with the events table.
The expansion of es_query! results in a call to the sqlx::query_as! macro - which means that you still get typesafety and compile time column validation.
Given the query we arrived at in the previous section - this is what a find_by_name fn could look like:
extern crate es_entity;
extern crate sqlx;
extern crate serde;
fn main () {}
use serde::{Deserialize, Serialize};
use es_entity::*;
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
pub struct NewUser { id: UserId, name: String }
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
events: EntityEvents<UserEvent>,
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
unimplemented!()
}
}
use sqlx::PgPool;
use es_entity::*;
pub struct Users {
pool: PgPool
}
impl Users {
pub async fn find_by_name(&self, name: String) -> Result<User, EsRepoError> {
let rows = sqlx::query_as!(
GenericEvent::<UserId>,
r#"
WITH target_entity AS (
SELECT id
FROM users
WHERE name = $1
)
SELECT e.id as entity_id, e.sequence, e.event, e.context as "context: ContextData", e.recorded_at
FROM user_events e
JOIN target_entity te ON e.id = te.id
ORDER BY e.sequence;
"#,
name,
)
.fetch_all(&self.pool)
.await?;
Ok(EntityEvents::load_first(rows)?)
}
}
The es_query! macro removes the boilerplate of fetching the events and lets you just write the part that queries the index table:
SELECT id FROM users WHERE name = $1
On expansion it constructs the complete query (adding the JOIN with the events table) and hydrates the entities from the events.
This simplifies the above implementation into:
extern crate es_entity;
extern crate sqlx;
extern crate serde;
fn main () {}
use serde::{Deserialize, Serialize};
use es_entity::*;
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
pub struct NewUser { id: UserId, name: String }
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
events: EntityEvents<UserEvent>,
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
unimplemented!()
}
}
use sqlx::PgPool;
use es_entity::*;
#[derive(EsRepo)]
#[es_repo(entity = "User")]
pub struct Users {
pool: PgPool
}
impl Users {
pub async fn find_by_name(&self, name: String) -> Result<User, EsRepoError> {
es_query!(
"SELECT id FROM users WHERE name = $1",
name
).fetch_one(&self.pool).await
}
}
The es_query! macro only works within fns defined on structs with EsRepo derived.
The functions intend to mimic the sqlx interface but instead of returning rows they return fully hydrated entities:
async fn fetch_one(<executor>) -> Result<Entity, Repo::Err>
async fn fetch_optional(<executor) -> Result<Option<Entity>, Repo::Err>
// The `(_, bool)` signifies whether or not the query could have fetched more or the list is exhausted:
async fn fetch_n(<executor>, n) -> Result<(Vec<Entity>, bool), Repo::Err>
EsRepo
Deriving the EsRepo macro on a struct will generate a bunch of CRUD fns (and some additional supporting structs) to interact with the persistence layer.
For this to work the Entity you intend to persist / load must be setup as described in the Entity section (with an Event, Id, NewEntity and Entity) type.
As a minimum you must specify the entity attribute and have a field that holds a PgPool type.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
fn main () {}
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
pub struct NewUser { id: UserId, name: String }
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
events: EntityEvents<UserEvent>,
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
unimplemented!()
}
}
use es_entity::*;
#[derive(EsRepo)]
#[es_repo(
entity = "User",
// Defaults that get derived if not explicitly configured:
// id = "UserId", // The type of the `id`
// new = "NewUser", // The type of the `NewEntity`
// event = "UserEvent", // The type of the `Event` enum
// err = "EsRepoError", // The Error type that should be returned from all fns.
// tbl = "users", // The name of the index table
// events_tbl = "user_events", // The name of the events table
// tbl_prefix = "", // A table prefix that should be added to the derived table names
// Columns specify a list of attributes that get mapped to the index table:
// columns(
// The id column is always mapped - no need to specify it
// id(ty = "UserId", list_by)
// )
)]
pub struct Users {
pool: sqlx::PgPool
// Marker if you use a name other than `pool`.
// #[es_entity(pool)]
// different_name_for_pool: sqlx::PgPool
}
There are a number of options that can be passed to es_repo to modify the behaviour or type of functions it generates.
The most important of which is the columns option that configures the mapping from entity attributes to index table columns.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
fn main () {}
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
unimplemented!()
}
}
use es_entity::*;
pub struct NewUser { id: UserId, name: String }
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
name: String,
events: EntityEvents<UserEvent>,
}
#[derive(EsRepo)]
#[es_repo(
entity = "User",
columns(
// Declares that there is a `name` column on the `index` table.
// The rust type for it is `String`.
// Without further configuration `EsRepo` will assume both the `NewEntity`
// and the `Entity` types have an accessible `.name` attribute
// for populating and updating the index table.
name = "String",
// The above is equivalent to the more explicit notation:
// name(ty = "String")
)
)]
pub struct Users {
pool: sqlx::PgPool
}
Take a look at the next sections to see more information on how the options modify the generated code.
fn create
The create fn takes a NewEntity type and returns an Entity type.
It INSERTs a row into the index table and persists all the events returned from the IntoEvents::into_events fn in the events table.
We must use the columns option to specify which columns need inserting into the index table
In the code below we want to include a name column in the index table that requires mapping.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate tokio;
extern crate anyhow;
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
EntityEvents::init(
self.id,
[UserEvent::Initialized {
id: self.id,
name: self.name,
}],
)
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
Ok(User { id: events.id().clone(), name: "Fred".to_string(), events })
}
}
use es_entity::*;
pub struct NewUser {
id: UserId,
// The `name` attribute on the `NewEntity` must be accessible
// for inserting into the `index` table.
name: String
}
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
// The name attribute on the `Entity` must be accessible
// for updates of the `index` table.
name: String,
events: EntityEvents<UserEvent>,
}
#[derive(EsRepo)]
#[es_repo(entity = "User", columns(name = "String"))]
pub struct Users {
pool: sqlx::PgPool
}
async fn init_pool() -> anyhow::Result<sqlx::PgPool> {
let pg_con = format!("postgres://user:password@localhost:5432/pg");
Ok(sqlx::PgPool::connect(&pg_con).await?)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let users = Users { pool: init_pool().await? };
let new_user = NewUser { id: UserId::new(), name: "Fred".to_string() };
// The `create` fn takes a `NewEntity` and returns a persisted and hydrated `Entity`
let _user = users.create(new_user).await?;
Ok(())
}
The insert part of the create function looks somewhat equivalent to:
impl Users {
pub async fn create(
&self,
new_entity: NewUser
) -> Result<User, es_entity::EsRepoError> {
let id = &new_entity.id;
// The attribute specified in the `columns` option
let name = &new_entity.name;
sqlx::query!("INSERT INTO users (id, name) VALUES ($1, $2)",
id as &UserId,
name as &String
)
.execute(self.pool())
.await?;
// persist events
// hydrate entity
// execute post_persist_hook
// return entity
}
}
The key thing to configure is how the columns of the index table get populated via the create option.
The create(accessor = "<>") option modifies how the field is accessed on the NewEntity type.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
fn main () {}
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
unimplemented!()
}
}
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
name: String,
events: EntityEvents<UserEvent>,
}
use es_entity::*;
pub struct NewUser { id: UserId, some_hidden_field: String }
impl NewUser {
fn my_name(&self) -> String {
self.some_hidden_field.clone()
}
}
#[derive(EsRepo)]
#[es_repo(
entity = "User",
columns(
// Instead of using the `name` field on the `NewEntity` struct
// the generated code will use: `new_entity.my_name()`
// to populate the `name` column.
name(ty = "String", create(accessor = "my_name()")),
)
)]
pub struct Users {
pool: sqlx::PgPool
}
The create(persist = false) option omits inserting the column during creation.
This is useful for dynamic values that don’t become known until later on in the entities lifecycle.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
fn main () {}
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
unimplemented!()
}
}
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
name: String,
events: EntityEvents<UserEvent>,
}
use es_entity::*;
// There is no `name` attribute because we do not initially insert into this column.
pub struct NewUser { id: UserId }
#[derive(EsRepo)]
#[es_repo(
entity = "User",
columns(
name(ty = "String", create(persist = false)),
)
)]
pub struct Users {
pool: sqlx::PgPool
}
fn create_all
The create_all function is a batch version of create.
It takes a Vec<NewEntity> and returns Vec<Entity>.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate tokio;
extern crate anyhow;
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
EntityEvents::init(
self.id,
[UserEvent::Initialized {
id: self.id,
name: self.name,
}],
)
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
Ok(User { id: events.id().clone(), name: "Fred".to_string(), events })
}
}
use es_entity::*;
pub struct NewUser {
id: UserId,
name: String
}
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
name: String,
events: EntityEvents<UserEvent>,
}
#[derive(EsRepo)]
#[es_repo(entity = "User", columns(name = "String"))]
pub struct Users {
pool: sqlx::PgPool
}
async fn init_pool() -> anyhow::Result<sqlx::PgPool> {
let pg_con = format!("postgres://user:password@localhost:5432/pg");
Ok(sqlx::PgPool::connect(&pg_con).await?)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let users = Users { pool: init_pool().await? };
let new_users = vec![
NewUser { id: UserId::new(), name: "James".to_string() },
NewUser { id: UserId::new(), name: "Roger".to_string() }
];
let users = users.create_all(new_users).await?;
Ok(())
}
fn update
The update fn takes a mutable reference to an Entity and persists any new events that have been added to it.
It will also UPDATE the row in the index table with the latest values derived from the entities attributes.
It returns the number of events that were persisted.
In the code below we have a name column in the index table that needs to be kept in sync with the entity’s state.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate tokio;
extern crate anyhow;
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
NameUpdated { name: String },
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
EntityEvents::init(
self.id,
[UserEvent::Initialized {
id: self.id,
name: self.name,
}],
)
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(mut events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
let mut name = String::new();
for event in events.iter_all() {
match event {
UserEvent::Initialized { name: n, .. } => name = n.clone(),
UserEvent::NameUpdated { name: n } => name = n.clone(),
}
}
Ok(User { id: events.id().clone(), name, events })
}
}
pub struct NewUser {
id: UserId,
name: String
}
use es_entity::*;
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
// The name attribute on the `Entity` must be accessible
// for updates of the `index` table.
name: String,
events: EntityEvents<UserEvent>,
}
impl User {
pub fn change_name(&mut self, name: String) {
self.events.push(UserEvent::NameUpdated { name: name.clone() });
self.name = name;
}
}
#[derive(EsRepo)]
#[es_repo(entity = "User", columns(name = "String"))]
pub struct Users {
pool: sqlx::PgPool
}
async fn init_pool() -> anyhow::Result<sqlx::PgPool> {
let pg_con = format!("postgres://user:password@localhost:5432/pg");
Ok(sqlx::PgPool::connect(&pg_con).await?)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let users = Users { pool: init_pool().await? };
// First create a user
let new_user = NewUser { id: UserId::new(), name: "Fred".to_string() };
let mut user = users.create(new_user).await?;
// Now update the user
user.change_name("Frederick".to_string());
// The `update` fn takes a mutable reference to an `Entity` and persists new events
let n_events = users.update(&mut user).await?;
assert_eq!(n_events, 1); // One NameUpdated event was persisted
Ok(())
}
The update part of the update function looks somewhat equivalent to:
impl Users {
pub async fn update(
&self,
entity: &mut User
) -> Result<usize, es_entity::EsRepoError> {
// Check if there are any new events to persist
if !entity.events().any_new() {
return Ok(0);
}
let id = &entity.id;
// The attribute specified in the `columns` option
let name = &entity.name;
sqlx::query!("UPDATE users SET name = $2 WHERE id = $1",
id as &UserId,
name as &String
)
.execute(self.pool())
.await?;
// persist new events
// execute post_persist_hook
// return number of events persisted
}
}
The key thing to configure is how the columns of the index table get updated via the update option.
The update(accessor = "<>") option modifies how the field is accessed on the Entity type.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
fn main () {}
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
unimplemented!()
}
}
pub struct NewUser { id: UserId, name: String }
use es_entity::*;
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
name: String,
events: EntityEvents<UserEvent>,
}
impl User {
pub fn display_name(&self) -> String {
format!("User: {}", self.name)
}
}
#[derive(EsRepo)]
#[es_repo(
entity = "User",
columns(
// Instead of using the `name` field on the `Entity` struct
// the generated code will use: `entity.display_name()`
// to populate the `name` column during updates.
name(ty = "String", update(accessor = "display_name()")),
)
)]
pub struct Users {
pool: sqlx::PgPool
}
The update(persist = false) option prevents updating the column.
This is useful for columns that should never change after creation.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
fn main () {}
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
unimplemented!()
}
}
use es_entity::*;
// Assume the name of a user is immutable.
pub struct NewUser { id: UserId, name: String }
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
// Exposing the `name` attribute on the `Entity` is optional
// as it does not need to be accessed during update.
// name: String
events: EntityEvents<UserEvent>,
}
#[derive(EsRepo)]
#[es_repo(
entity = "User",
columns(
name(ty = "String", update(persist = false))
)
)]
pub struct Users {
pool: sqlx::PgPool
}
Note that if no columns need updating (all columns have update(persist = false)), the UPDATE query is skipped entirely for better performance.
fn find_by
Every column that gets configured on the EsRepo will get the following fns:
fn find_by_<column> -> Result<Entity, EntityError>
fn maybe_find_by_<column> -> Result<Option<Entity>, EntityError>
It is assumed that your database schema has a relevant INDEX on <column> to make the lookup efficient.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate tokio;
extern crate anyhow;
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
EntityEvents::init(
self.id,
[UserEvent::Initialized {
id: self.id,
name: self.name,
}],
)
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
Ok(User { id: events.id().clone(), name: "Fred".to_string(), events })
}
}
pub struct NewUser { id: UserId, name: String }
use es_entity::*;
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
name: String,
events: EntityEvents<UserEvent>,
}
#[derive(EsRepo)]
#[es_repo(entity = "User", columns(name = "String"))]
pub struct Users {
pool: sqlx::PgPool
}
async fn init_pool() -> anyhow::Result<sqlx::PgPool> {
let pg_con = format!("postgres://user:password@localhost:5432/pg");
Ok(sqlx::PgPool::connect(&pg_con).await?)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let users = Users { pool: init_pool().await? };
let new_user = NewUser { id: UserId::new(), name: "Fred".to_string() };
users.create(new_user).await?;
let user = users.find_by_name("Fred").await?;
assert_eq!(user.name, "Fred");
let user = users.maybe_find_by_name("No Body").await?;
assert!(user.is_none());
Ok(())
}
fn list_by
To load a whole page of entities at once you can set the list_by option on the column.
This will generate the list_by_<column> fns and appropriate cursor structs.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate tokio;
extern crate anyhow;
extern crate uuid;
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
NameUpdated { name: String },
Deleted
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
EntityEvents::init(
self.id,
[UserEvent::Initialized {
id: self.id,
name: self.name,
}],
)
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
Ok(User { id: events.id().clone(), name: "Fred".to_string(), events })
}
}
pub struct NewUser { id: UserId, name: String }
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
name: String,
events: EntityEvents<UserEvent>,
}
use es_entity::*;
#[derive(EsRepo)]
#[es_repo(
entity = "User",
// list_by will generate the UsersByNameCursor
columns(name(ty = "String", list_by))
)]
pub struct Users {
pool: sqlx::PgPool
}
// // Generated code:
// pub mod user_cursor {
// pub struct UsersByNameCursor {
// name: String
// // id is always added to disambiguate
// // incase the `name` column is not unique
// id: UserId,
// }
//
// // Cursors that always exist:
// pub struct UsersById {
// id: UserId,
// }
//
// pub struct UsersByCreatedAt {
// created_at: chrono::DateTime<chrono::Utc>
// id: UserId,
// }
// }
//
async fn init_pool() -> anyhow::Result<sqlx::PgPool> {
let pg_con = format!("postgres://user:password@localhost:5432/pg");
Ok(sqlx::PgPool::connect(&pg_con).await?)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let users = Users { pool: init_pool().await? };
let new_user = NewUser { id: UserId::new(), name: "Fred".to_string() };
users.create(new_user).await?;
let PaginatedQueryRet {
entities,
has_next_page: _,
end_cursor: _,
} = users
.list_by_id(
PaginatedQueryArgs {
first: 5,
// after: None represents beginning of the list
after: Some(user_cursor::UsersByIdCursor {
id: uuid::Uuid::nil().into(),
}),
},
ListDirection::Ascending,
)
.await?;
assert!(!entities.is_empty());
// To collect all entities in a loop you can use `into_next_query()`.
// This is not recommended - just to highlight the API.
let mut query = Default::default();
let mut all_users = Vec::new();
loop {
let mut res = users.list_by_name(query, Default::default()).await?;
all_users.extend(res.entities.drain(..));
if let Some(next_query) = res.into_next_query() {
query = next_query;
} else {
break;
}
}
assert!(!all_users.is_empty());
Ok(())
}
fn list_for
Similar to list_by the list_for option lets you query pages of entities.
The difference is that list_for accepts an additional filter argument.
This is useful for situations where you have a 1-to-n relationship between 2 entities and you want to find all entities on the n side that share the same foreign key.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate tokio;
extern crate anyhow;
extern crate uuid;
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
NameUpdated { name: String },
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
unimplemented!()
}
}
pub struct NewUser { id: UserId, name: String }
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
events: EntityEvents<UserEvent>,
}
use es_entity::*;
es_entity::entity_id! { UserDocumentId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserDocumentId")]
pub enum UserDocumentEvent {
Initialized { id: UserDocumentId, owner_id: UserId },
}
impl IntoEvents<UserDocumentEvent> for NewUserDocument {
fn into_events(self) -> EntityEvents<UserDocumentEvent> {
EntityEvents::init(
self.id,
[UserDocumentEvent::Initialized {
id: self.id,
owner_id: self.owner_id,
}],
)
}
}
impl TryFromEvents<UserDocumentEvent> for UserDocument {
fn try_from_events(events: EntityEvents<UserDocumentEvent>) -> Result<Self, EsEntityError> {
Ok(UserDocument { id: events.id().clone(), owner_id: UserId::new(), events })
}
}
pub struct NewUserDocument { id: UserDocumentId, owner_id: UserId }
#[derive(EsEntity)]
pub struct UserDocument {
pub id: UserDocumentId,
owner_id: UserId,
events: EntityEvents<UserDocumentEvent>,
}
#[derive(EsRepo)]
#[es_repo(
entity = "UserDocument",
columns(
// The column name in the schema
user_id(
ty = "UserId",
// generate the `list_for` fn
list_for,
// The accessor on the `NewUserDocument` type
create(accessor = "owner_id"),
// Its immutable - so no need to ever update it
update(persist = false)
)
)
)]
pub struct UserDocuments {
pool: sqlx::PgPool
}
async fn init_pool() -> anyhow::Result<sqlx::PgPool> {
let pg_con = format!("postgres://user:password@localhost:5432/pg");
Ok(sqlx::PgPool::connect(&pg_con).await?)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let docs = UserDocuments { pool: init_pool().await? };
// Assume we have an existing user that we can get the id from
let owner_id = UserId::new();
// Batch creating a few entities for illustration
let new_docs = vec![
NewUserDocument { id: UserDocumentId::new(), owner_id },
NewUserDocument { id: UserDocumentId::new(), owner_id }
];
docs.create_all(new_docs).await?;
// The fns have the form `list_for_<filter>_by_<cursor>`
let docs = docs.list_for_user_id_by_created_at(
owner_id, Default::default(), Default::default()
).await?;
assert_eq!(docs.entities.len(), 2);
Ok(())
}
fn list_for_filter
The list_for_filter function provides a unified interface for querying entities with optional filtering and flexible sorting.
Unlike list_for which generates separate functions for each filter/sort combination, list_for_filter uses a single function that accepts:
- A filter enum (e.g.,
UsersFilter::WithNameorUsersFilter::NoFilter) - A sort specification with direction
- Pagination arguments
This approach is more flexible when you need to dynamically choose filters and sort orders at runtime, such as in GraphQL resolvers or REST API endpoints where users can specify different combinations of filters and sorting.
How It Works Internally
The list_for_filter function uses pattern matching to delegate to the appropriate underlying function based on the filter and sort combination:
pub async fn list_for_filter(
&self,
filter: UserDocumentsFilter,
sort: es_entity::Sort<UserDocumentsSortBy>,
cursor: es_entity::PaginatedQueryArgs<user_document_cursor::UserDocumentsCursor>,
) -> Result<es_entity::PaginatedQueryRet<UserDocument, user_document_cursor::UserDocumentsCursor>, EsRepoError> {
let es_entity::Sort { by, direction } = sort;
let es_entity::PaginatedQueryArgs { first, after } = cursor;
let res = match (filter, by) {
// Filter by user_id, sort by ID
(UserDocumentsFilter::WithUserId(filter_value), UserDocumentsSortBy::Id) => {
let after = after.map(user_document_cursor::UserDocumentsByIdCursor::try_from).transpose()?;
let query = es_entity::PaginatedQueryArgs { first, after };
self.list_for_user_id_by_id(filter_value, query, direction).await?
}
// Filter by user_id, sort by created_at
(UserDocumentsFilter::WithUserId(filter_value), UserDocumentsSortBy::CreatedAt) => {
let after = after.map(user_document_cursor::UserDocumentsByCreatedAtCursor::try_from).transpose()?;
let query = es_entity::PaginatedQueryArgs { first, after };
self.list_for_user_id_by_created_at(filter_value, query, direction).await?
}
// No filter, sort by ID
(UserDocumentsFilter::NoFilter, UserDocumentsSortBy::Id) => {
let after = after.map(user_document_cursor::UserDocumentsByIdCursor::try_from).transpose()?;
let query = es_entity::PaginatedQueryArgs { first, after };
self.list_by_id(query, direction).await?
}
// ... more combinations
};
Ok(res)
}
This pattern matching approach ensures type safety while providing a unified interface for all filter/sort combinations.
Important Notes
Cursor and Sort Alignment: The cursor type in PaginatedQueryArgs must match the sort field specified in the Sort parameter. If they don’t align, you’ll get a CursorDestructureError at runtime. For example, if you’re sorting by CreatedAt but your cursor is of type UsersByIdCursor, the conversion will fail.
Column Options: The available filter and sort combinations are determined by your column configuration:
- Filters: Generated for columns with the
list_foroption enabled - Sort By: Generated for columns with the
list_byoption enabled (ID and created_at are included by default)
Only columns configured with these options will appear in the respective filter and sort enums.
Example
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate tokio;
extern crate anyhow;
extern crate uuid;
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
NameUpdated { name: String },
}
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
unimplemented!()
}
}
pub struct NewUser { id: UserId, name: String }
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
events: EntityEvents<UserEvent>,
}
use es_entity::*;
es_entity::entity_id! { UserDocumentId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserDocumentId")]
pub enum UserDocumentEvent {
Initialized { id: UserDocumentId, owner_id: UserId },
}
impl IntoEvents<UserDocumentEvent> for NewUserDocument {
fn into_events(self) -> EntityEvents<UserDocumentEvent> {
EntityEvents::init(
self.id,
[UserDocumentEvent::Initialized {
id: self.id,
owner_id: self.owner_id,
}],
)
}
}
impl TryFromEvents<UserDocumentEvent> for UserDocument {
fn try_from_events(events: EntityEvents<UserDocumentEvent>) -> Result<Self, EsEntityError> {
Ok(UserDocument { id: events.id().clone(), owner_id: UserId::new(), events })
}
}
pub struct NewUserDocument { id: UserDocumentId, owner_id: UserId }
#[derive(EsEntity)]
pub struct UserDocument {
pub id: UserDocumentId,
owner_id: UserId,
events: EntityEvents<UserDocumentEvent>,
}
#[derive(EsRepo)]
#[es_repo(
entity = "UserDocument",
columns(
user_id(
ty = "UserId",
list_for,
create(accessor = "owner_id"), update(persist = false)
)
)
)]
pub struct UserDocuments {
pool: sqlx::PgPool
}
async fn init_pool() -> anyhow::Result<sqlx::PgPool> {
let pg_con = format!("postgres://user:password@localhost:5432/pg");
Ok(sqlx::PgPool::connect(&pg_con).await?)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let docs = UserDocuments { pool: init_pool().await? };
// Assume we have an existing user that we can get the id from
let owner_id = UserId::new();
// Batch creating a few entities for illustration
let new_docs = vec![
NewUserDocument { id: UserDocumentId::new(), owner_id },
NewUserDocument { id: UserDocumentId::new(), owner_id }
];
docs.create_all(new_docs).await?;
// Filter by user_id, sorted by created_at ascending
let filtered_docs = docs.list_for_filter(
UserDocumentsFilter::WithUserId(owner_id),
Sort {
by: UserDocumentsSortBy::CreatedAt,
direction: ListDirection::Ascending,
},
PaginatedQueryArgs {
first: 10,
after: None,
}
).await?;
assert_eq!(filtered_docs.entities.len(), 2);
// No filter, sorted by ID descending
let all_docs = docs.list_for_filter(
UserDocumentsFilter::NoFilter,
Sort {
by: UserDocumentsSortBy::Id,
direction: ListDirection::Descending,
},
PaginatedQueryArgs {
first: 10,
after: None,
}
).await?;
assert!(all_docs.entities.len() >= 2);
Ok(())
}
fn delete
If you are using Event Sourcing we assume you believe in immutability and keeping a long term audit history.
Deleting data from the Database goes against these principles.
Therefore es-entity does not provide a way to actually delete data.
It is however possible to configure a soft delete option by marking delete = soft on the EsRepo.
This will omit entities that have been flagged as deleted from all queries as well as generate additional queries that can include the deleted entities:
fn find_by_<column>_include_deleted
fn maybe_find_by_<column>_include_deleted
fn list_by_<column>_include_deleted
fn list_for_<column>_by_<cursor>_include_deleted
As a prerequisite the index table must include a deleted column:
CREATE TABLE users (
id UUID PRIMARY KEY,
name VARCHAR NOT NULL,
-- deleted will be set to 'TRUE' when `delete` is called.
deleted BOOL DEFAULT false,
created_at TIMESTAMPTZ NOT NULL
);
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate tokio;
extern crate anyhow;
use serde::{Deserialize, Serialize};
es_entity::entity_id! { UserId }
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
EntityEvents::init(
self.id,
[UserEvent::Initialized {
id: self.id,
name: self.name,
}],
)
}
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
Ok(User { id: events.id().clone(), name: "Delyth".to_string(), events })
}
}
pub struct NewUser { id: UserId, name: String }
use es_entity::*;
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
Deleted
}
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
name: String,
events: EntityEvents<UserEvent>,
}
impl User {
fn delete(&mut self) -> Idempotent<()> {
idempotency_guard!(
self.events.iter_all(),
UserEvent::Deleted
);
self.events.push(UserEvent::Deleted);
Idempotent::Executed(())
}
}
#[derive(EsRepo)]
#[es_repo(
entity = "User",
columns(name = "String"),
delete = "soft"
)]
pub struct Users {
pool: sqlx::PgPool
}
async fn init_pool() -> anyhow::Result<sqlx::PgPool> {
let pg_con = format!("postgres://user:password@localhost:5432/pg");
Ok(sqlx::PgPool::connect(&pg_con).await?)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let users = Users { pool: init_pool().await? };
let new_user = NewUser { id: UserId::new(), name: "Delyth".to_string() };
let mut user = users.create(new_user).await?;
let found_user = users.maybe_find_by_name("Delyth").await?;
assert!(found_user.is_some());
if user.delete().did_execute() {
users.delete(user).await?;
}
let found_user = users.maybe_find_by_name("Delyth").await?;
assert!(found_user.is_none());
let found_user = users.maybe_find_by_name_include_deleted("Delyth").await?;
assert!(found_user.is_some());
sqlx::query!(r#"
WITH deleted_users AS (
DELETE FROM user_events
WHERE id IN (SELECT id FROM users WHERE deleted = true)
RETURNING id
)
DELETE FROM users WHERE deleted = true"#
).execute(users.pool()).await?;
Ok(())
}
Transactions
One big advantage of using an ACID compliant database (such as Postgres) is transactional guarantees.
That is you can execute multiple writes atomically or multiple successive queries can view a consistent snapshot of your data.
The sqlx struct that manages this is the Transaction that is typically acquired from a pool.
Es-entity supports custom types that can wrap a connection while augmenting it with additional custom functionality.
By default the generated async fn begin_op() -> Result<Op, sqlx::Error> on EsRepo structs returns an es_entity::DbOp transaction wrapper that has support for commit hooks and caching of transaction time.
In order to be interoperable with bare sqlx::Transactions as well as custom transaction wrappers all generated functions accept one of 2 traits:
AtomicOperation- representing a transactional operation that needs to be committed.IntoOneTimeExecutor<'_>- representing a connection that can do 1 DB round trip and has no additional consistency guaranteed.
See Connection Types and Traits for details on these traits and their implementations.
Key Concepts
-
Connection Traits: Learn about
AtomicOperationandIntoOneTimeExecutortraits, method variants (_in_opfunctions), and operation requirements. -
DbOp: The default transaction wrapper with support for time caching, nested transactions, and
DbOpWithTimefor guaranteed timestamps. -
Commit Hooks: Execute custom logic before and after transaction commits, with support for hook merging and database operations during pre-commit.
Basic Example
extern crate anyhow;
extern crate sqlx;
extern crate tokio;
extern crate es_entity;
extern crate uuid;
async fn init_pool() -> anyhow::Result<sqlx::PgPool> {
let pg_con = format!("postgres://user:password@localhost:5432/pg");
Ok(sqlx::PgPool::connect(&pg_con).await?)
}
async fn count_users(op: impl es_entity::IntoOneTimeExecutor<'_>) -> anyhow::Result<i64> {
let row = op.into_executor().fetch_optional(sqlx::query!(
"SELECT COUNT(*) FROM users"
)).await?;
Ok(row.and_then(|r| r.count).unwrap_or(0))
}
// Ridiculous example of 2 operations that we want to execute atomically
async fn delete_and_count(op: &mut impl es_entity::AtomicOperation, id: uuid::Uuid) -> anyhow::Result<i64> {
sqlx::query!(
"DELETE FROM users WHERE id = $1",
id
).execute(op.as_executor()).await?;
let row = sqlx::query!(
"SELECT COUNT(*) FROM users"
).fetch_optional(op.as_executor()).await?;
Ok(row.and_then(|r| r.count).unwrap_or(0))
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let pool = init_pool().await?;
// &sqlx::PgPool implements IntoOneTimeExecutor
let _ = count_users(&pool).await?;
// It can only execute 1 roundtrip consistently as it will
// check out a new connection from the pool for each operation.
// Hence we cannot pass it to `fn`'s that need AtomicOperation
// as we cannot guarantee that they will be consistent.
// let _ = delete_and_count(&pool).await?; // <- won't compile
// &mut sqlx::PgTransaction implements AtomicOperation
// so we can use it for both `fns`
let mut tx = pool.begin().await?;
let _ = count_users(&mut tx).await?;
let some_existing_user_id = uuid::Uuid::now_v7();
let _ = delete_and_count(&mut tx, some_existing_user_id).await?;
// Don't forget to commit the operation or the change won't become visible
tx.commit().await?;
Ok(())
}
Connection Types and Traits
The type of the <connection> argument for generated functions is generic, requiring either the AtomicOperation or IntoOneTimeExecutor trait to be implemented on the type.
There is a blanket implementation that makes every AtomicOperation implement IntoOneTimeExecutor - but the reverse is not the case.
AtomicOperation
The AtomicOperation trait represents a transactional operation that can execute multiple database operations atomically with consistent snapshots of the data.
pub trait AtomicOperation: Send {
/// Function for querying when the operation is taking place - if it is cached.
fn maybe_now(&self) -> Option<chrono::DateTime<chrono::Utc>> {
None
}
/// Returns the underlying sqlx::Executor implementation.
fn as_executor(&mut self) -> &mut sqlx::PgConnection;
/// Registers a commit hook that will run pre_commit before and post_commit after
/// the transaction commits. Returns Ok(()) if the hook was registered,
/// Err(hook) if hooks are not supported.
fn add_commit_hook<H: CommitHook>(&mut self, hook: H) -> Result<(), H> {
Err(hook)
}
}
Implementations of AtomicOperation:
&mut sqlx::Transaction<'_, Postgres>&mut DbOp<'_>&mut DbOpWithTime<'_>&mut OpWithTime<'_, Op>(whereOp: AtomicOperation)HookOperation<'_>(used internally by hooks)
IntoOneTimeExecutor
The IntoOneTimeExecutor trait ensures in a typesafe way that only 1 database operation can occur by consuming the inner reference.
Implementations of IntoOneTimeExecutor:
&PgPool- checks out a new connection for each operation- Any type implementing
AtomicOperation- guarantees consistency across multiple operations
async fn find_by_id_in_op<'a, OP>(op: OP, id: EntityId)
where
OP: IntoOneTimeExecutor<'a>;
async fn create_in_op<OP>(op: &mut OP, new_entity: NewEntity)
where
OP: AtomicOperation;
Both traits wrap access to an sqlx::Executor implementation that ultimately executes the query.
Method Variants
All CRUD fns that es-entity generates come in 2 variants:
async fn create(new_entity: NewEntity)
async fn create_in_op(<connection>, new_entity: NewEntity)
async fn update(entity: &mut Entity)
async fn update_in_op(<connection>, entity: &mut Entity)
async fn find_by_id(id: EntityId)
async fn find_by_id_in_op(<connection>, id: EntityId)
etc
In all cases the _in_op variant accepts a first argument that represents the connection to the database.
The non-_in_op variant simply wraps the _in_op call by passing an appropriate connection argument internally.
Operation Requirements
In es-entity mutating fns generally require 2 roundtrips to update the index table and append to the events table.
Hence create_in_op, update_in_op and delete_in_op all require &mut impl AtomicOperation first arguments.
Most queries on the other hand are executed with 1 round trip (to fetch the events) and thus accept impl IntoOneTimeExecutor<'_> first arguments.
Exceptions to this are for nested entities which will be explained in a later section.
DbOp - Default Transaction Wrapper
DbOp is the default transaction wrapper returned by the generated begin_op() method on EsRepo structs. It wraps a sqlx::Transaction while providing:
- Support for commit hooks
- Caching of transaction time
- Integration with the clock module for deterministic testing
Creating DbOp Instances
// Initialize from a pool
let mut op = DbOp::init(&pool).await?;
// Convert from a sqlx::Transaction
let tx = pool.begin().await?;
let op: DbOp = tx.into();
// Or use the generated method on your repo
let mut op = MyEntityRepo::begin_op(&pool).await?;
Time Management
DbOp supports caching the transaction timestamp, which is useful for:
- Ensuring consistent timestamps across multiple operations in a transaction
- Deterministic testing with artificial time (see clock module)
- Avoiding multiple
NOW()database queries
When a global artificial clock is installed via Clock::install_artificial(), DbOp::init() automatically caches the artificial time.
// Get cached time if available
let maybe_time: Option<DateTime<Utc>> = op.maybe_now();
// Transition to DbOpWithTime with specific time
let op_with_time = op.with_time(my_timestamp);
// Transition to DbOpWithTime using Clock::now() (artificial or system time)
let op_with_time = op.with_system_time();
// Transition to DbOpWithTime with database time
// Uses artificial time if installed, otherwise executes SELECT NOW()
let op_with_time = op.with_db_time().await?;
DbOpWithTime
DbOpWithTime is equivalent to DbOp but guarantees that a timestamp is cached:
pub struct DbOpWithTime<'c> {
// ...
}
impl<'c> DbOpWithTime<'c> {
/// The cached DateTime
pub fn now(&self) -> chrono::DateTime<chrono::Utc>;
/// Begins a nested transaction
pub async fn begin(&mut self) -> Result<DbOpWithTime<'_>, sqlx::Error>;
/// Commits the inner transaction
pub async fn commit(self) -> Result<(), sqlx::Error>;
}
It implements both AtomicOperation and AtomicOperationWithTime traits.
Commit Hooks
Commit hooks allow you to execute custom logic before and after a transaction commits. This is useful for:
- Publishing events to message queues after successful commits
- Updating caches
- Triggering side effects that should only occur if the transaction succeeds
- Accumulating operations across multiple entity updates in a transaction
CommitHook Trait
The CommitHook trait defines the lifecycle hooks:
pub trait CommitHook: Send + 'static + Sized {
/// Called before the transaction commits. Can perform database operations.
/// Returns Self so it can be used in post_commit.
async fn pre_commit(
self,
op: HookOperation<'_>,
) -> Result<PreCommitRet<'_, Self>, sqlx::Error> {
PreCommitRet::ok(self, op)
}
/// Called after the transaction has successfully committed.
/// Cannot fail, not async.
fn post_commit(self) {
// Default: do nothing
}
/// Try to merge `other` into `self`.
/// Returns true if merged (other will be dropped).
/// Returns false if not merged (both will execute separately).
fn merge(&mut self, _other: &mut Self) -> bool {
false
}
}
Hook Execution Lifecycle
- Registration: Hooks are registered using
add_commit_hook()on anyAtomicOperation - Merging: If multiple hooks of the same type are registered and
merge()returnstrue, they are merged into a single hook - Pre-commit: All
pre_commit()methods are called sequentially before the transaction commits - Commit: The underlying transaction is committed
- Post-commit: All
post_commit()methods are called sequentially after successful commit
let mut op = DbOp::init(&pool).await?;
// Register a hook
op.add_commit_hook(MyHook { data: "example".to_string() })?;
// Hooks execute when commit is called
op.commit().await?; // pre_commit runs, then tx.commit(), then post_commit
HookOperation
HookOperation<'_> is a wrapper passed to pre_commit() that allows hooks to execute database operations:
impl CommitHook for MyHook {
async fn pre_commit(
self,
mut op: HookOperation<'_>,
) -> Result<PreCommitRet<'_, Self>, sqlx::Error> {
// Can execute queries
let result = sqlx::query!("SELECT COUNT(*) FROM events")
.fetch_one(op.as_executor())
.await?;
PreCommitRet::ok(self, op)
}
}
HookOperation implements AtomicOperation so it can be passed to any function expecting that trait.
Hook Merging
Hooks of the same type can be merged by implementing the merge() method. This is useful for aggregating operations:
struct EventPublisher {
events: Vec<DomainEvent>,
}
impl CommitHook for EventPublisher {
async fn pre_commit(
self,
op: HookOperation<'_>,
) -> Result<PreCommitRet<'_, Self>, sqlx::Error> {
// Prepare events for publishing
PreCommitRet::ok(self, op)
}
fn post_commit(self) {
// Publish all events to message queue
publish_events(self.events);
}
fn merge(&mut self, other: &mut Self) -> bool {
// Combine events from multiple entity updates
self.events.append(&mut other.events);
true // Successfully merged
}
}
// Usage:
let mut op = DbOp::init(&pool).await?;
op.add_commit_hook(EventPublisher { events: vec![event1] })?;
op.add_commit_hook(EventPublisher { events: vec![event2, event3] })?;
// When commit() is called, hooks merge and publish all 3 events together
op.commit().await?;
Fallback for Non-Supporting Operations
Not all AtomicOperation implementations support hooks. If add_commit_hook() returns Err(hook), you can force immediate execution:
let mut tx = pool.begin().await?; // Plain sqlx transaction doesn't support hooks
match tx.add_commit_hook(my_hook) {
Ok(()) => {
// Hook registered, will run on commit
}
Err(hook) => {
// Hooks not supported, execute immediately
let hook = hook.force_execute_pre_commit(&mut tx).await?;
tx.commit().await?;
hook.post_commit();
}
}
Complete Example
use es_entity::operation::{DbOp, hooks::{CommitHook, HookOperation, PreCommitRet}};
#[derive(Debug)]
struct EventPublisher {
events: Vec<String>,
}
impl CommitHook for EventPublisher {
async fn pre_commit(
self,
op: HookOperation<'_>,
) -> Result<PreCommitRet<'_, Self>, sqlx::Error> {
// Could validate events or store them in a staging table
PreCommitRet::ok(self, op)
}
fn post_commit(self) {
// Publish events only after successful commit
for event in self.events {
println!("Publishing event: {}", event);
// actual_publish_to_queue(event);
}
}
fn merge(&mut self, other: &mut Self) -> bool {
// Combine events from multiple operations
self.events.append(&mut other.events);
true
}
}
async fn example_with_hooks(pool: &PgPool) -> Result<(), sqlx::Error> {
let mut op = DbOp::init(pool).await?;
// Multiple updates might each register hooks
op.add_commit_hook(EventPublisher {
events: vec!["user.created".to_string()]
})?;
op.add_commit_hook(EventPublisher {
events: vec!["notification.sent".to_string()]
})?;
// When we commit, hooks merge and execute together
op.commit().await?;
// Output: Publishing event: user.created
// Publishing event: notification.sent
Ok(())
}
Aggregates
In Domain Driven Design an aggregate represents a collection of entities that must be atomically updated in order to keep the business rules intanct.
One entity is designated the aggregate root which is responsible for enforcing that the relationships between the component entities hold.
Keeping Aggregates Small
In practice it is preferable to keep your aggregates as small as possible.
In most cases the aggregate should only contain a single entity - in which case the aggregate is indistinguishable from the single entity it contains.
Keeping aggregates small by applying careful design and consideration of the domain invariants and their boundaries promotes decoupling and reduces over all complexity.
Its easier to reason about and test the behaviour of “a single thing” vs “a bunch of things” in aggregate.
When to Use Aggregates
A common misconception is that once you have identified a parent-child relationship you should naturally represent them as an aggregate. This is however not the case - not every entity relationship that intuitively presents as a parent-child hierarchy needs to be modelled as an aggregate. Only when there is a business rule that inherently spans the relationship does it become mandatory.
Example: Subscription and Billing Periods
An example of this could be if you have a Subscription that has successive BillingPeriods.
Say a use case specifies that the system should be able to add a line item to the current BillingPeriod.
The emphesis here is on the word current - the domain invariant is that there may only be a single current BillingPeriod per Subscription.
But how do we enforce that?
To keep the system consistent we need a “thing” that tracks all BillingPeriods and enforces the uniqueness of the current state across them.
A BillingPeriod entity is not aware of the other ones and can therefore not enforce whether or not current status is indeed unique or not.
Implementation Approaches
1. Simple Foreign Key Relationship
Lets first consider an approach that treats the entities as separate with a simple foreign key relationship:
let subscription = subs.find_by_id(id).await?;
let billing_period_id = subscription.current_billing_period_id();
let mut billing_period = periods.find_by_id(billing_period_id).await?;
billing_period.add_line_item(amount);
periods.update(&mut billing_period).await?;
The risk here is that it is possible that the period which is the current one changed between the line that queries the subscription and the line that updates the period.
It is of course possible to prohibit this given the correct implementation - but that puts the burdon on the developer and may be brittle.
In general the foreign key approach may lead to inconsistent states in edge cases.
Depending on the specifics of the domain and the processes that would need to be invoked if this edge case is hit that may or may not be acceptable.
2. Transactional Approach
One way of removing this edge case is using the transactional guarantees of the database to enforce consistency across the 2 entities.
To achieve this you would probably have to use SERIALIZABLE isolation level - which adds a lot of overhead to the database.
let mut tx = pool.begin().await?;
sqlx::query!("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE").execute(&mut *tx).await?;
let subscription = subs.find_by_id_in_op(&mut tx, id).await?;
let billing_period_id = subscription.current_billing_period_id();
let mut billing_period = billing_periods.find_by_id_in_op(&mut tx, billing_period_id).await?;
billing_period.add_line_item(amount);
billing_periods.update_in_op(&mut tx, &mut billing_period).await?;
tx.commit().await?;
By using a database transaction we have essentially created a logical-aggregate where we are using the features of the database to enforce consistency.
Thus eleviating some of the cognitive overhead and risk.
We still have to remember however that this logical-aggregate exists (even if not visible in the code) and ensure that we always set the correct transaction boundaries and isolation level when we are accessing the BillingPeriods.
3. Nested Aggregate Approach
To make the relationship more obvious and make it impossible to introduce edge cases we could also nest the BillingPeriod inside the Subscription.
This way we use the modeling and relationships of our structs to reflect the aggregate relationship more clearly.
All access to the BillingPeriod would be moderated by the Subscription root.
And updates would be proxied via the root entity to guarantee we are updating the correct one.
let mut subscription = subs.find_by_id(id).await?;
subscription.add_line_item_to_current_billing_period(amount);
subs.update(&mut subscription).await?;
This essentially removes all edge cases and guarantees atomicity on the type level - which is good. But it introduces some complexity on handling the nesting itself.
4. Domain Restructuring
Finally we could simply re-structure the entities so that no ‘higher-level’ enforcement is necesairy.
An example might be to represent CurrentBillingPeriod and ClosedBillingPeriod as separate entities entirely.
In the real world this approach would probably be better than any of the examples above.
After all, if a “thing” has fundamentally different domain rules when its in one state vs another state - why not simply represent the two states as two separate entities? Especially if the restructuring allows you to reduce the size of your aggregates.
That would make the implementation look something like:
let mut billing_period = current_billing_periods.find_by_subscription_id(subscription_id).await?;
billing_period.add_line_item(amount);
current_billing_periods.update(&mut billing_period).await?;
The current_billing_periods repository could not return a non-current one thereby sidestepping the coordination issue entirely.
Summary
Of the discussed options 3 of the 4 approaches (simple foreign key, transactional, restructuring) can be handled with the features of es-entity that were previously discussed.
The nested approach requires special support from your repository to correctly persist all the nested entities and hydrate them when loading the root.
If taking all options into consideration you decide the correct approach to solving your domain constraint is via nesting the next section will describe how to represent that using es-entity.
Nesting
Building on the aggregate example from the previous chapter, let’s implement the nested approach for our Subscription and BillingPeriod entities.
As discussed, this approach makes the aggregate relationship explicit in the type system and ensures all access to nested entities is moderated through the aggregate root.
Setting up the Database Tables
First, we need to create the tables for both the parent (Subscription) and nested (BillingPeriod) entities:
-- The parent entity table
CREATE TABLE subscriptions (
id UUID PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE subscription_events (
id UUID NOT NULL REFERENCES subscriptions(id),
sequence INT NOT NULL,
event_type VARCHAR NOT NULL,
event JSONB NOT NULL,
context JSONB DEFAULT NULL,
recorded_at TIMESTAMPTZ NOT NULL,
UNIQUE(id, sequence)
);
-- The nested entity table
CREATE TABLE billing_periods (
id UUID PRIMARY KEY,
subscription_id UUID NOT NULL REFERENCES subscriptions(id),
created_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE billing_period_events (
id UUID NOT NULL REFERENCES billing_periods(id),
sequence INT NOT NULL,
event_type VARCHAR NOT NULL,
event JSONB NOT NULL,
context JSONB DEFAULT NULL,
recorded_at TIMESTAMPTZ NOT NULL,
UNIQUE(id, sequence)
);
Note how the nested index table (billing_periods) includes a foreign key to the parent.
Defining the Nested Entity
Let’s start by implementing the BillingPeriod entity that will be nested inside Subscription.
There are no special requirements on the child entity and it can be setup just like always:
#![allow(unused)]
fn main() {
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate derive_builder;
extern crate tokio;
extern crate anyhow;
use derive_builder::Builder;
use es_entity::*;
use serde::{Deserialize, Serialize};
es_entity::entity_id! {
SubscriptionId,
BillingPeriodId
}
#[derive(EsEvent, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "BillingPeriodId")]
pub enum BillingPeriodEvent {
Initialized {
id: BillingPeriodId,
subscription_id: SubscriptionId,
},
LineItemAdded {
amount: f64,
description: String,
},
Closed,
}
#[derive(EsEntity, Builder)]
#[builder(pattern = "owned", build_fn(error = "EsEntityError"))]
pub struct BillingPeriod {
pub id: BillingPeriodId,
pub subscription_id: SubscriptionId,
pub is_current: bool,
pub line_items: Vec<LineItem>,
events: EntityEvents<BillingPeriodEvent>,
}
#[derive(Debug, Clone)]
pub struct LineItem {
pub amount: f64,
pub description: String,
}
impl BillingPeriod {
pub fn add_line_item(&mut self, amount: f64, description: String) -> Idempotent<usize> {
idempotency_guard!(
self.events.iter_all().rev(),
BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. }
if a == &amount && d == &description
);
self.line_items.push(LineItem {
amount,
description: description.clone(),
});
self.events.push(BillingPeriodEvent::LineItemAdded {
amount,
description,
});
Idempotent::Executed(self.line_items.len())
}
pub fn close(&mut self) -> Idempotent<()> {
idempotency_guard!(
self.events.iter_all().rev(),
BillingPeriodEvent::Closed
);
self.is_current = false;
self.events.push(BillingPeriodEvent::Closed);
Idempotent::Executed(())
}
}
impl TryFromEvents<BillingPeriodEvent> for BillingPeriod {
fn try_from_events(events: EntityEvents<BillingPeriodEvent>) -> Result<Self, EsEntityError> {
let mut builder = BillingPeriodBuilder::default().is_current(true);
let mut line_items = Vec::new();
for event in events.iter_all() {
match event {
BillingPeriodEvent::Initialized { id, subscription_id } => {
builder = builder.id(*id).subscription_id(*subscription_id);
}
BillingPeriodEvent::LineItemAdded { amount, description } => {
line_items.push(LineItem {
amount: *amount,
description: description.clone(),
});
}
BillingPeriodEvent::Closed => {
builder = builder.is_current(false)
}
}
}
builder
.line_items(line_items)
.events(events)
.build()
}
}
#[derive(Debug, Clone, Builder)]
pub struct NewBillingPeriod {
pub id: BillingPeriodId,
pub subscription_id: SubscriptionId,
}
impl IntoEvents<BillingPeriodEvent> for NewBillingPeriod {
fn into_events(self) -> EntityEvents<BillingPeriodEvent> {
EntityEvents::init(
self.id,
vec![BillingPeriodEvent::Initialized {
id: self.id,
subscription_id: self.subscription_id,
}],
)
}
}
}
Defining the Parent Entity with Nested Children
Now let’s implement the Subscription entity that will contain the nested BillingPeriod entities:
#![allow(unused)]
fn main() {
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate derive_builder;
extern crate tokio;
extern crate anyhow;
use derive_builder::Builder;
use es_entity::*;
use serde::{Deserialize, Serialize};
es_entity::entity_id! {
SubscriptionId,
BillingPeriodId
}
#[derive(EsEvent, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "BillingPeriodId")]
pub enum BillingPeriodEvent {
Initialized {
id: BillingPeriodId,
subscription_id: SubscriptionId,
},
LineItemAdded {
amount: f64,
description: String,
},
Closed,
}
#[derive(EsEntity, Builder)]
#[builder(pattern = "owned", build_fn(error = "EsEntityError"))]
pub struct BillingPeriod {
pub id: BillingPeriodId,
pub subscription_id: SubscriptionId,
pub is_current: bool,
pub line_items: Vec<LineItem>,
events: EntityEvents<BillingPeriodEvent>,
}
#[derive(Debug, Clone)]
pub struct LineItem {
pub amount: f64,
pub description: String,
}
impl BillingPeriod {
pub fn add_line_item(&mut self, amount: f64, description: String) -> Idempotent<usize> {
idempotency_guard!(
self.events.iter_all().rev(),
BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. }
if a == &amount && d == &description
);
self.line_items.push(LineItem {
amount,
description: description.clone(),
});
self.events.push(BillingPeriodEvent::LineItemAdded {
amount,
description,
});
Idempotent::Executed(self.line_items.len())
}
pub fn close(&mut self) -> Idempotent<()> {
idempotency_guard!(
self.events.iter_all().rev(),
BillingPeriodEvent::Closed
);
self.is_current = false;
self.events.push(BillingPeriodEvent::Closed);
Idempotent::Executed(())
}
}
impl TryFromEvents<BillingPeriodEvent> for BillingPeriod {
fn try_from_events(events: EntityEvents<BillingPeriodEvent>) -> Result<Self, EsEntityError> {
let mut builder = BillingPeriodBuilder::default().is_current(true);
let mut line_items = Vec::new();
for event in events.iter_all() {
match event {
BillingPeriodEvent::Initialized { id, subscription_id } => {
builder = builder.id(*id).subscription_id(*subscription_id);
}
BillingPeriodEvent::LineItemAdded { amount, description } => {
line_items.push(LineItem {
amount: *amount,
description: description.clone(),
});
}
BillingPeriodEvent::Closed => {
builder = builder.is_current(false)
}
}
}
builder
.line_items(line_items)
.events(events)
.build()
}
}
#[derive(Debug, Clone, Builder)]
pub struct NewBillingPeriod {
pub id: BillingPeriodId,
pub subscription_id: SubscriptionId,
}
impl IntoEvents<BillingPeriodEvent> for NewBillingPeriod {
fn into_events(self) -> EntityEvents<BillingPeriodEvent> {
EntityEvents::init(
self.id,
vec![BillingPeriodEvent::Initialized {
id: self.id,
subscription_id: self.subscription_id,
}],
)
}
}
#[derive(EsEvent, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "SubscriptionId")]
pub enum SubscriptionEvent {
Initialized { id: SubscriptionId },
BillingPeriodStarted { period_id: BillingPeriodId },
}
#[derive(EsEntity, Builder)]
#[builder(pattern = "owned", build_fn(error = "EsEntityError"))]
pub struct Subscription {
pub id: SubscriptionId,
current_period_id: Option<BillingPeriodId>,
events: EntityEvents<SubscriptionEvent>,
// The `#[es_entity(nested)]` attribute marks this field as containing nested entities.
// It must be of type `Nested<T>`.
// The #[builder(default)] will initialize it as empty.
// The Repository will load the children after the parent as been hydrated.
#[es_entity(nested)]
#[builder(default)]
billing_periods: Nested<BillingPeriod>,
}
impl Subscription {
pub fn start_new_billing_period(&mut self) -> Idempotent<BillingPeriodId> {
// Close the current billing period if there is one
if let Some(current_id) = self.current_period_id {
if let Some(current_period) = self.billing_periods.get_persisted_mut(¤t_id) {
current_period.close();
}
}
// Create the new billing period
let new_period = NewBillingPeriod {
id: BillingPeriodId::new(),
subscription_id: self.id,
};
let id = new_period.id;
self.billing_periods.add_new(new_period);
// Update the current period tracking
self.current_period_id = Some(id);
self.events.push(SubscriptionEvent::BillingPeriodStarted { period_id: id });
Idempotent::Executed(id)
}
pub fn add_line_item_to_current_billing_period(&mut self, amount: f64, description: String) -> Idempotent<usize> {
// Use the tracked current period ID to access the billing period directly
if let Some(current_id) = self.current_period_id {
if let Some(current_period) = self.billing_periods.get_persisted_mut(¤t_id) {
return current_period.add_line_item(amount, description);
}
}
Idempotent::AlreadyApplied
}
}
impl TryFromEvents<SubscriptionEvent> for Subscription {
fn try_from_events(events: EntityEvents<SubscriptionEvent>) -> Result<Self, EsEntityError> {
let mut builder = SubscriptionBuilder::default();
for event in events.iter_all() {
match event {
SubscriptionEvent::Initialized { id } => {
builder = builder.id(*id);
}
SubscriptionEvent::BillingPeriodStarted { period_id } => {
builder = builder.current_period_id(Some(*period_id));
}
}
}
builder
.events(events)
.build()
}
}
#[derive(Debug, Clone, Builder)]
pub struct NewSubscription {
pub id: SubscriptionId,
}
impl IntoEvents<SubscriptionEvent> for NewSubscription {
fn into_events(self) -> EntityEvents<SubscriptionEvent> {
EntityEvents::init(
self.id,
vec![SubscriptionEvent::Initialized { id: self.id }],
)
}
}
}
The key points to note:
- The
billing_periodsfield is marked with#[es_entity(nested)] - The field type is
Nested<BillingPeriod>which is a special container for nested entities - We use
add_new()to add new nested entities - We mutate the children via
get_persisted_mut().
Under the hood the EsEntity macro will create an implementation of the Parent trait:
pub trait Parent<T: EsEntity> {
fn new_children_mut(&mut self) -> &mut Vec<<T as EsEntity>::New>;
fn iter_persisted_children_mut<'a>(&'a mut self) -> impl Iterator<Item = &'a mut T>
where
T: 'a;
fn inject_children(&mut self, entities: impl IntoIterator<Item = T>);
}
for every field marked #[es_entity(nested)].
Setting up the Repositories
The repository setup is where the magic happens for nested entities.
We need to configure both the parent and child repositories with special attributes.
It is recommended to put both Repositories in the same file but only mark the parent one as pub.
This leverages the rust module system to enforce that the children cannot be accessed directly.
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate derive_builder;
extern crate tokio;
extern crate anyhow;
use derive_builder::Builder;
use es_entity::*;
use serde::{Deserialize, Serialize};
es_entity::entity_id! {
SubscriptionId,
BillingPeriodId
}
#[derive(EsEvent, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "BillingPeriodId")]
pub enum BillingPeriodEvent {
Initialized {
id: BillingPeriodId,
subscription_id: SubscriptionId,
},
LineItemAdded {
amount: f64,
description: String,
},
Closed,
}
#[derive(EsEntity, Builder)]
#[builder(pattern = "owned", build_fn(error = "EsEntityError"))]
pub struct BillingPeriod {
pub id: BillingPeriodId,
pub subscription_id: SubscriptionId,
pub is_current: bool,
pub line_items: Vec<LineItem>,
events: EntityEvents<BillingPeriodEvent>,
}
#[derive(Debug, Clone)]
pub struct LineItem {
pub amount: f64,
pub description: String,
}
impl BillingPeriod {
pub fn add_line_item(&mut self, amount: f64, description: String) -> Idempotent<usize> {
idempotency_guard!(
self.events.iter_all().rev(),
BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. }
if a == &amount && d == &description
);
self.line_items.push(LineItem {
amount,
description: description.clone(),
});
self.events.push(BillingPeriodEvent::LineItemAdded {
amount,
description,
});
Idempotent::Executed(self.line_items.len())
}
pub fn close(&mut self) -> Idempotent<()> {
idempotency_guard!(
self.events.iter_all().rev(),
BillingPeriodEvent::Closed
);
self.is_current = false;
self.events.push(BillingPeriodEvent::Closed);
Idempotent::Executed(())
}
}
impl TryFromEvents<BillingPeriodEvent> for BillingPeriod {
fn try_from_events(events: EntityEvents<BillingPeriodEvent>) -> Result<Self, EsEntityError> {
let mut builder = BillingPeriodBuilder::default().is_current(true);
let mut line_items = Vec::new();
for event in events.iter_all() {
match event {
BillingPeriodEvent::Initialized { id, subscription_id } => {
builder = builder.id(*id).subscription_id(*subscription_id);
}
BillingPeriodEvent::LineItemAdded { amount, description } => {
line_items.push(LineItem {
amount: *amount,
description: description.clone(),
});
}
BillingPeriodEvent::Closed => {
builder = builder.is_current(false)
}
}
}
builder
.line_items(line_items)
.events(events)
.build()
}
}
#[derive(Debug, Clone, Builder)]
pub struct NewBillingPeriod {
pub id: BillingPeriodId,
pub subscription_id: SubscriptionId,
}
impl IntoEvents<BillingPeriodEvent> for NewBillingPeriod {
fn into_events(self) -> EntityEvents<BillingPeriodEvent> {
EntityEvents::init(
self.id,
vec![BillingPeriodEvent::Initialized {
id: self.id,
subscription_id: self.subscription_id,
}],
)
}
}
#[derive(EsEvent, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "SubscriptionId")]
pub enum SubscriptionEvent {
Initialized { id: SubscriptionId },
BillingPeriodStarted { period_id: BillingPeriodId },
}
#[derive(EsEntity, Builder)]
#[builder(pattern = "owned", build_fn(error = "EsEntityError"))]
pub struct Subscription {
pub id: SubscriptionId,
current_period_id: Option<BillingPeriodId>,
events: EntityEvents<SubscriptionEvent>,
#[es_entity(nested)]
#[builder(default)]
billing_periods: Nested<BillingPeriod>,
}
impl TryFromEvents<SubscriptionEvent> for Subscription {
fn try_from_events(events: EntityEvents<SubscriptionEvent>) -> Result<Self, EsEntityError> {
let mut builder = SubscriptionBuilder::default();
for event in events.iter_all() {
match event {
SubscriptionEvent::Initialized { id } => {
builder = builder.id(*id);
}
SubscriptionEvent::BillingPeriodStarted { period_id } => {
builder = builder.current_period_id(Some(*period_id));
}
}
}
builder
.events(events)
.build()
}
}
#[derive(Debug, Clone, Builder)]
pub struct NewSubscription {
pub id: SubscriptionId,
}
impl IntoEvents<SubscriptionEvent> for NewSubscription {
fn into_events(self) -> EntityEvents<SubscriptionEvent> {
EntityEvents::init(
self.id,
vec![SubscriptionEvent::Initialized { id: self.id }],
)
}
}
fn main() {}
#[derive(EsRepo)]
#[es_repo(
entity = "BillingPeriod",
columns(
// The foreign key of the parent marked by `parent`.
subscription_id(ty = "SubscriptionId", update(persist = false), parent)
)
)]
// private struct
struct BillingPeriods {
pool: sqlx::PgPool,
}
#[derive(EsRepo)]
#[es_repo(entity = "Subscription")]
pub struct Subscriptions {
pool: sqlx::PgPool,
// Mark this field as containing the nested repository
#[es_repo(nested)]
billing_periods: BillingPeriods,
}
impl Subscriptions {
pub fn new(pool: sqlx::PgPool) -> Self {
Self {
pool: pool.clone(),
billing_periods: BillingPeriods { pool },
}
}
}
The important configuration here:
- The child repository (
BillingPeriods) marks the foreign key column withparent. - The parent repository (
Subscriptions) includes the child repository as a field marked with#[es_repo(nested)]
Using Nested Entities
Now we can use our aggregate with full type safety and automatic loading of nested entities:
extern crate es_entity;
extern crate sqlx;
extern crate serde;
extern crate derive_builder;
extern crate tokio;
extern crate anyhow;
use derive_builder::Builder;
use es_entity::*;
use serde::{Deserialize, Serialize};
es_entity::entity_id! {
SubscriptionId,
BillingPeriodId
}
#[derive(EsEvent, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "BillingPeriodId")]
pub enum BillingPeriodEvent {
Initialized {
id: BillingPeriodId,
subscription_id: SubscriptionId,
},
LineItemAdded {
amount: f64,
description: String,
},
Closed,
}
#[derive(EsEntity, Builder)]
#[builder(pattern = "owned", build_fn(error = "EsEntityError"))]
pub struct BillingPeriod {
pub id: BillingPeriodId,
pub subscription_id: SubscriptionId,
pub is_current: bool,
pub line_items: Vec<LineItem>,
events: EntityEvents<BillingPeriodEvent>,
}
#[derive(Debug, Clone)]
pub struct LineItem {
pub amount: f64,
pub description: String,
}
impl BillingPeriod {
pub fn add_line_item(&mut self, amount: f64, description: String) -> Idempotent<usize> {
if !self.is_current {
unreachable!()
}
idempotency_guard!(
self.events.iter_all().rev(),
BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. }
if a == &amount && d == &description
);
self.line_items.push(LineItem {
amount,
description: description.clone(),
});
self.events.push(BillingPeriodEvent::LineItemAdded {
amount,
description,
});
Idempotent::Executed(self.line_items.len())
}
pub fn close(&mut self) -> Idempotent<()> {
idempotency_guard!(
self.events.iter_all().rev(),
BillingPeriodEvent::Closed
);
self.is_current = false;
self.events.push(BillingPeriodEvent::Closed);
Idempotent::Executed(())
}
}
impl TryFromEvents<BillingPeriodEvent> for BillingPeriod {
fn try_from_events(events: EntityEvents<BillingPeriodEvent>) -> Result<Self, EsEntityError> {
let mut builder = BillingPeriodBuilder::default();
let mut line_items = Vec::new();
let mut is_current = true;
for event in events.iter_all() {
match event {
BillingPeriodEvent::Initialized { id, subscription_id } => {
builder = builder.id(*id).subscription_id(*subscription_id);
}
BillingPeriodEvent::LineItemAdded { amount, description } => {
line_items.push(LineItem {
amount: *amount,
description: description.clone(),
});
}
BillingPeriodEvent::Closed => {
is_current = false;
}
}
}
builder
.is_current(is_current)
.line_items(line_items)
.events(events)
.build()
}
}
#[derive(Debug, Clone, Builder)]
pub struct NewBillingPeriod {
pub id: BillingPeriodId,
pub subscription_id: SubscriptionId,
}
impl IntoEvents<BillingPeriodEvent> for NewBillingPeriod {
fn into_events(self) -> EntityEvents<BillingPeriodEvent> {
EntityEvents::init(
self.id,
vec![BillingPeriodEvent::Initialized {
id: self.id,
subscription_id: self.subscription_id,
}],
)
}
}
#[derive(EsEvent, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "SubscriptionId")]
pub enum SubscriptionEvent {
Initialized { id: SubscriptionId },
BillingPeriodStarted { period_id: BillingPeriodId },
}
#[derive(EsEntity, Builder)]
#[builder(pattern = "owned", build_fn(error = "EsEntityError"))]
pub struct Subscription {
pub id: SubscriptionId,
current_period_id: Option<BillingPeriodId>,
events: EntityEvents<SubscriptionEvent>,
#[es_entity(nested)]
#[builder(default)]
billing_periods: Nested<BillingPeriod>,
}
impl Subscription {
pub fn start_new_billing_period(&mut self) -> Idempotent<BillingPeriodId> {
if let Some(current_id) = self.current_period_id {
if let Some(current_period) = self.billing_periods.get_persisted_mut(¤t_id) {
current_period.close();
}
}
let new_period = NewBillingPeriod {
id: BillingPeriodId::new(),
subscription_id: self.id,
};
let id = new_period.id;
self.billing_periods.add_new(new_period);
self.current_period_id = Some(id);
self.events.push(SubscriptionEvent::BillingPeriodStarted { period_id: id });
Idempotent::Executed(id)
}
pub fn add_line_item_to_current_billing_period(&mut self, amount: f64, description: String) -> Idempotent<usize> {
if let Some(current_id) = self.current_period_id {
if let Some(current_period) = self.billing_periods.get_persisted_mut(¤t_id) {
return current_period.add_line_item(amount, description);
}
}
Idempotent::AlreadyApplied
}
pub fn current_billing_period(&self) -> Option<&BillingPeriod> {
self.current_period_id
.and_then(|id| self.billing_periods.get_persisted(&id))
}
}
impl TryFromEvents<SubscriptionEvent> for Subscription {
fn try_from_events(events: EntityEvents<SubscriptionEvent>) -> Result<Self, EsEntityError> {
let mut builder = SubscriptionBuilder::default();
let mut current_period_id = None;
for event in events.iter_all() {
match event {
SubscriptionEvent::Initialized { id } => {
builder = builder.id(*id);
}
SubscriptionEvent::BillingPeriodStarted { period_id } => {
current_period_id = Some(*period_id);
}
}
}
builder
.current_period_id(current_period_id)
.events(events)
.build()
}
}
#[derive(Debug, Clone, Builder)]
pub struct NewSubscription {
pub id: SubscriptionId,
}
impl IntoEvents<SubscriptionEvent> for NewSubscription {
fn into_events(self) -> EntityEvents<SubscriptionEvent> {
EntityEvents::init(
self.id,
vec![SubscriptionEvent::Initialized { id: self.id }],
)
}
}
#[derive(EsRepo)]
#[es_repo(
entity = "BillingPeriod",
columns(
subscription_id(ty = "SubscriptionId", update(persist = false), parent)
)
)]
pub struct BillingPeriods {
pool: sqlx::PgPool,
}
#[derive(EsRepo)]
#[es_repo(entity = "Subscription")]
pub struct Subscriptions {
pool: sqlx::PgPool,
#[es_repo(nested)]
billing_periods: BillingPeriods,
}
impl Subscriptions {
pub fn new(pool: sqlx::PgPool) -> Self {
Self {
pool: pool.clone(),
billing_periods: BillingPeriods { pool },
}
}
}
async fn init_pool() -> anyhow::Result<sqlx::PgPool> {
let pg_con = format!("postgres://user:password@localhost:5432/pg");
Ok(sqlx::PgPool::connect(&pg_con).await?)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let subscriptions = Subscriptions::new(init_pool().await?);
// Create a new subscription
let subscription_id = SubscriptionId::new();
let new_subscription = NewSubscription { id: subscription_id };
let mut subscription = subscriptions.create(new_subscription).await?;
// Start a billing period
subscription.start_new_billing_period();
// Add some line items to the current period
subscription.add_line_item_to_current_billing_period(
100.0,
"Monthly subscription fee".to_string()
);
subscription.add_line_item_to_current_billing_period(
25.0,
"Additional service charge".to_string()
);
// Persist all changes (both parent and nested entities)
subscriptions.update(&mut subscription).await?;
// Load the subscription - nested entities are automatically loaded
let loaded = subscriptions.find_by_id(subscription_id).await?;
// Access the current billing period
if let Some(current_period) = loaded.current_billing_period() {
println!("Current period has {} line items", current_period.line_items.len());
for item in ¤t_period.line_items {
println!(" - {}: ${}", item.description, item.amount);
}
}
Ok(())
}
One thing to note is that the _in_op functions of the parent repository now require an AtomicOperation argument since we must load all the entities in a consistent snapshot:
async fn find_by_id_in_op<OP>(op: OP, id: EntityId)
where
OP: AtomicOperation;
// The version of the queries in Repositories without nested children
// cannot be used here as it would not load parent + children from a consistent snapshot.
// async fn find_by_id_in_op<'a, OP>(op: OP, id: EntityId)
// where
// OP: IntoOneTimeExecutor<'a>;
Benefits of the Nested Approach
This approach provides several key benefits:
- Type Safety: The aggregate boundary is enforced at compile time
- Atomic Updates: All changes to the aggregate are persisted together
- Automatic Loading: When you load the parent, all nested entities are loaded automatically
- Encapsulation: All access to nested entities goes through the aggregate root
- Consistency: The parent entity can enforce invariants across all its children
Performance Considerations
While nesting provides strong consistency guarantees, there are some performance implications to consider:
- Loading: All nested entities are loaded when the parent is loaded. For aggregates with many children, this could impact performance.
- Updates: All nested entities are checked for changes during updates, even if only one was modified.
- Memory: The entire aggregate is held in memory, which could be significant for large aggregates.
For these reasons, it’s important to keep aggregates small and focused on a specific consistency boundary.
When to Use Nesting
Use the nested approach when:
- You have a true invariant that spans multiple entities
- The child entities have no meaning without the parent
- You need to enforce consistency rules across the relationship
- The number of child entities is reasonably bounded
Avoid nesting when:
- The relationship is merely associative
- Child entities can exist independently
- You expect unbounded growth in the number of children
- Performance requirements dictate more granular loading/updating
Remember, as discussed in the aggregates chapter, there are often alternative designs that can avoid the need for nesting while still maintaining consistency.
Clock Module
The es_entity::clock module provides a time abstraction that works identically whether using real time or artificial time for testing. This enables deterministic testing of time-dependent logic without waiting for real time to pass.
Overview
The clock module provides:
ClockHandle- A cheap-to-clone handle for time operationsClock- Global clock access (likeUtc::now()but testable)ClockController- Control artificial time advancementArtificialClockConfig- Configure artificial clock behavior
Clock Types
Realtime Clock
Uses the system clock and tokio timers. This is the default behavior.
use es_entity::clock::ClockHandle;
let clock = ClockHandle::realtime();
let now = clock.now(); // Returns Utc::now()
Artificial Clock
Time only advances when explicitly controlled. Perfect for deterministic testing.
use es_entity::clock::{ClockHandle, ArtificialClockConfig};
// Create artificial clock with manual advancement
let (clock, ctrl) = ClockHandle::artificial(ArtificialClockConfig::manual());
let t0 = clock.now();
// Time doesn't advance on its own
assert_eq!(clock.now(), t0);
// Advance time by 1 hour
ctrl.advance(Duration::from_secs(3600)).await;
assert_eq!(clock.now(), t0 + chrono::Duration::hours(1));
Global Clock API
The Clock struct provides static methods for global clock access, similar to Utc::now():
use es_entity::clock::{Clock, ArtificialClockConfig};
// For testing: install artificial clock (returns controller)
let ctrl = Clock::install_artificial(ArtificialClockConfig::manual());
// Get current time (works with both artificial and real time)
let now = Clock::now();
// Check if artificial clock is installed
if Clock::is_artificial() {
// We're in test mode with controlled time
}
// Sleep and timeout also use the global clock
Clock::sleep(Duration::from_secs(60)).await;
Clock::timeout(Duration::from_secs(5), some_future).await;
Lazy Initialization
If you call Clock::now() without installing an artificial clock, it lazily initializes to realtime mode. This means production code can use Clock::now() without any setup.
ArtificialClockConfig
Configure how the artificial clock behaves:
use es_entity::clock::{ArtificialClockConfig, ArtificialMode};
use chrono::Utc;
// Manual mode - time only advances via controller.advance()
let config = ArtificialClockConfig::manual();
// Auto mode - time advances automatically at 1000x speed
let config = ArtificialClockConfig::auto(1000.0);
// Start at a specific time
let config = ArtificialClockConfig {
start_at: Utc::now() - chrono::Duration::days(30),
mode: ArtificialMode::Manual,
};
ClockController
The controller is returned when creating an artificial clock and provides:
// Advance time by duration (wakes sleeping tasks in order)
ctrl.advance(Duration::from_secs(3600)).await;
// Advance to next pending wake event
let wake_time = ctrl.advance_to_next_wake().await;
// Set time directly (doesn't process intermediate wakes)
ctrl.set_time(some_datetime);
// Get current time
let now = ctrl.now();
// Check pending sleep count
let count = ctrl.pending_wake_count();
// Transition to realtime mode
ctrl.transition_to_realtime();
Integration with DbOp
When a global artificial clock is installed, database operations automatically use it:
use es_entity::clock::{Clock, ArtificialClockConfig};
// Install artificial clock for testing
let ctrl = Clock::install_artificial(ArtificialClockConfig::manual());
// DbOp::init() now caches the artificial time
let op = DbOp::init(&pool).await?;
// with_clock_time() uses the operation's clock
let op_with_time = op.with_clock_time();
// with_db_time() uses artificial time instead of SELECT NOW()
let op_with_time = op.with_db_time().await?;
This ensures all operations within a transaction use consistent, controlled time.
Explicit Clock Injection
For more control, you can inject a specific clock into database operations without modifying global state. This is useful when you want isolated clocks per test or need different clocks for different operations:
use es_entity::clock::{ClockHandle, ArtificialClockConfig};
// Create an artificial clock (not installed globally)
let (clock, ctrl) = ClockHandle::artificial(ArtificialClockConfig::manual());
// Pass the clock explicitly to DbOp
let op = DbOp::init_with_clock(&pool, &clock).await?;
// The operation uses this clock for time operations
let op_with_time = op.with_clock_time();
Repositories generated with #[derive(EsRepo)] also support this pattern:
// Using the repo's begin_op_with_clock method
let mut op = users.begin_op_with_clock(&clock).await?;
// Create entity - recorded_at will use the artificial clock's time
let user = users.create_in_op(&mut op, new_user).await?;
op.commit().await?;
This approach avoids global state and allows each test to have its own independent clock, preventing test interference.
Clock Field in Repository
For an even cleaner API, you can add a clock field to your repository struct. The macro supports two patterns:
Optional Clock Field
Use Option<ClockHandle> when you want the same repo type to work both with and without a custom clock:
use es_entity::{clock::ClockHandle, EsRepo};
use sqlx::PgPool;
#[derive(EsRepo)]
#[es_repo(entity = "User")]
pub struct Users {
pool: PgPool,
clock: Option<ClockHandle>, // Optional: use if Some, fallback to global
}
impl Users {
// Production: no clock, uses global
pub fn new(pool: PgPool) -> Self {
Self { pool, clock: None }
}
// Testing: with artificial clock
pub fn with_clock(pool: PgPool, clock: ClockHandle) -> Self {
Self { pool, clock: Some(clock) }
}
}
Usage:
// Production code - uses global clock
let users = Users::new(pool);
let user = users.create(new_user).await?;
// Test code - uses artificial clock
let (clock, ctrl) = ClockHandle::artificial(ArtificialClockConfig::manual());
let users = Users::with_clock(pool, clock);
let user = users.create(new_user).await?; // Uses artificial clock!
Required Clock Field
Use ClockHandle (non-optional) when you always want to inject a clock:
#[derive(EsRepo)]
#[es_repo(entity = "User")]
pub struct Users {
pool: PgPool,
clock: ClockHandle, // Required: always use this clock
}
impl Users {
pub fn new(pool: PgPool, clock: ClockHandle) -> Self {
Self { pool, clock }
}
}
This is useful when you want to enforce clock injection at construction time, making the dependency explicit.
Field Detection
The macro detects a field named clock (or marked with #[es_repo(clock)]) and generates the appropriate begin_op() implementation:
Option<ClockHandle>: Uses the clock ifSome, falls back to global clock ifNoneClockHandle: Always uses the injected clock- No clock field: Always uses the global clock
Example: Testing Time-Dependent Logic
use es_entity::clock::{Clock, ArtificialClockConfig};
use std::time::Duration;
#[tokio::test]
async fn test_subscription_expiry() {
// Install artificial clock starting 30 days ago
let start = Utc::now() - chrono::Duration::days(30);
let ctrl = Clock::install_artificial(ArtificialClockConfig {
start_at: start,
mode: ArtificialMode::Manual,
});
// Create subscription that expires in 7 days
let subscription = create_subscription_expiring_in(7).await;
// Not expired yet
assert!(!subscription.is_expired(Clock::now()));
// Advance 8 days
ctrl.advance(Duration::from_secs(8 * 86400)).await;
// Now expired
assert!(subscription.is_expired(Clock::now()));
}
Best Practices
-
Use
Clock::now()instead ofUtc::now()- This makes your code testable with artificial time. -
Install artificial clock early in tests - Call
Clock::install_artificial()before any code that uses time. -
Use manual mode for deterministic tests - Auto mode is useful for simulations but manual mode gives you full control.
-
Advance time explicitly - In tests, use
ctrl.advance()to move time forward in a controlled way. -
Check
is_artificial()sparingly - Most code shouldn’t need to know if time is artificial; it should just useClock::now().