Skip to content

Commit

Permalink
Optimising codegen using an asynchronous client and pipelining
Browse files Browse the repository at this point in the history
  • Loading branch information
Virgiel committed May 24, 2023
1 parent 5098729 commit be888e4
Show file tree
Hide file tree
Showing 19 changed files with 612 additions and 448 deletions.
903 changes: 517 additions & 386 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ cornucopia_sync = { path = "../crates/client_sync" }
cornucopia_async = { path = "../crates/client_async" }

# benchmarking
criterion = { version = "0.4.0", features = ["html_reports"] }
criterion = { version = "0.5.0", features = ["html_reports"] }

# async
tokio = { version = "1.24.2", features = ["full"] }
Expand Down
2 changes: 1 addition & 1 deletion benches/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ fn bench(c: &mut Criterion) {
cornucopia::container::setup(false).unwrap();
let client = &mut cornucopia_conn().unwrap();

cornucopia::load_schema(client, &["../codegen_test/schema.sql"]).unwrap();
cornucopia::load_schema(client, &["../test_codegen/schema.sql"]).unwrap();
c.bench_function("codegen_sync", |b| {
b.iter(|| {
cornucopia::generate_live(
Expand Down
22 changes: 7 additions & 15 deletions benches/execution/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use cornucopia::conn::cornucopia_conn;
use criterion::{BenchmarkId, Criterion};
use diesel::{Connection, PgConnection};
use postgres::{fallible_iterator::FallibleIterator, Client, NoTls};
use tokio::runtime::Runtime;

const QUERY_SIZE: &[usize] = &[1, 100, 10_000];
const INSERT_SIZE: &[usize] = &[1, 100, 1000];
Expand Down Expand Up @@ -128,23 +127,16 @@ fn prepare_full(client: &mut Client) {
fn bench(c: &mut Criterion) {
cornucopia::container::cleanup(false).ok();
cornucopia::container::setup(false).unwrap();
let client = &mut cornucopia_conn().unwrap();
let rt: &'static Runtime = Box::leak(Box::new(Runtime::new().unwrap()));
let async_client = &mut rt.block_on(async {
let (client, conn) = tokio_postgres::connect(
"postgresql://postgres:postgres@127.0.0.1:5435/postgres",
NoTls,
)
.await
.unwrap();
rt.spawn(conn);
client
});
let client = &mut postgres::Client::connect(
"postgresql://postgres:postgres@127.0.0.1:5435/postgres",
NoTls,
)
.unwrap();
let async_client = &mut cornucopia_conn().unwrap();
let conn =
&mut PgConnection::establish("postgresql://postgres:postgres@127.0.0.1:5435/postgres")
.unwrap();

cornucopia::load_schema(client, &["usage/cornucopia_benches/schema.sql"]).unwrap();
cornucopia::load_schema(async_client, &["execution/cornucopia_benches/schema.sql"]).unwrap();
{
let mut group = c.benchmark_group("bench_trivial_query");
for size in QUERY_SIZE {
Expand Down
6 changes: 5 additions & 1 deletion crates/cornucopia/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ keywords = ["postgresql", "query", "generator", "sql", "tokio-postgres"]
codegen_template = { path = "../codegen_template", version = "0.1.0" }

# Postgres interaction
postgres = "0.19.4"
tokio-postgres = "0.7.8"
postgres-types = "0.2.4"

# Async
tokio = { version = "1.28.1", features = ["rt-multi-thread"] }
futures = "0.3.28"

# Error handling and reporting
thiserror = "1.0.38"
miette = { version = "5.5.0", features = ["fancy"] }
Expand Down
4 changes: 2 additions & 2 deletions crates/cornucopia/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ pub fn run() -> Result<(), Error> {

match action {
Action::Live { url } => {
let mut client = conn::from_url(&url)?;
generate_live(&mut client, &queries_path, Some(&destination), settings)?;
let client = conn::from_url(&url)?;
generate_live(&client, &queries_path, Some(&destination), settings)?;
}
Action::Schema { schema_files } => {
// Run the generate command. If the command is unsuccessful, cleanup Cornucopia's container
Expand Down
4 changes: 2 additions & 2 deletions crates/cornucopia/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ fn gen_params_struct(w: &mut impl Write, params: &PreparedItem, ctx: &GenCtx) {
.map(|p| p.param_ergo_ty(traits, ctx))
.collect::<Vec<_>>();
let fields_name = fields.iter().map(|p| &p.ident.rs);
let traits_idx = (1..=traits.len()).into_iter().map(idx_char);
let traits_idx = (1..=traits.len()).map(idx_char);
code!(w =>
#[derive($copy Debug)]
pub struct $name<$lifetime $($traits_idx: $traits,)> {
Expand Down Expand Up @@ -507,7 +507,7 @@ fn gen_query_fn<W: Write>(w: &mut W, module: &PreparedModule, query: &PreparedQu
.map(|idx| param_field[*idx].param_ergo_ty(traits, ctx))
.collect();
let params_name = order.iter().map(|idx| &param_field[*idx].ident.rs);
let traits_idx = (1..=traits.len()).into_iter().map(idx_char);
let traits_idx = (1..=traits.len()).map(idx_char);
let lazy_impl = |w: &mut W| {
if let Some((idx, index)) = row {
let item = module.rows.get_index(*idx).unwrap().1;
Expand Down
35 changes: 25 additions & 10 deletions crates/cornucopia/src/conn.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,42 @@
use postgres::{Client, Config, NoTls};
use tokio::runtime::Runtime;
use tokio_postgres::{Client, Config, NoTls};

use self::error::Error;

/// Creates a non-TLS connection from a URL.
pub(crate) fn from_url(url: &str) -> Result<Client, Error> {
Ok(Client::connect(url, NoTls)?)
connect(url.parse()?)
}

/// Create a non-TLS connection to the container managed by Cornucopia.
pub fn cornucopia_conn() -> Result<Client, Error> {
Ok(Config::new()
.user("postgres")
.password("postgres")
.host("127.0.0.1")
.port(5435)
.dbname("postgres")
.connect(NoTls)?)
connect(
Config::new()
.user("postgres")
.password("postgres")
.host("127.0.0.1")
.port(5435)
.dbname("postgres")
.clone(),
)
}

fn connect(config: Config) -> Result<Client, Error> {
let rt: &'static Runtime = Box::leak(Box::new(
Runtime::new().expect("Failed to start async Runtime"),
));
let client = rt.block_on(async {
let (client, conn) = config.connect(NoTls).await.unwrap();
rt.spawn(conn);
client
});
Ok(client)
}

pub(crate) mod error {
use miette::Diagnostic;

#[derive(Debug, thiserror::Error, Diagnostic)]
#[error("Couldn't establish a connection with the database.")]
pub struct Error(#[from] pub postgres::Error);
pub struct Error(#[from] pub tokio_postgres::Error);
}
10 changes: 5 additions & 5 deletions crates/cornucopia/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod container;

use std::path::Path;

use postgres::Client;
use tokio_postgres::Client;

use codegen::generate as generate_internal;
use error::WriteOutputError;
Expand All @@ -43,7 +43,7 @@ pub struct CodegenSettings {
/// the generated code will be written at that path. Code generation settings are
/// set using the `settings` parameter.
pub fn generate_live<P: AsRef<Path>>(
client: &mut Client,
client: &Client,
queries_path: P,
destination: Option<P>,
settings: CodegenSettings,
Expand Down Expand Up @@ -84,9 +84,9 @@ pub fn generate_managed<P: AsRef<Path>>(
.map(parse_query_module)
.collect::<Result<_, parser::error::Error>>()?;
container::setup(podman)?;
let mut client = conn::cornucopia_conn()?;
load_schema(&mut client, schema_files)?;
let prepared_modules = prepare(&mut client, modules)?;
let client = conn::cornucopia_conn()?;
load_schema(&client, schema_files)?;
let prepared_modules = prepare(&client, modules)?;
let generated_code = generate_internal(prepared_modules, settings);
container::cleanup(podman)?;

Expand Down
6 changes: 3 additions & 3 deletions crates/cornucopia/src/load_schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::path::Path;

use miette::NamedSource;
use postgres::Client;
use tokio_postgres::Client;

use crate::utils::db_err;

Expand All @@ -10,14 +10,14 @@ use self::error::Error;
/// Loads PostgreSQL schemas into a database.
///
/// Takes a list of file paths as parameter and loads them in their given order.
pub fn load_schema<P: AsRef<Path>>(client: &mut Client, paths: &[P]) -> Result<(), Error> {
pub fn load_schema<P: AsRef<Path>>(client: &Client, paths: &[P]) -> Result<(), Error> {
for path in paths {
let path = path.as_ref();
let sql = std::fs::read_to_string(path).map_err(|err| Error::Io {
path: path.to_string_lossy().to_string(),
err,
})?;
client.batch_execute(&sql).map_err(|err| {
futures::executor::block_on(client.batch_execute(&sql)).map_err(|err| {
let msg = format!("{err:#}");
let src = NamedSource::new(path.to_string_lossy(), sql);
if let Some((position, msg, help)) = db_err(&err) {
Expand Down
40 changes: 29 additions & 11 deletions crates/cornucopia/src/prepare_queries.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::rc::Rc;
use std::{collections::HashMap, rc::Rc};

use futures::{stream::FuturesUnordered, StreamExt};
use heck::ToUpperCamelCase;
use indexmap::{map::Entry, IndexMap};
use postgres::Client;
use postgres_types::{Kind, Type};
use tokio_postgres::{Client, Statement};

use crate::{
codegen::GenCtx,
Expand Down Expand Up @@ -226,7 +227,8 @@ impl PreparedModule {
}

/// Prepares all modules
pub(crate) fn prepare(client: &mut Client, modules: Vec<Module>) -> Result<Preparation, Error> {
pub(crate) fn prepare(client: &Client, modules: Vec<Module>) -> Result<Preparation, Error> {
let stmts = prepare_sql(client, &modules);
let mut registrar = TypeRegistrar::default();
let mut tmp = Preparation {
modules: Vec::new(),
Expand All @@ -240,7 +242,7 @@ pub(crate) fn prepare(client: &mut Client, modules: Vec<Module>) -> Result<Prepa

for module in modules {
tmp.modules
.push(prepare_module(client, module, &mut registrar)?);
.push(prepare_module(&stmts, module, &mut registrar)?);
}

// Prepare types grouped by schema
Expand Down Expand Up @@ -315,9 +317,25 @@ fn prepare_type(
}
}

fn prepare_sql(
client: &Client,
modules: &[Module],
) -> HashMap<String, Result<Statement, tokio_postgres::Error>> {
let queries: FuturesUnordered<_> = modules
.iter()
.flat_map(|m| m.queries.iter().map(|q| q.sql_str.clone()))
.map(|query| async move {
let stmt = client.prepare(&query).await;
(query, stmt)
})
.collect();
let results: HashMap<_, _> = futures::executor::block_on(queries.collect());
results
}

/// Prepares all queries in this module
fn prepare_module(
client: &mut Client,
stmts: &HashMap<String, Result<Statement, tokio_postgres::Error>>,
module: Module,
registrar: &mut TypeRegistrar,
) -> Result<PreparedModule, Error> {
Expand All @@ -332,7 +350,7 @@ fn prepare_module(

for query in module.queries {
prepare_query(
client,
stmts,
&mut tmp_prepared_module,
registrar,
&module.types,
Expand All @@ -348,7 +366,7 @@ fn prepare_module(

/// Prepares a query
fn prepare_query(
client: &mut Client,
stmts: &HashMap<String, Result<Statement, tokio_postgres::Error>>,
module: &mut PreparedModule,
registrar: &mut TypeRegistrar,
types: &[TypeAnnotation],
Expand All @@ -363,9 +381,9 @@ fn prepare_query(
module_info: &ModuleInfo,
) -> Result<(), Error> {
// Prepare the statement
let stmt = client
.prepare(&sql_str)
.map_err(|e| Error::new_db_err(&e, module_info, &sql_span, &name))?;
let stmt = stmts[&sql_str]
.as_ref()
.map_err(|e| Error::new_db_err(e, module_info, &sql_span, &name))?;

let (nullable_params_fields, params_name) = param.name_and_fields(types, &name, Some("Params"));
let (nullable_row_fields, row_name) = row.name_and_fields(types, &name, None);
Expand Down Expand Up @@ -477,7 +495,7 @@ pub(crate) mod error {

impl Error {
pub(crate) fn new_db_err(
err: &postgres::Error,
err: &tokio_postgres::Error,
module_info: &ModuleInfo,
query_span: &SourceSpan,
query_name: &Span<String>,
Expand Down
2 changes: 1 addition & 1 deletion crates/cornucopia/src/type_registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ pub(crate) mod error {
#[derive(Debug, ThisError, Diagnostic)]
#[error("Couldn't register SQL type.")]
pub enum Error {
Db(#[from] postgres::Error),
Db(#[from] tokio_postgres::Error),
UnsupportedPostgresType {
#[source_code]
src: NamedSource,
Expand Down
4 changes: 2 additions & 2 deletions crates/cornucopia/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use indexmap::Equivalent;
use postgres::error::ErrorPosition;
use postgres_types::Type;
use tokio_postgres::error::ErrorPosition;

/// Allows us to query a map using type schema as key without having to own the key strings
#[derive(PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -34,7 +34,7 @@ pub fn find_duplicate<T>(slice: &[T], eq: fn(&T, &T) -> bool) -> Option<(&T, &T)
}

/// Extracts useful info from a `postgres`-generated error.
pub(crate) fn db_err(err: &postgres::Error) -> Option<(u32, String, Option<String>)> {
pub(crate) fn db_err(err: &tokio_postgres::Error) -> Option<(u32, String, Option<String>)> {
if let Some(db_err) = err.as_db_error() {
if let Some(ErrorPosition::Original(position)) = db_err.position() {
Some((
Expand Down
2 changes: 1 addition & 1 deletion crates/cornucopia/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{

use error::Error;
use miette::SourceSpan;
use postgres::Column;
use postgres_types::Type;
use tokio_postgres::Column;

pub(crate) fn duplicate_nullable_ident(
info: &ModuleInfo,
Expand Down
2 changes: 2 additions & 0 deletions test_integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ clap = { version = "4.0.29", features = ["derive"] }

# Postgres interaction
postgres = { version = "0.19.4" }
tokio-postgres = "0.7.8"
futures = "0.3.28"

# serde
## Test fixtures ser/de
Expand Down
2 changes: 1 addition & 1 deletion test_integration/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{env::set_current_dir, process::Command};

// Run codegen test, return true if all test are successful
pub(crate) fn run_codegen_test(
client: &mut postgres::Client,
client: &tokio_postgres::Client,
apply: bool,
) -> Result<bool, Box<dyn std::error::Error>> {
let mut successful = true;
Expand Down
2 changes: 1 addition & 1 deletion test_integration/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{

/// Run errors test, return true if all test are successful
pub(crate) fn run_errors_test(
client: &mut postgres::Client,
client: &tokio_postgres::Client,
apply: bool,
) -> Result<bool, Box<dyn std::error::Error>> {
let mut successful = true;
Expand Down
6 changes: 3 additions & 3 deletions test_integration/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ fn test(
container::cleanup(podman).ok();
container::setup(podman).unwrap();
let successful = std::panic::catch_unwind(|| {
let mut client = cornucopia::conn::cornucopia_conn().unwrap();
display(run_errors_test(&mut client, apply_errors)).unwrap()
&& display(run_codegen_test(&mut client, apply_codegen)).unwrap()
let client = cornucopia::conn::cornucopia_conn().unwrap();
display(run_errors_test(&client, apply_errors)).unwrap()
&& display(run_codegen_test(&client, apply_codegen)).unwrap()
});
container::cleanup(podman).unwrap();
successful.unwrap()
Expand Down
6 changes: 4 additions & 2 deletions test_integration/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use std::{
};

/// Reset the current database
pub(crate) fn reset_db(client: &mut postgres::Client) -> Result<(), postgres::Error> {
client.batch_execute("DROP SCHEMA public CASCADE;CREATE SCHEMA public;")
pub(crate) fn reset_db(client: &tokio_postgres::Client) -> Result<(), postgres::Error> {
futures::executor::block_on(
client.batch_execute("DROP SCHEMA public CASCADE;CREATE SCHEMA public;"),
)
}

pub(crate) fn rustfmt_file(path: &Path) {
Expand Down

0 comments on commit be888e4

Please sign in to comment.