From 40b6c96cde9ea524b531a85a2f5caa351bbb6502 Mon Sep 17 00:00:00 2001 From: Piet Geursen Date: Fri, 15 Feb 2019 17:05:04 +1300 Subject: [PATCH] Integrate latest flume api, fix bug with contacts update. --- native/Cargo.lock | 16 +++++++++---- native/Cargo.toml | 2 +- native/benches/bench.rs | 17 +++++++------- native/src/flume_view_sql/contacts.rs | 33 +++++++++++++++++++-------- native/src/flume_view_sql/mod.rs | 1 + native/src/flume_view_sql/queries.rs | 4 ++-- native/src/flume_view_sql/votes.rs | 2 +- native/src/lib.rs | 4 ++-- 8 files changed, 49 insertions(+), 30 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 3e101f7..cdeb495 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -84,6 +84,11 @@ name = "bitflags" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "buffered_offset_reader" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "byteorder" version = "1.2.7" @@ -314,9 +319,10 @@ dependencies = [ [[package]] name = "flumedb" -version = "0.1.0" -source = "git+https://github.com/sunrise-choir/flumedb-rs#d2a13524a6d7b08746f50f08ecff177acec542ba" +version = "0.1.1" +source = "git+https://github.com/sunrise-choir/flumedb-rs#73a443ae2de99235d8f970667577ae5a17c758f4" dependencies = [ + "buffered_offset_reader 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -325,7 +331,6 @@ dependencies = [ "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.84 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.34 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1031,7 +1036,7 @@ dependencies = [ "env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "failure_derive 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", - "flumedb 0.1.0 (git+https://github.com/sunrise-choir/flumedb-rs)", + "flumedb 0.1.1 (git+https://github.com/sunrise-choir/flumedb-rs)", "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-tcp-server 10.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1457,6 +1462,7 @@ dependencies = [ "checksum backtrace-sys 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "3fcce89e5ad5c8949caa9434501f7b55415b3e7ad5270cb88c75a8d35e8f1279" "checksum base64 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "621fc7ecb8008f86d7fb9b95356cd692ce9514b80a86d85b397f32a22da7b9e2" "checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12" +"checksum buffered_offset_reader 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ab6593d7a0e248ac2692fcc2f98e231548a33635da5682a078a7f1d0cd64c0b1" "checksum byteorder 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "94f88df23a25417badc922ab0f5716cc1330e87f71ddd9203b3a3ccd9cedf75d" "checksum bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "40ade3d27603c2cb345eb0912aec461a6dec7e06a4ae48589904e808335c7afa" "checksum cast 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "926013f2860c46252efceabb19f4a6b308197505082c609025aa6706c011d427" @@ -1480,7 +1486,7 @@ dependencies = [ "checksum env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "afb070faf94c85d17d50ca44f6ad076bce18ae92f0037d350947240a36e9d42e" "checksum failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6dd377bcc1b1b7ce911967e3ec24fa19c3224394ec05b54aa7b083d498341ac7" "checksum failure_derive 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "64c2d913fe8ed3b6c6518eedf4538255b989945c14c2a7d5cbff62a5e2120596" -"checksum flumedb 0.1.0 (git+https://github.com/sunrise-choir/flumedb-rs)" = "" +"checksum flumedb 0.1.1 (git+https://github.com/sunrise-choir/flumedb-rs)" = "" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum fuchsia-cprng 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "81f7f8eb465745ea9b02e2704612a9946a59fa40572086c6fd49d6ddcf30bf31" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" diff --git a/native/Cargo.toml b/native/Cargo.toml index 6d086ae..8b717a8 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -22,7 +22,7 @@ private-box = "0.4.5" base64 = "0.10.0" itertools = "0.8.0" node_napi = { git = "https://github.com/sunrise-choir/node-napi" } -flumedb = { git = "https://github.com/sunrise-choir/flumedb-rs" } +flumedb = { git = "https://github.com/sunrise-choir/flumedb-rs", version = "0.1.1" } [dependencies.rusqlite] version = "0.16.0" diff --git a/native/benches/bench.rs b/native/benches/bench.rs index 27e058b..0a20150 100644 --- a/native/benches/bench.rs +++ b/native/benches/bench.rs @@ -14,7 +14,6 @@ extern crate ssb_sql_napi; use base64::{decode, encode}; use flumedb::flume_log::FlumeLog; -use flumedb::offset_log::OffsetCodec; use flumedb::offset_log::OffsetLogIter; use itertools::Itertools; use private_box::SecretKey; @@ -30,9 +29,9 @@ fn create_test_db(num_entries: usize, offset_filename: &str, db_filename: &str) let file = std::fs::File::open(offset_filename.to_string()).unwrap(); - OffsetLogIter::::new(file) + OffsetLogIter::::new(file) .take(num_entries) - .map(|data| (data.id, data.data_buffer)) + .map(|data| (data.offset, data.data)) .chunks(NUM_ENTRIES as usize) .into_iter() .for_each(|chunk| { @@ -53,8 +52,8 @@ fn flume_view_sql_insert_piets_entire_log(c: &mut Criterion) { let mut view = FlumeViewSql::new(db_filename, keys, "").unwrap(); let file = std::fs::File::open(offset_filename.to_string()).unwrap(); - OffsetLogIter::::new(file) - .map(|data| (data.id, data.data_buffer)) + OffsetLogIter::::new(file) + .map(|data| (data.offset, data.data)) .chunks(NUM_ENTRIES as usize) .into_iter() .for_each(|chunk| { @@ -85,8 +84,8 @@ fn flume_view_sql_insert_piets_entire_log_with_decryption(c: &mut Criterion) { let mut view = FlumeViewSql::new(db_filename, keys, "").unwrap(); let file = std::fs::File::open(offset_filename.to_string()).unwrap(); - OffsetLogIter::::new(file) - .map(|data| (data.id, data.data_buffer)) + OffsetLogIter::::new(file) + .map(|data| (data.offset, data.data)) .chunks(NUM_ENTRIES as usize) .into_iter() .for_each(|chunk| { @@ -111,9 +110,9 @@ fn flume_view_sql_insert(c: &mut Criterion) { //TODO: this is ok for a benchmark but uses lots of memory. //Better would be to create a transaction and then append in a for_each loop. - OffsetLogIter::::new(file) + OffsetLogIter::::new(file) .take(NUM_ENTRIES as usize) - .map(|data| (data.id, data.data_buffer)) + .map(|data| (data.offset, data.data)) .chunks(NUM_ENTRIES as usize) .into_iter() .for_each(|chunk| { diff --git a/native/src/flume_view_sql/contacts.rs b/native/src/flume_view_sql/contacts.rs index 1d324b1..409015a 100644 --- a/native/src/flume_view_sql/contacts.rs +++ b/native/src/flume_view_sql/contacts.rs @@ -25,6 +25,12 @@ pub fn insert_or_update_contacts( is_decrypted: bool, ) { if let Value::String(contact) = &message.value.content["contact"] { + //Ok what should this do: + // - if the record already exists + // - delete it if the new state is zero (this should only happen when record already + // exists because you can't unfollow someone you already don't follow. + // - update it if the new state is 1 or -1 + // - else create the new record. State should be a 1 or a -1 let is_blocking = message.value.content["blocking"].as_bool().unwrap_or(false); let is_following = message.value.content["following"] .as_bool() @@ -38,20 +44,27 @@ pub fn insert_or_update_contacts( }; let author_id = find_or_create_author(&connection, &message.value.author).unwrap(); - let mut insert_contacts_stmt = connection - .prepare_cached("REPLACE INTO contacts_raw (author_id, contact_author_id, state, is_decrypted) VALUES (?, ?, ?, ?)") - .unwrap(); let contact_author_id = find_or_create_author(&connection, contact).unwrap(); - insert_contacts_stmt - .execute(&[ - &author_id, - &contact_author_id, - &state, - &is_decrypted as &ToSql, - ]) + let mut stmt = connection.prepare_cached("SELECT id FROM contacts_raw WHERE author_id = ? AND contact_author_id = ? AND is_decrypted = ?").unwrap(); + + stmt.query_row(&[&author_id, &contact_author_id, &is_decrypted as &ToSql], |row| row.get(0)) + .and_then(|id: i64|{ + //Row exists so update + connection + .prepare_cached("UPDATE contacts_raw SET state = ? WHERE id = ?") + .map(|mut stmt| stmt.execute(&[&state, &id])) + }) + .or_else(|_| { + //Row didn't exist so insert + connection + .prepare_cached("INSERT INTO contacts_raw (author_id, contact_author_id, is_decrypted, state) VALUES (?, ?, ?, ?)") + .map(|mut stmt| stmt.execute(&[&author_id, &contact_author_id, &is_decrypted as &ToSql, &state])) + }) + .unwrap() .unwrap(); } + } pub fn create_contacts_indices(connection: &Connection) -> Result { diff --git a/native/src/flume_view_sql/mod.rs b/native/src/flume_view_sql/mod.rs index aaa6890..329e351 100644 --- a/native/src/flume_view_sql/mod.rs +++ b/native/src/flume_view_sql/mod.rs @@ -258,6 +258,7 @@ fn append_item( let message_key_id = find_or_create_key(&connection, &message.key).unwrap(); + // votes are a kind of backlink, but we want to put them in their own table. match &message.value.content["type"] { Value::String(type_string) if type_string == "vote" => { insert_or_update_votes(connection, &message); diff --git a/native/src/flume_view_sql/queries.rs b/native/src/flume_view_sql/queries.rs index cdcca58..1cd40a6 100644 --- a/native/src/flume_view_sql/queries.rs +++ b/native/src/flume_view_sql/queries.rs @@ -103,9 +103,9 @@ mod test { let file = std::fs::File::open(offset_filename.to_string()).unwrap(); - OffsetLogIter::::new(file) + OffsetLogIter::::new(file) .take(num_entries) - .map(|data| (data.id, data.data_buffer)) + .map(|data| (data.offset, data.data)) .chunks(1000 as usize) .into_iter() .for_each(|chunk| { diff --git a/native/src/flume_view_sql/votes.rs b/native/src/flume_view_sql/votes.rs index cd8cf67..11134cc 100644 --- a/native/src/flume_view_sql/votes.rs +++ b/native/src/flume_view_sql/votes.rs @@ -23,7 +23,7 @@ pub fn insert_or_update_votes(connection: &Connection, message: &SsbMessage) { if value.as_i64().unwrap() == 1 { connection - .prepare_cached("REPLACE INTO votes_raw (link_from_author_id, link_to_key_id) VALUES (?, ?)") + .prepare_cached("INSERT INTO votes_raw (link_from_author_id, link_to_key_id) VALUES (?, ?)") .unwrap() .execute(&[&author_id, &link_to_key_id]) .unwrap(); diff --git a/native/src/lib.rs b/native/src/lib.rs index c6e6647..8e36705 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -74,10 +74,10 @@ impl SsbQuery { n => n as usize, }; - OffsetLogIter::::with_starting_offset(file, latest) + OffsetLogIter::::with_starting_offset(file, latest) .skip(num_to_skip) .take(items_to_take) - .map(|data| (data.id + latest, data.data_buffer)) //TODO log_latest might not be the right thing + .map(|data| (data.offset + latest, data.data)) //TODO log_latest might not be the right thing .chunks(1000) .into_iter() .for_each(|chunk| {