Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

post_process method outside batch consumption DB transaction #201

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## UNRELEASED
- Fix: Fixed handler metric for status:received, status:success in batch consumption
- Feature: Adds post_process method for BatchConsumers to use the ActiveRecord objects after saving records to DB.

# 1.22.5 - 2023-07-18
- Fix: Fixed buffer overflow crash with DB producer.
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,11 @@ class MyBatchConsumer < Deimos::Consumer
# Do something
end
end

def post_process(records)
# ActiveRecord objects that have been used to save to the database.
# They contain primary keys only if associations are involved.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wording is a bit awkward... maybe

This method gets passed the ActiveRecord objects that were saved to the database.
They contain primary keys only if associations were also saved via bulk_import_id.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have been used to I am sorry I should not flex my grammar. I will use your words.

I wanted to give the intent that these are not the active records returned after save. These are the records we created and passing to save the record.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But... aren't they the same thing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misunderstood that rails adds IDs back to the objects for every save.

I tested in rails console and attribute ID is saved to the object only whenobject.save! is called. For other scenarios, object.save doesnot really change the object attribute ID.

end
end
```
#### Saving data to Multiple Database tables
Expand Down
50 changes: 31 additions & 19 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module BatchConsumption
# they are split
# @param payloads [Array<Hash,Deimos::SchemaClass::Record>] Decoded payloads
# @param metadata [Hash] Information about batch, including keys.
# @return [void]
# @return [Array<ActiveRecord::Base>]
def consume_batch(payloads, metadata)
messages = payloads.
zip(metadata[:keys]).
Expand All @@ -31,21 +31,30 @@ def consume_batch(payloads, metadata)
tags = %W(topic:#{metadata[:topic]})

Deimos.instrument('ar_consumer.consume_batch', tags) do
upserts = []
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(tags) do
if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
upserts = if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
end
post_process(upserts)
end
end

protected

# Takes the ActiveRecord objects created during batch consumption and use them after the transaction is complete.
# @param [Array<ActiveRecord::Base>]
# @return [void]
def post_process(_records)
nil
end

# Get the set of attribute names that uniquely identify messages in the
# batch. Requires at least one record.
# The parameters are mutually exclusive. records is used by default implementation.
Expand Down Expand Up @@ -114,45 +123,48 @@ def compact_messages(batch)
# All messages are split into slices containing only unique keys, and
# each slice is handles as its own batch.
# @param messages [Array<Message>] List of messages.
# @return [void]
# @return [Array<ActiveRecord::Base>]
def uncompacted_update(messages)
BatchSlicer.
slice(messages).
each(&method(:update_database))
each(&method(:update_database)).flatten
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should use map instead of each here since we're using the return value.

end

# Perform database operations for a group of messages.
# All messages with payloads are passed to upsert_records.
# All tombstones messages are passed to remove_records.
# @param messages [Array<Message>] List of messages.
# @return [void]
# @return [Array<ActiveRecord::Base>]
def update_database(messages)
# Find all upserted records (i.e. that have a payload) and all
# deleted record (no payload)
removed, upserted = messages.partition(&:tombstone?)

max_db_batch_size = self.class.config[:max_db_batch_size]
upserts = []
if upserted.any?
upserts = if max_db_batch_size
upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) }
else
upsert_records(upserted)
end
end

if removed.any?
if max_db_batch_size
upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) }
removed.each_slice(max_db_batch_size) { |group| remove_records(group) }
else
upsert_records(upserted)
remove_records(removed)
end
end

return if removed.empty?

if max_db_batch_size
removed.each_slice(max_db_batch_size) { |group| remove_records(group) }
else
remove_records(removed)
end
upserts
end

# Upsert any non-deleted records
# @param messages [Array<Message>] List of messages for a group of
# records to either be updated or inserted.
# @return [void]
# @return [Array<ActiveRelation>]
def upsert_records(messages)
record_list = build_records(messages)
record_list.filter!(self.method(:should_consume?).to_proc)
Expand Down
1 change: 1 addition & 0 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def import_associations(record_list)
def mass_update(record_list)
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
record_list.records
end

end
Expand Down
21 changes: 20 additions & 1 deletion spec/active_record_consume/batch_consumption_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
it 'should be called 1 time when record size == max batch size' do
batch_consumer.class.config[:max_db_batch_size] = records.size
expect(batch_consumer).to receive(:upsert_records).once

batch_consumer.send(:update_database, records)
end

Expand Down Expand Up @@ -100,4 +99,24 @@
end
end
end

describe '#consume_batch' do
describe 'post_process in compact mode' do
let(:payload) do
[
{ v: 1 }, { v: 2 }, { v: 3 }, { v: 4 }, { v: 5 }
]
end
let(:metadata) do
{ keys: [1, 2, 3, 4, 5] }
end

it 'should be called once with a flat array of records' do
records = [Object, Object] # should be ActiveRecord object
expect(batch_consumer).to receive(:post_process).once.with(records)
expect(batch_consumer).to receive(:update_database) { records }
batch_consumer.send(:consume_batch, payload, metadata)
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're probably missing some more tests - e.g. with and without max_db_batch_size, and with and without deletions. Probably can just add another expect onto existing tests showing that they return the expected values.

end
end
end
Loading