diff --git a/CHANGELOG.md b/CHANGELOG.md index 60c31c65..a0c8d4e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## UNRELEASED - Fix: Fixed handler metric for status:received, status:success in batch consumption +- Feature: Adds post_process method for BatchConsumers to use the ActiveRecord objects after saving records to DB. # 1.22.5 - 2023-07-18 - Fix: Fixed buffer overflow crash with DB producer. diff --git a/README.md b/README.md index be9628ad..fd653f73 100644 --- a/README.md +++ b/README.md @@ -352,6 +352,11 @@ class MyBatchConsumer < Deimos::Consumer # Do something end end + + def post_process(records) + # ActiveRecord objects that have been used to save to the database. + # They contain primary keys only if associations are involved. + end end ``` #### Saving data to Multiple Database tables diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index dee466f8..7b57d885 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -22,7 +22,7 @@ module BatchConsumption # they are split # @param payloads [Array] Decoded payloads # @param metadata [Hash] Information about batch, including keys. - # @return [void] + # @return [Array] def consume_batch(payloads, metadata) messages = payloads. zip(metadata[:keys]). @@ -31,21 +31,30 @@ def consume_batch(payloads, metadata) tags = %W(topic:#{metadata[:topic]}) Deimos.instrument('ar_consumer.consume_batch', tags) do + upserts = [] # The entire batch should be treated as one transaction so that if # any message fails, the whole thing is rolled back or retried # if there is deadlock Deimos::Utils::DeadlockRetry.wrap(tags) do - if @compacted || self.class.config[:no_keys] - update_database(compact_messages(messages)) - else - uncompacted_update(messages) - end + upserts = if @compacted || self.class.config[:no_keys] + update_database(compact_messages(messages)) + else + uncompacted_update(messages) + end end + post_process(upserts) end end protected + # Takes the ActiveRecord objects created during batch consumption and use them after the transaction is complete. + # @param [Array] + # @return [void] + def post_process(_records) + nil + end + # Get the set of attribute names that uniquely identify messages in the # batch. Requires at least one record. # The parameters are mutually exclusive. records is used by default implementation. @@ -114,45 +123,48 @@ def compact_messages(batch) # All messages are split into slices containing only unique keys, and # each slice is handles as its own batch. # @param messages [Array] List of messages. - # @return [void] + # @return [Array] def uncompacted_update(messages) BatchSlicer. slice(messages). - each(&method(:update_database)) + each(&method(:update_database)).flatten end # Perform database operations for a group of messages. # All messages with payloads are passed to upsert_records. # All tombstones messages are passed to remove_records. # @param messages [Array] List of messages. - # @return [void] + # @return [Array] def update_database(messages) # Find all upserted records (i.e. that have a payload) and all # deleted record (no payload) removed, upserted = messages.partition(&:tombstone?) max_db_batch_size = self.class.config[:max_db_batch_size] + upserts = [] if upserted.any? + upserts = if max_db_batch_size + upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } + else + upsert_records(upserted) + end + end + + if removed.any? if max_db_batch_size - upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } + removed.each_slice(max_db_batch_size) { |group| remove_records(group) } else - upsert_records(upserted) + remove_records(removed) end end - return if removed.empty? - - if max_db_batch_size - removed.each_slice(max_db_batch_size) { |group| remove_records(group) } - else - remove_records(removed) - end + upserts end # Upsert any non-deleted records # @param messages [Array] List of messages for a group of # records to either be updated or inserted. - # @return [void] + # @return [Array] def upsert_records(messages) record_list = build_records(messages) record_list.filter!(self.method(:should_consume?).to_proc) diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index 5694dbb7..ac61ab85 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -85,6 +85,7 @@ def import_associations(record_list) def mass_update(record_list) save_records_to_database(record_list) import_associations(record_list) if record_list.associations.any? + record_list.records end end diff --git a/spec/active_record_consume/batch_consumption_spec.rb b/spec/active_record_consume/batch_consumption_spec.rb index e1e89562..c4acaa9f 100644 --- a/spec/active_record_consume/batch_consumption_spec.rb +++ b/spec/active_record_consume/batch_consumption_spec.rb @@ -27,7 +27,6 @@ it 'should be called 1 time when record size == max batch size' do batch_consumer.class.config[:max_db_batch_size] = records.size expect(batch_consumer).to receive(:upsert_records).once - batch_consumer.send(:update_database, records) end @@ -100,4 +99,24 @@ end end end + + describe '#consume_batch' do + describe 'post_process in compact mode' do + let(:payload) do + [ + { v: 1 }, { v: 2 }, { v: 3 }, { v: 4 }, { v: 5 } + ] + end + let(:metadata) do + { keys: [1, 2, 3, 4, 5] } + end + + it 'should be called once with a flat array of records' do + records = [Object, Object] # should be ActiveRecord object + expect(batch_consumer).to receive(:post_process).once.with(records) + expect(batch_consumer).to receive(:update_database) { records } + batch_consumer.send(:consume_batch, payload, metadata) + end + end + end end