diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index dee466f8..2e724dea 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -165,7 +165,8 @@ def upsert_records(messages) updater = MassUpdater.new(@klass, key_col_proc: key_col_proc, col_proc: col_proc, - replace_associations: self.class.config[:replace_associations]) + replace_associations: self.class.replace_associations, + bulk_import_id_generator: self.class.bulk_import_id_generator) updater.mass_update(record_list) end diff --git a/lib/deimos/active_record_consume/batch_record.rb b/lib/deimos/active_record_consume/batch_record.rb index 19611a99..b8e909f0 100644 --- a/lib/deimos/active_record_consume/batch_record.rb +++ b/lib/deimos/active_record_consume/batch_record.rb @@ -17,16 +17,17 @@ class BatchRecord # @return [String] The column name to use for bulk IDs - defaults to `bulk_import_id`. attr_accessor :bulk_import_column - delegate :valid?, to: :record + delegate :valid?, :errors, :send, :attributes, to: :record # @param klass [Class < ActiveRecord::Base] # @param attributes [Hash] the full attribute list, including associations. # @param bulk_import_column [String] - def initialize(klass:, attributes:, bulk_import_column: nil) + # @param bulk_import_id_generator [Proc] + def initialize(klass:, attributes:, bulk_import_column: nil, bulk_import_id_generator: nil) @klass = klass if bulk_import_column self.bulk_import_column = bulk_import_column - self.bulk_import_id = SecureRandom.uuid + self.bulk_import_id = bulk_import_id_generator&.call attributes[bulk_import_column] = bulk_import_id end attributes = attributes.with_indifferent_access @@ -43,7 +44,7 @@ def validate_import_id! return if @klass.column_names.include?(self.bulk_import_column.to_s) raise "Create bulk_import_id on the #{@klass.table_name} table." \ - ' Run rails g deimos:bulk_import_id {table} to create the migration.' + ' Run rails g deimos:bulk_import_id {table} to create the migration.' end # @return [Class < ActiveRecord::Base] diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index 5694dbb7..b0961474 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -19,9 +19,10 @@ def default_cols(klass) # @param key_col_proc [Proc] # @param col_proc [Proc] # @param replace_associations [Boolean] - def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true) + def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil) @klass = klass @replace_associations = replace_associations + @bulk_import_id_generator = bulk_import_id_generator || proc { SecureRandom.uuid } @key_cols = {} @key_col_proc = key_col_proc @@ -69,7 +70,7 @@ def save_records_to_database(record_list) def import_associations(record_list) record_list.fill_primary_keys! - import_id = @replace_associations ? SecureRandom.uuid : nil + import_id = @replace_associations ? @bulk_import_id_generator.call : nil record_list.associations.each do |assoc| sub_records = record_list.map { |r| r.sub_records(assoc.name, import_id) }.flatten next unless sub_records.any? diff --git a/lib/deimos/active_record_consumer.rb b/lib/deimos/active_record_consumer.rb index 56911ab1..8fd7bc61 100644 --- a/lib/deimos/active_record_consumer.rb +++ b/lib/deimos/active_record_consumer.rb @@ -35,6 +35,16 @@ def bulk_import_id_column config[:bulk_import_id_column] end + # @return [Proc] + def bulk_import_id_generator + config[:bulk_import_id_generator] + end + + # @return [Boolean] + def replace_associations + config[:replace_associations] + end + # @param val [Boolean] Turn pre-compaction of the batch on or off. If true, # only the last message for each unique key in a batch is processed. # @return [void] diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index 7677d43c..16cd115d 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -79,6 +79,7 @@ def self.validate_consumers # @!visibility private # @param kafka_config [FigTree::ConfigStruct] + # rubocop:disable Metrics/PerceivedComplexity, Metrics/AbcSize def self.configure_producer_or_consumer(kafka_config) klass = kafka_config.class_name.constantize klass.class_eval do @@ -90,11 +91,18 @@ def self.configure_producer_or_consumer(kafka_config) if kafka_config.respond_to?(:bulk_import_id_column) # consumer klass.config.merge!( bulk_import_id_column: kafka_config.bulk_import_id_column, - replace_associations: kafka_config.replace_associations + replace_associations: if kafka_config.replace_associations.nil? + Deimos.config.consumers.replace_associations + else + kafka_config.replace_associations + end, + bulk_import_id_generator: kafka_config.bulk_import_id_generator || + Deimos.config.consumers.bulk_import_id_generator ) end end end + # rubocop:enable Metrics/PerceivedComplexity, Metrics/AbcSize define_settings do @@ -242,6 +250,15 @@ def self.configure_producer_or_consumer(kafka_config) # Not needed if reraise_errors is set to true. # @return [Block] setting(:fatal_error, proc { false }) + + # The default function to generate a bulk ID for bulk consumers + # @return [Block] + setting(:bulk_import_id_generator, proc { SecureRandom.uuid }) + + # If true, multi-table consumers will blow away associations rather than appending to them. + # Applies to all consumers unless specified otherwise + # @return [Boolean] + setting :replace_associations, true end setting :producers do @@ -445,7 +462,13 @@ def self.configure_producer_or_consumer(kafka_config) setting :bulk_import_id_column, :bulk_import_id # If true, multi-table consumers will blow away associations rather than appending to them. # @return [Boolean] - setting :replace_associations, true + setting :replace_associations, nil + + # The default function to generate a bulk ID for this consumer + # Uses the consumers proc defined in the consumers config by default unless + # specified for individual consumers + # @return [Block] + setting :bulk_import_id_generator, nil # These are the phobos "listener" configs. See CONFIGURATION.md for more # info. diff --git a/spec/active_record_consume/mass_updater_spec.rb b/spec/active_record_consume/mass_updater_spec.rb index a3e82428..5a030369 100644 --- a/spec/active_record_consume/mass_updater_spec.rb +++ b/spec/active_record_consume/mass_updater_spec.rb @@ -39,6 +39,8 @@ end end + let(:bulk_id_generator) { proc { SecureRandom.uuid } } + before(:each) do stub_const('Widget', widget_class) stub_const('Detail', detail_class) @@ -52,20 +54,24 @@ Deimos::ActiveRecordConsume::BatchRecord.new( klass: Widget, attributes: { test_id: 'id1', some_int: 5, detail: { title: 'Title 1' } }, - bulk_import_column: 'bulk_import_id' + bulk_import_column: 'bulk_import_id', + bulk_import_id_generator: bulk_id_generator ), Deimos::ActiveRecordConsume::BatchRecord.new( klass: Widget, attributes: { test_id: 'id2', some_int: 10, detail: { title: 'Title 2' } }, - bulk_import_column: 'bulk_import_id' + bulk_import_column: 'bulk_import_id', + bulk_import_id_generator: bulk_id_generator ) ] ) end it 'should mass update the batch' do + allow(SecureRandom).to receive(:uuid).and_return('1', '2') described_class.new(Widget).mass_update(batch) expect(Widget.count).to eq(2) + expect(Widget.all.to_a.map(&:bulk_import_id)).to match(%w(1 2)) expect(Detail.count).to eq(2) expect(Widget.first.detail).not_to be_nil expect(Widget.last.detail).not_to be_nil diff --git a/spec/config/configuration_spec.rb b/spec/config/configuration_spec.rb index ee7c04b9..1cfd7ac1 100644 --- a/spec/config/configuration_spec.rb +++ b/spec/config/configuration_spec.rb @@ -91,7 +91,8 @@ def consume heartbeat_interval: 10, handler: 'ConsumerTest::MyConsumer', use_schema_classes: nil, - max_db_batch_size: nil + max_db_batch_size: nil, + bulk_import_id_generator: nil }, { topic: 'my_batch_consume_topic', group_id: 'my_batch_group_id', @@ -109,7 +110,8 @@ def consume heartbeat_interval: 10, handler: 'ConsumerTest::MyBatchConsumer', use_schema_classes: nil, - max_db_batch_size: nil + max_db_batch_size: nil, + bulk_import_id_generator: nil } ], producer: { @@ -261,7 +263,8 @@ def consume heartbeat_interval: 13, handler: 'MyConfigConsumer', use_schema_classes: false, - max_db_batch_size: nil + max_db_batch_size: nil, + bulk_import_id_generator: nil } ], producer: { @@ -279,4 +282,40 @@ def consume } ) end + + it 'should override global configurations' do + described_class.configure do + consumers.bulk_import_id_generator(-> { 'global' }) + consumers.replace_associations true + + consumer do + class_name 'MyConfigConsumer' + schema 'blah' + topic 'blah' + group_id 'myconsumerid' + bulk_import_id_generator(-> { 'consumer' }) + replace_associations false + end + + consumer do + class_name 'MyConfigConsumer2' + schema 'blah' + topic 'blah' + group_id 'myconsumerid' + end + end + + consumers = described_class.config.consumers + expect(consumers.replace_associations).to eq(true) + expect(consumers.bulk_import_id_generator.call).to eq('global') + + custom = MyConfigConsumer.config + expect(custom[:replace_associations]).to eq(false) + expect(custom[:bulk_import_id_generator].call).to eq('consumer') + + default = MyConfigConsumer2.config + expect(default[:replace_associations]).to eq(true) + expect(default[:bulk_import_id_generator].call).to eq('global') + + end end