-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
post_process method outside batch consumption DB transaction #201
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ module BatchConsumption | |
# they are split | ||
# @param payloads [Array<Hash,Deimos::SchemaClass::Record>] Decoded payloads | ||
# @param metadata [Hash] Information about batch, including keys. | ||
# @return [void] | ||
# @return [Array<ActiveRecord::Base>] | ||
def consume_batch(payloads, metadata) | ||
messages = payloads. | ||
zip(metadata[:keys]). | ||
|
@@ -31,21 +31,30 @@ def consume_batch(payloads, metadata) | |
tags = %W(topic:#{metadata[:topic]}) | ||
|
||
Deimos.instrument('ar_consumer.consume_batch', tags) do | ||
upserts = [] | ||
# The entire batch should be treated as one transaction so that if | ||
# any message fails, the whole thing is rolled back or retried | ||
# if there is deadlock | ||
Deimos::Utils::DeadlockRetry.wrap(tags) do | ||
if @compacted || self.class.config[:no_keys] | ||
update_database(compact_messages(messages)) | ||
else | ||
uncompacted_update(messages) | ||
end | ||
upserts = if @compacted || self.class.config[:no_keys] | ||
update_database(compact_messages(messages)) | ||
else | ||
uncompacted_update(messages) | ||
end | ||
end | ||
post_process(upserts) | ||
end | ||
end | ||
|
||
protected | ||
|
||
# Takes the ActiveRecord objects created during batch consumption and use them after the transaction is complete. | ||
# @param [Array<ActiveRecord::Base>] | ||
# @return [void] | ||
def post_process(_records) | ||
nil | ||
end | ||
|
||
# Get the set of attribute names that uniquely identify messages in the | ||
# batch. Requires at least one record. | ||
# The parameters are mutually exclusive. records is used by default implementation. | ||
|
@@ -114,45 +123,48 @@ def compact_messages(batch) | |
# All messages are split into slices containing only unique keys, and | ||
# each slice is handles as its own batch. | ||
# @param messages [Array<Message>] List of messages. | ||
# @return [void] | ||
# @return [Array<ActiveRecord::Base>] | ||
def uncompacted_update(messages) | ||
BatchSlicer. | ||
slice(messages). | ||
each(&method(:update_database)) | ||
each(&method(:update_database)).flatten | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably should use |
||
end | ||
|
||
# Perform database operations for a group of messages. | ||
# All messages with payloads are passed to upsert_records. | ||
# All tombstones messages are passed to remove_records. | ||
# @param messages [Array<Message>] List of messages. | ||
# @return [void] | ||
# @return [Array<ActiveRecord::Base>] | ||
def update_database(messages) | ||
# Find all upserted records (i.e. that have a payload) and all | ||
# deleted record (no payload) | ||
removed, upserted = messages.partition(&:tombstone?) | ||
|
||
max_db_batch_size = self.class.config[:max_db_batch_size] | ||
upserts = [] | ||
if upserted.any? | ||
upserts = if max_db_batch_size | ||
upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } | ||
else | ||
upsert_records(upserted) | ||
end | ||
end | ||
|
||
if removed.any? | ||
if max_db_batch_size | ||
upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } | ||
removed.each_slice(max_db_batch_size) { |group| remove_records(group) } | ||
else | ||
upsert_records(upserted) | ||
remove_records(removed) | ||
end | ||
end | ||
|
||
return if removed.empty? | ||
|
||
if max_db_batch_size | ||
removed.each_slice(max_db_batch_size) { |group| remove_records(group) } | ||
else | ||
remove_records(removed) | ||
end | ||
upserts | ||
end | ||
|
||
# Upsert any non-deleted records | ||
# @param messages [Array<Message>] List of messages for a group of | ||
# records to either be updated or inserted. | ||
# @return [void] | ||
# @return [Array<ActiveRelation>] | ||
def upsert_records(messages) | ||
record_list = build_records(messages) | ||
record_list.filter!(self.method(:should_consume?).to_proc) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,6 @@ | |
it 'should be called 1 time when record size == max batch size' do | ||
batch_consumer.class.config[:max_db_batch_size] = records.size | ||
expect(batch_consumer).to receive(:upsert_records).once | ||
|
||
batch_consumer.send(:update_database, records) | ||
end | ||
|
||
|
@@ -100,4 +99,24 @@ | |
end | ||
end | ||
end | ||
|
||
describe '#consume_batch' do | ||
describe 'post_process in compact mode' do | ||
let(:payload) do | ||
[ | ||
{ v: 1 }, { v: 2 }, { v: 3 }, { v: 4 }, { v: 5 } | ||
] | ||
end | ||
let(:metadata) do | ||
{ keys: [1, 2, 3, 4, 5] } | ||
end | ||
|
||
it 'should be called once with a flat array of records' do | ||
records = [Object, Object] # should be ActiveRecord object | ||
expect(batch_consumer).to receive(:post_process).once.with(records) | ||
expect(batch_consumer).to receive(:update_database) { records } | ||
batch_consumer.send(:consume_batch, payload, metadata) | ||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we're probably missing some more tests - e.g. with and without max_db_batch_size, and with and without deletions. Probably can just add another |
||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wording is a bit awkward... maybe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have been used to
I am sorry I should not flex my grammar. I will use your words.I wanted to give the intent that these are not the active records returned after save. These are the records we created and passing to save the record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But... aren't they the same thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I misunderstood that rails adds IDs back to the objects for every save.
I tested in rails console and attribute ID is saved to the object only when
object.save!
is called. For other scenarios,object.save
doesnot really change the object attribute ID.