Skip to content

Commit

Permalink
Integrate latest flume api, fix bug with contacts update.
Browse files Browse the repository at this point in the history
  • Loading branch information
pietgeursen committed Feb 15, 2019
1 parent 1666fb1 commit 40b6c96
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 30 deletions.
16 changes: 11 additions & 5 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ private-box = "0.4.5"
base64 = "0.10.0"
itertools = "0.8.0"
node_napi = { git = "https://github.com/sunrise-choir/node-napi" }
flumedb = { git = "https://github.com/sunrise-choir/flumedb-rs" }
flumedb = { git = "https://github.com/sunrise-choir/flumedb-rs", version = "0.1.1" }

[dependencies.rusqlite]
version = "0.16.0"
Expand Down
17 changes: 8 additions & 9 deletions native/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ extern crate ssb_sql_napi;

use base64::{decode, encode};
use flumedb::flume_log::FlumeLog;
use flumedb::offset_log::OffsetCodec;
use flumedb::offset_log::OffsetLogIter;
use itertools::Itertools;
use private_box::SecretKey;
Expand All @@ -30,9 +29,9 @@ fn create_test_db(num_entries: usize, offset_filename: &str, db_filename: &str)

let file = std::fs::File::open(offset_filename.to_string()).unwrap();

OffsetLogIter::<u32, std::fs::File>::new(file)
OffsetLogIter::<u32>::new(file)
.take(num_entries)
.map(|data| (data.id, data.data_buffer))
.map(|data| (data.offset, data.data))
.chunks(NUM_ENTRIES as usize)
.into_iter()
.for_each(|chunk| {
Expand All @@ -53,8 +52,8 @@ fn flume_view_sql_insert_piets_entire_log(c: &mut Criterion) {
let mut view = FlumeViewSql::new(db_filename, keys, "").unwrap();

let file = std::fs::File::open(offset_filename.to_string()).unwrap();
OffsetLogIter::<u32, std::fs::File>::new(file)
.map(|data| (data.id, data.data_buffer))
OffsetLogIter::<u32>::new(file)
.map(|data| (data.offset, data.data))
.chunks(NUM_ENTRIES as usize)
.into_iter()
.for_each(|chunk| {
Expand Down Expand Up @@ -85,8 +84,8 @@ fn flume_view_sql_insert_piets_entire_log_with_decryption(c: &mut Criterion) {
let mut view = FlumeViewSql::new(db_filename, keys, "").unwrap();

let file = std::fs::File::open(offset_filename.to_string()).unwrap();
OffsetLogIter::<u32, std::fs::File>::new(file)
.map(|data| (data.id, data.data_buffer))
OffsetLogIter::<u32>::new(file)
.map(|data| (data.offset, data.data))
.chunks(NUM_ENTRIES as usize)
.into_iter()
.for_each(|chunk| {
Expand All @@ -111,9 +110,9 @@ fn flume_view_sql_insert(c: &mut Criterion) {

//TODO: this is ok for a benchmark but uses lots of memory.
//Better would be to create a transaction and then append in a for_each loop.
OffsetLogIter::<u32, std::fs::File>::new(file)
OffsetLogIter::<u32>::new(file)
.take(NUM_ENTRIES as usize)
.map(|data| (data.id, data.data_buffer))
.map(|data| (data.offset, data.data))
.chunks(NUM_ENTRIES as usize)
.into_iter()
.for_each(|chunk| {
Expand Down
33 changes: 23 additions & 10 deletions native/src/flume_view_sql/contacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ pub fn insert_or_update_contacts(
is_decrypted: bool,
) {
if let Value::String(contact) = &message.value.content["contact"] {
//Ok what should this do:
// - if the record already exists
// - delete it if the new state is zero (this should only happen when record already
// exists because you can't unfollow someone you already don't follow.

This comment has been minimized.

Copy link
@black-puppydog

black-puppydog Feb 18, 2019

%fZ8stm8O52Ah+LVAr6gtPJaugJXkcq1V6YG8ULBBzcI=.sha256

That's an unfollow of a not-followed feed. I don't see any DELETE in that file, so I guess that part isn't implemented yet? Might be worth it to catch this corner case, now that at least one example exists in the wild 😛

// - update it if the new state is 1 or -1
// - else create the new record. State should be a 1 or a -1
let is_blocking = message.value.content["blocking"].as_bool().unwrap_or(false);
let is_following = message.value.content["following"]
.as_bool()
Expand All @@ -38,20 +44,27 @@ pub fn insert_or_update_contacts(
};

let author_id = find_or_create_author(&connection, &message.value.author).unwrap();
let mut insert_contacts_stmt = connection
.prepare_cached("REPLACE INTO contacts_raw (author_id, contact_author_id, state, is_decrypted) VALUES (?, ?, ?, ?)")
.unwrap();
let contact_author_id = find_or_create_author(&connection, contact).unwrap();

insert_contacts_stmt
.execute(&[
&author_id,
&contact_author_id,
&state,
&is_decrypted as &ToSql,
])
let mut stmt = connection.prepare_cached("SELECT id FROM contacts_raw WHERE author_id = ? AND contact_author_id = ? AND is_decrypted = ?").unwrap();

stmt.query_row(&[&author_id, &contact_author_id, &is_decrypted as &ToSql], |row| row.get(0))
.and_then(|id: i64|{
//Row exists so update
connection
.prepare_cached("UPDATE contacts_raw SET state = ? WHERE id = ?")
.map(|mut stmt| stmt.execute(&[&state, &id]))
})
.or_else(|_| {
//Row didn't exist so insert
connection
.prepare_cached("INSERT INTO contacts_raw (author_id, contact_author_id, is_decrypted, state) VALUES (?, ?, ?, ?)")
.map(|mut stmt| stmt.execute(&[&author_id, &contact_author_id, &is_decrypted as &ToSql, &state]))
})
.unwrap()
.unwrap();
}

}

pub fn create_contacts_indices(connection: &Connection) -> Result<usize, Error> {
Expand Down
1 change: 1 addition & 0 deletions native/src/flume_view_sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ fn append_item(

let message_key_id = find_or_create_key(&connection, &message.key).unwrap();

// votes are a kind of backlink, but we want to put them in their own table.
match &message.value.content["type"] {
Value::String(type_string) if type_string == "vote" => {
insert_or_update_votes(connection, &message);
Expand Down
4 changes: 2 additions & 2 deletions native/src/flume_view_sql/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ mod test {

let file = std::fs::File::open(offset_filename.to_string()).unwrap();

OffsetLogIter::<u32, std::fs::File>::new(file)
OffsetLogIter::<u32>::new(file)
.take(num_entries)
.map(|data| (data.id, data.data_buffer))
.map(|data| (data.offset, data.data))
.chunks(1000 as usize)
.into_iter()
.for_each(|chunk| {
Expand Down
2 changes: 1 addition & 1 deletion native/src/flume_view_sql/votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn insert_or_update_votes(connection: &Connection, message: &SsbMessage) {

if value.as_i64().unwrap() == 1 {
connection
.prepare_cached("REPLACE INTO votes_raw (link_from_author_id, link_to_key_id) VALUES (?, ?)")
.prepare_cached("INSERT INTO votes_raw (link_from_author_id, link_to_key_id) VALUES (?, ?)")
.unwrap()
.execute(&[&author_id, &link_to_key_id])
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ impl SsbQuery {
n => n as usize,
};

OffsetLogIter::<u32, std::fs::File>::with_starting_offset(file, latest)
OffsetLogIter::<u32>::with_starting_offset(file, latest)
.skip(num_to_skip)
.take(items_to_take)
.map(|data| (data.id + latest, data.data_buffer)) //TODO log_latest might not be the right thing
.map(|data| (data.offset + latest, data.data)) //TODO log_latest might not be the right thing
.chunks(1000)
.into_iter()
.for_each(|chunk| {
Expand Down

0 comments on commit 40b6c96

Please sign in to comment.