es_query
The es_query! macro is a helper that allows you only to query the index table without needing to join with the events table.
The expansion of es_query! results in a call to the sqlx::query_as! macro - which means that you still get typesafety and compile time column validation.
Given the query we arrived at in the previous section - this is what a find_by_name fn could look like:
extern crate es_entity;
extern crate sqlx;
extern crate serde;
fn main () {}
use serde::{Deserialize, Serialize};
use es_entity::*;
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
pub struct NewUser { id: UserId, name: String }
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
events: EntityEvents<UserEvent>,
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EntityHydrationError> {
unimplemented!()
}
}
use sqlx::PgPool;
use es_entity::*;
#[derive(Debug)]
pub enum UserError {
Sqlx(sqlx::Error),
HydrationError(EntityHydrationError),
NotFound,
}
impl From<sqlx::Error> for UserError {
fn from(e: sqlx::Error) -> Self { Self::Sqlx(e) }
}
impl From<EntityHydrationError> for UserError {
fn from(e: EntityHydrationError) -> Self { Self::HydrationError(e) }
}
pub struct Users {
pool: PgPool
}
impl Users {
pub async fn find_by_name(&self, name: String) -> Result<User, UserError> {
let rows = sqlx::query_as!(
GenericEvent::<UserId>,
r#"
WITH target_entity AS (
SELECT id
FROM users
WHERE name = $1
)
SELECT e.id as entity_id, e.sequence, e.event, e.context as "context: ContextData", e.recorded_at
FROM user_events e
JOIN target_entity te ON e.id = te.id
ORDER BY e.sequence;
"#,
name,
)
.fetch_all(&self.pool)
.await?;
EntityEvents::load_first(rows.into_iter())?
.ok_or(UserError::NotFound)
}
}
The es_query! macro removes the boilerplate of fetching the events and lets you just write the part that queries the index table:
SELECT id FROM users WHERE name = $1
On expansion it constructs the complete query (adding the JOIN with the events table) and hydrates the entities from the events.
This simplifies the above implementation into:
extern crate es_entity;
extern crate sqlx;
extern crate serde;
fn main () {}
use serde::{Deserialize, Serialize};
use es_entity::*;
es_entity::entity_id! { UserId }
#[derive(EsEvent, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
}
pub struct NewUser { id: UserId, name: String }
impl IntoEvents<UserEvent> for NewUser {
fn into_events(self) -> EntityEvents<UserEvent> {
unimplemented!()
}
}
#[derive(EsEntity)]
pub struct User {
pub id: UserId,
events: EntityEvents<UserEvent>,
}
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EntityHydrationError> {
unimplemented!()
}
}
use sqlx::PgPool;
use es_entity::*;
#[derive(EsRepo)]
#[es_repo(entity = "User")]
pub struct Users {
pool: PgPool
}
impl Users {
pub async fn find_by_name(&self, name: String) -> Result<User, UserFindError> {
es_query!(
"SELECT id FROM users WHERE name = $1",
name
)
.fetch_optional(&self.pool)
.await?
.ok_or_else(|| UserFindError::NotFound {
entity: "User",
column: None,
value: name,
})
}
}
The es_query! macro only works within fns defined on structs with EsRepo derived.
es_query! provides fetch_optional which returns Result<Option<Entity>, QueryError>.
To return a concrete entity (not Option), use ok_or_else to construct a NotFound error with context about what was searched for — the generated FindError type carries the entity name, column, and value:
async fn fetch_optional(<executor>) -> Result<Option<Entity>, Repo::QueryError>
// The `(_, bool)` signifies whether or not the query could have fetched more or the list is exhausted:
async fn fetch_n(<executor>, n) -> Result<(Vec<Entity>, bool), Repo::QueryError>