Skip to content

Commit

Permalink
CCOL-2039: Post process valid and invalid records from batch consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
Lionel Pereira committed Nov 30, 2023
1 parent 92437aa commit f2386fa
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 24 deletions.
78 changes: 55 additions & 23 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@ def consume_batch(payloads, metadata)
tags = %W(topic:#{metadata[:topic]})

Deimos.instrument('ar_consumer.consume_batch', tags) do
valid_upserts = []
invalid_upserts = []
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(tags) do
if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
valid_upserts, invalid_upserts = if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
end
post_process(valid_upserts, invalid_upserts)
end
end

Expand Down Expand Up @@ -114,50 +117,65 @@ def compact_messages(batch)
# All messages are split into slices containing only unique keys, and
# each slice is handles as its own batch.
# @param messages [Array<Message>] List of messages.
# @return [void]
# @return [Array<Array<ActiveRecord::Base>, Array<BatchRecord>>]
def uncompacted_update(messages)
valid_records = []
invalid_records = []
BatchSlicer.
slice(messages).
each(&method(:update_database))
slice(messages).each do |slice|
valid, invalid = update_database(slice)
valid_records.push(*valid) if valid.any?
invalid_records.push(*invalid) if invalid.any?
end
[valid_records, invalid_records]
end

# Perform database operations for a group of messages.
# All messages with payloads are passed to upsert_records.
# All tombstones messages are passed to remove_records.
# @param messages [Array<Message>] List of messages.
# @return [void]
# @return [Array<Array<ActiveRecord::Base>, Array<BatchRecord>>]
def update_database(messages)
# Find all upserted records (i.e. that have a payload) and all
# deleted record (no payload)
removed, upserted = messages.partition(&:tombstone?)

max_db_batch_size = self.class.config[:max_db_batch_size]
valid_upserts = []
invalid_upserts = []
if upserted.any?
valid_upserts, invalid_upserts = if max_db_batch_size
upserted.each_slice(max_db_batch_size) do |group|
valid, invalid = upsert_records(group)
valid_upserts.push(*valid)
invalid_upserts.push(*invalid)
end
valid_upserts.compact!
invalid_upserts.compact!
else
upsert_records(upserted)
end
end

if removed.any?
if max_db_batch_size
upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) }
removed.each_slice(max_db_batch_size) { |group| remove_records(group) }
else
upsert_records(upserted)
remove_records(removed)
end
end

return if removed.empty?

if max_db_batch_size
removed.each_slice(max_db_batch_size) { |group| remove_records(group) }
else
remove_records(removed)
end
[valid_upserts, invalid_upserts]
end

# Upsert any non-deleted records
# @param messages [Array<Message>] List of messages for a group of
# records to either be updated or inserted.
# @return [void]
# @return [Array<Array<ActiveRecord::Base>, Array<BatchRecord>]
def upsert_records(messages)
record_list = build_records(messages)
record_list.filter!(self.method(:should_consume?).to_proc)
invalid = filter_records(record_list)

return if record_list.empty?
return [[], invalid] if record_list.empty?

key_col_proc = self.method(:key_columns).to_proc
col_proc = self.method(:columns).to_proc
Expand All @@ -166,7 +184,13 @@ def upsert_records(messages)
key_col_proc: key_col_proc,
col_proc: col_proc,
replace_associations: self.class.config[:replace_associations])
updater.mass_update(record_list)
[updater.mass_update(record_list), invalid]
end

# @param record_list [BatchRecordList]
# @return [Array<BatchRecord>]
def filter_records(record_list)
record_list.partition!(self.method(:should_consume?).to_proc)
end

# @param messages [Array<Deimos::Message>]
Expand Down Expand Up @@ -194,6 +218,14 @@ def build_records(messages)
BatchRecordList.new(records.compact)
end

# Additional processing after records have been successfully upserted
# @param _valid_records [Array<ActiveRecord>] Records to be post processed
# @param _invalid_records [Array<BatchRecord>] Invalid records to be processed
# @return [void]
def post_process(_valid_records, _invalid_records)
nil
end

# Delete any records with a tombstone.
# @param messages [Array<Message>] List of messages for a group of
# deleted records.
Expand Down
23 changes: 23 additions & 0 deletions lib/deimos/active_record_consume/batch_record_list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,29 @@ def filter!(method)
self.batch_records.delete_if { |record| !method.call(record.record) }
end

