Introduction
Welcome to the ES Entity documentation!
ES Entity is an opinionated rust library for persisting Event Sourced entities to PostgreSQL.
It promotes decoupling your domain code from persistence details by putting all the mapping logic onto Repository
structs.
Almost all the generated queries are verified at compile time by sqlx
under the hood to give strong type-safe guarantees.
The main traits that must to be derived are EsEvent
and EsEntity
so that they can be used by the EsRepo
macro that generates all the persistence and query fns.
This book will explain how to use this library effectively as well as provide a general introduction on how to use Event Sourcing to persist the state of your domain entities.
Quickstart
This is the - just show me the damn code - section. Its an end to end working example without any background. On first reading its probably best to skip this and dive into the concepts first. When you are ready to start trying things out in your own implementation you can use this as a template with all the pieces in one place.
Example
Let's assume there is a User
entity in your domain that you wish to persist using EsEntity
.
The first thing you will need is 2 tables in postgres. These are referred to as the 'index table' and the 'events table'.
By convention they look like this:
$ cargo sqlx migrate add users
$ cat migrations/*_users.sql
-- The 'index' table that holds the latest values of some selected attributes.
CREATE TABLE users (
-- Mandatory id column
id UUID PRIMARY KEY,
-- Mandatory created_at column
created_at TIMESTAMPTZ NOT NULL,
-- Any other columns you want a quick 'index-based' lookup
name VARCHAR UNIQUE
);
-- The table that actually stores the events sequenced per entity.
-- This table has the same columns for every entity you create
-- by convention named `<entity>_events`.
CREATE TABLE user_events (
id UUID NOT NULL REFERENCES users(id),
sequence INT NOT NULL,
event_type VARCHAR NOT NULL,
event JSONB NOT NULL,
context JSONB DEFAULT NULL,
recorded_at TIMESTAMPTZ NOT NULL,
UNIQUE(id, sequence)
);
To persist the entity we need to setup a pattern with 5 components:
- The
EntityId
- The
EntityEvent
- The
NewEntity
- The
Entity
itself - And finally the
Repository
that encodes the mapping.
Here's a complete working example:
[dependencies]
es-entity = "0.6.10"
sqlx = "0.8.3" # Needs to be in scope for entity_id! macro
serde = { version = "1.0.219", features = ["derive"] } # To serialize the `EntityEvent`
derive_builder = "0.20.1" # For hydrating and building the entity state (optional)
extern crate es_entity; extern crate sqlx; extern crate serde; extern crate tokio; extern crate anyhow; extern crate derive_builder; use derive_builder::Builder; use serde::{Deserialize, Serialize}; use es_entity::*; // Will create a uuid::Uuid wrapper type. // But any type can act as the ID that fulfills: // Clone + PartialEq + Eq + std::hash::Hash + Send + Sync // + sqlx::Type<sqlx::Postgres> es_entity::entity_id!{ UserId } // The `EsEvent` must have `serde(tag = "type")` annotation. #[derive(EsEvent, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] // Tell the macro what the `id` type is #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, NameUpdated { name: String }, } // The `EsEntity` - using derive_builder is optional // but useful for hydrating in the `TryFromEvents` trait. #[derive(EsEntity, Builder)] #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] pub struct User { pub id: UserId, pub name: String, // The `events` container - mandatory field. // Basically its a `Vec` wrapper with some ES specific augmentation. events: EntityEvents<UserEvent>, } impl User { // Mutation to update the name of a user. pub fn update_name(&mut self, new_name: impl Into<String>) -> Idempotent<()> { let new_name = new_name.into(); // The idempotency_guard macro is a helper to return quickly // if a mutation has already been applied. // It is not mandatory but very useful in the context of distributed / multi-thread // systems to protect against replays. idempotency_guard!( self.events.iter_all().rev(), // If this pattern matches return Idempotent::Ignored UserEvent::NameUpdated { name } if name == &new_name, // Stop searching here => UserEvent::NameUpdated { .. } ); self.name = new_name.clone(); self.events.push(UserEvent::NameUpdated { name: new_name }); Idempotent::Executed(()) } } // Any EsEntity must implement `TryFromEvents`. // This trait is what hydrates entities after loading the events from the database impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { let mut builder = UserBuilder::default(); for event in events.iter_all() { match event { UserEvent::Initialized { id, name } => { builder = builder.id(*id).name(name.clone()); } UserEvent::NameUpdated { name } => { builder = builder.name(name.clone()); } } } builder.events(events).build() } } // The `NewEntity` - this represents the data of an entity in a pre-persisted state. // Using derive_builder is not mandatory - any type can be used for the `NewEntity` state. #[derive(Debug, Builder)] pub struct NewUser { #[builder(setter(into))] pub id: UserId, #[builder(setter(into))] pub name: String, } impl NewUser { pub fn builder() -> NewUserBuilder { NewUserBuilder::default() } } // The `NewEntity` type must implement `IntoEvents` to get the initial events that require persisting. impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { EntityEvents::init( self.id, [UserEvent::Initialized { id: self.id, name: self.name, }], ) } } // The `EsRepo` that will host all the persistence operations. #[derive(EsRepo, Debug)] #[es_repo( entity = "User", // Configure the columns that need populating in the index table columns( // The 'name' column name( // The rust type of the name attribute ty = "String" )))] pub struct Users { // Mandatory field so that the Repository can begin transactions pool: sqlx::PgPool, } #[tokio::main] async fn main() -> anyhow::Result<()> { // Connect to postgres let pg_con = format!("postgres://user:password@localhost:5432/pg"); let pool = sqlx::PgPool::connect(&pg_con).await?; let users = Users { pool }; let new_user = NewUser::builder() .id(UserId::new()) .name("Frank") .build() .unwrap(); // The returned type is the hydrated Entity let mut user = users.create(new_user).await?; // Using the Idempotency::did_execute() to check if we need a DB roundtrip if user.update_name("Dweezil").did_execute() { users.update(&mut user).await?; } let loaded_user = users.find_by_id(user.id).await?; assert_eq!(user.name, loaded_user.name); Ok(()) }
Event Sourcing
Event sourcing is a software design pattern where state changes are stored as a sequence of events rather than as snapshots that get updated in place.
Instead of updating a database record directly, every change is recorded as an immutable event (e.g., UserCreated, EmailChanged, AccountDeactivated) which gets inserted as a row in a table. The current state is rebuilt by replaying these events in order.
One thing to note is that in es-entity
the events are scoped to a specific type of Entity
.
Thus they are (by convention) not all written to the same global events table like in some Event Sourcing approaches.
Rather each Entity
-type gets its own events
table - though it is possible to use a global table if desired.
Further the events are strictly ordered on a per-entity basis - there are no ordering guarantees across entities
.
This can be interpreted as the Entity
-type representing a topic
and the EntityId
playing the role of the PartitionKey
that exists in some pub-sub / event-store systems.
Entity Pattern
In the Software Engineering community the term Entity
can refer to many different things.
In the context of es-entity
it is generally meant in the sense put forward by Domain Driven Design.
Strict adherence to DDD is not mandatory to use es-entity
but there are a lot of benefits to be had by following these principles.
In DDD entities serve the following purpose:
- execute commands that
- execute business logic
- enforce domain invariants
- mutate state
- record events (in the context of Event Sourcing)
- supply queries that expose some of the entities state
They often host the most critical code in your application where correctness is of upmost importance.
Ideally they are unit-testable and thus should not be overly coupled to the persistence layer (as they generally are when using just about any ORM library / framework).
The design of es-entity
is very deliberate in not getting in the way of testability of your Entities
.
Each Entity
type used in es-entity
must have:
- an
EntityId
type - an
EntityEvent
type - a
NewEntity
type - the
Entity
type itself
Entity Event
In es-entity
it is assumed that an Entity
has an associated EntityEvent
enum
that represents all of the state changes (ie. types of events
) that can originate from mutations of said Entity
.
This enum
must be serializable and is stored as a JSON
-blob in the associated events
table.
#![allow(unused)] fn main() { extern crate es_entity; extern crate sqlx; extern crate serde; use serde::{Deserialize, Serialize}; use es_entity::*; // Entities must always have an associated id type type UserId = String; // es_entity::entity_id! { UserId }; Can be used to create a Uuid wrapper struct. #[derive(EsEvent, Debug, Serialize, Deserialize)] // The `EsEvent` must have `serde(tag = "type")` annotation. #[serde(tag = "type", rename_all = "snake_case")] // Tell the macro what the `id` type is #[es_event(id = "UserId")] pub enum UserEvent { // Typically there is a 'first' event that records the initial state of an `Entity`. Initialized { id: UserId, name: String }, // Every mutation should result in an `Event` that represents the // change that happened. // This event represents that the `name` attribute of a user was updated. NameUpdated { name: String }, } }
New Entity
The NewEntity
type represents the data of the Entity
in a pre-persisted state.
It gets passed to the Repository::create
function where the IntoEvents
trait emits the initial EntityEvent
s which are then persisted and used to hydrate the actual Entity
.
It is also recommended that any validation of the initial attributes is performed on this type to ensure that the Entity
will be in a legal state once it gets hydrated for the first time.
Using the derive_builder crate is recommended for this purpose.
#![allow(unused)] fn main() { extern crate es_entity; extern crate sqlx; extern crate serde; extern crate derive_builder; use serde::{Deserialize, Serialize}; use es_entity::*; use derive_builder::Builder; es_entity::entity_id! { UserId }; #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } const MAX_NAME_LENGTH: usize = 100; #[derive(Debug, Builder)] // Specify the `validation` fn #[builder(build_fn(validate = "Self::validate"))] pub struct NewUser { #[builder(setter(into))] pub id: UserId, #[builder(setter(into))] pub name: String, } impl NewUser { pub fn builder() -> NewUserBuilder { NewUserBuilder::default() } } impl NewUserBuilder { // Execute some validation that ensures the initial `Entity` is legal fn validate(&self) -> Result<(), String> { if self.name.as_ref().expect("name wasn't set").len() > MAX_NAME_LENGTH { return Err("Name length exceeded".to_string()); } Ok(()) } } impl IntoEvents<UserEvent> for NewUser { // This `fn` returns the first `Events` of the `Entity` fn into_events(self) -> EntityEvents<UserEvent> { EntityEvents::init( self.id, [UserEvent::Initialized { id: self.id, name: self.name, }], ) } } #[cfg(test)] mod tests { #[test] fn user_creation() { let new_user = NewUser::builder().id("user-id").name("Steven").build(); assert!(new_user.is_ok()); } } }
Entity Type
The Entity
type is a struct that holds the events: EntityEvents<EntityEvent>
field.
Mutations of the entity append events to this collection.
The Entity
is (re-)constructed from events via the TryFromEvents
trait.
Other than the events
field additional fields can be exposed and populated during hydration
.
#![allow(unused)] fn main() { extern crate es_entity; extern crate sqlx; extern crate serde; extern crate derive_builder; use derive_builder::Builder; use serde::{Deserialize, Serialize}; use es_entity::*; es_entity::entity_id! { UserId }; #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, NameUpdated { name: String }, } pub struct NewUser { id: UserId, name: String } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { EntityEvents::init( self.id, [UserEvent::Initialized { id: self.id, name: self.name, }], ) } } // Using derive_builder is optional but useful for hydrating // in the `TryFromEvents` trait. #[derive(EsEntity, Builder)] #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] pub struct User { pub id: UserId, pub name: String, // The `events` container - mandatory field. // Basically its a `Vec` wrapper with some ES specific augmentation. events: EntityEvents<UserEvent>, // Marker if you use a name other than `events`. // #[es_entity(events)] // different_name_for_events_field: EntityEvents<UserEvent> } impl User { pub fn update_name(&mut self, new_name: impl Into<String>) -> Idempotent<()> { let new_name = new_name.into(); // The idempotency_guard macro is a helper to return quickly // if a mutation has already been applied. // It is not mandatory but very useful in the context of distributed / multi-thread // systems to protect against replays. idempotency_guard!( self.events.iter_all().rev(), // If this pattern matches return Idempotent::Ignored UserEvent::NameUpdated { name } if name == &new_name, // Stop searching here => UserEvent::NameUpdated { .. } ); self.name = new_name.clone(); self.events.push(UserEvent::NameUpdated { name: new_name }); Idempotent::Executed(()) } } // Any EsEntity must implement `TryFromEvents`. // This trait is what hydrates entities after loading the events from the database impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { let mut builder = UserBuilder::default(); for event in events.iter_all() { match event { UserEvent::Initialized { id, name } => { builder = builder.id(*id).name(name.clone()); } UserEvent::NameUpdated { name } => { builder = builder.name(name.clone()); } } } builder.events(events).build() } } #[cfg(test)] mod tests { fn fresh_user() -> User { let id = UserId::new(); let initial_events = EntityEvents::init( id, [UserEvent::Initialized { id, name: "Willson" }], ); User::try_from_events(initial_events).expect("Could not create user"); } #[test] fn update_name() { let mut user = fresh_user(); // There are no new events to persist after hydrating assert!(!user.events.any_new()); let new_name = "Gavin".to_string(); user.update_name(new_name.clone()).unwrap(); assert_eq!(user.name, new_name); // Mutations are expected to append events assert!(user.events.any_new()); } } }
Idempotency
Idempotency means that performing the same operation multiple times has the same effect as doing it once.
It’s used to ensure that retrying a request doesn’t cause unintended side effects, such as duplicated Event
s being persisted.
It is particularly useful in the context of a distributed system where operations could be triggered from an asynchronous event queue (ie pub-sub).
Whenever you would like to have an exactly-once
processing guarantee - you can easily achieve an effectively-once
processing by ensuring your mutations are all idempotent.
Making your Entity
mutations idempotent is very simple when doing Event Sourcing as you can easily check if the event you are about to append already exists in the history.
Example
To see the issue in action - lets consider the update_name
mutation without an idempotency check.
#![allow(unused)] fn main() { pub enum UserEvent { Initialized { id: u64, name: String }, NameUpdated { name: String }, } pub struct User { events: Vec<UserEvent> } impl User { pub fn update_name(&mut self, new_name: impl Into<String>) { let name = new_name.into(); self.events.push(UserEvent::NameUpdated { name }); } } }
In the above code we could easily record redundant events by calling the update_name
mutation multiple times with the same input.
pub enum UserEvent { Initialized { id: u64, name: String }, NameUpdated { name: String }, } pub struct User { events: Vec<UserEvent> } impl User { pub fn update_name(&mut self, new_name: impl Into<String>) { let name = new_name.into(); self.events.push(UserEvent::NameUpdated { name }); } } fn main() { let mut user = User { events: vec![] }; user.update_name("Harrison"); // Causes a redundant event to be appended user.update_name("Harrison"); assert_eq!(user.events.len(), 2); }
To prevent this we can iterate through the events to check if it has already been applied:
pub enum UserEvent { Initialized { id: u64, name: String }, NameUpdated { name: String }, } pub struct User { events: Vec<UserEvent> } impl User { pub fn update_name(&mut self, new_name: impl Into<String>) { let name = new_name.into(); for event in self.events.iter().rev() { match event { UserEvent::NameUpdated { name: existing_name } if existing_name == &name => { return; } _ => () } } self.events.push(UserEvent::NameUpdated { name }); } } fn main() { let mut user = User { events: vec![] }; user.update_name("Harrison"); // This update will be ignored user.update_name("Harrison"); assert_eq!(user.events.len(), 1); }
But now we just silently ignore the operation.
Better would be to signal back to the caller whether or not the operation was applied.
For that we use the Idempotent
type:
extern crate es_entity; pub enum UserEvent { Initialized { id: u64, name: String }, NameUpdated { name: String }, } pub struct User { events: Vec<UserEvent> } use es_entity::Idempotent; // #[must_use] // pub enum Idempotent<T> { // Executed(T), // Ignored, // } impl User { pub fn update_name(&mut self, new_name: impl Into<String>) -> Idempotent<()>{ let name = new_name.into(); for event in self.events.iter().rev() { match event { UserEvent::NameUpdated { name: existing_name } if existing_name == &name => { return Idempotent::Ignored; } _ => () } } self.events.push(UserEvent::NameUpdated { name }); Idempotent::Executed(()) } } fn main() { let mut user = User { events: vec![] }; assert!(user.update_name("Harrison").did_execute()); assert!(user.update_name("Harrison").was_ignored()); }
To cut down on boilerplate this pattern of iterating the events to check if an event was already applied has been encoded into the idempotency_guard!
macro:
extern crate es_entity; pub enum UserEvent { Initialized { id: u64, name: String }, NameUpdated { name: String }, } pub struct User { events: Vec<UserEvent> } use es_entity::{idempotency_guard, Idempotent}; impl User { pub fn update_name(&mut self, new_name: impl Into<String>) -> Idempotent<()>{ let name = new_name.into(); idempotency_guard!( // The iterator of events self.events.iter().rev(), // The pattern match to check whether an operation was already applied UserEvent::NameUpdated { name: existing_name } if existing_name == &name ); self.events.push(UserEvent::NameUpdated { name }); Idempotent::Executed(()) } } fn main() { let mut user = User { events: vec![] }; assert!(user.update_name("Harrison").did_execute()); assert!(user.update_name("Harrison").was_ignored()); }
Finally there is the case where an operation was applied in the past - but it is still legal to re-apply it. Like changing a name back to what it originally was:
extern crate es_entity; pub enum UserEvent { Initialized { id: u64, name: String }, NameUpdated { name: String }, } pub struct User { events: Vec<UserEvent> } use es_entity::{idempotency_guard, Idempotent}; impl User { pub fn update_name(&mut self, new_name: impl Into<String>) -> Idempotent<()>{ let name = new_name.into(); idempotency_guard!( self.events.iter().rev(), UserEvent::NameUpdated { name: existing_name } if existing_name == &name, // The `=>` signifies the pattern where to stop the iteration. => UserEvent::NameUpdated { .. } ); self.events.push(UserEvent::NameUpdated { name }); Idempotent::Executed(()) } } fn main() { let mut user = User { events: vec![] }; assert!(user.update_name("Harrison").did_execute()); assert!(user.update_name("Colin").did_execute()); assert!(user.update_name("Harrison").did_execute()); }
Without the =>
argument the second call of assert!(user.update_name("Harrison").did_execute());
would fail.
Repository
In the context of es-entity
a Repository
is a struct
that hosts all operations performed with the database.
Thus it is responsible for all CRUD style interactions with the persistence layer.
The EsRepo
macro generates functions such as:
create
update
find_by_id
list_by_id
- etc.
that hide away the complexity of querying and hydrating entities who's state is represented in an Event Sourced way.
Under the hood the es_query!
helper macro (which only works within fn
s inside EsRepo
structs) handles loading the events while enabling you to write a 'normal' looking SQL query against the index
table.
The following sections will take a deep dive in how this design came to be.
Database Tables
In es-entity
every Entity
gets 2 tables - the index
and the events
table.
This section will explain the rational behind that.
The Events Table
The events table always has the same fields:
CREATE TABLE user_events (
-- The entity id
id UUID NOT NULL REFERENCES users(id),
-- The sequence number of the event
-- Starting at 1 and incrementing by 1 each event
sequence INT NOT NULL,
-- The 'type' of the event (corresponding to the enum variant)
event_type VARCHAR NOT NULL,
-- The event data serialized as a JSON blob
event JSONB NOT NULL,
-- The 'event context'
-- additional metadata that can be collected out of band
-- only populated if 'event_context' attribute is set on the EsEvent
context JSONB DEFAULT NULL,
-- The time the event was recorded
recorded_at TIMESTAMPTZ NOT NULL,
-- Unique constraint to ensure there are no duplicate sequence numbers
UNIQUE(id, sequence)
);
In fact we could persist all events to a global table with that schema but partitioning the events per Entity
gives us some benefits when querying (like read performance and referential integrity).
Intuitively you might think this is all we need as we can very easily query all the events for a specific Entity
:
SELECT * FROM user_events WHERE id = $1 ORDER BY sequence
This is correct if we know the id
of the Entity
we are looking up.
But it becomes a lot more tricky when we want to do a lookup on a non-id field.
Assuming the Event
-enum
looks like this:
#![allow(unused)] fn main() { pub enum UserEvent { Initialized { id: u64, name: String }, NameUpdated { name: String }, EmailUpdated { email: String }, } }
and we want to lookup a user by email, the query would quickly become a lot more complicated. Lets consider the naive query:
SELECT * FROM user_events WHERE event->>'email' = $1;
This doesn't work as it only gets a single event - but we want all events for that Entity
.
SELECT *
FROM user_events
WHERE id = (
SELECT id
FROM user_events
WHERE event->>'email' = $1
LIMIT 1
)
ORDER BY sequence;
This also doesn't work because perhaps the event that was found wasn't the latest EmailUpdated
event in the User
s history.
But we want to get the user who's email is currently $1
.
So it could find some false positives.
When iterating with ChatGPT the next suggestion is:
WITH latest_email_updates AS (
SELECT id, MAX(sequence) AS max_sequence
FROM user_events
WHERE event_type = 'email_updated'
GROUP BY id
),
latest_emails AS (
SELECT e.id, e.event->>'email' AS email
FROM user_events e
JOIN latest_email_updates leu
ON e.id = leu.id AND e.sequence = leu.max_sequence
WHERE e.event_type = 'email_updated'
),
target_user AS (
SELECT id
FROM latest_emails
WHERE email = $1
)
SELECT *
FROM user_events
WHERE id = (SELECT id FROM target_user)
ORDER BY sequence;
This query might execute what we want but it still has issues.
The worst one being that we are leaking a lot of domain knowledge into the query.
Specifically the presence and shape of the EmailUpdated
event is encoded into the query.
Preferably the specifics of the Event
-schemas would only need to be known on the domain side encoded in the EntityEvent
enum.
Also the whole query is quite inefficient.
Sure we could add an index on the event->>'email'
field but that would introduce more implicit coupling.
Also what if we wanted something like a UNIQUE
constraint on the email - but still allow emails swapped multiple times.
The Index Table
Enter the index
table.
The index
-table is a table that hosts 1 row per Entity
with the columns populated by the latest values.
In that sense it looks very similar to a table that might hold the entire state of the Entity
in a typical update-in-place
persistence strategy.
The difference is that we only include columns that we want to index
for fast lookup or some kind of constraint like UNIQUE
or REFERENCES
.
In that sense it is purely an optimization and does not represent the entire state of the Entity
- for that you must load all the events.
CREATE TABLE users (
id UUID PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL,
email VARCHAR UNIQUE
);
Now the query simplifies to:
WITH target_entity AS (
SELECT id
FROM users
WHERE email = $1
)
SELECT e.*
FROM user_events e
JOIN target_entity te ON e.id = te.id
ORDER BY e.sequence;
As a result the query is much simpler and we are no longer leaking any domain information. We just have to ensure the index table gets updated atomically as we append the events to the events table.
es_query
The es_query!
macro is a helper that allows you only to query the index
table without needing to join with the events
table.
The expansion of es_query!
results in a call to the sqlx::query_as!
macro - which means that you still get typesafety and compile time column validation.
Given the query we arrived at in the previous section - this is what a find_by_name
fn
could look like:
extern crate es_entity; extern crate sqlx; extern crate serde; fn main () {} use serde::{Deserialize, Serialize}; use es_entity::*; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } pub struct NewUser { id: UserId, name: String } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { unimplemented!() } } #[derive(EsEntity)] pub struct User { pub id: UserId, events: EntityEvents<UserEvent>, } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { unimplemented!() } } use sqlx::PgPool; use es_entity::*; pub struct Users { pool: PgPool } impl Users { pub async fn find_by_name(&self, name: String) -> Result<User, EsRepoError> { let rows = sqlx::query_as!( GenericEvent::<UserId>, r#" WITH target_entity AS ( SELECT id FROM users WHERE name = $1 ) SELECT e.id as entity_id, e.sequence, e.event, e.context as "context: ContextData", e.recorded_at FROM user_events e JOIN target_entity te ON e.id = te.id ORDER BY e.sequence; "#, name, ) .fetch_all(&self.pool) .await?; Ok(EntityEvents::load_first(rows)?) } }
The es_query!
macro removes the boilerplate of fetching the events and lets you just write the part that queries the index
table:
SELECT id FROM users WHERE name = $1
On expansion it constructs the complete query (adding the JOIN
with the events
table) and hydrates the entities from the events.
This simplifies the above implementation into:
extern crate es_entity; extern crate sqlx; extern crate serde; fn main () {} use serde::{Deserialize, Serialize}; use es_entity::*; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } pub struct NewUser { id: UserId, name: String } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { unimplemented!() } } #[derive(EsEntity)] pub struct User { pub id: UserId, events: EntityEvents<UserEvent>, } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { unimplemented!() } } use sqlx::PgPool; use es_entity::*; #[derive(EsRepo)] #[es_repo(entity = "User")] pub struct Users { pool: PgPool } impl Users { pub async fn find_by_name(&self, name: String) -> Result<User, EsRepoError> { es_query!( "SELECT id FROM users WHERE name = $1", name ).fetch_one(&self.pool).await } }
The es_query!
macro only works within fn
s defined on structs with EsRepo
derived.
The functions intend to mimic the sqlx
interface but instead of returning rows they return fully hydrated entities:
async fn fetch_one(<executor>) -> Result<Entity, Repo::Err>
async fn fetch_optional(<executor) -> Result<Option<Entity>, Repo::Err>
// The `(_, bool)` signifies whether or not the query could have fetched more or the list is exhausted:
async fn fetch_n(<executor>, n) -> Result<(Vec<Entity>, bool), Repo::Err>
EsRepo
Deriving the EsRepo
macro on a struct will generate a bunch of CRUD fns
(and some additional supporting structs) to interact with the persistence layer.
For this to work the Entity
you intend to persist / load must be setup as described in the Entity
section (with an Event
, Id
, NewEntity
and Entity
) type.
As a minimum you must specify the entity
attribute and have a field that holds a PgPool
type.
extern crate es_entity; extern crate sqlx; extern crate serde; fn main () {} use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } pub struct NewUser { id: UserId, name: String } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { unimplemented!() } } #[derive(EsEntity)] pub struct User { pub id: UserId, events: EntityEvents<UserEvent>, } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { unimplemented!() } } use es_entity::*; #[derive(EsRepo)] #[es_repo( entity = "User", // Defaults that get derived if not explicitly configured: // id = "UserId", // The type of the `id` // new = "NewUser", // The type of the `NewEntity` // event = "UserEvent", // The type of the `Event` enum // err = "EsRepoError", // The Error type that should be returned from all fns. // tbl = "users", // The name of the index table // events_tbl = "user_events", // The name of the events table // tbl_prefix = "", // A table prefix that should be added to the derived table names // Columns specify a list of attributes that get mapped to the index table: // columns( // The id column is always mapped - no need to specify it // id(ty = "UserId", list_by) // ) )] pub struct Users { pool: sqlx::PgPool // Marker if you use a name other than `pool`. // #[es_entity(pool)] // different_name_for_pool: sqlx::PgPool }
There are a number of options that can be passed to es_repo
to modify the behaviour or type of functions it generates.
The most important of which is the columns
option that configures the mapping from entity attributes to index table columns.
extern crate es_entity; extern crate sqlx; extern crate serde; fn main () {} use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { unimplemented!() } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { unimplemented!() } } use es_entity::*; pub struct NewUser { id: UserId, name: String } #[derive(EsEntity)] pub struct User { pub id: UserId, name: String, events: EntityEvents<UserEvent>, } #[derive(EsRepo)] #[es_repo( entity = "User", columns( // Declares that there is a `name` column on the `index` table. // The rust type for it is `String`. // Without further configuration `EsRepo` will assume both the `NewEntity` // and the `Entity` types have an accessible `.name` attribute // for populating and updating the index table. name = "String", // The above is equivalent to the more explicit notation: // name(ty = "String") ) )] pub struct Users { pool: sqlx::PgPool }
Take a look at the next sections to see more information on how the options modify the generated code.
fn create
The create
fn takes a NewEntity
type and returns an Entity
type.
It INSERT
s a row into the index
table and persists all the events returned from the IntoEvents::into_events
fn
in the events
table.
We must use the columns
option to specify which columns need inserting into the index
table
In the code below we want to include a name
column in the index
table that requires mapping.
extern crate es_entity; extern crate sqlx; extern crate serde; extern crate tokio; extern crate anyhow; use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { EntityEvents::init( self.id, [UserEvent::Initialized { id: self.id, name: self.name, }], ) } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { Ok(User { id: events.id().clone(), name: "Fred".to_string(), events }) } } use es_entity::*; pub struct NewUser { id: UserId, // The `name` attribute on the `NewEntity` must be accessible // for inserting into the `index` table. name: String } #[derive(EsEntity)] pub struct User { pub id: UserId, // The name attribute on the `Entity` must be accessible // for updates of the `index` table. name: String, events: EntityEvents<UserEvent>, } #[derive(EsRepo)] #[es_repo(entity = "User", columns(name = "String"))] pub struct Users { pool: sqlx::PgPool } async fn init_pool() -> anyhow::Result<sqlx::PgPool> { let pg_con = format!("postgres://user:password@localhost:5432/pg"); Ok(sqlx::PgPool::connect(&pg_con).await?) } #[tokio::main] async fn main() -> anyhow::Result<()> { let users = Users { pool: init_pool().await? }; let new_user = NewUser { id: UserId::new(), name: "Fred".to_string() }; // The `create` fn takes a `NewEntity` and returns a persisted and hydrated `Entity` let _user = users.create(new_user).await?; Ok(()) }
The insert part of the create
function looks somewhat equivalent to:
impl Users {
pub async fn create(
&self,
new_entity: NewUser
) -> Result<User, es_entity::EsRepoError> {
let id = &new_entity.id;
// The attribute specified in the `columns` option
let name = &new_entity.name;
sqlx::query!("INSERT INTO users (id, name) VALUES ($1, $2)",
id as &UserId,
name as &String
)
.execute(self.pool())
.await?;
// persist events
// hydrate entity
// execute post_persist_hook
// return entity
}
}
The key thing to configure is how the columns of the index table get populated via the create
option.
The create(accessor = "<>")
option modifies how the field is accessed on the NewEntity
type.
extern crate es_entity; extern crate sqlx; extern crate serde; fn main () {} use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { unimplemented!() } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { unimplemented!() } } #[derive(EsEntity)] pub struct User { pub id: UserId, name: String, events: EntityEvents<UserEvent>, } use es_entity::*; pub struct NewUser { id: UserId, some_hidden_field: String } impl NewUser { fn my_name(&self) -> String { self.some_hidden_field.clone() } } #[derive(EsRepo)] #[es_repo( entity = "User", columns( // Instead of using the `name` field on the `NewEntity` struct // the generated code will use: `new_entity.my_name()` // to populate the `name` column. name(ty = "String", create(accessor = "my_name()")), ) )] pub struct Users { pool: sqlx::PgPool }
The create(persist = false)
option omits inserting the column during creation.
This is useful for dynamic values that don't become known until later on in the entities lifecycle.
extern crate es_entity; extern crate sqlx; extern crate serde; fn main () {} use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { unimplemented!() } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { unimplemented!() } } #[derive(EsEntity)] pub struct User { pub id: UserId, name: String, events: EntityEvents<UserEvent>, } use es_entity::*; // There is no `name` attribute because we do not initially insert into this column. pub struct NewUser { id: UserId } #[derive(EsRepo)] #[es_repo( entity = "User", columns( name(ty = "String", create(persist = false)), ) )] pub struct Users { pool: sqlx::PgPool }
fn create_all
The create_all
function is a batch version of create
.
It takes a Vec<NewEntity>
and returns Vec<Entity>
.
extern crate es_entity; extern crate sqlx; extern crate serde; extern crate tokio; extern crate anyhow; use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { EntityEvents::init( self.id, [UserEvent::Initialized { id: self.id, name: self.name, }], ) } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { Ok(User { id: events.id().clone(), name: "Fred".to_string(), events }) } } use es_entity::*; pub struct NewUser { id: UserId, name: String } #[derive(EsEntity)] pub struct User { pub id: UserId, name: String, events: EntityEvents<UserEvent>, } #[derive(EsRepo)] #[es_repo(entity = "User", columns(name = "String"))] pub struct Users { pool: sqlx::PgPool } async fn init_pool() -> anyhow::Result<sqlx::PgPool> { let pg_con = format!("postgres://user:password@localhost:5432/pg"); Ok(sqlx::PgPool::connect(&pg_con).await?) } #[tokio::main] async fn main() -> anyhow::Result<()> { let users = Users { pool: init_pool().await? }; let new_users = vec![ NewUser { id: UserId::new(), name: "James".to_string() }, NewUser { id: UserId::new(), name: "Roger".to_string() } ]; let users = users.create_all(new_users).await?; Ok(()) }
fn update
The update
fn takes a mutable reference to an Entity
and persists any new events that have been added to it.
It will also UPDATE
the row in the index
table with the latest values derived from the entities attributes.
It returns the number of events that were persisted.
In the code below we have a name
column in the index
table that needs to be kept in sync with the entity's state.
extern crate es_entity; extern crate sqlx; extern crate serde; extern crate tokio; extern crate anyhow; use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, NameUpdated { name: String }, } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { EntityEvents::init( self.id, [UserEvent::Initialized { id: self.id, name: self.name, }], ) } } impl TryFromEvents<UserEvent> for User { fn try_from_events(mut events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { let mut name = String::new(); for event in events.iter_all() { match event { UserEvent::Initialized { name: n, .. } => name = n.clone(), UserEvent::NameUpdated { name: n } => name = n.clone(), } } Ok(User { id: events.id().clone(), name, events }) } } pub struct NewUser { id: UserId, name: String } use es_entity::*; #[derive(EsEntity)] pub struct User { pub id: UserId, // The name attribute on the `Entity` must be accessible // for updates of the `index` table. name: String, events: EntityEvents<UserEvent>, } impl User { pub fn change_name(&mut self, name: String) { self.events.push(UserEvent::NameUpdated { name: name.clone() }); self.name = name; } } #[derive(EsRepo)] #[es_repo(entity = "User", columns(name = "String"))] pub struct Users { pool: sqlx::PgPool } async fn init_pool() -> anyhow::Result<sqlx::PgPool> { let pg_con = format!("postgres://user:password@localhost:5432/pg"); Ok(sqlx::PgPool::connect(&pg_con).await?) } #[tokio::main] async fn main() -> anyhow::Result<()> { let users = Users { pool: init_pool().await? }; // First create a user let new_user = NewUser { id: UserId::new(), name: "Fred".to_string() }; let mut user = users.create(new_user).await?; // Now update the user user.change_name("Frederick".to_string()); // The `update` fn takes a mutable reference to an `Entity` and persists new events let n_events = users.update(&mut user).await?; assert_eq!(n_events, 1); // One NameUpdated event was persisted Ok(()) }
The update part of the update
function looks somewhat equivalent to:
impl Users {
pub async fn update(
&self,
entity: &mut User
) -> Result<usize, es_entity::EsRepoError> {
// Check if there are any new events to persist
if !entity.events().any_new() {
return Ok(0);
}
let id = &entity.id;
// The attribute specified in the `columns` option
let name = &entity.name;
sqlx::query!("UPDATE users SET name = $2 WHERE id = $1",
id as &UserId,
name as &String
)
.execute(self.pool())
.await?;
// persist new events
// execute post_persist_hook
// return number of events persisted
}
}
The key thing to configure is how the columns of the index table get updated via the update
option.
The update(accessor = "<>")
option modifies how the field is accessed on the Entity
type.
extern crate es_entity; extern crate sqlx; extern crate serde; fn main () {} use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { unimplemented!() } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { unimplemented!() } } pub struct NewUser { id: UserId, name: String } use es_entity::*; #[derive(EsEntity)] pub struct User { pub id: UserId, name: String, events: EntityEvents<UserEvent>, } impl User { pub fn display_name(&self) -> String { format!("User: {}", self.name) } } #[derive(EsRepo)] #[es_repo( entity = "User", columns( // Instead of using the `name` field on the `Entity` struct // the generated code will use: `entity.display_name()` // to populate the `name` column during updates. name(ty = "String", update(accessor = "display_name()")), ) )] pub struct Users { pool: sqlx::PgPool }
The update(persist = false)
option prevents updating the column.
This is useful for columns that should never change after creation.
extern crate es_entity; extern crate sqlx; extern crate serde; fn main () {} use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { unimplemented!() } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { unimplemented!() } } use es_entity::*; // Assume the name of a user is immutable. pub struct NewUser { id: UserId, name: String } #[derive(EsEntity)] pub struct User { pub id: UserId, // Exposing the `name` attribute on the `Entity` is optional // as it does not need to be accessed during update. // name: String events: EntityEvents<UserEvent>, } #[derive(EsRepo)] #[es_repo( entity = "User", columns( name(ty = "String", update(persist = false)) ) )] pub struct Users { pool: sqlx::PgPool }
Note that if no columns need updating (all columns have update(persist = false)
), the UPDATE
query is skipped entirely for better performance.
fn find_by
Every column
that gets configured on the EsRepo
will get the following fn
s:
fn find_by_<column> -> Result<Entity, EntityError>
fn maybe_find_by_<column> -> Result<Option<Entity>, EntityError>
It is assumed that your database schema has a relevant INDEX
on <column>
to make the lookup efficient.
extern crate es_entity; extern crate sqlx; extern crate serde; extern crate tokio; extern crate anyhow; use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { EntityEvents::init( self.id, [UserEvent::Initialized { id: self.id, name: self.name, }], ) } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { Ok(User { id: events.id().clone(), name: "Fred".to_string(), events }) } } pub struct NewUser { id: UserId, name: String } use es_entity::*; #[derive(EsEntity)] pub struct User { pub id: UserId, name: String, events: EntityEvents<UserEvent>, } #[derive(EsRepo)] #[es_repo(entity = "User", columns(name = "String"))] pub struct Users { pool: sqlx::PgPool } async fn init_pool() -> anyhow::Result<sqlx::PgPool> { let pg_con = format!("postgres://user:password@localhost:5432/pg"); Ok(sqlx::PgPool::connect(&pg_con).await?) } #[tokio::main] async fn main() -> anyhow::Result<()> { let users = Users { pool: init_pool().await? }; let new_user = NewUser { id: UserId::new(), name: "Fred".to_string() }; users.create(new_user).await?; let user = users.find_by_name("Fred").await?; assert_eq!(user.name, "Fred"); let user = users.maybe_find_by_name("No Body").await?; assert!(user.is_none()); Ok(()) }
fn list_by
To load a whole page of entities at once you can set the list_by
option on the column.
This will generate the list_by_<column>
fn
s and appropriate cursor
structs.
extern crate es_entity; extern crate sqlx; extern crate serde; extern crate tokio; extern crate anyhow; extern crate uuid; use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, NameUpdated { name: String }, Deleted } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { EntityEvents::init( self.id, [UserEvent::Initialized { id: self.id, name: self.name, }], ) } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { Ok(User { id: events.id().clone(), name: "Fred".to_string(), events }) } } pub struct NewUser { id: UserId, name: String } #[derive(EsEntity)] pub struct User { pub id: UserId, name: String, events: EntityEvents<UserEvent>, } use es_entity::*; #[derive(EsRepo)] #[es_repo( entity = "User", // list_by will generate the UsersByNameCursor columns(name(ty = "String", list_by)) )] pub struct Users { pool: sqlx::PgPool } // // Generated code: // pub mod user_cursor { // pub struct UsersByNameCursor { // name: String // // id is always added to disambiguate // // incase the `name` column is not unique // id: UserId, // } // // // Cursors that always exist: // pub struct UsersById { // id: UserId, // } // // pub struct UsersByCreatedAt { // created_at: chrono::DateTime<chrono::Utc> // id: UserId, // } // } // async fn init_pool() -> anyhow::Result<sqlx::PgPool> { let pg_con = format!("postgres://user:password@localhost:5432/pg"); Ok(sqlx::PgPool::connect(&pg_con).await?) } #[tokio::main] async fn main() -> anyhow::Result<()> { let users = Users { pool: init_pool().await? }; let new_user = NewUser { id: UserId::new(), name: "Fred".to_string() }; users.create(new_user).await?; let PaginatedQueryRet { entities, has_next_page: _, end_cursor: _, } = users .list_by_id( PaginatedQueryArgs { first: 5, // after: None represents beginning of the list after: Some(user_cursor::UsersByIdCursor { id: uuid::Uuid::nil().into(), }), }, ListDirection::Ascending, ) .await?; assert!(!entities.is_empty()); // To collect all entities in a loop you can use `into_next_query()`. // This is not recommended - just to highlight the API. let mut query = Default::default(); let mut all_users = Vec::new(); loop { let mut res = users.list_by_name(query, Default::default()).await?; all_users.extend(res.entities.drain(..)); if let Some(next_query) = res.into_next_query() { query = next_query; } else { break; } } assert!(!all_users.is_empty()); Ok(()) }
fn list_for
Similar to list_by
the list_for
option lets you query pages of entities.
The difference is that list_for
accepts an additional filter argument.
This is useful for situations where you have a 1-to-n
relationship between 2 entities and you want to find all entities on the n
side that share the same foreign key.
extern crate es_entity; extern crate sqlx; extern crate serde; extern crate tokio; extern crate anyhow; extern crate uuid; use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, NameUpdated { name: String }, } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { unimplemented!() } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { unimplemented!() } } pub struct NewUser { id: UserId, name: String } #[derive(EsEntity)] pub struct User { pub id: UserId, events: EntityEvents<UserEvent>, } use es_entity::*; es_entity::entity_id! { UserDocumentId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserDocumentId")] pub enum UserDocumentEvent { Initialized { id: UserDocumentId, owner_id: UserId }, } impl IntoEvents<UserDocumentEvent> for NewUserDocument { fn into_events(self) -> EntityEvents<UserDocumentEvent> { EntityEvents::init( self.id, [UserDocumentEvent::Initialized { id: self.id, owner_id: self.owner_id, }], ) } } impl TryFromEvents<UserDocumentEvent> for UserDocument { fn try_from_events(events: EntityEvents<UserDocumentEvent>) -> Result<Self, EsEntityError> { Ok(UserDocument { id: events.id().clone(), owner_id: UserId::new(), events }) } } pub struct NewUserDocument { id: UserDocumentId, owner_id: UserId } #[derive(EsEntity)] pub struct UserDocument { pub id: UserDocumentId, owner_id: UserId, events: EntityEvents<UserDocumentEvent>, } #[derive(EsRepo)] #[es_repo( entity = "UserDocument", columns( // The column name in the schema user_id( ty = "UserId", // generate the `list_for` fn list_for, // The accessor on the `NewUserDocument` type create(accessor = "owner_id"), // Its immutable - so no need to ever update it update(persist = false) ) ) )] pub struct UserDocuments { pool: sqlx::PgPool } async fn init_pool() -> anyhow::Result<sqlx::PgPool> { let pg_con = format!("postgres://user:password@localhost:5432/pg"); Ok(sqlx::PgPool::connect(&pg_con).await?) } #[tokio::main] async fn main() -> anyhow::Result<()> { let docs = UserDocuments { pool: init_pool().await? }; // Assume we have an existing user that we can get the id from let owner_id = UserId::new(); // Batch creating a few entities for illustration let new_docs = vec![ NewUserDocument { id: UserDocumentId::new(), owner_id }, NewUserDocument { id: UserDocumentId::new(), owner_id } ]; docs.create_all(new_docs).await?; // The fns have the form `list_for_<filter>_by_<cursor>` let docs = docs.list_for_user_id_by_created_at( owner_id, Default::default(), Default::default() ).await?; assert_eq!(docs.entities.len(), 2); Ok(()) }
fn list_for_filter
The list_for_filter
function provides a unified interface for querying entities with optional filtering and flexible sorting.
Unlike list_for
which generates separate functions for each filter/sort combination, list_for_filter
uses a single function that accepts:
- A filter enum (e.g.,
UsersFilter::WithName
orUsersFilter::NoFilter
) - A sort specification with direction
- Pagination arguments
This approach is more flexible when you need to dynamically choose filters and sort orders at runtime, such as in GraphQL resolvers or REST API endpoints where users can specify different combinations of filters and sorting.
How It Works Internally
The list_for_filter
function uses pattern matching to delegate to the appropriate underlying function based on the filter and sort combination:
pub async fn list_for_filter(
&self,
filter: UserDocumentsFilter,
sort: es_entity::Sort<UserDocumentsSortBy>,
cursor: es_entity::PaginatedQueryArgs<user_document_cursor::UserDocumentsCursor>,
) -> Result<es_entity::PaginatedQueryRet<UserDocument, user_document_cursor::UserDocumentsCursor>, EsRepoError> {
let es_entity::Sort { by, direction } = sort;
let es_entity::PaginatedQueryArgs { first, after } = cursor;
let res = match (filter, by) {
// Filter by user_id, sort by ID
(UserDocumentsFilter::WithUserId(filter_value), UserDocumentsSortBy::Id) => {
let after = after.map(user_document_cursor::UserDocumentsByIdCursor::try_from).transpose()?;
let query = es_entity::PaginatedQueryArgs { first, after };
self.list_for_user_id_by_id(filter_value, query, direction).await?
}
// Filter by user_id, sort by created_at
(UserDocumentsFilter::WithUserId(filter_value), UserDocumentsSortBy::CreatedAt) => {
let after = after.map(user_document_cursor::UserDocumentsByCreatedAtCursor::try_from).transpose()?;
let query = es_entity::PaginatedQueryArgs { first, after };
self.list_for_user_id_by_created_at(filter_value, query, direction).await?
}
// No filter, sort by ID
(UserDocumentsFilter::NoFilter, UserDocumentsSortBy::Id) => {
let after = after.map(user_document_cursor::UserDocumentsByIdCursor::try_from).transpose()?;
let query = es_entity::PaginatedQueryArgs { first, after };
self.list_by_id(query, direction).await?
}
// ... more combinations
};
Ok(res)
}
This pattern matching approach ensures type safety while providing a unified interface for all filter/sort combinations.
Important Notes
Cursor and Sort Alignment: The cursor type in PaginatedQueryArgs
must match the sort field specified in the Sort
parameter. If they don't align, you'll get a CursorDestructureError
at runtime. For example, if you're sorting by CreatedAt
but your cursor is of type UsersByIdCursor
, the conversion will fail.
Column Options: The available filter and sort combinations are determined by your column configuration:
- Filters: Generated for columns with the
list_for
option enabled - Sort By: Generated for columns with the
list_by
option enabled (ID and created_at are included by default)
Only columns configured with these options will appear in the respective filter and sort enums.
Example
extern crate es_entity; extern crate sqlx; extern crate serde; extern crate tokio; extern crate anyhow; extern crate uuid; use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, NameUpdated { name: String }, } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { unimplemented!() } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { unimplemented!() } } pub struct NewUser { id: UserId, name: String } #[derive(EsEntity)] pub struct User { pub id: UserId, events: EntityEvents<UserEvent>, } use es_entity::*; es_entity::entity_id! { UserDocumentId } #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserDocumentId")] pub enum UserDocumentEvent { Initialized { id: UserDocumentId, owner_id: UserId }, } impl IntoEvents<UserDocumentEvent> for NewUserDocument { fn into_events(self) -> EntityEvents<UserDocumentEvent> { EntityEvents::init( self.id, [UserDocumentEvent::Initialized { id: self.id, owner_id: self.owner_id, }], ) } } impl TryFromEvents<UserDocumentEvent> for UserDocument { fn try_from_events(events: EntityEvents<UserDocumentEvent>) -> Result<Self, EsEntityError> { Ok(UserDocument { id: events.id().clone(), owner_id: UserId::new(), events }) } } pub struct NewUserDocument { id: UserDocumentId, owner_id: UserId } #[derive(EsEntity)] pub struct UserDocument { pub id: UserDocumentId, owner_id: UserId, events: EntityEvents<UserDocumentEvent>, } #[derive(EsRepo)] #[es_repo( entity = "UserDocument", columns( user_id( ty = "UserId", list_for, create(accessor = "owner_id"), update(persist = false) ) ) )] pub struct UserDocuments { pool: sqlx::PgPool } async fn init_pool() -> anyhow::Result<sqlx::PgPool> { let pg_con = format!("postgres://user:password@localhost:5432/pg"); Ok(sqlx::PgPool::connect(&pg_con).await?) } #[tokio::main] async fn main() -> anyhow::Result<()> { let docs = UserDocuments { pool: init_pool().await? }; // Assume we have an existing user that we can get the id from let owner_id = UserId::new(); // Batch creating a few entities for illustration let new_docs = vec![ NewUserDocument { id: UserDocumentId::new(), owner_id }, NewUserDocument { id: UserDocumentId::new(), owner_id } ]; docs.create_all(new_docs).await?; // Filter by user_id, sorted by created_at ascending let filtered_docs = docs.list_for_filter( UserDocumentsFilter::WithUserId(owner_id), Sort { by: UserDocumentsSortBy::CreatedAt, direction: ListDirection::Ascending, }, PaginatedQueryArgs { first: 10, after: None, } ).await?; assert_eq!(filtered_docs.entities.len(), 2); // No filter, sorted by ID descending let all_docs = docs.list_for_filter( UserDocumentsFilter::NoFilter, Sort { by: UserDocumentsSortBy::Id, direction: ListDirection::Descending, }, PaginatedQueryArgs { first: 10, after: None, } ).await?; assert!(all_docs.entities.len() >= 2); Ok(()) }
fn delete
If you are using Event Sourcing we assume you believe in immutability and keeping a long term audit history.
Deleting data from the Database goes against these principles.
Therefore es-entity
does not provide a way to actually delete data.
It is however possible to configure a soft delete option by marking delete = soft
on the EsRepo
.
This will omit entities that have been flagged as deleted from all queries as well as generate additional queries that can include the deleted entities:
fn find_by_<column>_include_deleted
fn maybe_find_by_<column>_include_deleted
fn list_by_<column>_include_deleted
fn list_for_<column>_by_<cursor>_include_deleted
As a prerequisite the index
table must include a deleted
column:
CREATE TABLE users (
id UUID PRIMARY KEY,
name VARCHAR NOT NULL,
-- deleted will be set to 'TRUE' when `delete` is called.
deleted BOOL DEFAULT false,
created_at TIMESTAMPTZ NOT NULL
);
extern crate es_entity; extern crate sqlx; extern crate serde; extern crate tokio; extern crate anyhow; use serde::{Deserialize, Serialize}; es_entity::entity_id! { UserId } impl IntoEvents<UserEvent> for NewUser { fn into_events(self) -> EntityEvents<UserEvent> { EntityEvents::init( self.id, [UserEvent::Initialized { id: self.id, name: self.name, }], ) } } impl TryFromEvents<UserEvent> for User { fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> { Ok(User { id: events.id().clone(), name: "Delyth".to_string(), events }) } } pub struct NewUser { id: UserId, name: String } use es_entity::*; #[derive(EsEvent, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "UserId")] pub enum UserEvent { Initialized { id: UserId, name: String }, Deleted } #[derive(EsEntity)] pub struct User { pub id: UserId, name: String, events: EntityEvents<UserEvent>, } impl User { fn delete(&mut self) -> Idempotent<()> { idempotency_guard!( self.events.iter_all(), UserEvent::Deleted ); self.events.push(UserEvent::Deleted); Idempotent::Executed(()) } } #[derive(EsRepo)] #[es_repo( entity = "User", columns(name = "String"), delete = "soft" )] pub struct Users { pool: sqlx::PgPool } async fn init_pool() -> anyhow::Result<sqlx::PgPool> { let pg_con = format!("postgres://user:password@localhost:5432/pg"); Ok(sqlx::PgPool::connect(&pg_con).await?) } #[tokio::main] async fn main() -> anyhow::Result<()> { let users = Users { pool: init_pool().await? }; let new_user = NewUser { id: UserId::new(), name: "Delyth".to_string() }; let mut user = users.create(new_user).await?; let found_user = users.maybe_find_by_name("Delyth").await?; assert!(found_user.is_some()); if user.delete().did_execute() { users.delete(user).await?; } let found_user = users.maybe_find_by_name("Delyth").await?; assert!(found_user.is_none()); let found_user = users.maybe_find_by_name_include_deleted("Delyth").await?; assert!(found_user.is_some()); sqlx::query!(r#" WITH deleted_users AS ( DELETE FROM user_events WHERE id IN (SELECT id FROM users WHERE deleted = true) RETURNING id ) DELETE FROM users WHERE deleted = true"# ).execute(users.pool()).await?; Ok(()) }
Transactions
One big advantage of using an ACID compliant database (such as Postgres) is transactional guarantees.
That is you can execute multiple writes atomically or multiple successive queries can view a consistent snapshot of your data.
The sqlx
struct that manages this is the Transaction
that is typically acquired from a pool.
Method Variants
All CRUD fn
s thates-entity
generates come in 2 variants:
async fn create(new_entity: NewEntity)
async fn create_in_op(<connection>, new_entity: NewEntity)
async fn update(entity: &mut Entity)
async fn update_in_op(<connection>, entity: &mut Entity)
async fn find_by_id(id: EntityId)
async fn find_by_id_in_op(<connection>, id: EntityId)
etc
In all cases the _in_op
variant accepts a first argument that represents the connection to the database.
The non-_in_op
variant simply wraps the _in_op
call by passing an appropriate connection argument internally.
Connection Types and Traits
The type of the <connection>
argument is generic requiring either the AtomicOperation
or IntoOneTimeExecutor
trait to be implemented on the type.
There is a blanket implementation that makes every AtomicOperation
implement IntoOneTimeExecutor
- but the reverse is not the case.
async fn find_by_id_in_op<'a, OP>(op: OP, id: EntityId)
where
OP: IntoOneTimeExecutor<'a>;
async fn create_in_op<OP>(op: &mut OP, new_entity: NewEntity)
where
OP: AtomicOperation;
Both traits wrap access to an sqlx::Executor
implementation that ultimately executes the query.
The difference is that the IntoOneTimeExecutor
trait ensures in a typesafe way that only 1 database operation can occur by consuming the inner reference.
Example Usage
extern crate anyhow; extern crate sqlx; extern crate tokio; extern crate es_entity; extern crate uuid; async fn init_pool() -> anyhow::Result<sqlx::PgPool> { let pg_con = format!("postgres://user:password@localhost:5432/pg"); Ok(sqlx::PgPool::connect(&pg_con).await?) } async fn count_users(op: impl es_entity::IntoOneTimeExecutor<'_>) -> anyhow::Result<i64> { let row = op.into_executor().fetch_optional(sqlx::query!( "SELECT COUNT(*) FROM users" )).await?; Ok(row.and_then(|r| r.count).unwrap_or(0)) } // Ridiculous example of 2 operations that we want to execute atomically async fn delete_and_count(op: &mut impl es_entity::AtomicOperation, id: uuid::Uuid) -> anyhow::Result<i64> { sqlx::query!( "DELETE FROM users WHERE id = $1", id ).execute(op.as_executor()).await?; let row = sqlx::query!( "SELECT COUNT(*) FROM users" ).fetch_optional(op.as_executor()).await?; Ok(row.and_then(|r| r.count).unwrap_or(0)) } #[tokio::main] async fn main() -> anyhow::Result<()> { let pool = init_pool().await?; // &sqlx::PgPool implements IntoOneTimeExecutor let _ = count_users(&pool).await?; // It can only execute 1 roundtrip consistently as it will // check out a new connection from the pool for each operation. // Hence we cannot pass it to `fn`'s that need AtomicOperation // as we cannot guarantee that they will be consistent. // let _ = delete_and_count(&pool).await?; // <- won't compile // &mut sqlx::PgTransaction implements AtomicOperation // so we can use it for both `fns` let mut tx = pool.begin().await?; let _ = count_users(&mut tx).await?; let some_existing_user_id = uuid::Uuid::now_v7(); let _ = delete_and_count(&mut tx, some_existing_user_id).await?; // Don't forget to commit the operation or the change won't become visible tx.commit().await?; Ok(()) }
Operation Requirements
In es-entity
mutating fn
s generally require 2 roundtrips to update the index
table and append to the events
table.
Hence create_in_op
, update_in_op
and delete_in_op
all require &mut impl AtomicOperation
first arguments.
Most queries on the other hand are executed with 1 round trip (to fetch the events) and thus accept impl IntoOneTimeExecutor<'_>
first arguments.
Exceptions to this are for nested
entities which have will be explained in a later section.
Aggregates
In Domain Driven Design an aggregate
represents a collection of entities
that must be atomically updated in order to keep the business rules intanct.
One entity
is designated the aggregate root
which is responsible for enforcing that the relationships between the component entities hold.
Keeping Aggregates Small
In practice it is preferable to keep your aggregates
as small as possible.
In most cases the aggregate
should only contain a single entity
- in which case the aggregate
is indistinguishable from the single entity
it contains.
Keeping aggregates
small by applying careful design and consideration of the domain invariants and their boundaries promotes decoupling and reduces over all complexity.
Its easier to reason about and test the behaviour of "a single thing" vs "a bunch of things" in aggregate.
When to Use Aggregates
A common misconception is that once you have identified a parent-child relationship you should naturally represent them as an aggregate. This is however not the case - not every entity relationship that intuitively presents as a parent-child hierarchy needs to be modelled as an aggregate. Only when there is a business rule that inherently spans the relationship does it become mandatory.
Example: Subscription and Billing Periods
An example of this could be if you have a Subscription
that has successive BillingPeriods
.
Say a use case specifies that the system should be able to add a line item to the current BillingPeriod
.
The emphesis here is on the word current
- the domain invariant is that there may only be a single current
BillingPeriod
per Subscription
.
But how do we enforce that?
To keep the system consistent we need a "thing" that tracks all BillingPeriods
and enforces the uniqueness of the current
state across them.
A BillingPeriod
entity is not aware of the other ones and can therefore not enforce whether or not current
status is indeed unique or not.
Implementation Approaches
1. Simple Foreign Key Relationship
Lets first consider an approach that treats the entities as separate with a simple foreign key relationship:
let subscription = subs.find_by_id(id).await?;
let billing_period_id = subscription.current_billing_period_id();
let mut billing_period = periods.find_by_id(billing_period_id).await?;
billing_period.add_line_item(amount);
periods.update(&mut billing_period).await?;
The risk here is that it is possible that the period which is the current
one changed between the line that queries the subscription and the line that updates the period.
It is of course possible to prohibit this given the correct implementation - but that puts the burdon on the developer and may be brittle.
In general the foreign key approach may lead to inconsistent states in edge cases.
Depending on the specifics of the domain and the processes that would need to be invoked if this edge case is hit that may or may not be acceptable.
2. Transactional Approach
One way of removing this edge case is using the transactional guarantees of the database to enforce consistency across the 2 entities.
To achieve this you would probably have to use SERIALIZABLE
isolation level - which adds a lot of overhead to the database.
let mut tx = pool.begin().await?;
sqlx::query!("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE").execute(&mut *tx).await?;
let subscription = subs.find_by_id_in_op(&mut tx, id).await?;
let billing_period_id = subscription.current_billing_period_id();
let mut billing_period = billing_periods.find_by_id_in_op(&mut tx, billing_period_id).await?;
billing_period.add_line_item(amount);
billing_periods.update_in_op(&mut tx, &mut billing_period).await?;
tx.commit().await?;
By using a database transaction we have essentially created a logical-aggregate
where we are using the features of the database to enforce consistency.
Thus eleviating some of the cognitive overhead and risk.
We still have to remember however that this logical-aggregate
exists (even if not visible in the code) and ensure that we always set the correct transaction boundaries and isolation level when we are accessing the BillingPeriods
.
3. Nested Aggregate Approach
To make the relationship more obvious and make it impossible to introduce edge cases we could also nest the BillingPeriod
inside the Subscription
.
This way we use the modeling and relationships of our struct
s to reflect the aggregate
relationship more clearly.
All access to the BillingPeriod
would be moderated by the Subscription
root.
And updates would be proxied via the root entity to guarantee we are updating the correct one.
let mut subscription = subs.find_by_id(id).await?;
subscription.add_line_item_to_current_billing_period(amount);
subs.update(&mut subscription).await?;
This essentially removes all edge cases and guarantees atomicity on the type level - which is good. But it introduces some complexity on handling the nesting itself.
4. Domain Restructuring
Finally we could simply re-structure the entities
so that no 'higher-level' enforcement is necesairy.
An example might be to represent CurrentBillingPeriod
and ClosedBillingPeriod
as separate entities entirely.
In the real world this approach would probably be better than any of the examples above.
After all, if a "thing" has fundamentally different domain rules when its in one state vs another state - why not simply represent the two states as two separate entities
? Especially if the restructuring allows you to reduce the size of your aggregates
.
That would make the implementation look something like:
let mut billing_period = current_billing_periods.find_by_subscription_id(subscription_id).await?;
billing_period.add_line_item(amount);
current_billing_periods.update(&mut billing_period).await?;
The current_billing_periods
repository could not return a non-current one thereby sidestepping the coordination issue entirely.
Summary
Of the discussed options 3 of the 4 approaches (simple foreign key, transactional, restructuring) can be handled with the features of es-entity
that were previously discussed.
The nested approach requires special support from your repository to correctly persist all the nested entities and hydrate them when loading the root.
If taking all options into consideration you decide the correct approach to solving your domain constraint is via nesting the next section will describe how to represent that using es-entity
.
Nesting
Building on the aggregate example from the previous chapter, let's implement the nested approach for our Subscription
and BillingPeriod
entities.
As discussed, this approach makes the aggregate relationship explicit in the type system and ensures all access to nested entities is moderated through the aggregate root.
Setting up the Database Tables
First, we need to create the tables for both the parent (Subscription
) and nested (BillingPeriod
) entities:
-- The parent entity table
CREATE TABLE subscriptions (
id UUID PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE subscription_events (
id UUID NOT NULL REFERENCES subscriptions(id),
sequence INT NOT NULL,
event_type VARCHAR NOT NULL,
event JSONB NOT NULL,
context JSONB DEFAULT NULL,
recorded_at TIMESTAMPTZ NOT NULL,
UNIQUE(id, sequence)
);
-- The nested entity table
CREATE TABLE billing_periods (
id UUID PRIMARY KEY,
subscription_id UUID NOT NULL REFERENCES subscriptions(id),
created_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE billing_period_events (
id UUID NOT NULL REFERENCES billing_periods(id),
sequence INT NOT NULL,
event_type VARCHAR NOT NULL,
event JSONB NOT NULL,
context JSONB DEFAULT NULL,
recorded_at TIMESTAMPTZ NOT NULL,
UNIQUE(id, sequence)
);
Note how the nested index
table (billing_periods
) includes a foreign key to the parent.
Defining the Nested Entity
Let's start by implementing the BillingPeriod
entity that will be nested inside Subscription
.
There are no special requirements on the child entity
and it can be setup just like always:
#![allow(unused)] fn main() { extern crate es_entity; extern crate sqlx; extern crate serde; extern crate derive_builder; extern crate tokio; extern crate anyhow; use derive_builder::Builder; use es_entity::*; use serde::{Deserialize, Serialize}; es_entity::entity_id! { SubscriptionId, BillingPeriodId } #[derive(EsEvent, Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "BillingPeriodId")] pub enum BillingPeriodEvent { Initialized { id: BillingPeriodId, subscription_id: SubscriptionId, }, LineItemAdded { amount: f64, description: String, }, Closed, } #[derive(EsEntity, Builder)] #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] pub struct BillingPeriod { pub id: BillingPeriodId, pub subscription_id: SubscriptionId, pub is_current: bool, pub line_items: Vec<LineItem>, events: EntityEvents<BillingPeriodEvent>, } #[derive(Debug, Clone)] pub struct LineItem { pub amount: f64, pub description: String, } impl BillingPeriod { pub fn add_line_item(&mut self, amount: f64, description: String) -> Idempotent<usize> { idempotency_guard!( self.events.iter_all().rev(), BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. } if a == &amount && d == &description ); self.line_items.push(LineItem { amount, description: description.clone(), }); self.events.push(BillingPeriodEvent::LineItemAdded { amount, description, }); Idempotent::Executed(self.line_items.len()) } pub fn close(&mut self) -> Idempotent<()> { idempotency_guard!( self.events.iter_all().rev(), BillingPeriodEvent::Closed ); self.is_current = false; self.events.push(BillingPeriodEvent::Closed); Idempotent::Executed(()) } } impl TryFromEvents<BillingPeriodEvent> for BillingPeriod { fn try_from_events(events: EntityEvents<BillingPeriodEvent>) -> Result<Self, EsEntityError> { let mut builder = BillingPeriodBuilder::default().is_current(true); let mut line_items = Vec::new(); for event in events.iter_all() { match event { BillingPeriodEvent::Initialized { id, subscription_id } => { builder = builder.id(*id).subscription_id(*subscription_id); } BillingPeriodEvent::LineItemAdded { amount, description } => { line_items.push(LineItem { amount: *amount, description: description.clone(), }); } BillingPeriodEvent::Closed => { builder = builder.is_current(false) } } } builder .line_items(line_items) .events(events) .build() } } #[derive(Debug, Clone, Builder)] pub struct NewBillingPeriod { pub id: BillingPeriodId, pub subscription_id: SubscriptionId, } impl IntoEvents<BillingPeriodEvent> for NewBillingPeriod { fn into_events(self) -> EntityEvents<BillingPeriodEvent> { EntityEvents::init( self.id, vec![BillingPeriodEvent::Initialized { id: self.id, subscription_id: self.subscription_id, }], ) } } }
Defining the Parent Entity with Nested Children
Now let's implement the Subscription
entity that will contain the nested BillingPeriod
entities:
#![allow(unused)] fn main() { extern crate es_entity; extern crate sqlx; extern crate serde; extern crate derive_builder; extern crate tokio; extern crate anyhow; use derive_builder::Builder; use es_entity::*; use serde::{Deserialize, Serialize}; es_entity::entity_id! { SubscriptionId, BillingPeriodId } #[derive(EsEvent, Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "BillingPeriodId")] pub enum BillingPeriodEvent { Initialized { id: BillingPeriodId, subscription_id: SubscriptionId, }, LineItemAdded { amount: f64, description: String, }, Closed, } #[derive(EsEntity, Builder)] #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] pub struct BillingPeriod { pub id: BillingPeriodId, pub subscription_id: SubscriptionId, pub is_current: bool, pub line_items: Vec<LineItem>, events: EntityEvents<BillingPeriodEvent>, } #[derive(Debug, Clone)] pub struct LineItem { pub amount: f64, pub description: String, } impl BillingPeriod { pub fn add_line_item(&mut self, amount: f64, description: String) -> Idempotent<usize> { idempotency_guard!( self.events.iter_all().rev(), BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. } if a == &amount && d == &description ); self.line_items.push(LineItem { amount, description: description.clone(), }); self.events.push(BillingPeriodEvent::LineItemAdded { amount, description, }); Idempotent::Executed(self.line_items.len()) } pub fn close(&mut self) -> Idempotent<()> { idempotency_guard!( self.events.iter_all().rev(), BillingPeriodEvent::Closed ); self.is_current = false; self.events.push(BillingPeriodEvent::Closed); Idempotent::Executed(()) } } impl TryFromEvents<BillingPeriodEvent> for BillingPeriod { fn try_from_events(events: EntityEvents<BillingPeriodEvent>) -> Result<Self, EsEntityError> { let mut builder = BillingPeriodBuilder::default().is_current(true); let mut line_items = Vec::new(); for event in events.iter_all() { match event { BillingPeriodEvent::Initialized { id, subscription_id } => { builder = builder.id(*id).subscription_id(*subscription_id); } BillingPeriodEvent::LineItemAdded { amount, description } => { line_items.push(LineItem { amount: *amount, description: description.clone(), }); } BillingPeriodEvent::Closed => { builder = builder.is_current(false) } } } builder .line_items(line_items) .events(events) .build() } } #[derive(Debug, Clone, Builder)] pub struct NewBillingPeriod { pub id: BillingPeriodId, pub subscription_id: SubscriptionId, } impl IntoEvents<BillingPeriodEvent> for NewBillingPeriod { fn into_events(self) -> EntityEvents<BillingPeriodEvent> { EntityEvents::init( self.id, vec![BillingPeriodEvent::Initialized { id: self.id, subscription_id: self.subscription_id, }], ) } } #[derive(EsEvent, Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "SubscriptionId")] pub enum SubscriptionEvent { Initialized { id: SubscriptionId }, BillingPeriodStarted { period_id: BillingPeriodId }, } #[derive(EsEntity, Builder)] #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] pub struct Subscription { pub id: SubscriptionId, current_period_id: Option<BillingPeriodId>, events: EntityEvents<SubscriptionEvent>, // The `#[es_entity(nested)]` attribute marks this field as containing nested entities. // It must be of type `Nested<T>`. // The #[builder(default)] will initialize it as empty. // The Repository will load the children after the parent as been hydrated. #[es_entity(nested)] #[builder(default)] billing_periods: Nested<BillingPeriod>, } impl Subscription { pub fn start_new_billing_period(&mut self) -> Idempotent<BillingPeriodId> { // Close the current billing period if there is one if let Some(current_id) = self.current_period_id { if let Some(current_period) = self.billing_periods.get_persisted_mut(¤t_id) { current_period.close(); } } // Create the new billing period let new_period = NewBillingPeriod { id: BillingPeriodId::new(), subscription_id: self.id, }; let id = new_period.id; self.billing_periods.add_new(new_period); // Update the current period tracking self.current_period_id = Some(id); self.events.push(SubscriptionEvent::BillingPeriodStarted { period_id: id }); Idempotent::Executed(id) } pub fn add_line_item_to_current_billing_period(&mut self, amount: f64, description: String) -> Idempotent<usize> { // Use the tracked current period ID to access the billing period directly if let Some(current_id) = self.current_period_id { if let Some(current_period) = self.billing_periods.get_persisted_mut(¤t_id) { return current_period.add_line_item(amount, description); } } Idempotent::Ignored } } impl TryFromEvents<SubscriptionEvent> for Subscription { fn try_from_events(events: EntityEvents<SubscriptionEvent>) -> Result<Self, EsEntityError> { let mut builder = SubscriptionBuilder::default(); for event in events.iter_all() { match event { SubscriptionEvent::Initialized { id } => { builder = builder.id(*id); } SubscriptionEvent::BillingPeriodStarted { period_id } => { builder = builder.current_period_id(Some(*period_id)); } } } builder .events(events) .build() } } #[derive(Debug, Clone, Builder)] pub struct NewSubscription { pub id: SubscriptionId, } impl IntoEvents<SubscriptionEvent> for NewSubscription { fn into_events(self) -> EntityEvents<SubscriptionEvent> { EntityEvents::init( self.id, vec![SubscriptionEvent::Initialized { id: self.id }], ) } } }
The key points to note:
- The
billing_periods
field is marked with#[es_entity(nested)]
- The field type is
Nested<BillingPeriod>
which is a special container for nested entities - We use
add_new()
to add new nested entities - We mutate the children via
get_persisted_mut()
.
Under the hood the EsEntity
macro will create an implementation of the Parent
trait:
pub trait Parent<T: EsEntity> {
fn new_children_mut(&mut self) -> &mut Vec<<T as EsEntity>::New>;
fn iter_persisted_children_mut<'a>(&'a mut self) -> impl Iterator<Item = &'a mut T>
where
T: 'a;
fn inject_children(&mut self, entities: impl IntoIterator<Item = T>);
}
for every field marked #[es_entity(nested)]
.
Setting up the Repositories
The repository setup is where the magic happens for nested entities.
We need to configure both the parent and child repositories with special attributes.
It is recommended to put both Repositories in the same file but only mark the parent one as pub
.
This leverages the rust module system to enforce that the children cannot be accessed directly.
extern crate es_entity; extern crate sqlx; extern crate serde; extern crate derive_builder; extern crate tokio; extern crate anyhow; use derive_builder::Builder; use es_entity::*; use serde::{Deserialize, Serialize}; es_entity::entity_id! { SubscriptionId, BillingPeriodId } #[derive(EsEvent, Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "BillingPeriodId")] pub enum BillingPeriodEvent { Initialized { id: BillingPeriodId, subscription_id: SubscriptionId, }, LineItemAdded { amount: f64, description: String, }, Closed, } #[derive(EsEntity, Builder)] #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] pub struct BillingPeriod { pub id: BillingPeriodId, pub subscription_id: SubscriptionId, pub is_current: bool, pub line_items: Vec<LineItem>, events: EntityEvents<BillingPeriodEvent>, } #[derive(Debug, Clone)] pub struct LineItem { pub amount: f64, pub description: String, } impl BillingPeriod { pub fn add_line_item(&mut self, amount: f64, description: String) -> Idempotent<usize> { idempotency_guard!( self.events.iter_all().rev(), BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. } if a == &amount && d == &description ); self.line_items.push(LineItem { amount, description: description.clone(), }); self.events.push(BillingPeriodEvent::LineItemAdded { amount, description, }); Idempotent::Executed(self.line_items.len()) } pub fn close(&mut self) -> Idempotent<()> { idempotency_guard!( self.events.iter_all().rev(), BillingPeriodEvent::Closed ); self.is_current = false; self.events.push(BillingPeriodEvent::Closed); Idempotent::Executed(()) } } impl TryFromEvents<BillingPeriodEvent> for BillingPeriod { fn try_from_events(events: EntityEvents<BillingPeriodEvent>) -> Result<Self, EsEntityError> { let mut builder = BillingPeriodBuilder::default().is_current(true); let mut line_items = Vec::new(); for event in events.iter_all() { match event { BillingPeriodEvent::Initialized { id, subscription_id } => { builder = builder.id(*id).subscription_id(*subscription_id); } BillingPeriodEvent::LineItemAdded { amount, description } => { line_items.push(LineItem { amount: *amount, description: description.clone(), }); } BillingPeriodEvent::Closed => { builder = builder.is_current(false) } } } builder .line_items(line_items) .events(events) .build() } } #[derive(Debug, Clone, Builder)] pub struct NewBillingPeriod { pub id: BillingPeriodId, pub subscription_id: SubscriptionId, } impl IntoEvents<BillingPeriodEvent> for NewBillingPeriod { fn into_events(self) -> EntityEvents<BillingPeriodEvent> { EntityEvents::init( self.id, vec![BillingPeriodEvent::Initialized { id: self.id, subscription_id: self.subscription_id, }], ) } } #[derive(EsEvent, Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "SubscriptionId")] pub enum SubscriptionEvent { Initialized { id: SubscriptionId }, BillingPeriodStarted { period_id: BillingPeriodId }, } #[derive(EsEntity, Builder)] #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] pub struct Subscription { pub id: SubscriptionId, current_period_id: Option<BillingPeriodId>, events: EntityEvents<SubscriptionEvent>, #[es_entity(nested)] #[builder(default)] billing_periods: Nested<BillingPeriod>, } impl TryFromEvents<SubscriptionEvent> for Subscription { fn try_from_events(events: EntityEvents<SubscriptionEvent>) -> Result<Self, EsEntityError> { let mut builder = SubscriptionBuilder::default(); for event in events.iter_all() { match event { SubscriptionEvent::Initialized { id } => { builder = builder.id(*id); } SubscriptionEvent::BillingPeriodStarted { period_id } => { builder = builder.current_period_id(Some(*period_id)); } } } builder .events(events) .build() } } #[derive(Debug, Clone, Builder)] pub struct NewSubscription { pub id: SubscriptionId, } impl IntoEvents<SubscriptionEvent> for NewSubscription { fn into_events(self) -> EntityEvents<SubscriptionEvent> { EntityEvents::init( self.id, vec![SubscriptionEvent::Initialized { id: self.id }], ) } } fn main() {} #[derive(EsRepo)] #[es_repo( entity = "BillingPeriod", columns( // The foreign key of the parent marked by `parent`. subscription_id(ty = "SubscriptionId", update(persist = false), parent) ) )] // private struct struct BillingPeriods { pool: sqlx::PgPool, } #[derive(EsRepo)] #[es_repo(entity = "Subscription")] pub struct Subscriptions { pool: sqlx::PgPool, // Mark this field as containing the nested repository #[es_repo(nested)] billing_periods: BillingPeriods, } impl Subscriptions { pub fn new(pool: sqlx::PgPool) -> Self { Self { pool: pool.clone(), billing_periods: BillingPeriods { pool }, } } }
The important configuration here:
- The child repository (
BillingPeriods
) marks the foreign key column withparent
. - The parent repository (
Subscriptions
) includes the child repository as a field marked with#[es_repo(nested)]
Using Nested Entities
Now we can use our aggregate with full type safety and automatic loading of nested entities:
extern crate es_entity; extern crate sqlx; extern crate serde; extern crate derive_builder; extern crate tokio; extern crate anyhow; use derive_builder::Builder; use es_entity::*; use serde::{Deserialize, Serialize}; es_entity::entity_id! { SubscriptionId, BillingPeriodId } #[derive(EsEvent, Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "BillingPeriodId")] pub enum BillingPeriodEvent { Initialized { id: BillingPeriodId, subscription_id: SubscriptionId, }, LineItemAdded { amount: f64, description: String, }, Closed, } #[derive(EsEntity, Builder)] #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] pub struct BillingPeriod { pub id: BillingPeriodId, pub subscription_id: SubscriptionId, pub is_current: bool, pub line_items: Vec<LineItem>, events: EntityEvents<BillingPeriodEvent>, } #[derive(Debug, Clone)] pub struct LineItem { pub amount: f64, pub description: String, } impl BillingPeriod { pub fn add_line_item(&mut self, amount: f64, description: String) -> Idempotent<usize> { if !self.is_current { unreachable!() } idempotency_guard!( self.events.iter_all().rev(), BillingPeriodEvent::LineItemAdded { amount: a, description: d, .. } if a == &amount && d == &description ); self.line_items.push(LineItem { amount, description: description.clone(), }); self.events.push(BillingPeriodEvent::LineItemAdded { amount, description, }); Idempotent::Executed(self.line_items.len()) } pub fn close(&mut self) -> Idempotent<()> { idempotency_guard!( self.events.iter_all().rev(), BillingPeriodEvent::Closed ); self.is_current = false; self.events.push(BillingPeriodEvent::Closed); Idempotent::Executed(()) } } impl TryFromEvents<BillingPeriodEvent> for BillingPeriod { fn try_from_events(events: EntityEvents<BillingPeriodEvent>) -> Result<Self, EsEntityError> { let mut builder = BillingPeriodBuilder::default(); let mut line_items = Vec::new(); let mut is_current = true; for event in events.iter_all() { match event { BillingPeriodEvent::Initialized { id, subscription_id } => { builder = builder.id(*id).subscription_id(*subscription_id); } BillingPeriodEvent::LineItemAdded { amount, description } => { line_items.push(LineItem { amount: *amount, description: description.clone(), }); } BillingPeriodEvent::Closed => { is_current = false; } } } builder .is_current(is_current) .line_items(line_items) .events(events) .build() } } #[derive(Debug, Clone, Builder)] pub struct NewBillingPeriod { pub id: BillingPeriodId, pub subscription_id: SubscriptionId, } impl IntoEvents<BillingPeriodEvent> for NewBillingPeriod { fn into_events(self) -> EntityEvents<BillingPeriodEvent> { EntityEvents::init( self.id, vec![BillingPeriodEvent::Initialized { id: self.id, subscription_id: self.subscription_id, }], ) } } #[derive(EsEvent, Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] #[es_event(id = "SubscriptionId")] pub enum SubscriptionEvent { Initialized { id: SubscriptionId }, BillingPeriodStarted { period_id: BillingPeriodId }, } #[derive(EsEntity, Builder)] #[builder(pattern = "owned", build_fn(error = "EsEntityError"))] pub struct Subscription { pub id: SubscriptionId, current_period_id: Option<BillingPeriodId>, events: EntityEvents<SubscriptionEvent>, #[es_entity(nested)] #[builder(default)] billing_periods: Nested<BillingPeriod>, } impl Subscription { pub fn start_new_billing_period(&mut self) -> Idempotent<BillingPeriodId> { if let Some(current_id) = self.current_period_id { if let Some(current_period) = self.billing_periods.get_persisted_mut(¤t_id) { current_period.close(); } } let new_period = NewBillingPeriod { id: BillingPeriodId::new(), subscription_id: self.id, }; let id = new_period.id; self.billing_periods.add_new(new_period); self.current_period_id = Some(id); self.events.push(SubscriptionEvent::BillingPeriodStarted { period_id: id }); Idempotent::Executed(id) } pub fn add_line_item_to_current_billing_period(&mut self, amount: f64, description: String) -> Idempotent<usize> { if let Some(current_id) = self.current_period_id { if let Some(current_period) = self.billing_periods.get_persisted_mut(¤t_id) { return current_period.add_line_item(amount, description); } } Idempotent::Ignored } pub fn current_billing_period(&self) -> Option<&BillingPeriod> { self.current_period_id .and_then(|id| self.billing_periods.get_persisted(&id)) } } impl TryFromEvents<SubscriptionEvent> for Subscription { fn try_from_events(events: EntityEvents<SubscriptionEvent>) -> Result<Self, EsEntityError> { let mut builder = SubscriptionBuilder::default(); let mut current_period_id = None; for event in events.iter_all() { match event { SubscriptionEvent::Initialized { id } => { builder = builder.id(*id); } SubscriptionEvent::BillingPeriodStarted { period_id } => { current_period_id = Some(*period_id); } } } builder .current_period_id(current_period_id) .events(events) .build() } } #[derive(Debug, Clone, Builder)] pub struct NewSubscription { pub id: SubscriptionId, } impl IntoEvents<SubscriptionEvent> for NewSubscription { fn into_events(self) -> EntityEvents<SubscriptionEvent> { EntityEvents::init( self.id, vec![SubscriptionEvent::Initialized { id: self.id }], ) } } #[derive(EsRepo)] #[es_repo( entity = "BillingPeriod", columns( subscription_id(ty = "SubscriptionId", update(persist = false), parent) ) )] pub struct BillingPeriods { pool: sqlx::PgPool, } #[derive(EsRepo)] #[es_repo(entity = "Subscription")] pub struct Subscriptions { pool: sqlx::PgPool, #[es_repo(nested)] billing_periods: BillingPeriods, } impl Subscriptions { pub fn new(pool: sqlx::PgPool) -> Self { Self { pool: pool.clone(), billing_periods: BillingPeriods { pool }, } } } async fn init_pool() -> anyhow::Result<sqlx::PgPool> { let pg_con = format!("postgres://user:password@localhost:5432/pg"); Ok(sqlx::PgPool::connect(&pg_con).await?) } #[tokio::main] async fn main() -> anyhow::Result<()> { let subscriptions = Subscriptions::new(init_pool().await?); // Create a new subscription let subscription_id = SubscriptionId::new(); let new_subscription = NewSubscription { id: subscription_id }; let mut subscription = subscriptions.create(new_subscription).await?; // Start a billing period subscription.start_new_billing_period(); // Add some line items to the current period subscription.add_line_item_to_current_billing_period( 100.0, "Monthly subscription fee".to_string() ); subscription.add_line_item_to_current_billing_period( 25.0, "Additional service charge".to_string() ); // Persist all changes (both parent and nested entities) subscriptions.update(&mut subscription).await?; // Load the subscription - nested entities are automatically loaded let loaded = subscriptions.find_by_id(subscription_id).await?; // Access the current billing period if let Some(current_period) = loaded.current_billing_period() { println!("Current period has {} line items", current_period.line_items.len()); for item in ¤t_period.line_items { println!(" - {}: ${}", item.description, item.amount); } } Ok(()) }
One thing to note is that the _in_op
functions of the parent repository now require an AtomicOperation
argument since we must load all the entities in a consistent snapshot:
async fn find_by_id_in_op<OP>(op: OP, id: EntityId)
where
OP: AtomicOperation;
// The version of the queries in Repositories without nested children
// cannot be used here as it would not load parent + children from a consistent snapshot.
// async fn find_by_id_in_op<'a, OP>(op: OP, id: EntityId)
// where
// OP: IntoOneTimeExecutor<'a>;
Benefits of the Nested Approach
This approach provides several key benefits:
- Type Safety: The aggregate boundary is enforced at compile time
- Atomic Updates: All changes to the aggregate are persisted together
- Automatic Loading: When you load the parent, all nested entities are loaded automatically
- Encapsulation: All access to nested entities goes through the aggregate root
- Consistency: The parent entity can enforce invariants across all its children
Performance Considerations
While nesting provides strong consistency guarantees, there are some performance implications to consider:
- Loading: All nested entities are loaded when the parent is loaded. For aggregates with many children, this could impact performance.
- Updates: All nested entities are checked for changes during updates, even if only one was modified.
- Memory: The entire aggregate is held in memory, which could be significant for large aggregates.
For these reasons, it's important to keep aggregates small and focused on a specific consistency boundary.
When to Use Nesting
Use the nested approach when:
- You have a true invariant that spans multiple entities
- The child entities have no meaning without the parent
- You need to enforce consistency rules across the relationship
- The number of child entities is reasonably bounded
Avoid nesting when:
- The relationship is merely associative
- Child entities can exist independently
- You expect unbounded growth in the number of children
- Performance requirements dictate more granular loading/updating
Remember, as discussed in the aggregates chapter, there are often alternative designs that can avoid the need for nesting while still maintaining consistency.
Features
Es Entity provides optional features that can be enabled via Cargo features. These features extend the core functionality of the framework to support specific use cases.
Available Features
- sim-time - Time simulation for testing and development
sim-time
The sim-time
feature enables time simulation capabilities in es-entity, allowing you to accelerate time for testing and development purposes. This is particularly useful for testing time-dependent logic without having to wait for real time to pass.
Enabling sim-time
Add the sim-time
feature to your es-entity dependency:
[dependencies]
es-entity = { version = "0.7", features = ["sim-time"] }
Configuration
The sim-time crate is configured through the TimeConfig
struct. By default, sim-time operates in real-time mode. To enable simulation, you need to initialize it with a configuration:
extern crate es_entity; extern crate tokio; use es_entity::prelude::{sim_time, chrono}; use std::time::Duration; #[tokio::main] async fn main() { let config = sim_time::TimeConfig { realtime: false, simulation: Some(sim_time::SimulationConfig { // Start the simulation at a specific time start_at: chrono::Utc::now(), // Real milliseconds between simulation ticks tick_interval_ms: 10, // Simulated duration per tick tick_duration_secs: Duration::from_secs(86400), // 1 day per 10ms // Whether to switch to real-time when catching up to present transform_to_realtime: false, }), }; // Initialize sim-time with the configuration sim_time::init(config); }
Configuration Parameters
TimeConfig
realtime: bool
- Whentrue
, sim-time is deactivated and all time operations use real time. Whenfalse
, simulation is enabled.simulation: Option<SimulationConfig>
- The simulation configuration. Required whenrealtime
isfalse
.
SimulationConfig
start_at: DateTime<Utc>
- The starting time for the simulation. Defaults to the current time.tick_interval_ms: u64
- The real-world milliseconds between simulation ticks.tick_duration_secs: Duration
- How much simulated time passes per tick.transform_to_realtime: bool
- Iftrue
, the simulation will automatically switch to real-time mode once it catches up to the current time.
Usage
Once configured, sim-time provides several functions that work with simulated time:
extern crate es_entity; extern crate tokio; use es_entity::prelude::{sim_time, chrono}; use std::time::Duration; #[tokio::main] async fn main() { // Initialize sim-time with the example configuration let config = sim_time::TimeConfig { realtime: false, simulation: Some(sim_time::SimulationConfig { start_at: chrono::Utc::now(), tick_interval_ms: 10, tick_duration_secs: Duration::from_secs(86400), // 1 day per 10ms transform_to_realtime: false, }), }; sim_time::init(config); // Get the current simulated time let current_time = sim_time::now(); // Sleep for a simulated duration // With the example config (1 day = 10ms), this sleeps for ~0.04 real seconds sim_time::sleep(Duration::from_secs(3600)).await; // Sleep for 1 simulated hour // Set a timeout on an operation async fn async_operation() -> Result<(), std::io::Error> { Ok(()) } sim_time::timeout(Duration::from_secs(60), async_operation()).await; // Wait until simulation catches up to real time // (only relevant if transform_to_realtime is true) sim_time::wait_until_realtime().await; }
Effect on es-entity
When sim-time is enabled, it affects how es-entity handles timestamps:
-
Database Operations: The
DbOp
struct automatically caches the simulated time when the feature is enabled. This cached time is used instead of databaseNOW()
for all write operations. -
Event Timestamps: All events created during a transaction will use the same simulated timestamp, ensuring consistency.
-
Time-based Queries: Operations that depend on the current time will use the simulated time instead of real time.
Example: Testing Time-Dependent Logic
extern crate es_entity; extern crate tokio; extern crate sqlx; extern crate serde; extern crate anyhow; use es_entity::prelude::*; use std::time::Duration; use es_entity::{EsEntity, EsEvent, EsRepo, TryFromEvents, IntoEvents, EsEntityError, EntityEvents}; use chrono::Datelike; es_entity::entity_id! { SubscriptionId } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[derive(EsEvent)] #[serde(tag = "type")] #[es_event(id = "SubscriptionId")] enum SubscriptionEvent { Initialized { id: SubscriptionId, expires_at: chrono::DateTime<chrono::Utc> }, } #[derive(Clone, EsEntity)] struct Subscription { pub id: SubscriptionId, pub expires_at: chrono::DateTime<chrono::Utc>, pub events: EntityEvents<SubscriptionEvent>, } impl Subscription { pub fn is_expired(&self, now: chrono::DateTime<chrono::Utc>) -> bool { self.expires_at <= now } pub fn created_at(&self) -> chrono::DateTime<chrono::Utc> { // Get the timestamp from when this entity was first persisted self.events.entity_first_persisted_at().unwrap_or_else(|| sim_time::now()) } } impl TryFromEvents<SubscriptionEvent> for Subscription { fn try_from_events(events: EntityEvents<SubscriptionEvent>) -> Result<Self, es_entity::EsEntityError> { let mut expires_at = chrono::Utc::now(); for event in events.iter_all() { match event { SubscriptionEvent::Initialized { expires_at: exp, .. } => { expires_at = *exp; } } } Ok(Self { id: events.id().clone(), expires_at, events }) } } #[derive(Debug)] struct NewSubscription { id: SubscriptionId, duration_days: i64, } impl IntoEvents<SubscriptionEvent> for NewSubscription { fn into_events(self) -> EntityEvents<SubscriptionEvent> { EntityEvents::init( self.id, [SubscriptionEvent::Initialized { id: self.id, expires_at: sim_time::now() + chrono::Duration::days(self.duration_days), }]) } } #[derive(Clone, EsRepo)] #[es_repo(entity = "Subscription", event = "SubscriptionEvent")] struct SubscriptionRepo { pool: sqlx::PgPool, } #[tokio::main] async fn main() -> anyhow::Result<()> { // Setup database connection let db_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| "postgres://localhost/test".to_string()); let pool = sqlx::PgPool::connect(&db_url).await.unwrap(); let repo = SubscriptionRepo { pool }; // Configure time to run 30 days per second let config = sim_time::TimeConfig { realtime: false, simulation: Some(sim_time::SimulationConfig { // Start simulation one year in the past start_at: chrono::Utc::now() - chrono::Duration::days(365), tick_interval_ms: 33, // ~30 ticks per second tick_duration_secs: Duration::from_secs(86400), // 1 day per tick transform_to_realtime: false, }), }; let start_time = chrono::Utc::now() - chrono::Duration::days(365); sim_time::init(config); // Create a subscription that expires in 30 days let subscription = repo.create(NewSubscription { id: SubscriptionId::new(), duration_days: 30, }).await?; // Verify that sim-time is working let created_at = subscription.created_at(); // Verify sim-time is working by checking the entity was created in the simulated year/month assert_eq!(created_at.year(), start_time.year(), "Entity should be created in the simulated year"); assert_eq!(created_at.month(), start_time.month(), "Entity should be created in the simulated month"); // Verify that we're actually in the past (compared to real time) let real_now = chrono::Utc::now(); assert!(created_at < real_now - chrono::Duration::days(300), "Entity creation time should be in the past"); // The subscription should NOT be expired yet (30 days haven't passed in sim time) assert!(!subscription.is_expired(sim_time::now())); // Sleep for 30 simulated days (which takes ~1 real second with this config) sim_time::sleep(Duration::from_secs(30 * 86400)).await; // Check that the subscription is now expired let subscription = repo.find_by_id(subscription.id).await?; assert!(subscription.is_expired(sim_time::now())); Ok(()) }
Best Practices
-
Initialize Early: Call
sim_time::init()
before any other es-entity operations to ensure consistent time handling. -
Use in Tests: The sim-time feature is primarily designed for testing. Consider using conditional compilation to only enable it in test builds:
[dev-dependencies] es-entity = { version = "0.7", features = ["sim-time"] }
-
Consistent Time: All operations within a single database transaction will use the same timestamp, ensuring consistency in your event store.
-
Real-time Transformation: Use
transform_to_realtime: true
when you want to start a simulation in the past and have it automatically switch to real-time when it catches up. This is useful for replaying historical scenarios.