diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index dee466f8..676dc04b 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -31,16 +31,19 @@ def consume_batch(payloads, metadata) tags = %W(topic:#{metadata[:topic]}) Deimos.instrument('ar_consumer.consume_batch', tags) do + valid_upserts = [] + invalid_upserts = [] # The entire batch should be treated as one transaction so that if # any message fails, the whole thing is rolled back or retried # if there is deadlock Deimos::Utils::DeadlockRetry.wrap(tags) do - if @compacted || self.class.config[:no_keys] - update_database(compact_messages(messages)) - else - uncompacted_update(messages) - end + valid_upserts, invalid_upserts = if @compacted || self.class.config[:no_keys] + update_database(compact_messages(messages)) + else + uncompacted_update(messages) + end end + post_process(valid_upserts, invalid_upserts) end end @@ -114,50 +117,65 @@ def compact_messages(batch) # All messages are split into slices containing only unique keys, and # each slice is handles as its own batch. # @param messages [Array] List of messages. - # @return [void] + # @return [Array, Array>] def uncompacted_update(messages) + valid_records = [] + invalid_records = [] BatchSlicer. - slice(messages). - each(&method(:update_database)) + slice(messages).each do |slice| + valid, invalid = update_database(slice) + valid_records.push(*valid) if valid.any? + invalid_records.push(*invalid) if invalid.any? + end + [valid_records, invalid_records] end # Perform database operations for a group of messages. # All messages with payloads are passed to upsert_records. # All tombstones messages are passed to remove_records. # @param messages [Array] List of messages. - # @return [void] + # @return [Array, Array>] def update_database(messages) # Find all upserted records (i.e. that have a payload) and all # deleted record (no payload) removed, upserted = messages.partition(&:tombstone?) max_db_batch_size = self.class.config[:max_db_batch_size] + valid_upserts = [] + invalid_upserts = [] if upserted.any? + valid_upserts, invalid_upserts = if max_db_batch_size + upserted.each_slice(max_db_batch_size) do |group| + valid, invalid = upsert_records(group) + valid_upserts.push(*valid) + invalid_upserts.push(*invalid) + end + valid_upserts.compact! + invalid_upserts.compact! + else + upsert_records(upserted) + end + end + + if removed.any? if max_db_batch_size - upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } + removed.each_slice(max_db_batch_size) { |group| remove_records(group) } else - upsert_records(upserted) + remove_records(removed) end end - - return if removed.empty? - - if max_db_batch_size - removed.each_slice(max_db_batch_size) { |group| remove_records(group) } - else - remove_records(removed) - end + [valid_upserts, invalid_upserts] end # Upsert any non-deleted records # @param messages [Array] List of messages for a group of # records to either be updated or inserted. - # @return [void] + # @return [Array, Array] def upsert_records(messages) record_list = build_records(messages) - record_list.filter!(self.method(:should_consume?).to_proc) + invalid = filter_records(record_list) - return if record_list.empty? + return [[], invalid] if record_list.empty? key_col_proc = self.method(:key_columns).to_proc col_proc = self.method(:columns).to_proc @@ -166,7 +184,13 @@ def upsert_records(messages) key_col_proc: key_col_proc, col_proc: col_proc, replace_associations: self.class.config[:replace_associations]) - updater.mass_update(record_list) + [updater.mass_update(record_list), invalid] + end + + # @param record_list [BatchRecordList] + # @return [Array] + def filter_records(record_list) + record_list.partition!(self.method(:should_consume?).to_proc) end # @param messages [Array] @@ -194,6 +218,14 @@ def build_records(messages) BatchRecordList.new(records.compact) end + # Additional processing after records have been successfully upserted + # @param _valid_records [Array] Records to be post processed + # @param _invalid_records [Array] Invalid records to be processed + # @return [void] + def post_process(_valid_records, _invalid_records) + nil + end + # Delete any records with a tombstone. # @param messages [Array] List of messages for a group of # deleted records. diff --git a/lib/deimos/active_record_consume/batch_record_list.rb b/lib/deimos/active_record_consume/batch_record_list.rb index 30f99d23..706ffad0 100644 --- a/lib/deimos/active_record_consume/batch_record_list.rb +++ b/lib/deimos/active_record_consume/batch_record_list.rb @@ -23,6 +23,29 @@ def filter!(method) self.batch_records.delete_if { |record| !method.call(record.record) } end + # Filter and return removed invalid batch records by the specified method or block + # @param method [Proc] + # @param block [Proc] + # @return [Array] + def partition!(method=nil, &block) + valid, invalid = if method.nil? + self.batch_records.partition(&block) + else + case method.parameters.size + when 2 + self.batch_records.partition do |record| + method.call(record.record, record.associations) + end + else + self.batch_records.partition do |record| + method.call(record.record) + end + end + end + self.batch_records = valid + invalid + end + # Get the original ActiveRecord objects. # @return [Array] def records diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index 5694dbb7..e8fc3cc4 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -82,9 +82,11 @@ def import_associations(record_list) end # @param record_list [BatchRecordList] + # @return [Array] def mass_update(record_list) save_records_to_database(record_list) import_associations(record_list) if record_list.associations.any? + record_list.records end end diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index d1558dd2..1d04eb60 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -493,5 +493,144 @@ def record_attributes(payload, key) end end + describe 'post processing' do + + context 'with uncompacted messages' do + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted false + + def should_consume?(record) + record.some_int.even? + end + + def post_process(valid, invalid) + # Success + Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) + + # Invalid + Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any? + end + + end + end + + it 'should process successful and failed records' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + + widget_one, widget_two, widget_three, widget_four = Widget.all.to_a + + expect(widget_one.some_int).to eq(1) + expect(widget_two.some_int).to eq(20) + expect(widget_three.some_int).to eq(2000) + expect(widget_three.test_id).to eq(widget_two.test_id) + expect(widget_four.some_int).to eq(11) + expect(widget_four.test_id).to eq(widget_one.test_id) + end + end + + context 'with post processing' do + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted true + + def should_consume?(record) + record.some_int.even? + end + + def post_process(valid, invalid) + # Success + Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) + + # Invalid + Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any? + end + + end + end + + it 'should process successful and failed records' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + + widget_one, widget_two, widget_three, widget_four = Widget.all.to_a + + expect(widget_one.some_int).to eq(1) + expect(widget_two.some_int).to eq(20) + expect(widget_three.some_int).to eq(2000) + expect(widget_three.test_id).to eq(widget_two.test_id) + expect(widget_four.some_int).to eq(11) + expect(widget_four.test_id).to eq(widget_one.test_id) + end + end + + context 'with post processing errors' do + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted false + + def post_process(_, _) + raise StandardError, 'Something went wrong' + end + + end + end + + it 'should save records if an exception occurs in post processing' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + + expect { + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + }.to raise_error(StandardError, 'Something went wrong') + + widget_one, widget_two = Widget.all.to_a + + expect(widget_one.some_int).to eq(11) + expect(widget_two.some_int).to eq(20) + + end + end + + end + end end diff --git a/spec/active_record_consume/mass_updater_spec.rb b/spec/active_record_consume/mass_updater_spec.rb index a3e82428..ad734f41 100644 --- a/spec/active_record_consume/mass_updater_spec.rb +++ b/spec/active_record_consume/mass_updater_spec.rb @@ -64,7 +64,9 @@ end it 'should mass update the batch' do - described_class.new(Widget).mass_update(batch) + results = described_class.new(Widget).mass_update(batch) + expect(results.count).to eq(2) + expect(results.map(&:test_id)).to match(%w(id1 id2)) expect(Widget.count).to eq(2) expect(Detail.count).to eq(2) expect(Widget.first.detail).not_to be_nil