# Filter and return removed invalid batch records by the specified method or block
# @param method [Proc]
# @param block [Proc]
# @return [Array<BatchRecord>]
def partition!(method=nil, &block)
valid, invalid = if method.nil?
self.batch_records.partition(&block)
else
case method.parameters.size
when 2
self.batch_records.partition do |record|
method.call(record.record, record.associations)
end
else
self.batch_records.partition do |record|
method.call(record.record)
end
end
end
self.batch_records = valid
invalid
end

# Get the original ActiveRecord objects.
# @return [Array<ActiveRecord::Base>]
def records
Expand Down
2 changes: 2 additions & 0 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ def import_associations(record_list)
end

# @param record_list [BatchRecordList]
# @return [Array<ActiveRecord::Base>]
def mass_update(record_list)
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
record_list.records
end

end
Expand Down
139 changes: 139 additions & 0 deletions spec/active_record_batch_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -493,5 +493,144 @@ def record_attributes(payload, key)
end
end

describe 'post processing' do

context 'with uncompacted messages' do
let(:consumer_class) do
Class.new(described_class) do
schema 'MySchema'
namespace 'com.my-namespace'
key_config plain: true
record_class Widget
compacted false

def should_consume?(record)
record.some_int.even?
end

def post_process(valid, invalid)
# Success
Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3))

# Invalid
Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any?
end

end
end

it 'should process successful and failed records' do
Widget.create!(id: 1, test_id: 'abc', some_int: 1)
Widget.create!(id: 2, test_id: 'def', some_int: 2)

publish_batch(
[
{ key: 1,
payload: { test_id: 'abc', some_int: 11 } },
{ key: 2,
payload: { test_id: 'def', some_int: 20 } }
]
)

widget_one, widget_two, widget_three, widget_four = Widget.all.to_a

expect(widget_one.some_int).to eq(1)
expect(widget_two.some_int).to eq(20)
expect(widget_three.some_int).to eq(2000)
expect(widget_three.test_id).to eq(widget_two.test_id)
expect(widget_four.some_int).to eq(11)
expect(widget_four.test_id).to eq(widget_one.test_id)
end
end

context 'with post processing' do
let(:consumer_class) do
Class.new(described_class) do
schema 'MySchema'
namespace 'com.my-namespace'
key_config plain: true
record_class Widget
compacted true

def should_consume?(record)
record.some_int.even?
end

def post_process(valid, invalid)
# Success
Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3))

# Invalid
Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any?
end

end
end

it 'should process successful and failed records' do
Widget.create!(id: 1, test_id: 'abc', some_int: 1)
Widget.create!(id: 2, test_id: 'def', some_int: 2)

publish_batch(
[
{ key: 1,
payload: { test_id: 'abc', some_int: 11 } },
{ key: 2,
payload: { test_id: 'def', some_int: 20 } }
]
)

widget_one, widget_two, widget_three, widget_four = Widget.all.to_a

expect(widget_one.some_int).to eq(1)
expect(widget_two.some_int).to eq(20)
expect(widget_three.some_int).to eq(2000)
expect(widget_three.test_id).to eq(widget_two.test_id)
expect(widget_four.some_int).to eq(11)
expect(widget_four.test_id).to eq(widget_one.test_id)
end
end

context 'with post processing errors' do
let(:consumer_class) do
Class.new(described_class) do
schema 'MySchema'
namespace 'com.my-namespace'
key_config plain: true
record_class Widget
compacted false

def post_process(_, _)
raise StandardError, 'Something went wrong'
end

end
end

it 'should save records if an exception occurs in post processing' do
Widget.create!(id: 1, test_id: 'abc', some_int: 1)
Widget.create!(id: 2, test_id: 'def', some_int: 2)

expect {
publish_batch(
[
{ key: 1,
payload: { test_id: 'abc', some_int: 11 } },
{ key: 2,
payload: { test_id: 'def', some_int: 20 } }
]
)
}.to raise_error(StandardError, 'Something went wrong')

widget_one, widget_two = Widget.all.to_a

expect(widget_one.some_int).to eq(11)
expect(widget_two.some_int).to eq(20)

end
end

end

end
end
4 changes: 3 additions & 1 deletion spec/active_record_consume/mass_updater_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@
end

it 'should mass update the batch' do
described_class.new(Widget).mass_update(batch)
results = described_class.new(Widget).mass_update(batch)
expect(results.count).to eq(2)
expect(results.map(&:test_id)).to match(%w(id1 id2))
expect(Widget.count).to eq(2)
expect(Detail.count).to eq(2)
expect(Widget.first.detail).not_to be_nil
Expand Down

0 comments on commit f2386fa

Please sign in to comment.