From 2d9e8dda16f2b22c8afb804eee655b1c2aafe6d1 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Mon, 4 Dec 2023 16:26:51 -0500 Subject: [PATCH] CCOL-2039: Process valid and invalid records via notification --- .../batch_consumption.rb | 19 +++++----- lib/deimos/instrumentation.rb | 5 --- spec/active_record_batch_consumer_spec.rb | 36 +++++++++++++++++-- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 2e9b8b7a..953736ac 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -42,10 +42,20 @@ def consume_batch(payloads, metadata) uncompacted_update(messages) end end - process_valid_records(@valid_active_records) + ActiveSupport::Notifications.instrument('batch_consumption.valid_records', { + records: @valid_active_records, + consumer: self.class + }) end end + # Additional processing after records have been successfully upserted + # @param _valid_active_records [Array] Records to be post processed + # @return [void] + def self.process_valid_records(_valid_active_records) + nil + end + # Additional processing after records have been unsuccessfully upserted # @param _invalid_batch_records [Array] Invalid records to be processed # @return [void] @@ -225,13 +235,6 @@ def build_records(messages) BatchRecordList.new(records.compact) end - # Additional processing after records have been successfully upserted - # @param _valid_active_records [Array] Records to be post processed - # @return [void] - def process_valid_records(_valid_active_records) - nil - end - # Delete any records with a tombstone. # @param messages [Array] List of messages for a group of # deleted records. diff --git a/lib/deimos/instrumentation.rb b/lib/deimos/instrumentation.rb index a4cff165..7e32cc8e 100644 --- a/lib/deimos/instrumentation.rb +++ b/lib/deimos/instrumentation.rb @@ -80,9 +80,4 @@ def self.send_produce_error(event) event = ActiveSupport::Notifications::Event.new(*args) KafkaListener.send_produce_error(event) end - - ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| - payload = ActiveSupport::Notifications::Event.new(*args).payload - payload[:consumer].process_invalid_records(payload[:records]) - end end diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index dfa77dc3..5e1f4b77 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -621,6 +621,11 @@ def self.process_invalid_records(_) nil end + ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_invalid_records(payload[:records]) + end + end end @@ -668,7 +673,7 @@ def should_consume?(record) record.record.some_int.even? end - def process_valid_records(valid) + def self.process_valid_records(valid) # Success Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) end @@ -678,6 +683,16 @@ def self.process_invalid_records(invalid) Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) end + ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_invalid_records(payload[:records]) + end + + ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_valid_records(payload[:records]) + end + end end @@ -718,7 +733,7 @@ def should_consume?(record) record.record.some_int.even? end - def process_valid_records(valid) + def self.process_valid_records(valid) # Success Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) end @@ -728,6 +743,16 @@ def self.process_invalid_records(invalid) Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any? end + ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_invalid_records(payload[:records]) + end + + ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_valid_records(payload[:records]) + end + end end @@ -764,10 +789,15 @@ def self.process_invalid_records(invalid) record_class Widget compacted false - def process_valid_records(_) + def self.process_valid_records(_) raise StandardError, 'Something went wrong' end + ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_valid_records(payload[:records]) + end + end end