diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c08c053..94b8f184 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## UNRELEASED - Fix: Fixed handler metric for status:received, status:success in batch consumption - Feature: Allow pre processing of messages prior to bulk consumption +- Feature: Add global configuration for custom `bulk_import_id_generator` proc for all consumers +- Feature: Add individual configuration for custom `bulk_import_id_generator` proc per consumer +- Feature: Add global `replace_assocations` value for for all consumers +- Feature: Add individual `replace_assocations` value for for individual consumers # 1.22.5 - 2023-07-18 - Fix: Fixed buffer overflow crash with DB producer. diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 060597f9..01c3273e 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -165,7 +165,8 @@ def upsert_records(messages) updater = MassUpdater.new(@klass, key_col_proc: key_col_proc, col_proc: col_proc, - replace_associations: self.class.config[:replace_associations]) + replace_associations: self.class.replace_associations, + bulk_import_id_generator: self.class.bulk_import_id_generator) updater.mass_update(record_list) end @@ -197,7 +198,8 @@ def build_records(messages) BatchRecord.new(klass: @klass, attributes: attrs, - bulk_import_column: col) + bulk_import_column: col, + bulk_import_id_generator: self.class.bulk_import_id_generator) end BatchRecordList.new(records.compact) end diff --git a/lib/deimos/active_record_consume/batch_record.rb b/lib/deimos/active_record_consume/batch_record.rb index 19611a99..b8e909f0 100644 --- a/lib/deimos/active_record_consume/batch_record.rb +++ b/lib/deimos/active_record_consume/batch_record.rb @@ -17,16 +17,17 @@ class BatchRecord # @return [String] The column name to use for bulk IDs - defaults to `bulk_import_id`. attr_accessor :bulk_import_column - delegate :valid?, to: :record + delegate :valid?, :errors, :send, :attributes, to: :record # @param klass [Class < ActiveRecord::Base] # @param attributes [Hash] the full attribute list, including associations. # @param bulk_import_column [String] - def initialize(klass:, attributes:, bulk_import_column: nil) + # @param bulk_import_id_generator [Proc] + def initialize(klass:, attributes:, bulk_import_column: nil, bulk_import_id_generator: nil) @klass = klass if bulk_import_column self.bulk_import_column = bulk_import_column - self.bulk_import_id = SecureRandom.uuid + self.bulk_import_id = bulk_import_id_generator&.call attributes[bulk_import_column] = bulk_import_id end attributes = attributes.with_indifferent_access @@ -43,7 +44,7 @@ def validate_import_id! return if @klass.column_names.include?(self.bulk_import_column.to_s) raise "Create bulk_import_id on the #{@klass.table_name} table." \ - ' Run rails g deimos:bulk_import_id {table} to create the migration.' + ' Run rails g deimos:bulk_import_id {table} to create the migration.' end # @return [Class < ActiveRecord::Base] diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index 5694dbb7..267fc002 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -19,9 +19,10 @@ def default_cols(klass) # @param key_col_proc [Proc] # @param col_proc [Proc] # @param replace_associations [Boolean] - def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true) + def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil) @klass = klass @replace_associations = replace_associations + @bulk_import_id_generator = bulk_import_id_generator @key_cols = {} @key_col_proc = key_col_proc @@ -69,7 +70,7 @@ def save_records_to_database(record_list) def import_associations(record_list) record_list.fill_primary_keys! - import_id = @replace_associations ? SecureRandom.uuid : nil + import_id = @replace_associations ? @bulk_import_id_generator&.call : nil record_list.associations.each do |assoc| sub_records = record_list.map { |r| r.sub_records(assoc.name, import_id) }.flatten next unless sub_records.any? diff --git a/lib/deimos/active_record_consumer.rb b/lib/deimos/active_record_consumer.rb index 56911ab1..8fd7bc61 100644 --- a/lib/deimos/active_record_consumer.rb +++ b/lib/deimos/active_record_consumer.rb @@ -35,6 +35,16 @@ def bulk_import_id_column config[:bulk_import_id_column] end + # @return [Proc] + def bulk_import_id_generator + config[:bulk_import_id_generator] + end + + # @return [Boolean] + def replace_associations + config[:replace_associations] + end + # @param val [Boolean] Turn pre-compaction of the batch on or off. If true, # only the last message for each unique key in a batch is processed. # @return [void] diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index 7677d43c..16cd115d 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -79,6 +79,7 @@ def self.validate_consumers # @!visibility private # @param kafka_config [FigTree::ConfigStruct] + # rubocop:disable Metrics/PerceivedComplexity, Metrics/AbcSize def self.configure_producer_or_consumer(kafka_config) klass = kafka_config.class_name.constantize klass.class_eval do @@ -90,11 +91,18 @@ def self.configure_producer_or_consumer(kafka_config) if kafka_config.respond_to?(:bulk_import_id_column) # consumer klass.config.merge!( bulk_import_id_column: kafka_config.bulk_import_id_column, - replace_associations: kafka_config.replace_associations + replace_associations: if kafka_config.replace_associations.nil? + Deimos.config.consumers.replace_associations + else + kafka_config.replace_associations + end, + bulk_import_id_generator: kafka_config.bulk_import_id_generator || + Deimos.config.consumers.bulk_import_id_generator ) end end end + # rubocop:enable Metrics/PerceivedComplexity, Metrics/AbcSize define_settings do @@ -242,6 +250,15 @@ def self.configure_producer_or_consumer(kafka_config) # Not needed if reraise_errors is set to true. # @return [Block] setting(:fatal_error, proc { false }) + + # The default function to generate a bulk ID for bulk consumers + # @return [Block] + setting(:bulk_import_id_generator, proc { SecureRandom.uuid }) + + # If true, multi-table consumers will blow away associations rather than appending to them. + # Applies to all consumers unless specified otherwise + # @return [Boolean] + setting :replace_associations, true end setting :producers do @@ -445,7 +462,13 @@ def self.configure_producer_or_consumer(kafka_config) setting :bulk_import_id_column, :bulk_import_id # If true, multi-table consumers will blow away associations rather than appending to them. # @return [Boolean] - setting :replace_associations, true + setting :replace_associations, nil + + # The default function to generate a bulk ID for this consumer + # Uses the consumers proc defined in the consumers config by default unless + # specified for individual consumers + # @return [Block] + setting :bulk_import_id_generator, nil # These are the phobos "listener" configs. See CONFIGURATION.md for more # info. diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index ad281306..54e7d4db 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -11,6 +11,7 @@ module ActiveRecordBatchConsumerTest t.string(:part_two) t.integer(:some_int) t.boolean(:deleted, default: false) + t.string(:bulk_import_id) t.timestamps t.index(%i(part_one part_two), unique: true) @@ -531,6 +532,74 @@ def pre_process(messages) expect(widget_two.some_int).to eq(-20) end end + end + + describe 'global configurations' do + + context 'with a global bulk_import_id_generator' do + + before(:each) do + Deimos.configure do + consumers.bulk_import_id_generator(proc { 'global' }) + end + end + + it 'should call the default bulk_import_id_generator proc' do + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 3 } } + ] + ) + + expect(all_widgets). + to match_array( + [ + have_attributes(id: 1, + test_id: 'abc', + some_int: 3, + updated_at: start, + created_at: start, + bulk_import_id: 'global') + ] + ) + + end + + end + + context 'with a class defined bulk_import_id_generator' do + + before(:each) do + Deimos.configure do + consumers.bulk_import_id_generator(proc { 'global' }) + end + consumer_class.config[:bulk_import_id_generator] = proc { 'custom' } + end + + it 'should call the default bulk_import_id_generator proc' do + + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 3 } } + ] + ) + + expect(all_widgets). + to match_array( + [ + have_attributes(id: 1, + test_id: 'abc', + some_int: 3, + updated_at: start, + created_at: start, + bulk_import_id: 'custom') + ] + ) + + end + end end diff --git a/spec/active_record_consume/mass_updater_spec.rb b/spec/active_record_consume/mass_updater_spec.rb index a3e82428..edae353c 100644 --- a/spec/active_record_consume/mass_updater_spec.rb +++ b/spec/active_record_consume/mass_updater_spec.rb @@ -39,6 +39,8 @@ end end + let(:bulk_id_generator) { proc { SecureRandom.uuid } } + before(:each) do stub_const('Widget', widget_class) stub_const('Detail', detail_class) @@ -52,20 +54,24 @@ Deimos::ActiveRecordConsume::BatchRecord.new( klass: Widget, attributes: { test_id: 'id1', some_int: 5, detail: { title: 'Title 1' } }, - bulk_import_column: 'bulk_import_id' + bulk_import_column: 'bulk_import_id', + bulk_import_id_generator: bulk_id_generator ), Deimos::ActiveRecordConsume::BatchRecord.new( klass: Widget, attributes: { test_id: 'id2', some_int: 10, detail: { title: 'Title 2' } }, - bulk_import_column: 'bulk_import_id' + bulk_import_column: 'bulk_import_id', + bulk_import_id_generator: bulk_id_generator ) ] ) end it 'should mass update the batch' do - described_class.new(Widget).mass_update(batch) + allow(SecureRandom).to receive(:uuid).and_return('1', '2') + described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch) expect(Widget.count).to eq(2) + expect(Widget.all.to_a.map(&:bulk_import_id)).to match(%w(1 2)) expect(Detail.count).to eq(2) expect(Widget.first.detail).not_to be_nil expect(Widget.last.detail).not_to be_nil diff --git a/spec/config/configuration_spec.rb b/spec/config/configuration_spec.rb index ee7c04b9..1cfd7ac1 100644 --- a/spec/config/configuration_spec.rb +++ b/spec/config/configuration_spec.rb @@ -91,7 +91,8 @@ def consume heartbeat_interval: 10, handler: 'ConsumerTest::MyConsumer', use_schema_classes: nil, - max_db_batch_size: nil + max_db_batch_size: nil, + bulk_import_id_generator: nil }, { topic: 'my_batch_consume_topic', group_id: 'my_batch_group_id', @@ -109,7 +110,8 @@ def consume heartbeat_interval: 10, handler: 'ConsumerTest::MyBatchConsumer', use_schema_classes: nil, - max_db_batch_size: nil + max_db_batch_size: nil, + bulk_import_id_generator: nil } ], producer: { @@ -261,7 +263,8 @@ def consume heartbeat_interval: 13, handler: 'MyConfigConsumer', use_schema_classes: false, - max_db_batch_size: nil + max_db_batch_size: nil, + bulk_import_id_generator: nil } ], producer: { @@ -279,4 +282,40 @@ def consume } ) end + + it 'should override global configurations' do + described_class.configure do + consumers.bulk_import_id_generator(-> { 'global' }) + consumers.replace_associations true + + consumer do + class_name 'MyConfigConsumer' + schema 'blah' + topic 'blah' + group_id 'myconsumerid' + bulk_import_id_generator(-> { 'consumer' }) + replace_associations false + end + + consumer do + class_name 'MyConfigConsumer2' + schema 'blah' + topic 'blah' + group_id 'myconsumerid' + end + end + + consumers = described_class.config.consumers + expect(consumers.replace_associations).to eq(true) + expect(consumers.bulk_import_id_generator.call).to eq('global') + + custom = MyConfigConsumer.config + expect(custom[:replace_associations]).to eq(false) + expect(custom[:bulk_import_id_generator].call).to eq('consumer') + + default = MyConfigConsumer2.config + expect(default[:replace_associations]).to eq(true) + expect(default[:bulk_import_id_generator].call).to eq('global') + + end end