From 65531672a470739b0c3c6fcadc58ab8f824051ab Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Thu, 30 Nov 2023 15:09:00 -0500 Subject: [PATCH 01/29] CCOL-2039: Post process valid and invalid records from batch consumption --- .../batch_consumption.rb | 77 +++++++--- .../batch_record_list.rb | 23 +++ .../active_record_consume/mass_updater.rb | 2 + spec/active_record_batch_consumer_spec.rb | 139 ++++++++++++++++++ .../mass_updater_spec.rb | 4 +- 5 files changed, 222 insertions(+), 23 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 01c3273e..227bcb57 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -31,16 +31,19 @@ def consume_batch(payloads, metadata) tags = %W(topic:#{metadata[:topic]}) Deimos.instrument('ar_consumer.consume_batch', tags) do + valid_upserts = [] + invalid_upserts = [] # The entire batch should be treated as one transaction so that if # any message fails, the whole thing is rolled back or retried # if there is deadlock Deimos::Utils::DeadlockRetry.wrap(tags) do - if @compacted || self.class.config[:no_keys] - update_database(compact_messages(messages)) - else - uncompacted_update(messages) - end + valid_upserts, invalid_upserts = if @compacted || self.class.config[:no_keys] + update_database(compact_messages(messages)) + else + uncompacted_update(messages) + end end + post_process(valid_upserts, invalid_upserts) end end @@ -114,50 +117,65 @@ def compact_messages(batch) # All messages are split into slices containing only unique keys, and # each slice is handles as its own batch. # @param messages [Array] List of messages. - # @return [void] + # @return [Array, Array>] def uncompacted_update(messages) + valid_records = [] + invalid_records = [] BatchSlicer. - slice(messages). - each(&method(:update_database)) + slice(messages).each do |slice| + valid, invalid = update_database(slice) + valid_records.push(*valid) if valid.any? + invalid_records.push(*invalid) if invalid.any? + end + [valid_records, invalid_records] end # Perform database operations for a group of messages. # All messages with payloads are passed to upsert_records. # All tombstones messages are passed to remove_records. # @param messages [Array] List of messages. - # @return [void] + # @return [Array, Array>] def update_database(messages) # Find all upserted records (i.e. that have a payload) and all # deleted record (no payload) removed, upserted = messages.partition(&:tombstone?) max_db_batch_size = self.class.config[:max_db_batch_size] + valid_upserts = [] + invalid_upserts = [] if upserted.any? + valid_upserts, invalid_upserts = if max_db_batch_size + upserted.each_slice(max_db_batch_size) do |group| + valid, invalid = upsert_records(group) + valid_upserts.push(*valid) + invalid_upserts.push(*invalid) + end + valid_upserts.compact! + invalid_upserts.compact! + else + upsert_records(upserted) + end + end + + if removed.any? if max_db_batch_size - upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } + removed.each_slice(max_db_batch_size) { |group| remove_records(group) } else - upsert_records(upserted) + remove_records(removed) end end - - return if removed.empty? - - if max_db_batch_size - removed.each_slice(max_db_batch_size) { |group| remove_records(group) } - else - remove_records(removed) - end + [valid_upserts, invalid_upserts] end # Upsert any non-deleted records # @param messages [Array] List of messages for a group of # records to either be updated or inserted. - # @return [void] + # @return [Array, Array] def upsert_records(messages) record_list = build_records(messages) - record_list.filter!(self.method(:should_consume?).to_proc) + invalid = filter_records(record_list) - return if record_list.empty? + return [[], invalid] if record_list.empty? key_col_proc = self.method(:key_columns).to_proc col_proc = self.method(:columns).to_proc @@ -167,6 +185,13 @@ def upsert_records(messages) col_proc: col_proc, replace_associations: self.class.replace_associations, bulk_import_id_generator: self.class.bulk_import_id_generator) + [updater.mass_update(record_list), invalid] + end + + # @param record_list [BatchRecordList] + # @return [Array] + def filter_records(record_list) + record_list.partition!(self.method(:should_consume?).to_proc) updater.mass_update(record_list) end @@ -204,6 +229,14 @@ def build_records(messages) BatchRecordList.new(records.compact) end + # Additional processing after records have been successfully upserted + # @param _valid_records [Array] Records to be post processed + # @param _invalid_records [Array] Invalid records to be processed + # @return [void] + def post_process(_valid_records, _invalid_records) + nil + end + # Delete any records with a tombstone. # @param messages [Array] List of messages for a group of # deleted records. diff --git a/lib/deimos/active_record_consume/batch_record_list.rb b/lib/deimos/active_record_consume/batch_record_list.rb index 30f99d23..706ffad0 100644 --- a/lib/deimos/active_record_consume/batch_record_list.rb +++ b/lib/deimos/active_record_consume/batch_record_list.rb @@ -23,6 +23,29 @@ def filter!(method) self.batch_records.delete_if { |record| !method.call(record.record) } end + # Filter and return removed invalid batch records by the specified method or block + # @param method [Proc] + # @param block [Proc] + # @return [Array] + def partition!(method=nil, &block) + valid, invalid = if method.nil? + self.batch_records.partition(&block) + else + case method.parameters.size + when 2 + self.batch_records.partition do |record| + method.call(record.record, record.associations) + end + else + self.batch_records.partition do |record| + method.call(record.record) + end + end + end + self.batch_records = valid + invalid + end + # Get the original ActiveRecord objects. # @return [Array] def records diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index 267fc002..b9559280 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -83,9 +83,11 @@ def import_associations(record_list) end # @param record_list [BatchRecordList] + # @return [Array] def mass_update(record_list) save_records_to_database(record_list) import_associations(record_list) if record_list.associations.any? + record_list.records end end diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index 54e7d4db..b1e2256a 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -603,5 +603,144 @@ def pre_process(messages) end + describe 'post processing' do + + context 'with uncompacted messages' do + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted false + + def should_consume?(record) + record.some_int.even? + end + + def post_process(valid, invalid) + # Success + Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) + + # Invalid + Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any? + end + + end + end + + it 'should process successful and failed records' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + + widget_one, widget_two, widget_three, widget_four = Widget.all.to_a + + expect(widget_one.some_int).to eq(1) + expect(widget_two.some_int).to eq(20) + expect(widget_three.some_int).to eq(2000) + expect(widget_three.test_id).to eq(widget_two.test_id) + expect(widget_four.some_int).to eq(11) + expect(widget_four.test_id).to eq(widget_one.test_id) + end + end + + context 'with post processing' do + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted true + + def should_consume?(record) + record.some_int.even? + end + + def post_process(valid, invalid) + # Success + Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) + + # Invalid + Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any? + end + + end + end + + it 'should process successful and failed records' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + + widget_one, widget_two, widget_three, widget_four = Widget.all.to_a + + expect(widget_one.some_int).to eq(1) + expect(widget_two.some_int).to eq(20) + expect(widget_three.some_int).to eq(2000) + expect(widget_three.test_id).to eq(widget_two.test_id) + expect(widget_four.some_int).to eq(11) + expect(widget_four.test_id).to eq(widget_one.test_id) + end + end + + context 'with post processing errors' do + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted false + + def post_process(_, _) + raise StandardError, 'Something went wrong' + end + + end + end + + it 'should save records if an exception occurs in post processing' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + + expect { + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + }.to raise_error(StandardError, 'Something went wrong') + + widget_one, widget_two = Widget.all.to_a + + expect(widget_one.some_int).to eq(11) + expect(widget_two.some_int).to eq(20) + + end + end + + end + end end diff --git a/spec/active_record_consume/mass_updater_spec.rb b/spec/active_record_consume/mass_updater_spec.rb index edae353c..59728635 100644 --- a/spec/active_record_consume/mass_updater_spec.rb +++ b/spec/active_record_consume/mass_updater_spec.rb @@ -69,7 +69,9 @@ it 'should mass update the batch' do allow(SecureRandom).to receive(:uuid).and_return('1', '2') - described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch) + results = described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch) + expect(results.count).to eq(2) + expect(results.map(&:test_id)).to match(%w(id1 id2)) expect(Widget.count).to eq(2) expect(Widget.all.to_a.map(&:bulk_import_id)).to match(%w(1 2)) expect(Detail.count).to eq(2) From ca6a1e38203263c11437559ac154c1d9661e88c5 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Fri, 1 Dec 2023 09:32:20 -0500 Subject: [PATCH 02/29] CCOL-2039: Lint fixes --- lib/deimos/active_record_consume/batch_consumption.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 227bcb57..98761e99 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -121,8 +121,7 @@ def compact_messages(batch) def uncompacted_update(messages) valid_records = [] invalid_records = [] - BatchSlicer. - slice(messages).each do |slice| + BatchSlicer.slice(messages).each do |slice| valid, invalid = update_database(slice) valid_records.push(*valid) if valid.any? invalid_records.push(*invalid) if invalid.any? From 3b7461e897b16bceca8f42472d598e6015fa287e Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Fri, 1 Dec 2023 11:53:44 -0500 Subject: [PATCH 03/29] CCOL-2039: Add optional associations for should_consume? --- .../batch_consumption.rb | 2 +- ..._record_batch_consumer_association_spec.rb | 36 ++++++++++++++++--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 98761e99..68f6679b 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -97,7 +97,7 @@ def deleted_query(records) # @param _record [ActiveRecord::Base] # @return [Boolean] - def should_consume?(_record) + def should_consume?(_record, _associations=nil) true end diff --git a/spec/active_record_batch_consumer_association_spec.rb b/spec/active_record_batch_consumer_association_spec.rb index c6af0f52..eb77bb95 100644 --- a/spec/active_record_batch_consumer_association_spec.rb +++ b/spec/active_record_batch_consumer_association_spec.rb @@ -91,17 +91,20 @@ def publish_batch(messages) klass = Class.new(described_class) do cattr_accessor :record_attributes_proc cattr_accessor :should_consume_proc + cattr_accessor :should_consume_association_proc schema 'MySchema' namespace 'com.my-namespace' key_config plain: true record_class Widget - def should_consume?(record) - if self.should_consume_proc + def should_consume?(record, associations=nil) + if associations && self.should_consume_association_proc + return self.should_consume_association_proc.call(record, associations) + elsif self.should_consume_proc return self.should_consume_proc.call(record) + else + true end - - true end def record_attributes(payload, _key) @@ -280,5 +283,28 @@ def columns(record_class) expect(Widget.count).to eq(2) end end - end + + context 'with invalid associations' do + + before(:each) do + consumer_class.should_consume_association_proc = proc { |record, associations| + record.some_int <= 10 && associations['detail']['title'] != 'invalid' + } + end + + it 'should only save valid associations' do + publish_batch([{ + key: 2, + payload: { test_id: 'xyz', some_int: 5, title: 'valid' } }, + { key: 3, + payload: { test_id: 'abc', some_int: 15, title: 'valid' } }, + { key: 4, + payload: { test_id: 'abc', some_int: 9, title: 'invalid' } } + ]) + expect(Widget.count).to eq(2) + expect(Widget.second.some_int).to eq(5) + end + end + + end end From 8d347e48cec63bb42c4e4c931ac58830ec822690 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Fri, 1 Dec 2023 13:22:32 -0500 Subject: [PATCH 04/29] CCOL-2039: Linting changes --- ..._record_batch_consumer_association_spec.rb | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/spec/active_record_batch_consumer_association_spec.rb b/spec/active_record_batch_consumer_association_spec.rb index eb77bb95..93a75583 100644 --- a/spec/active_record_batch_consumer_association_spec.rb +++ b/spec/active_record_batch_consumer_association_spec.rb @@ -99,9 +99,9 @@ def publish_batch(messages) def should_consume?(record, associations=nil) if associations && self.should_consume_association_proc - return self.should_consume_association_proc.call(record, associations) + self.should_consume_association_proc.call(record, associations) elsif self.should_consume_proc - return self.should_consume_proc.call(record) + self.should_consume_proc.call(record) else true end @@ -293,18 +293,17 @@ def columns(record_class) end it 'should only save valid associations' do - publish_batch([{ - key: 2, - payload: { test_id: 'xyz', some_int: 5, title: 'valid' } }, - { key: 3, - payload: { test_id: 'abc', some_int: 15, title: 'valid' } }, - { key: 4, - payload: { test_id: 'abc', some_int: 9, title: 'invalid' } } + publish_batch([ + { key: 2, + payload: { test_id: 'xyz', some_int: 5, title: 'valid' } }, + { key: 3, + payload: { test_id: 'abc', some_int: 15, title: 'valid' } }, + { key: 4, + payload: { test_id: 'abc', some_int: 9, title: 'invalid' } } ]) expect(Widget.count).to eq(2) expect(Widget.second.some_int).to eq(5) end end - - end + end end From 4a688a02a314d37da67ee8f792de56d4aafacb45 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Fri, 1 Dec 2023 15:34:17 -0500 Subject: [PATCH 05/29] CCOL-2039: Add CHANGELOG entry && refactor valid and invalid record collection --- CHANGELOG.md | 2 + .../batch_consumption.rb | 69 ++++++++----------- spec/active_record_batch_consumer_spec.rb | 2 +- 3 files changed, 30 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94b8f184..f86b5fd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Feature: Add individual configuration for custom `bulk_import_id_generator` proc per consumer - Feature: Add global `replace_assocations` value for for all consumers - Feature: Add individual `replace_assocations` value for for individual consumers +- Feature: `should_consume?` method accepts associations hash of record for validation of associations +- Feature: Return valid and invalid records saved during consumption for further processing in `post_process` method # 1.22.5 - 2023-07-18 - Fix: Fixed buffer overflow crash with DB producer. diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 68f6679b..eab2ec1c 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -29,21 +29,21 @@ def consume_batch(payloads, metadata) map { |p, k| Deimos::Message.new(p, nil, key: k) } tags = %W(topic:#{metadata[:topic]}) + @valid_active_records = [] + @invalid_batch_records = [] Deimos.instrument('ar_consumer.consume_batch', tags) do - valid_upserts = [] - invalid_upserts = [] # The entire batch should be treated as one transaction so that if # any message fails, the whole thing is rolled back or retried # if there is deadlock Deimos::Utils::DeadlockRetry.wrap(tags) do - valid_upserts, invalid_upserts = if @compacted || self.class.config[:no_keys] - update_database(compact_messages(messages)) - else - uncompacted_update(messages) - end + if @compacted || self.class.config[:no_keys] + update_database(compact_messages(messages)) + else + uncompacted_update(messages) + end end - post_process(valid_upserts, invalid_upserts) + post_process(@valid_active_records, @invalid_batch_records) end end @@ -117,64 +117,50 @@ def compact_messages(batch) # All messages are split into slices containing only unique keys, and # each slice is handles as its own batch. # @param messages [Array] List of messages. - # @return [Array, Array>] + # @return [Void] def uncompacted_update(messages) - valid_records = [] - invalid_records = [] - BatchSlicer.slice(messages).each do |slice| - valid, invalid = update_database(slice) - valid_records.push(*valid) if valid.any? - invalid_records.push(*invalid) if invalid.any? - end - [valid_records, invalid_records] + BatchSlicer. + slice(messages). + each(&method(:update_database)) end # Perform database operations for a group of messages. # All messages with payloads are passed to upsert_records. # All tombstones messages are passed to remove_records. # @param messages [Array] List of messages. - # @return [Array, Array>] + # @return [Void] def update_database(messages) # Find all upserted records (i.e. that have a payload) and all # deleted record (no payload) removed, upserted = messages.partition(&:tombstone?) max_db_batch_size = self.class.config[:max_db_batch_size] - valid_upserts = [] - invalid_upserts = [] if upserted.any? - valid_upserts, invalid_upserts = if max_db_batch_size - upserted.each_slice(max_db_batch_size) do |group| - valid, invalid = upsert_records(group) - valid_upserts.push(*valid) - invalid_upserts.push(*invalid) - end - valid_upserts.compact! - invalid_upserts.compact! - else - upsert_records(upserted) - end - end - - if removed.any? if max_db_batch_size - removed.each_slice(max_db_batch_size) { |group| remove_records(group) } + upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } else - remove_records(removed) + upsert_records(upserted) end end - [valid_upserts, invalid_upserts] + + return if removed.empty? + + if max_db_batch_size + removed.each_slice(max_db_batch_size) { |group| remove_records(group) } + else + remove_records(removed) + end end # Upsert any non-deleted records # @param messages [Array] List of messages for a group of # records to either be updated or inserted. - # @return [Array, Array] + # @return [Void] def upsert_records(messages) record_list = build_records(messages) - invalid = filter_records(record_list) + @invalid_batch_records.concat(filter_records(record_list)) - return [[], invalid] if record_list.empty? + return if record_list.empty? key_col_proc = self.method(:key_columns).to_proc col_proc = self.method(:columns).to_proc @@ -184,14 +170,13 @@ def upsert_records(messages) col_proc: col_proc, replace_associations: self.class.replace_associations, bulk_import_id_generator: self.class.bulk_import_id_generator) - [updater.mass_update(record_list), invalid] + @valid_active_records.concat(updater.mass_update(record_list)) end # @param record_list [BatchRecordList] # @return [Array] def filter_records(record_list) record_list.partition!(self.method(:should_consume?).to_proc) - updater.mass_update(record_list) end # Process messages prior to saving to database diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index b1e2256a..ed9502f3 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -653,7 +653,7 @@ def post_process(valid, invalid) end end - context 'with post processing' do + context 'with compacted messages' do let(:consumer_class) do Class.new(described_class) do schema 'MySchema' From 385e58b4c7850ef6fa0ac0ad5a20bd828b47a602 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Fri, 1 Dec 2023 17:20:26 -0500 Subject: [PATCH 06/29] CCOL-2039: Add consume_filter when filtering records --- CHANGELOG.md | 3 +- .../batch_consumption.rb | 12 +++- .../batch_record_list.rb | 28 ++------- ..._record_batch_consumer_association_spec.rb | 13 ++--- spec/active_record_batch_consumer_spec.rb | 58 +++++++++++++++++-- 5 files changed, 75 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f86b5fd4..2cc57488 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Feature: Add individual configuration for custom `bulk_import_id_generator` proc per consumer - Feature: Add global `replace_assocations` value for for all consumers - Feature: Add individual `replace_assocations` value for for individual consumers -- Feature: `should_consume?` method accepts associations hash of record for validation of associations +- ***BREAKING CHANGE***: `should_consume?` method uses record of type `BatchRecord` instead of `ActiveRecord` when determining records to consume +- Feature: `consume_lookup` to build lookup entity when determining which records to consume in `should_consume?` - Feature: Return valid and invalid records saved during consumption for further processing in `post_process` method # 1.22.5 - 2023-07-18 diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index eab2ec1c..d3d3cf61 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -95,9 +95,10 @@ def deleted_query(records) @klass.unscoped.where(@klass.primary_key => keys) end - # @param _record [ActiveRecord::Base] + # @param _batch_record [BatchRecord] + # @param _filter [NilClass, Hash, ActiveRecord::Relation, Set] # @return [Boolean] - def should_consume?(_record, _associations=nil) + def should_consume?(_batch_record, _filter=nil) true end @@ -176,7 +177,12 @@ def upsert_records(messages) # @param record_list [BatchRecordList] # @return [Array] def filter_records(record_list) - record_list.partition!(self.method(:should_consume?).to_proc) + record_list.filter!(self.method(:should_consume?).to_proc, consume_filter) + end + + # @return [NilClass,ActiveRecord::Relation,Hash,Set] + def consume_filter + nil end # Process messages prior to saving to database diff --git a/lib/deimos/active_record_consume/batch_record_list.rb b/lib/deimos/active_record_consume/batch_record_list.rb index 706ffad0..7876898d 100644 --- a/lib/deimos/active_record_consume/batch_record_list.rb +++ b/lib/deimos/active_record_consume/batch_record_list.rb @@ -17,32 +17,14 @@ def initialize(records) self.bulk_import_column = records.first&.bulk_import_column&.to_sym end - # Filter out any invalid records. - # @param method [Proc] - def filter!(method) - self.batch_records.delete_if { |record| !method.call(record.record) } - end - # Filter and return removed invalid batch records by the specified method or block # @param method [Proc] - # @param block [Proc] + # @param consume_filter [NilClass, Hash, ActiveRecord::Relation, Set] # @return [Array] - def partition!(method=nil, &block) - valid, invalid = if method.nil? - self.batch_records.partition(&block) - else - case method.parameters.size - when 2 - self.batch_records.partition do |record| - method.call(record.record, record.associations) - end - else - self.batch_records.partition do |record| - method.call(record.record) - end - end - end - self.batch_records = valid + def filter!(method, consume_filter=nil) + self.batch_records, invalid = self.batch_records.partition do |record| + method.call(record, consume_filter) + end invalid end diff --git a/spec/active_record_batch_consumer_association_spec.rb b/spec/active_record_batch_consumer_association_spec.rb index 93a75583..b36b630c 100644 --- a/spec/active_record_batch_consumer_association_spec.rb +++ b/spec/active_record_batch_consumer_association_spec.rb @@ -91,17 +91,14 @@ def publish_batch(messages) klass = Class.new(described_class) do cattr_accessor :record_attributes_proc cattr_accessor :should_consume_proc - cattr_accessor :should_consume_association_proc schema 'MySchema' namespace 'com.my-namespace' key_config plain: true record_class Widget - def should_consume?(record, associations=nil) - if associations && self.should_consume_association_proc - self.should_consume_association_proc.call(record, associations) - elsif self.should_consume_proc - self.should_consume_proc.call(record) + def should_consume?(batch_record, _) + if self.should_consume_proc + self.should_consume_proc.call(batch_record) else true end @@ -287,8 +284,8 @@ def columns(record_class) context 'with invalid associations' do before(:each) do - consumer_class.should_consume_association_proc = proc { |record, associations| - record.some_int <= 10 && associations['detail']['title'] != 'invalid' + consumer_class.should_consume_proc = proc { |record, _| + record.record.some_int <= 10 && record.associations['detail']['title'] != 'invalid' } end diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index ed9502f3..a5d2dcdc 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -603,6 +603,56 @@ def pre_process(messages) end + describe 'should_consume?' do + + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted false + + def should_consume?(record, lookup) + lookup.find_by(test_id: record.record.test_id, id: record.record.id).nil? + end + + def consume_filter + Widget.where(id: 2, test_id: 'def') + end + + end + end + + it 'should skip records in the consume filter' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + + expect(Widget.count).to eq(2) + expect(Widget.all.to_a).to match_array([ + have_attributes(id: 1, + test_id: 'abc', + some_int: 11, + updated_at: start, + created_at: start), + have_attributes(id: 2, + test_id: 'def', + some_int: 2, + updated_at: start, + created_at: start) + ]) + end + + end + describe 'post processing' do context 'with uncompacted messages' do @@ -614,8 +664,8 @@ def pre_process(messages) record_class Widget compacted false - def should_consume?(record) - record.some_int.even? + def should_consume?(record, _) + record.record.some_int.even? end def post_process(valid, invalid) @@ -662,8 +712,8 @@ def post_process(valid, invalid) record_class Widget compacted true - def should_consume?(record) - record.some_int.even? + def should_consume?(record, _) + record.record.some_int.even? end def post_process(valid, invalid) From 4661b4f54488bf29801fbb6380d07c7bf88ff91f Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Fri, 1 Dec 2023 17:23:10 -0500 Subject: [PATCH 07/29] CCOL-2039: YARD fixes --- lib/deimos/active_record_consume/batch_consumption.rb | 6 +++--- lib/deimos/active_record_consume/batch_record_list.rb | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index d3d3cf61..f57d3c0b 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -118,7 +118,7 @@ def compact_messages(batch) # All messages are split into slices containing only unique keys, and # each slice is handles as its own batch. # @param messages [Array] List of messages. - # @return [Void] + # @return [void] def uncompacted_update(messages) BatchSlicer. slice(messages). @@ -129,7 +129,7 @@ def uncompacted_update(messages) # All messages with payloads are passed to upsert_records. # All tombstones messages are passed to remove_records. # @param messages [Array] List of messages. - # @return [Void] + # @return [void] def update_database(messages) # Find all upserted records (i.e. that have a payload) and all # deleted record (no payload) @@ -156,7 +156,7 @@ def update_database(messages) # Upsert any non-deleted records # @param messages [Array] List of messages for a group of # records to either be updated or inserted. - # @return [Void] + # @return [void] def upsert_records(messages) record_list = build_records(messages) @invalid_batch_records.concat(filter_records(record_list)) diff --git a/lib/deimos/active_record_consume/batch_record_list.rb b/lib/deimos/active_record_consume/batch_record_list.rb index 7876898d..fc32a879 100644 --- a/lib/deimos/active_record_consume/batch_record_list.rb +++ b/lib/deimos/active_record_consume/batch_record_list.rb @@ -17,7 +17,7 @@ def initialize(records) self.bulk_import_column = records.first&.bulk_import_column&.to_sym end - # Filter and return removed invalid batch records by the specified method or block + # Filter and return removed invalid batch records by the specified method # @param method [Proc] # @param consume_filter [NilClass, Hash, ActiveRecord::Relation, Set] # @return [Array] From 8813fa48d8a2c5f8a40ffa36680875d62f5d5ebb Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Fri, 1 Dec 2023 17:36:53 -0500 Subject: [PATCH 08/29] CCOL-2039: Variable rename --- lib/deimos/active_record_consume/batch_consumption.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index f57d3c0b..6f722952 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -180,9 +180,9 @@ def filter_records(record_list) record_list.filter!(self.method(:should_consume?).to_proc, consume_filter) end - # @return [NilClass,ActiveRecord::Relation,Hash,Set] + # @return [ActiveRecord::Relation,Hash,Set] def consume_filter - nil + { } end # Process messages prior to saving to database @@ -220,10 +220,10 @@ def build_records(messages) end # Additional processing after records have been successfully upserted - # @param _valid_records [Array] Records to be post processed - # @param _invalid_records [Array] Invalid records to be processed + # @param _valid_active_records [Array] Records to be post processed + # @param _invalid_batch_records [Array] Invalid records to be processed # @return [void] - def post_process(_valid_records, _invalid_records) + def post_process(_valid_active_records, _invalid_batch_records) nil end From d6181fde1e489c9f75f17c993d9527c5acf39478 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Mon, 4 Dec 2023 15:18:54 -0500 Subject: [PATCH 09/29] CCOL-2039: Process invalid records via ActiveSupport notifications --- .../batch_consumption.rb | 33 +++++++++++-------- .../batch_record_list.rb | 5 ++- lib/deimos/instrumentation.rb | 5 +++ spec/active_record_batch_consumer_spec.rb | 26 ++++++++------- 4 files changed, 41 insertions(+), 28 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 6f722952..2e9b8b7a 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -30,7 +30,6 @@ def consume_batch(payloads, metadata) tags = %W(topic:#{metadata[:topic]}) @valid_active_records = [] - @invalid_batch_records = [] Deimos.instrument('ar_consumer.consume_batch', tags) do # The entire batch should be treated as one transaction so that if @@ -43,10 +42,17 @@ def consume_batch(payloads, metadata) uncompacted_update(messages) end end - post_process(@valid_active_records, @invalid_batch_records) + process_valid_records(@valid_active_records) end end + # Additional processing after records have been unsuccessfully upserted + # @param _invalid_batch_records [Array] Invalid records to be processed + # @return [void] + def self.process_invalid_records(_invalid_batch_records) + nil + end + protected # Get the set of attribute names that uniquely identify messages in the @@ -96,9 +102,8 @@ def deleted_query(records) end # @param _batch_record [BatchRecord] - # @param _filter [NilClass, Hash, ActiveRecord::Relation, Set] # @return [Boolean] - def should_consume?(_batch_record, _filter=nil) + def should_consume?(_batch_record) true end @@ -159,8 +164,14 @@ def update_database(messages) # @return [void] def upsert_records(messages) record_list = build_records(messages) - @invalid_batch_records.concat(filter_records(record_list)) + invalid = filter_records(record_list) + if invalid.any? + ActiveSupport::Notifications.instrument('batch_consumption.invalid_records', { + records: invalid, + consumer: self.class + }) + end return if record_list.empty? key_col_proc = self.method(:key_columns).to_proc @@ -175,14 +186,9 @@ def upsert_records(messages) end # @param record_list [BatchRecordList] - # @return [Array] + # @return [void] def filter_records(record_list) - record_list.filter!(self.method(:should_consume?).to_proc, consume_filter) - end - - # @return [ActiveRecord::Relation,Hash,Set] - def consume_filter - { } + record_list.filter!(self.method(:should_consume?).to_proc) end # Process messages prior to saving to database @@ -221,9 +227,8 @@ def build_records(messages) # Additional processing after records have been successfully upserted # @param _valid_active_records [Array] Records to be post processed - # @param _invalid_batch_records [Array] Invalid records to be processed # @return [void] - def post_process(_valid_active_records, _invalid_batch_records) + def process_valid_records(_valid_active_records) nil end diff --git a/lib/deimos/active_record_consume/batch_record_list.rb b/lib/deimos/active_record_consume/batch_record_list.rb index fc32a879..c0d0bde6 100644 --- a/lib/deimos/active_record_consume/batch_record_list.rb +++ b/lib/deimos/active_record_consume/batch_record_list.rb @@ -19,11 +19,10 @@ def initialize(records) # Filter and return removed invalid batch records by the specified method # @param method [Proc] - # @param consume_filter [NilClass, Hash, ActiveRecord::Relation, Set] # @return [Array] - def filter!(method, consume_filter=nil) + def filter!(method) self.batch_records, invalid = self.batch_records.partition do |record| - method.call(record, consume_filter) + method.call(record) end invalid end diff --git a/lib/deimos/instrumentation.rb b/lib/deimos/instrumentation.rb index 7e32cc8e..a4cff165 100644 --- a/lib/deimos/instrumentation.rb +++ b/lib/deimos/instrumentation.rb @@ -80,4 +80,9 @@ def self.send_produce_error(event) event = ActiveSupport::Notifications::Event.new(*args) KafkaListener.send_produce_error(event) end + + ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_invalid_records(payload[:records]) + end end diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index a5d2dcdc..dfa77dc3 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -613,18 +613,18 @@ def pre_process(messages) record_class Widget compacted false - def should_consume?(record, lookup) - lookup.find_by(test_id: record.record.test_id, id: record.record.id).nil? + def should_consume?(record) + record.record.test_id != 'def' end - def consume_filter - Widget.where(id: 2, test_id: 'def') + def self.process_invalid_records(_) + nil end end end - it 'should skip records in the consume filter' do + it "should skip records that shouldn't be consumed" do Widget.create!(id: 1, test_id: 'abc', some_int: 1) Widget.create!(id: 2, test_id: 'def', some_int: 2) publish_batch( @@ -664,16 +664,18 @@ def consume_filter record_class Widget compacted false - def should_consume?(record, _) + def should_consume?(record) record.record.some_int.even? end - def post_process(valid, invalid) + def process_valid_records(valid) # Success Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) + end + def self.process_invalid_records(invalid) # Invalid - Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any? + Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) end end @@ -712,14 +714,16 @@ def post_process(valid, invalid) record_class Widget compacted true - def should_consume?(record, _) + def should_consume?(record) record.record.some_int.even? end - def post_process(valid, invalid) + def process_valid_records(valid) # Success Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) + end + def self.process_invalid_records(invalid) # Invalid Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any? end @@ -760,7 +764,7 @@ def post_process(valid, invalid) record_class Widget compacted false - def post_process(_, _) + def process_valid_records(_) raise StandardError, 'Something went wrong' end From 2d9e8dda16f2b22c8afb804eee655b1c2aafe6d1 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Mon, 4 Dec 2023 16:26:51 -0500 Subject: [PATCH 10/29] CCOL-2039: Process valid and invalid records via notification --- .../batch_consumption.rb | 19 +++++----- lib/deimos/instrumentation.rb | 5 --- spec/active_record_batch_consumer_spec.rb | 36 +++++++++++++++++-- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 2e9b8b7a..953736ac 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -42,10 +42,20 @@ def consume_batch(payloads, metadata) uncompacted_update(messages) end end - process_valid_records(@valid_active_records) + ActiveSupport::Notifications.instrument('batch_consumption.valid_records', { + records: @valid_active_records, + consumer: self.class + }) end end + # Additional processing after records have been successfully upserted + # @param _valid_active_records [Array] Records to be post processed + # @return [void] + def self.process_valid_records(_valid_active_records) + nil + end + # Additional processing after records have been unsuccessfully upserted # @param _invalid_batch_records [Array] Invalid records to be processed # @return [void] @@ -225,13 +235,6 @@ def build_records(messages) BatchRecordList.new(records.compact) end - # Additional processing after records have been successfully upserted - # @param _valid_active_records [Array] Records to be post processed - # @return [void] - def process_valid_records(_valid_active_records) - nil - end - # Delete any records with a tombstone. # @param messages [Array] List of messages for a group of # deleted records. diff --git a/lib/deimos/instrumentation.rb b/lib/deimos/instrumentation.rb index a4cff165..7e32cc8e 100644 --- a/lib/deimos/instrumentation.rb +++ b/lib/deimos/instrumentation.rb @@ -80,9 +80,4 @@ def self.send_produce_error(event) event = ActiveSupport::Notifications::Event.new(*args) KafkaListener.send_produce_error(event) end - - ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| - payload = ActiveSupport::Notifications::Event.new(*args).payload - payload[:consumer].process_invalid_records(payload[:records]) - end end diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index dfa77dc3..5e1f4b77 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -621,6 +621,11 @@ def self.process_invalid_records(_) nil end + ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_invalid_records(payload[:records]) + end + end end @@ -668,7 +673,7 @@ def should_consume?(record) record.record.some_int.even? end - def process_valid_records(valid) + def self.process_valid_records(valid) # Success Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) end @@ -678,6 +683,16 @@ def self.process_invalid_records(invalid) Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) end + ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_invalid_records(payload[:records]) + end + + ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_valid_records(payload[:records]) + end + end end @@ -718,7 +733,7 @@ def should_consume?(record) record.record.some_int.even? end - def process_valid_records(valid) + def self.process_valid_records(valid) # Success Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) end @@ -728,6 +743,16 @@ def self.process_invalid_records(invalid) Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any? end + ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_invalid_records(payload[:records]) + end + + ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_valid_records(payload[:records]) + end + end end @@ -764,10 +789,15 @@ def self.process_invalid_records(invalid) record_class Widget compacted false - def process_valid_records(_) + def self.process_valid_records(_) raise StandardError, 'Something went wrong' end + ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_valid_records(payload[:records]) + end + end end From 2cb9d4d5258bd9d946f78250147db15ae89fa0d1 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Mon, 4 Dec 2023 16:45:15 -0500 Subject: [PATCH 11/29] CCOL-2039: Spec update --- .../batch_consumption.rb | 14 -------- spec/active_record_batch_consumer_spec.rb | 32 ++++++++----------- 2 files changed, 14 insertions(+), 32 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 953736ac..e1361328 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -49,20 +49,6 @@ def consume_batch(payloads, metadata) end end - # Additional processing after records have been successfully upserted - # @param _valid_active_records [Array] Records to be post processed - # @return [void] - def self.process_valid_records(_valid_active_records) - nil - end - - # Additional processing after records have been unsuccessfully upserted - # @param _invalid_batch_records [Array] Invalid records to be processed - # @return [void] - def self.process_invalid_records(_invalid_batch_records) - nil - end - protected # Get the set of attribute names that uniquely identify messages in the diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index 5e1f4b77..293b26c5 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -675,12 +675,14 @@ def should_consume?(record) def self.process_valid_records(valid) # Success - Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) + attrs = valid.first.attributes + Widget.find_by(id: attrs['id'], test_id: attrs['test_id']).update!(some_int: 2000) end def self.process_invalid_records(invalid) # Invalid - Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) + attrs = invalid.first.record.attributes + Widget.find_by(id: attrs['id'], test_id: attrs['test_id']).update!(some_int: attrs['some_int']) end ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| @@ -709,14 +711,10 @@ def self.process_invalid_records(invalid) ] ) - widget_one, widget_two, widget_three, widget_four = Widget.all.to_a + widget_one, widget_two = Widget.all.to_a - expect(widget_one.some_int).to eq(1) - expect(widget_two.some_int).to eq(20) - expect(widget_three.some_int).to eq(2000) - expect(widget_three.test_id).to eq(widget_two.test_id) - expect(widget_four.some_int).to eq(11) - expect(widget_four.test_id).to eq(widget_one.test_id) + expect(widget_one.some_int).to eq(11) + expect(widget_two.some_int).to eq(2000) end end @@ -735,12 +733,14 @@ def should_consume?(record) def self.process_valid_records(valid) # Success - Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3)) + attrs = valid.first.attributes + Widget.find_by(id: attrs['id'], test_id: attrs['test_id']).update!(some_int: 2000) end def self.process_invalid_records(invalid) # Invalid - Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any? + attrs = invalid.first.record.attributes + Widget.find_by(id: attrs['id'], test_id: attrs['test_id']).update!(some_int: attrs['some_int']) end ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| @@ -769,14 +769,10 @@ def self.process_invalid_records(invalid) ] ) - widget_one, widget_two, widget_three, widget_four = Widget.all.to_a + widget_one, widget_two = Widget.all.to_a - expect(widget_one.some_int).to eq(1) - expect(widget_two.some_int).to eq(20) - expect(widget_three.some_int).to eq(2000) - expect(widget_three.test_id).to eq(widget_two.test_id) - expect(widget_four.some_int).to eq(11) - expect(widget_four.test_id).to eq(widget_one.test_id) + expect(widget_one.some_int).to eq(11) + expect(widget_two.some_int).to eq(2000) end end From 6a750c88d411d93c02d21a6340f9ea119d0f9a12 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Mon, 4 Dec 2023 17:26:06 -0500 Subject: [PATCH 12/29] CCOL-2039: Update association spec --- spec/active_record_batch_consumer_association_spec.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spec/active_record_batch_consumer_association_spec.rb b/spec/active_record_batch_consumer_association_spec.rb index b36b630c..e2795b09 100644 --- a/spec/active_record_batch_consumer_association_spec.rb +++ b/spec/active_record_batch_consumer_association_spec.rb @@ -96,7 +96,7 @@ def publish_batch(messages) key_config plain: true record_class Widget - def should_consume?(batch_record, _) + def should_consume?(batch_record) if self.should_consume_proc self.should_consume_proc.call(batch_record) else @@ -269,7 +269,7 @@ def columns(record_class) context 'with invalid models' do before(:each) do - consumer_class.should_consume_proc = proc { |val| val.some_int <= 10 } + consumer_class.should_consume_proc = proc { |val| val.record.some_int <= 10 } end it 'should only save valid models' do @@ -284,7 +284,7 @@ def columns(record_class) context 'with invalid associations' do before(:each) do - consumer_class.should_consume_proc = proc { |record, _| + consumer_class.should_consume_proc = proc { |record| record.record.some_int <= 10 && record.associations['detail']['title'] != 'invalid' } end From eae600eae63c95d2061302639e7b89726bf7c604 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Tue, 5 Dec 2023 09:37:21 -0500 Subject: [PATCH 13/29] CCOL-2039: Update CHANGELOG entry --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cc57488..b6c9f895 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Feature: Add global `replace_assocations` value for for all consumers - Feature: Add individual `replace_assocations` value for for individual consumers - ***BREAKING CHANGE***: `should_consume?` method uses record of type `BatchRecord` instead of `ActiveRecord` when determining records to consume -- Feature: `consume_lookup` to build lookup entity when determining which records to consume in `should_consume?` -- Feature: Return valid and invalid records saved during consumption for further processing in `post_process` method +- Feature: Reintroduce `filter_records` for bulk filtering of records prior to insertion +- Feature: Return valid and invalid records saved during consumption for further processing in `batch_consumption.valid_records` and `batch_consumption.invalid_records` ActiveSupport Notifications # 1.22.5 - 2023-07-18 - Fix: Fixed buffer overflow crash with DB producer. From 2b8aa2d2f0c6d77cf50cada790ebed3d533aa67a Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Tue, 5 Dec 2023 10:58:06 -0500 Subject: [PATCH 14/29] CCOL-2039: Move deadlocks to mass updater --- .../batch_consumption.rb | 34 +++++------ .../active_record_consume/mass_updater.rb | 13 ++++- spec/active_record_batch_consumer_spec.rb | 58 ------------------- .../mass_updater_spec.rb | 3 +- 4 files changed, 26 insertions(+), 82 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index e1361328..572f555d 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -28,24 +28,14 @@ def consume_batch(payloads, metadata) zip(metadata[:keys]). map { |p, k| Deimos::Message.new(p, nil, key: k) } - tags = %W(topic:#{metadata[:topic]}) - @valid_active_records = [] - - Deimos.instrument('ar_consumer.consume_batch', tags) do - # The entire batch should be treated as one transaction so that if - # any message fails, the whole thing is rolled back or retried - # if there is deadlock - Deimos::Utils::DeadlockRetry.wrap(tags) do - if @compacted || self.class.config[:no_keys] - update_database(compact_messages(messages)) - else - uncompacted_update(messages) - end + @tags = %W(topic:#{metadata[:topic]}) + + Deimos.instrument('ar_consumer.consume_batch', @tags) do + if @compacted || self.class.config[:no_keys] + update_database(compact_messages(messages)) + else + uncompacted_update(messages) end - ActiveSupport::Notifications.instrument('batch_consumption.valid_records', { - records: @valid_active_records, - consumer: self.class - }) end end @@ -161,7 +151,7 @@ def update_database(messages) def upsert_records(messages) record_list = build_records(messages) invalid = filter_records(record_list) - if invalid.any? + unless invalid.blank? ActiveSupport::Notifications.instrument('batch_consumption.invalid_records', { records: invalid, consumer: self.class @@ -174,15 +164,19 @@ def upsert_records(messages) col_proc = self.method(:columns).to_proc updater = MassUpdater.new(@klass, + @tags, key_col_proc: key_col_proc, col_proc: col_proc, replace_associations: self.class.replace_associations, bulk_import_id_generator: self.class.bulk_import_id_generator) - @valid_active_records.concat(updater.mass_update(record_list)) + ActiveSupport::Notifications.instrument('batch_consumption.valid_records', { + records: updater.mass_update(record_list), + consumer: self.class + }) end # @param record_list [BatchRecordList] - # @return [void] + # @return [Array] def filter_records(record_list) record_list.filter!(self.method(:should_consume?).to_proc) end diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index b9559280..3782c0ab 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -19,8 +19,10 @@ def default_cols(klass) # @param key_col_proc [Proc] # @param col_proc [Proc] # @param replace_associations [Boolean] - def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil) + def initialize(klass, tags, key_col_proc: nil, col_proc: nil, + replace_associations: true, bulk_import_id_generator: nil) @klass = klass + @tags = tags @replace_associations = replace_associations @bulk_import_id_generator = bulk_import_id_generator @@ -85,8 +87,13 @@ def import_associations(record_list) # @param record_list [BatchRecordList] # @return [Array] def mass_update(record_list) - save_records_to_database(record_list) - import_associations(record_list) if record_list.associations.any? + # The entire batch should be treated as one transaction so that if + # any message fails, the whole thing is rolled back or retried + # if there is deadlock + Deimos::Utils::DeadlockRetry.wrap(@tags) do + save_records_to_database(record_list) + import_associations(record_list) if record_list.associations.any? + end record_list.records end diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index 293b26c5..e19b32f6 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -255,64 +255,6 @@ def deleted_query(_records) end end - describe 'batch atomicity' do - it 'should roll back if there was an exception while deleting' do - Widget.create!(id: 1, test_id: 'abc', some_int: 2) - - travel 1.day - - expect(Widget.connection).to receive(:delete).and_raise('Some error') - - expect { - publish_batch( - [ - { key: 1, - payload: { test_id: 'def', some_int: 3 } }, - { key: 1, - payload: nil } - ] - ) - }.to raise_error('Some error') - - expect(all_widgets). - to match_array( - [ - have_attributes(id: 1, test_id: 'abc', some_int: 2, updated_at: start, created_at: start) - ] - ) - end - - it 'should roll back if there was an invalid instance while upserting' do - Widget.create!(id: 1, test_id: 'abc', some_int: 2) # Updated but rolled back - Widget.create!(id: 3, test_id: 'ghi', some_int: 3) # Removed but rolled back - - travel 1.day - - expect { - publish_batch( - [ - { key: 1, - payload: { test_id: 'def', some_int: 3 } }, - { key: 2, - payload: nil }, - { key: 2, - payload: { test_id: '', some_int: 4 } }, # Empty string is not valid for test_id - { key: 3, - payload: nil } - ] - ) - }.to raise_error(ActiveRecord::RecordInvalid) - - expect(all_widgets). - to match_array( - [ - have_attributes(id: 1, test_id: 'abc', some_int: 2, updated_at: start, created_at: start), - have_attributes(id: 3, test_id: 'ghi', some_int: 3, updated_at: start, created_at: start) - ] - ) - end - end - describe 'compound keys' do let(:consumer_class) do Class.new(described_class) do diff --git a/spec/active_record_consume/mass_updater_spec.rb b/spec/active_record_consume/mass_updater_spec.rb index 59728635..9371c1d8 100644 --- a/spec/active_record_consume/mass_updater_spec.rb +++ b/spec/active_record_consume/mass_updater_spec.rb @@ -69,7 +69,8 @@ it 'should mass update the batch' do allow(SecureRandom).to receive(:uuid).and_return('1', '2') - results = described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch) + results = described_class.new(Widget, %w(topic:my-topic), + bulk_import_id_generator: bulk_id_generator).mass_update(batch) expect(results.count).to eq(2) expect(results.map(&:test_id)).to match(%w(id1 id2)) expect(Widget.count).to eq(2) From ac27466ad620268dcde0a2f42505eade74fbc1e4 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Tue, 5 Dec 2023 11:37:46 -0500 Subject: [PATCH 15/29] CCOL-2039: Maintain backwards compatibility for should_consume? --- .github/workflows/ci.yml | 4 ++-- CHANGELOG.md | 2 +- .../active_record_consume/batch_consumption.rb | 5 +++-- .../active_record_consume/batch_record_list.rb | 9 +++++++-- ...tive_record_batch_consumer_association_spec.rb | 15 ++++++++++----- spec/active_record_batch_consumer_spec.rb | 6 +++--- 6 files changed, 26 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index df29bd52..b05e7f59 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,7 +17,7 @@ jobs: BUNDLE_WITHOUT: development:test steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Ruby 2.7 uses: ruby/setup-ruby@v1 @@ -39,7 +39,7 @@ jobs: ruby: [ '2.6', '2.7', '3.0', '3.1' ] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - uses: ruby/setup-ruby@v1 with: ruby-version: ${{ matrix.ruby }} diff --git a/CHANGELOG.md b/CHANGELOG.md index b6c9f895..a68598ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Feature: Add individual configuration for custom `bulk_import_id_generator` proc per consumer - Feature: Add global `replace_assocations` value for for all consumers - Feature: Add individual `replace_assocations` value for for individual consumers -- ***BREAKING CHANGE***: `should_consume?` method uses record of type `BatchRecord` instead of `ActiveRecord` when determining records to consume +- Feature: `should_consume?` method accepts BatchRecord associations - Feature: Reintroduce `filter_records` for bulk filtering of records prior to insertion - Feature: Return valid and invalid records saved during consumption for further processing in `batch_consumption.valid_records` and `batch_consumption.invalid_records` ActiveSupport Notifications diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 572f555d..4dfa4810 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -87,9 +87,10 @@ def deleted_query(records) @klass.unscoped.where(@klass.primary_key => keys) end - # @param _batch_record [BatchRecord] + # @param _record [ActiveRecord::Base] + # @param _associations [Hash] # @return [Boolean] - def should_consume?(_batch_record) + def should_consume?(_record, _associations=nil) true end diff --git a/lib/deimos/active_record_consume/batch_record_list.rb b/lib/deimos/active_record_consume/batch_record_list.rb index c0d0bde6..f8e85de9 100644 --- a/lib/deimos/active_record_consume/batch_record_list.rb +++ b/lib/deimos/active_record_consume/batch_record_list.rb @@ -21,8 +21,13 @@ def initialize(records) # @param method [Proc] # @return [Array] def filter!(method) - self.batch_records, invalid = self.batch_records.partition do |record| - method.call(record) + self.batch_records, invalid = self.batch_records.partition do |batch_record| + case method.parameters.size + when 2 + method.call(batch_record.record, batch_record.associations) + else + method.call(batch_record.record) + end end invalid end diff --git a/spec/active_record_batch_consumer_association_spec.rb b/spec/active_record_batch_consumer_association_spec.rb index e2795b09..99b08aa1 100644 --- a/spec/active_record_batch_consumer_association_spec.rb +++ b/spec/active_record_batch_consumer_association_spec.rb @@ -96,9 +96,14 @@ def publish_batch(messages) key_config plain: true record_class Widget - def should_consume?(batch_record) + def should_consume?(record, associations) if self.should_consume_proc - self.should_consume_proc.call(batch_record) + case self.should_consume_proc.parameters.size + when 2 + self.should_consume_proc.call(record, associations) + else + self.should_consume_proc.call(record) + end else true end @@ -269,7 +274,7 @@ def columns(record_class) context 'with invalid models' do before(:each) do - consumer_class.should_consume_proc = proc { |val| val.record.some_int <= 10 } + consumer_class.should_consume_proc = proc { |record| record.some_int <= 10 } end it 'should only save valid models' do @@ -284,8 +289,8 @@ def columns(record_class) context 'with invalid associations' do before(:each) do - consumer_class.should_consume_proc = proc { |record| - record.record.some_int <= 10 && record.associations['detail']['title'] != 'invalid' + consumer_class.should_consume_proc = proc { |record, associations| + record.some_int <= 10 && associations['detail']['title'] != 'invalid' } end diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index e19b32f6..13911f0f 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -556,7 +556,7 @@ def pre_process(messages) compacted false def should_consume?(record) - record.record.test_id != 'def' + record.test_id != 'def' end def self.process_invalid_records(_) @@ -612,7 +612,7 @@ def self.process_invalid_records(_) compacted false def should_consume?(record) - record.record.some_int.even? + record.some_int.even? end def self.process_valid_records(valid) @@ -670,7 +670,7 @@ def self.process_invalid_records(invalid) compacted true def should_consume?(record) - record.record.some_int.even? + record.some_int.even? end def self.process_valid_records(valid) From 1a5b7db350e627fea0720bbe5d8c63d2451c2349 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Tue, 5 Dec 2023 15:57:58 -0500 Subject: [PATCH 16/29] CCOL-2039: Set and get topic tag --- .../active_record_consume/batch_consumption.rb | 17 +++++++++-------- .../active_record_consume/mass_updater.rb | 5 ++--- lib/deimos/tracing/mock.rb | 6 ++++++ lib/deimos/tracing/provider.rb | 6 ++++++ spec/active_record_batch_consumer_spec.rb | 5 +++++ spec/active_record_consume/mass_updater_spec.rb | 4 ++-- 6 files changed, 30 insertions(+), 13 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 4dfa4810..5e5bcbf2 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -28,15 +28,16 @@ def consume_batch(payloads, metadata) zip(metadata[:keys]). map { |p, k| Deimos::Message.new(p, nil, key: k) } - @tags = %W(topic:#{metadata[:topic]}) + Deimos.config.tracer.active_span.set_tag('topic', metadata[:topic]) - Deimos.instrument('ar_consumer.consume_batch', @tags) do + Deimos.instrument('ar_consumer.consume_batch', + Deimos.config.tracer.active_span.get_tag('topic')) do if @compacted || self.class.config[:no_keys] update_database(compact_messages(messages)) else uncompacted_update(messages) end - end + end end protected @@ -152,12 +153,11 @@ def update_database(messages) def upsert_records(messages) record_list = build_records(messages) invalid = filter_records(record_list) - unless invalid.blank? + if invalid.any? ActiveSupport::Notifications.instrument('batch_consumption.invalid_records', { records: invalid, consumer: self.class }) - end return if record_list.empty? @@ -165,7 +165,6 @@ def upsert_records(messages) col_proc = self.method(:columns).to_proc updater = MassUpdater.new(@klass, - @tags, key_col_proc: key_col_proc, col_proc: col_proc, replace_associations: self.class.replace_associations, @@ -221,9 +220,11 @@ def build_records(messages) # deleted records. # @return [void] def remove_records(messages) - clause = deleted_query(messages) + Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do + clause = deleted_query(messages) - clause.delete_all + clause.delete_all + end end end end diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index 3782c0ab..854eaf7f 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -19,10 +19,9 @@ def default_cols(klass) # @param key_col_proc [Proc] # @param col_proc [Proc] # @param replace_associations [Boolean] - def initialize(klass, tags, key_col_proc: nil, col_proc: nil, + def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil) @klass = klass - @tags = tags @replace_associations = replace_associations @bulk_import_id_generator = bulk_import_id_generator @@ -90,7 +89,7 @@ def mass_update(record_list) # The entire batch should be treated as one transaction so that if # any message fails, the whole thing is rolled back or retried # if there is deadlock - Deimos::Utils::DeadlockRetry.wrap(@tags) do + Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do save_records_to_database(record_list) import_associations(record_list) if record_list.associations.any? end diff --git a/lib/deimos/tracing/mock.rb b/lib/deimos/tracing/mock.rb index df1a6593..5263d809 100644 --- a/lib/deimos/tracing/mock.rb +++ b/lib/deimos/tracing/mock.rb @@ -40,6 +40,12 @@ def set_tag(tag, value, span=nil) nil end + # Get a tag from a span with the specified tag. + # @param tag [String] + def get_tag(tag) + nil + end + # :nodoc: def set_error(span, exception) span[:exception] = exception diff --git a/lib/deimos/tracing/provider.rb b/lib/deimos/tracing/provider.rb index f46643d8..f1791a47 100644 --- a/lib/deimos/tracing/provider.rb +++ b/lib/deimos/tracing/provider.rb @@ -42,6 +42,12 @@ def set_tag(tag, value, span=nil) raise NotImplementedError end + # Get a tag from a span with the specified tag. + # @param tag [String] + def get_tag(tag) + raise NotImplementedError + end + end end end diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index 13911f0f..bc6646c7 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -37,6 +37,11 @@ class Widget < ActiveRecord::Base consumer_class.config[:bulk_import_id_column] = :bulk_import_id # default end + before(:each) do + allow(Deimos.config.tracer.active_span).to receive(:set_tag) + allow(Deimos.config.tracer.active_span).to receive(:get_tag).with('topic').and_return(%w(topic:mytopic)) + end + around(:each) do |ex| # Set and freeze example time travel_to start do diff --git a/spec/active_record_consume/mass_updater_spec.rb b/spec/active_record_consume/mass_updater_spec.rb index 9371c1d8..c9a19d16 100644 --- a/spec/active_record_consume/mass_updater_spec.rb +++ b/spec/active_record_consume/mass_updater_spec.rb @@ -45,6 +45,7 @@ stub_const('Widget', widget_class) stub_const('Detail', detail_class) Widget.reset_column_information + allow(Deimos.config.tracer.active_span).to receive(:get_tag).with('topic').and_return(%w(topic:mytopic)) end describe '#mass_update' do @@ -69,8 +70,7 @@ it 'should mass update the batch' do allow(SecureRandom).to receive(:uuid).and_return('1', '2') - results = described_class.new(Widget, %w(topic:my-topic), - bulk_import_id_generator: bulk_id_generator).mass_update(batch) + results = described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch) expect(results.count).to eq(2) expect(results.map(&:test_id)).to match(%w(id1 id2)) expect(Widget.count).to eq(2) From ea099a37e96dd3120e0bdbb38a7151053c77ce89 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Tue, 5 Dec 2023 16:54:24 -0500 Subject: [PATCH 17/29] CCOL-2039: Add mock span to mock tracer --- lib/deimos/test_helpers.rb | 1 + lib/deimos/tracing/mock.rb | 29 +++++++++++++++++-- spec/active_record_batch_consumer_spec.rb | 5 ---- .../mass_updater_spec.rb | 1 - 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/lib/deimos/test_helpers.rb b/lib/deimos/test_helpers.rb index c1afcd7e..4e41a8c4 100644 --- a/lib/deimos/test_helpers.rb +++ b/lib/deimos/test_helpers.rb @@ -28,6 +28,7 @@ def unit_test! deimos_config.kafka.seed_brokers ||= ['test_broker'] deimos_config.schema.backend = Deimos.schema_backend_class.mock_backend deimos_config.producers.backend = :test + deimos_config.tracer = Deimos::Tracing::Mock.new end end diff --git a/lib/deimos/tracing/mock.rb b/lib/deimos/tracing/mock.rb index 5263d809..a599d162 100644 --- a/lib/deimos/tracing/mock.rb +++ b/lib/deimos/tracing/mock.rb @@ -10,6 +10,7 @@ class Mock < Tracing::Provider def initialize(logger=nil) @logger = logger || Logger.new(STDOUT) @logger.info('MockTracingProvider initialized') + @active_span = MockSpan.new end # @param span_name [String] @@ -32,18 +33,22 @@ def finish(span) # :nodoc: def active_span - nil + @active_span ||= MockSpan.new end # :nodoc: def set_tag(tag, value, span=nil) - nil + if span + span.set_tag(tag, value) + else + @span.set_tag(tag, value) + end end # Get a tag from a span with the specified tag. # @param tag [String] def get_tag(tag) - nil + @span.get_tag(tag) end # :nodoc: @@ -53,5 +58,23 @@ def set_error(span, exception) @logger.info("Mock span '#{name}' set an error: #{exception}") end end + + # Mock Span class + class MockSpan + # :nodoc: + def initialize + @span = {} + end + + # :nodoc: + def set_tag(tag, value) + @span[tag] = value + end + + # :nodoc: + def get_tag(tag) + @span[tag] + end + end end end diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index bc6646c7..13911f0f 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -37,11 +37,6 @@ class Widget < ActiveRecord::Base consumer_class.config[:bulk_import_id_column] = :bulk_import_id # default end - before(:each) do - allow(Deimos.config.tracer.active_span).to receive(:set_tag) - allow(Deimos.config.tracer.active_span).to receive(:get_tag).with('topic').and_return(%w(topic:mytopic)) - end - around(:each) do |ex| # Set and freeze example time travel_to start do diff --git a/spec/active_record_consume/mass_updater_spec.rb b/spec/active_record_consume/mass_updater_spec.rb index c9a19d16..59728635 100644 --- a/spec/active_record_consume/mass_updater_spec.rb +++ b/spec/active_record_consume/mass_updater_spec.rb @@ -45,7 +45,6 @@ stub_const('Widget', widget_class) stub_const('Detail', detail_class) Widget.reset_column_information - allow(Deimos.config.tracer.active_span).to receive(:get_tag).with('topic').and_return(%w(topic:mytopic)) end describe '#mass_update' do From 1ea30696b4a7219f2713fdd2593d48f41e69a982 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Tue, 5 Dec 2023 17:09:36 -0500 Subject: [PATCH 18/29] CCOL-2039: Update mock set_tag --- lib/deimos/tracing/mock.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/deimos/tracing/mock.rb b/lib/deimos/tracing/mock.rb index a599d162..7be50666 100644 --- a/lib/deimos/tracing/mock.rb +++ b/lib/deimos/tracing/mock.rb @@ -41,7 +41,7 @@ def set_tag(tag, value, span=nil) if span span.set_tag(tag, value) else - @span.set_tag(tag, value) + active_span.set_tag(tag, value) end end From 1e5d3a57322b82313b75bf1f89acc6666a377089 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Wed, 6 Dec 2023 09:40:21 -0500 Subject: [PATCH 19/29] CCOL-2039: Add retries spec for mass update and deletes --- spec/active_record_batch_consumer_spec.rb | 42 +++++++++++++++++++ .../mass_updater_spec.rb | 35 ++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index 13911f0f..fba9b630 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -206,6 +206,48 @@ def publish_batch(messages) ] ) end + + it 'should handle deletes with deadlock retries' do + allow(Deimos::Utils::DeadlockRetry).to receive(:sleep) + allow(instance_double(ActiveRecord::Relation)).to receive(:delete_all).and_raise( + ActiveRecord::Deadlocked.new('Lock wait timeout exceeded') + ).twice.ordered + + Widget.create!(id: 1, test_id: 'abc', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: nil }, + { key: 1, + payload: nil } + ] + ) + + expect(all_widgets).to be_empty + end + + it 'should not delete after multiple deadlock retries' do + allow(Deimos::Utils::DeadlockRetry).to receive(:sleep) + allow(instance_double(ActiveRecord::Relation)).to receive(:delete_all).and_raise( + ActiveRecord::Deadlocked.new('Lock wait timeout exceeded') + ).exactly(3).times + + Widget.create!(id: 1, test_id: 'abc', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: nil }, + { key: 1, + payload: nil } + ] + ) + + expect(Widget.count).to eq(0) + + end + end end end diff --git a/spec/active_record_consume/mass_updater_spec.rb b/spec/active_record_consume/mass_updater_spec.rb index 59728635..7fcb961c 100644 --- a/spec/active_record_consume/mass_updater_spec.rb +++ b/spec/active_record_consume/mass_updater_spec.rb @@ -79,5 +79,40 @@ expect(Widget.last.detail).not_to be_nil end + context 'with deadlock retries' do + before(:each) do + allow(Deimos::Utils::DeadlockRetry).to receive(:sleep) + end + + it 'should upsert rows after deadlocks' do + allow(Widget).to receive(:import!).and_raise( + ActiveRecord::Deadlocked.new('Lock wait timeout exceeded') + ).twice.ordered + allow(Widget).to receive(:import!).and_raise( + ActiveRecord::Deadlocked.new('Lock wait timeout exceeded') + ).once.and_call_original + + results = described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch) + expect(results.count).to eq(2) + expect(results.map(&:test_id)).to match(%w(id1 id2)) + expect(Widget.count).to eq(2) + expect(Detail.count).to eq(2) + expect(Widget.first.detail).not_to be_nil + expect(Widget.last.detail).not_to be_nil + end + + it 'should not upsert after encountering multiple deadlocks' do + allow(Widget).to receive(:import!).and_raise( + ActiveRecord::Deadlocked.new('Lock wait timeout exceeded') + ).exactly(3).times + expect { + described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch) + }.to raise_error(ActiveRecord::Deadlocked) + expect(Widget.count).to eq(0) + expect(Detail.count).to eq(0) + end + + end + end end From 74364f624754dc8af6b8b6ca1c381fe8e59e2e3b Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Wed, 6 Dec 2023 10:58:30 -0500 Subject: [PATCH 20/29] CCOL-2039: Update datadog span tagging --- lib/deimos/active_record_consume/batch_consumption.rb | 8 ++++---- lib/deimos/tracing/datadog.rb | 5 +++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 5e5bcbf2..007e2a3a 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -28,16 +28,16 @@ def consume_batch(payloads, metadata) zip(metadata[:keys]). map { |p, k| Deimos::Message.new(p, nil, key: k) } - Deimos.config.tracer.active_span.set_tag('topic', metadata[:topic]) + tag = metadata[:topic] + Deimos.config.tracer.active_span.set_tag('topic', tag) - Deimos.instrument('ar_consumer.consume_batch', - Deimos.config.tracer.active_span.get_tag('topic')) do + Deimos.instrument('ar_consumer.consume_batch', tag) do if @compacted || self.class.config[:no_keys] update_database(compact_messages(messages)) else uncompacted_update(messages) end - end + end end protected diff --git a/lib/deimos/tracing/datadog.rb b/lib/deimos/tracing/datadog.rb index 932404d4..cc2503d1 100644 --- a/lib/deimos/tracing/datadog.rb +++ b/lib/deimos/tracing/datadog.rb @@ -45,6 +45,11 @@ def set_tag(tag, value, span=nil) (span || active_span).set_tag(tag, value) end + # :nodoc: + def get_tag(tag) + active_span.get_tag(tag) + end + end end end From 90b5cf7c4ef7621cd6f78c4603cbff1c48e0fff3 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Wed, 6 Dec 2023 11:14:37 -0500 Subject: [PATCH 21/29] CCOL-2039: Update README --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index be9628ad..f44c7c2a 100644 --- a/README.md +++ b/README.md @@ -189,6 +189,12 @@ produced by Phobos and RubyKafka): * exception_object * messages - the batch of messages (in the form of `Deimos::KafkaMessage`s) that failed - this should have only a single message in the batch. +* `batch_consumption.valid_records` - sent when the consumer has successfully upserted records. Limited by `max_db_batch_size`. + * consumer: class of the consumer that upserted these records + * records: Records upserted into the DB (of type `ActiveRecord::Base`) +* `batch_consumption.invalid_records` - sent when the consumer has rejected records returned from `filtered_records`. Limited by `max_db_batch_size`. + * consumer: class of the consumer that rejected these records + * records: Rejected records (of type `Deimos::ActiveRecordConsume::BatchRecord`) Similarly: ```ruby From ceff0558fd482f823dcfe55420a8f0f965291ff4 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Mon, 18 Dec 2023 14:15:50 -0500 Subject: [PATCH 22/29] CCOL-2039: Update CONFIGURATION docs --- docs/CONFIGURATION.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 6a3ed559..728d947d 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -100,6 +100,8 @@ offset_commit_threshold|0|Number of messages that can be processed before their offset_retention_time|nil|The time period that committed offsets will be retained, in seconds. Defaults to the broker setting. heartbeat_interval|10|Interval between heartbeats; must be less than the session window. backoff|`(1000..60_000)`|Range representing the minimum and maximum number of milliseconds to back off after a consumer error. +replace_associations|nil| Whether to delete existing associations for records during bulk consumption for this consumer. If no value is specified the provided/default value from the `consumers` configuration will be used. +bulk_import_id_generator|nil| Block to determine the `bulk_import_id` generated during bulk consumption. If no block is specified the provided/default block from the `consumers` configuration will be used. ## Defining Database Pollers @@ -172,6 +174,8 @@ consumers.backoff|`(1000..60_000)`|Range representing the minimum and maximum nu consumers.reraise_errors|false|Default behavior is to swallow uncaught exceptions and log to the metrics provider. Set this to true to instead raise all errors. Note that raising an error will ensure that the message cannot be processed - if there is a bad message which will always raise that error, your consumer will not be able to proceed past it and will be stuck forever until you fix your code. See also the `fatal_error` configuration. This is automatically set to true when using the `TestHelpers` module in RSpec. consumers.report_lag|false|Whether to send the `consumer_lag` metric. This requires an extra thread per consumer. consumers.fatal_error|`proc { false }`|Block taking an exception, payload and metadata and returning true if this should be considered a fatal error and false otherwise. E.g. you can use this to always fail if the database is available. Not needed if reraise_errors is set to true. +consumers.replace_associations|true|Whether to delete existing associations for records during bulk consumption prior to inserting new associated records +consumers.bulk_import_id_generator|`proc { SecureRandom.uuid }`| Block to determine the `bulk_import_id` generated during bulk consumption. Block will be used for all bulk consumers unless explicitly set for individual consumers ## Producer Configuration From b61a40359ae567ae08471c5759b90ebad81eb8f2 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Mon, 18 Dec 2023 17:45:27 -0500 Subject: [PATCH 23/29] CCOL-2039: Evaluate schemaclass schema and namespace when setting decoder --- lib/deimos/consumer.rb | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/deimos/consumer.rb b/lib/deimos/consumer.rb index f9932bc3..cc4d9228 100644 --- a/lib/deimos/consumer.rb +++ b/lib/deimos/consumer.rb @@ -19,8 +19,16 @@ class Consumer class << self # @return [Deimos::SchemaBackends::Base] def decoder - @decoder ||= Deimos.schema_backend(schema: config[:schema], - namespace: config[:namespace]) + # @decoder ||= Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace]) + + return @decoder if @decoder + + @decoder = if Utils::SchemaClass.use?(config.to_h) + schema_class = "Schemas::#{config[:schema]}".constantize.new + Deimos.schema_backend(schema: schema_class.schema, namespace: schema_class.namespace) + else + Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace]) + end end # @return [Deimos::SchemaBackends::Base] From 97bd093f3ca63c9abe7ef6f9a6143a21bcd570cf Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Tue, 19 Dec 2023 09:50:12 -0500 Subject: [PATCH 24/29] CCOL-2039: Update specs with Schema classes --- lib/deimos/consumer.rb | 3 + lib/deimos/tracing/datadog.rb | 9 +-- spec/active_record_batch_consumer_spec.rb | 29 ++++++++++ spec/active_record_consumer_spec.rb | 70 +++++++++++++++++++++++ spec/batch_consumer_spec.rb | 29 ++++++++++ spec/consumer_spec.rb | 29 ++++++++++ 6 files changed, 163 insertions(+), 6 deletions(-) diff --git a/lib/deimos/consumer.rb b/lib/deimos/consumer.rb index cc4d9228..2f5586e1 100644 --- a/lib/deimos/consumer.rb +++ b/lib/deimos/consumer.rb @@ -24,6 +24,9 @@ def decoder return @decoder if @decoder @decoder = if Utils::SchemaClass.use?(config.to_h) + # Initialize an instance of the provided schema + # in the event the schema class is an override, the inherited + # schema and namespace will be applied schema_class = "Schemas::#{config[:schema]}".constantize.new Deimos.schema_backend(schema: schema_class.schema, namespace: schema_class.namespace) else diff --git a/lib/deimos/tracing/datadog.rb b/lib/deimos/tracing/datadog.rb index cc2503d1..baf72b8a 100644 --- a/lib/deimos/tracing/datadog.rb +++ b/lib/deimos/tracing/datadog.rb @@ -11,15 +11,12 @@ def initialize(config) raise 'Tracing config must specify service_name' if config[:service_name].nil? @service = config[:service_name] + @tracer = ::Datadog.respond_to?(:tracer) ? ::Datadog.tracer : ::Datadog::Tracing end # :nodoc: def start(span_name, options={}) - span = if ::Datadog.respond_to?(:tracer) - ::Datadog.tracer.trace(span_name) - else - ::Datadog::Tracing.trace(span_name) - end + span = @tracer.trace(span_name) span.service = @service span.resource = options[:resource] span @@ -32,7 +29,7 @@ def finish(span) # :nodoc: def active_span - ::Datadog.tracer.active_span + @tracer.active_span end # :nodoc: diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index fba9b630..436de79a 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -35,6 +35,35 @@ class Widget < ActiveRecord::Base stub_const('MyBatchConsumer', consumer_class) stub_const('ConsumerTest::MyBatchConsumer', consumer_class) consumer_class.config[:bulk_import_id_column] = :bulk_import_id # default + schema_class = Class.new(Deimos::SchemaClass::Record) do + def schema + 'MySchema' + end + + def namespace + 'com.my-namespace' + end + + attr_accessor :test_id + attr_accessor :some_int + + def initialize(test_id: nil, + some_int: nil) + self.test_id = test_id + self.some_int = some_int + end + + def as_json + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'payload_key' => @payload_key&.as_json + } + end + end + end + stub_const('Schemas::MySchema', schema_class) end around(:each) do |ex| diff --git a/spec/active_record_consumer_spec.rb b/spec/active_record_consumer_spec.rb index 3d9f2a05..e2881d20 100644 --- a/spec/active_record_consumer_spec.rb +++ b/spec/active_record_consumer_spec.rb @@ -66,6 +66,76 @@ def fetch_record(klass, payload, _key) stub_const('MyCustomFetchConsumer', consumer_class) Time.zone = 'Eastern Time (US & Canada)' + + schema_class = Class.new(Deimos::SchemaClass::Record) do + def schema + 'MySchema' + end + + def namespace + 'com.my-namespace' + end + + attr_accessor :test_id + attr_accessor :some_int + + def initialize(test_id: nil, + some_int: nil) + self.test_id = test_id + self.some_int = some_int + end + + def as_json + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'payload_key' => @payload_key&.as_json + } + end + end + end + stub_const('Schemas::MySchema', schema_class) + + schema_datetime_class = Class.new(Deimos::SchemaClass::Record) do + def schema + 'MySchemaWithDateTimes' + end + + def namespace + 'com.my-namespace' + end + + attr_accessor :test_id + attr_accessor :some_int + attr_accessor :updated_at + attr_accessor :some_datetime_int + attr_accessor :timestamp + + def initialize(test_id: nil, + some_int: nil, + updated_at: nil, + some_datetime_int: nil, + timestamp: nil) + self.test_id = test_id + self.some_int = some_int + self.updated_at = updated_at + self.some_datetime_int = some_datetime_int + self.timestamp = timestamp + end + + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'updated_at' => @updated_at, + 'some_datetime_int' => @some_datetime_int, + 'timestamp' => @timestamp, + 'payload_key' => @payload_key&.as_json + } + end + end + stub_const('Schemas::MySchemaWithDateTimes', schema_datetime_class) end describe 'consume' do diff --git a/spec/batch_consumer_spec.rb b/spec/batch_consumer_spec.rb index 5fb4af84..f59f932f 100644 --- a/spec/batch_consumer_spec.rb +++ b/spec/batch_consumer_spec.rb @@ -16,6 +16,35 @@ def consume_batch(_payloads, _metadata) end end stub_const('ConsumerTest::MyBatchConsumer', consumer_class) + schema_class = Class.new(Deimos::SchemaClass::Record) do + def schema + 'MySchema' + end + + def namespace + 'com.my-namespace' + end + + attr_accessor :test_id + attr_accessor :some_int + + def initialize(test_id: nil, + some_int: nil) + self.test_id = test_id + self.some_int = some_int + end + + def as_json + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'payload_key' => @payload_key&.as_json + } + end + end + end + stub_const('Schemas::MySchema', schema_class) end let(:batch) do diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index 07f47306..4fd2d10c 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -21,6 +21,35 @@ def consume(_payload, _metadata) end end stub_const('ConsumerTest::MyConsumer', consumer_class) + schema_class = Class.new(Deimos::SchemaClass::Record) do + def schema + 'MySchema' + end + + def namespace + 'com.my-namespace' + end + + attr_accessor :test_id + attr_accessor :some_int + + def initialize(test_id: nil, + some_int: nil) + self.test_id = test_id + self.some_int = some_int + end + + def as_json + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'payload_key' => @payload_key&.as_json + } + end + end + end + stub_const('Schemas::MySchema', schema_class) end describe 'consume' do From 1ecee754e6351041f63ee57986c147646d11680c Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Tue, 19 Dec 2023 11:47:54 -0500 Subject: [PATCH 25/29] CCOL-2039: Update linting & spec cleanup --- lib/deimos/consumer.rb | 2 -- spec/active_record_batch_consumer_spec.rb | 14 ++++++-------- spec/active_record_consumer_spec.rb | 16 ++++++++-------- spec/batch_consumer_spec.rb | 14 ++++++-------- spec/consumer_spec.rb | 16 ++++++++-------- 5 files changed, 28 insertions(+), 34 deletions(-) diff --git a/lib/deimos/consumer.rb b/lib/deimos/consumer.rb index 2f5586e1..530f10ae 100644 --- a/lib/deimos/consumer.rb +++ b/lib/deimos/consumer.rb @@ -19,8 +19,6 @@ class Consumer class << self # @return [Deimos::SchemaBackends::Base] def decoder - # @decoder ||= Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace]) - return @decoder if @decoder @decoder = if Utils::SchemaClass.use?(config.to_h) diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index 436de79a..c8be86a5 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -53,14 +53,12 @@ def initialize(test_id: nil, self.some_int = some_int end - def as_json - def as_json(_opts={}) - { - 'test_id' => @test_id, - 'some_int' => @some_int, - 'payload_key' => @payload_key&.as_json - } - end + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'payload_key' => @payload_key&.as_json + } end end stub_const('Schemas::MySchema', schema_class) diff --git a/spec/active_record_consumer_spec.rb b/spec/active_record_consumer_spec.rb index e2881d20..b97a6f84 100644 --- a/spec/active_record_consumer_spec.rb +++ b/spec/active_record_consumer_spec.rb @@ -3,6 +3,7 @@ require 'date' # Wrapped in a module to prevent class leakage +# rubocop:disable Metrics/ModuleLength module ActiveRecordConsumerTest describe Deimos::ActiveRecordConsumer, 'Message Consumer' do before(:all) do @@ -85,14 +86,12 @@ def initialize(test_id: nil, self.some_int = some_int end - def as_json - def as_json(_opts={}) - { - 'test_id' => @test_id, - 'some_int' => @some_int, - 'payload_key' => @payload_key&.as_json - } - end + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'payload_key' => @payload_key&.as_json + } end end stub_const('Schemas::MySchema', schema_class) @@ -252,3 +251,4 @@ def as_json(_opts={}) end end end +# rubocop:enable Metrics/ModuleLength diff --git a/spec/batch_consumer_spec.rb b/spec/batch_consumer_spec.rb index f59f932f..cc6b785a 100644 --- a/spec/batch_consumer_spec.rb +++ b/spec/batch_consumer_spec.rb @@ -34,14 +34,12 @@ def initialize(test_id: nil, self.some_int = some_int end - def as_json - def as_json(_opts={}) - { - 'test_id' => @test_id, - 'some_int' => @some_int, - 'payload_key' => @payload_key&.as_json - } - end + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'payload_key' => @payload_key&.as_json + } end end stub_const('Schemas::MySchema', schema_class) diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index 4fd2d10c..530c46f8 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true # :nodoc: +# rubocop:disable Metrics/ModuleLength module ConsumerTest describe Deimos::Consumer, 'Message Consumer' do prepend_before(:each) do @@ -39,14 +40,12 @@ def initialize(test_id: nil, self.some_int = some_int end - def as_json - def as_json(_opts={}) - { - 'test_id' => @test_id, - 'some_int' => @some_int, - 'payload_key' => @payload_key&.as_json - } - end + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'payload_key' => @payload_key&.as_json + } end end stub_const('Schemas::MySchema', schema_class) @@ -247,3 +246,4 @@ def consume(_payload, _metadata) end end end +# rubocop:enable Metrics/ModuleLength From 2b4dfc2001c850f5eb1849f705a48b76134e6c90 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Wed, 3 Jan 2024 13:40:11 -0500 Subject: [PATCH 26/29] CCOL-2039: Add SchemaClasses context, move decoder logic to schema backend --- lib/deimos.rb | 15 ++++- lib/deimos/consumer.rb | 13 +--- lib/deimos/tracing/datadog.rb | 10 ++- lib/deimos/utils/schema_class.rb | 12 +++- spec/active_record_batch_consumer_spec.rb | 29 +-------- spec/batch_consumer_spec.rb | 28 +-------- spec/consumer_spec.rb | 74 ++++++++++++++--------- spec/spec_helper.rb | 66 ++++++++++++++++++++ 8 files changed, 149 insertions(+), 98 deletions(-) diff --git a/lib/deimos.rb b/lib/deimos.rb index 1ac0e24b..e278b622 100644 --- a/lib/deimos.rb +++ b/lib/deimos.rb @@ -57,7 +57,20 @@ def schema_backend_class # @param namespace [String] # @return [Deimos::SchemaBackends::Base] def schema_backend(schema:, namespace:) - schema_backend_class.new(schema: schema, namespace: namespace) + if Utils::SchemaClass.use?(config.to_h) + # Initialize an instance of the provided schema + # in the event the schema class is an override, the inherited + # schema and namespace will be applied + schema_class = Utils::SchemaClass.klass(schema, namespace) + if schema_class.nil? + schema_backend_class.new(schema: schema, namespace: namespace) + else + schema_instance = schema_class.new + schema_backend_class.new(schema: schema_instance.schema, namespace: schema_instance.namespace) + end + else + schema_backend_class.new(schema: schema, namespace: namespace) + end end # @param schema [String] diff --git a/lib/deimos/consumer.rb b/lib/deimos/consumer.rb index 530f10ae..f9932bc3 100644 --- a/lib/deimos/consumer.rb +++ b/lib/deimos/consumer.rb @@ -19,17 +19,8 @@ class Consumer class << self # @return [Deimos::SchemaBackends::Base] def decoder - return @decoder if @decoder - - @decoder = if Utils::SchemaClass.use?(config.to_h) - # Initialize an instance of the provided schema - # in the event the schema class is an override, the inherited - # schema and namespace will be applied - schema_class = "Schemas::#{config[:schema]}".constantize.new - Deimos.schema_backend(schema: schema_class.schema, namespace: schema_class.namespace) - else - Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace]) - end + @decoder ||= Deimos.schema_backend(schema: config[:schema], + namespace: config[:namespace]) end # @return [Deimos::SchemaBackends::Base] diff --git a/lib/deimos/tracing/datadog.rb b/lib/deimos/tracing/datadog.rb index baf72b8a..597a6104 100644 --- a/lib/deimos/tracing/datadog.rb +++ b/lib/deimos/tracing/datadog.rb @@ -11,12 +11,11 @@ def initialize(config) raise 'Tracing config must specify service_name' if config[:service_name].nil? @service = config[:service_name] - @tracer = ::Datadog.respond_to?(:tracer) ? ::Datadog.tracer : ::Datadog::Tracing end # :nodoc: def start(span_name, options={}) - span = @tracer.trace(span_name) + span = tracer.trace(span_name) span.service = @service span.resource = options[:resource] span @@ -27,9 +26,14 @@ def finish(span) span.finish end + # :nodoc: + def tracer + @tracer ||= ::Datadog.respond_to?(:tracer) ? ::Datadog.tracer : ::Datadog::Tracing + end + # :nodoc: def active_span - @tracer.active_span + tracer.active_span end # :nodoc: diff --git a/lib/deimos/utils/schema_class.rb b/lib/deimos/utils/schema_class.rb index 26c08909..220a507a 100644 --- a/lib/deimos/utils/schema_class.rb +++ b/lib/deimos/utils/schema_class.rb @@ -25,13 +25,21 @@ def modules_for(namespace) def instance(payload, schema, namespace='') return payload if payload.is_a?(Deimos::SchemaClass::Base) - constants = modules_for(namespace) + [schema.underscore.camelize.singularize] - klass = constants.join('::').safe_constantize + klass = klass(schema, namespace) return payload if klass.nil? || payload.nil? klass.new(**payload.symbolize_keys) end + # Determine and return the SchemaClass with the provided schema and namespace + # @param schema [String] + # @param namespace [String] + # @return [Deimos::SchemaClass] + def klass(schema, namespace) + constants = modules_for(namespace) + [schema.underscore.camelize.singularize] + constants.join('::').safe_constantize + end + # @param config [Hash] Producer or Consumer config # @return [Boolean] def use?(config) diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index c8be86a5..8f714a88 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -35,33 +35,6 @@ class Widget < ActiveRecord::Base stub_const('MyBatchConsumer', consumer_class) stub_const('ConsumerTest::MyBatchConsumer', consumer_class) consumer_class.config[:bulk_import_id_column] = :bulk_import_id # default - schema_class = Class.new(Deimos::SchemaClass::Record) do - def schema - 'MySchema' - end - - def namespace - 'com.my-namespace' - end - - attr_accessor :test_id - attr_accessor :some_int - - def initialize(test_id: nil, - some_int: nil) - self.test_id = test_id - self.some_int = some_int - end - - def as_json(_opts={}) - { - 'test_id' => @test_id, - 'some_int' => @some_int, - 'payload_key' => @payload_key&.as_json - } - end - end - stub_const('Schemas::MySchema', schema_class) end around(:each) do |ex| @@ -101,6 +74,8 @@ def publish_batch(messages) describe 'consume_batch' do SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do + include_context('with SchemaClasses') + before(:each) do Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } end diff --git a/spec/batch_consumer_spec.rb b/spec/batch_consumer_spec.rb index cc6b785a..063eca9e 100644 --- a/spec/batch_consumer_spec.rb +++ b/spec/batch_consumer_spec.rb @@ -16,33 +16,6 @@ def consume_batch(_payloads, _metadata) end end stub_const('ConsumerTest::MyBatchConsumer', consumer_class) - schema_class = Class.new(Deimos::SchemaClass::Record) do - def schema - 'MySchema' - end - - def namespace - 'com.my-namespace' - end - - attr_accessor :test_id - attr_accessor :some_int - - def initialize(test_id: nil, - some_int: nil) - self.test_id = test_id - self.some_int = some_int - end - - def as_json(_opts={}) - { - 'test_id' => @test_id, - 'some_int' => @some_int, - 'payload_key' => @payload_key&.as_json - } - end - end - stub_const('Schemas::MySchema', schema_class) end let(:batch) do @@ -59,6 +32,7 @@ def as_json(_opts={}) describe 'consume_batch' do SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do + include_context('with SchemaClasses') let(:schema_class_batch) do batch.map do |p| diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index 530c46f8..f7305b2c 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -22,38 +22,13 @@ def consume(_payload, _metadata) end end stub_const('ConsumerTest::MyConsumer', consumer_class) - schema_class = Class.new(Deimos::SchemaClass::Record) do - def schema - 'MySchema' - end - - def namespace - 'com.my-namespace' - end - - attr_accessor :test_id - attr_accessor :some_int - - def initialize(test_id: nil, - some_int: nil) - self.test_id = test_id - self.some_int = some_int - end - - def as_json(_opts={}) - { - 'test_id' => @test_id, - 'some_int' => @some_int, - 'payload_key' => @payload_key&.as_json - } - end - end - stub_const('Schemas::MySchema', schema_class) end describe 'consume' do SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do + include_context('with SchemaClasses') + before(:each) do Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } end @@ -155,6 +130,51 @@ def as_json(_opts={}) end end end + + context 'with overriden schema classes' do + include_context('with SchemaClasses') + + before(:each) do + Deimos.configure { |config| config.schema.use_schema_classes = true } + end + + prepend_before(:each) do + schema_class = Class.new(Schemas::MySchema) do + + attr_accessor :super_int + + def initialize(test_id: nil, + some_int: nil) + super + self.super_int = some_int.nil? ? 10 : some_int * 9000 + end + end + stub_const('Schemas::MyUpdatedSchema', schema_class) + + consumer_class = Class.new(described_class) do + schema 'MyUpdatedSchema' + namespace 'com.my-namespace' + key_config field: 'test_id' + + # :nodoc: + def consume(_payload, _metadata) + raise 'This should not be called unless call_original is set' + end + end + stub_const('ConsumerTest::MyConsumer', consumer_class) + end + + it 'should consume messages' do + test_consume_message('my_consume_topic', + { 'test_id' => 'foo', + 'some_int' => 1 }) do |payload, _metadata| + expect(payload['test_id']).to eq('foo') + expect(payload['some_int']).to eq(1) + expect(payload['super_int']).to eq(9000) + end + end + + end end describe 'decode_key' do diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 16beb134..0cdc1351 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -240,6 +240,72 @@ def generated_id end end +RSpec.shared_context('with SchemaClasses') do + # rubocop:disable RSpec/InstanceVariable + class Schemas::MySchema < Deimos::SchemaClass::Record + def schema + 'MySchema' + end + + def namespace + 'com.my-namespace' + end + + attr_accessor :test_id, :some_int + + def initialize(test_id: nil, some_int: nil) + super + self.test_id = test_id + self.some_int = some_int + end + + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'payload_key' => @payload_key&.as_json + } + end + end + + class Schemas::MySchemaWithDateTimes < Deimos::SchemaClass::Record + def schema + 'MySchemaWithDateTimes' + end + + def namespace + 'com.my-namespace' + end + + attr_accessor :test_id, :some_int, :updated_at, :some_datetime_int, :timestamp + + def initialize(test_id: nil, + some_int: nil, + updated_at: nil, + some_datetime_int: nil, + timestamp: nil) + super + self.test_id = test_id + self.some_int = some_int + self.updated_at = updated_at + self.some_datetime_int = some_datetime_int + self.timestamp = timestamp + end + + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'updated_at' => @updated_at, + 'some_datetime_int' => @some_datetime_int, + 'timestamp' => @timestamp, + 'payload_key' => @payload_key&.as_json + } + end + end + # rubocop:enable RSpec/InstanceVariable +end + RSpec.shared_context('with DB') do before(:all) do setup_db(self.class.metadata[:db_config] || DbConfigs::DB_OPTIONS.last) From 1c4e6b45c8d6a24030e1632c3cdd23ed43268004 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Wed, 3 Jan 2024 14:07:56 -0500 Subject: [PATCH 27/29] CCOL-2039: Use generate namespace folders in specs --- spec/active_record_batch_consumer_spec.rb | 6 ++- spec/active_record_consumer_spec.rb | 5 +- spec/active_record_producer_spec.rb | 5 +- spec/batch_consumer_spec.rb | 6 ++- spec/consumer_spec.rb | 24 +++------ spec/spec_helper.rb | 66 ----------------------- 6 files changed, 24 insertions(+), 88 deletions(-) diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index 8f714a88..3777e8a4 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -74,10 +74,12 @@ def publish_batch(messages) describe 'consume_batch' do SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do - include_context('with SchemaClasses') before(:each) do - Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } + Deimos.configure do |config| + config.schema.use_schema_classes = use_schema_classes + config.schema.generate_namespace_folders = true + end end it 'should handle an empty batch' do diff --git a/spec/active_record_consumer_spec.rb b/spec/active_record_consumer_spec.rb index b97a6f84..fae7807c 100644 --- a/spec/active_record_consumer_spec.rb +++ b/spec/active_record_consumer_spec.rb @@ -141,7 +141,10 @@ def as_json(_opts={}) SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do before(:each) do - Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } + Deimos.configure do |config| + config.schema.use_schema_classes = use_schema_classes + config.schema.generate_namespace_folders = true + end end it 'should receive events correctly' do diff --git a/spec/active_record_producer_spec.rb b/spec/active_record_producer_spec.rb index 07776009..ec576630 100644 --- a/spec/active_record_producer_spec.rb +++ b/spec/active_record_producer_spec.rb @@ -69,7 +69,10 @@ def self.post_process(batch) SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do before(:each) do - Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } + Deimos.configure do |config| + config.schema.use_schema_classes = use_schema_classes + config.schema.generate_namespace_folders = true + end end it 'should send events correctly' do diff --git a/spec/batch_consumer_spec.rb b/spec/batch_consumer_spec.rb index 063eca9e..1b4de1a8 100644 --- a/spec/batch_consumer_spec.rb +++ b/spec/batch_consumer_spec.rb @@ -32,7 +32,6 @@ def consume_batch(_payloads, _metadata) describe 'consume_batch' do SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do - include_context('with SchemaClasses') let(:schema_class_batch) do batch.map do |p| @@ -41,7 +40,10 @@ def consume_batch(_payloads, _metadata) end before(:each) do - Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } + Deimos.configure do |config| + config.schema.use_schema_classes = use_schema_classes + config.schema.generate_namespace_folders = true + end end it 'should provide backwards compatibility for BatchConsumer class' do diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index f7305b2c..d5cd3962 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -27,10 +27,12 @@ def consume(_payload, _metadata) describe 'consume' do SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do - include_context('with SchemaClasses') before(:each) do - Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } + Deimos.configure do |config| + config.schema.use_schema_classes = use_schema_classes + config.schema.generate_namespace_folders = true + end end it 'should consume a message' do @@ -132,25 +134,15 @@ def consume(_payload, _metadata) end context 'with overriden schema classes' do - include_context('with SchemaClasses') before(:each) do - Deimos.configure { |config| config.schema.use_schema_classes = true } + Deimos.configure do |config| + config.schema.use_schema_classes = true + config.schema.generate_namespace_folders = true + end end prepend_before(:each) do - schema_class = Class.new(Schemas::MySchema) do - - attr_accessor :super_int - - def initialize(test_id: nil, - some_int: nil) - super - self.super_int = some_int.nil? ? 10 : some_int * 9000 - end - end - stub_const('Schemas::MyUpdatedSchema', schema_class) - consumer_class = Class.new(described_class) do schema 'MyUpdatedSchema' namespace 'com.my-namespace' diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 0cdc1351..16beb134 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -240,72 +240,6 @@ def generated_id end end -RSpec.shared_context('with SchemaClasses') do - # rubocop:disable RSpec/InstanceVariable - class Schemas::MySchema < Deimos::SchemaClass::Record - def schema - 'MySchema' - end - - def namespace - 'com.my-namespace' - end - - attr_accessor :test_id, :some_int - - def initialize(test_id: nil, some_int: nil) - super - self.test_id = test_id - self.some_int = some_int - end - - def as_json(_opts={}) - { - 'test_id' => @test_id, - 'some_int' => @some_int, - 'payload_key' => @payload_key&.as_json - } - end - end - - class Schemas::MySchemaWithDateTimes < Deimos::SchemaClass::Record - def schema - 'MySchemaWithDateTimes' - end - - def namespace - 'com.my-namespace' - end - - attr_accessor :test_id, :some_int, :updated_at, :some_datetime_int, :timestamp - - def initialize(test_id: nil, - some_int: nil, - updated_at: nil, - some_datetime_int: nil, - timestamp: nil) - super - self.test_id = test_id - self.some_int = some_int - self.updated_at = updated_at - self.some_datetime_int = some_datetime_int - self.timestamp = timestamp - end - - def as_json(_opts={}) - { - 'test_id' => @test_id, - 'some_int' => @some_int, - 'updated_at' => @updated_at, - 'some_datetime_int' => @some_datetime_int, - 'timestamp' => @timestamp, - 'payload_key' => @payload_key&.as_json - } - end - end - # rubocop:enable RSpec/InstanceVariable -end - RSpec.shared_context('with DB') do before(:all) do setup_db(self.class.metadata[:db_config] || DbConfigs::DB_OPTIONS.last) From 4e4f774585e9334a3e980840d557fa3cdbfdb20d Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Wed, 3 Jan 2024 14:48:15 -0500 Subject: [PATCH 28/29] CCOL-2039: Add overriden schema --- spec/schemas/my_namespace/my_schema_updated.rb | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 spec/schemas/my_namespace/my_schema_updated.rb diff --git a/spec/schemas/my_namespace/my_schema_updated.rb b/spec/schemas/my_namespace/my_schema_updated.rb new file mode 100644 index 00000000..2ece1dd9 --- /dev/null +++ b/spec/schemas/my_namespace/my_schema_updated.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +# This file is autogenerated by Deimos, Do NOT modify +module Schemas; module MyNamespace + ### Primary Schema Class ### + # Autogenerated Schema for Record at com.my-namespace.MySchema + class MyUpdatedSchema < Schemas::MyNamespace::MySchema + + attr_accessor :super_int + + def initialize(test_id: nil, + some_int: nil) + super + self.super_int = some_int.nil? ? 10 : some_int * 9000 + end + end +end +end From e21bdedff021d29acd92888374d9650c1e07e641 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Wed, 3 Jan 2024 15:00:04 -0500 Subject: [PATCH 29/29] CCOL-2039: Rename updated schema class --- .../my_namespace/{my_schema_updated.rb => my_updated_schema.rb} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename spec/schemas/my_namespace/{my_schema_updated.rb => my_updated_schema.rb} (100%) diff --git a/spec/schemas/my_namespace/my_schema_updated.rb b/spec/schemas/my_namespace/my_updated_schema.rb similarity index 100% rename from spec/schemas/my_namespace/my_schema_updated.rb rename to spec/schemas/my_namespace/my_updated_schema.rb