Skip to content

Commit

Permalink
CCOL-2039: All Global consumer configuration for replace_association …
Browse files Browse the repository at this point in the history
…& bulk_id_generation
  • Loading branch information
Lionel Pereira committed Nov 30, 2023
1 parent 92437aa commit fe22821
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 14 deletions.
3 changes: 2 additions & 1 deletion lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ def upsert_records(messages)
updater = MassUpdater.new(@klass,
key_col_proc: key_col_proc,
col_proc: col_proc,
replace_associations: self.class.config[:replace_associations])
replace_associations: self.class.replace_associations,
bulk_import_id_generator: self.class.bulk_import_id_generator)
updater.mass_update(record_list)
end

Expand Down
9 changes: 5 additions & 4 deletions lib/deimos/active_record_consume/batch_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ class BatchRecord
# @return [String] The column name to use for bulk IDs - defaults to `bulk_import_id`.
attr_accessor :bulk_import_column

delegate :valid?, to: :record
delegate :valid?, :errors, :send, :attributes, to: :record

# @param klass [Class < ActiveRecord::Base]
# @param attributes [Hash] the full attribute list, including associations.
# @param bulk_import_column [String]
def initialize(klass:, attributes:, bulk_import_column: nil)
# @param bulk_import_id_generator [Proc]
def initialize(klass:, attributes:, bulk_import_column: nil, bulk_import_id_generator: nil)
@klass = klass
if bulk_import_column
self.bulk_import_column = bulk_import_column
self.bulk_import_id = SecureRandom.uuid
self.bulk_import_id = bulk_import_id_generator&.call
attributes[bulk_import_column] = bulk_import_id
end
attributes = attributes.with_indifferent_access
Expand All @@ -43,7 +44,7 @@ def validate_import_id!
return if @klass.column_names.include?(self.bulk_import_column.to_s)

raise "Create bulk_import_id on the #{@klass.table_name} table." \
' Run rails g deimos:bulk_import_id {table} to create the migration.'
' Run rails g deimos:bulk_import_id {table} to create the migration.'
end

# @return [Class < ActiveRecord::Base]
Expand Down
5 changes: 3 additions & 2 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ def default_cols(klass)
# @param key_col_proc [Proc<Class < ActiveRecord::Base>]
# @param col_proc [Proc<Class < ActiveRecord::Base>]
# @param replace_associations [Boolean]
def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true)
def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil)
@klass = klass
@replace_associations = replace_associations
@bulk_import_id_generator = bulk_import_id_generator || proc { SecureRandom.uuid }

@key_cols = {}
@key_col_proc = key_col_proc
Expand Down Expand Up @@ -69,7 +70,7 @@ def save_records_to_database(record_list)
def import_associations(record_list)
record_list.fill_primary_keys!

import_id = @replace_associations ? SecureRandom.uuid : nil
import_id = @replace_associations ? @bulk_import_id_generator.call : nil
record_list.associations.each do |assoc|
sub_records = record_list.map { |r| r.sub_records(assoc.name, import_id) }.flatten
next unless sub_records.any?
Expand Down
10 changes: 10 additions & 0 deletions lib/deimos/active_record_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ def bulk_import_id_column
config[:bulk_import_id_column]
end

# @return [Proc]
def bulk_import_id_generator
config[:bulk_import_id_generator]
end

# @return [Boolean]
def replace_associations
config[:replace_associations]
end

# @param val [Boolean] Turn pre-compaction of the batch on or off. If true,
# only the last message for each unique key in a batch is processed.
# @return [void]
Expand Down
27 changes: 25 additions & 2 deletions lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def self.validate_consumers

# @!visibility private
# @param kafka_config [FigTree::ConfigStruct]
# rubocop:disable Metrics/PerceivedComplexity, Metrics/AbcSize
def self.configure_producer_or_consumer(kafka_config)
klass = kafka_config.class_name.constantize
klass.class_eval do
Expand All @@ -90,11 +91,18 @@ def self.configure_producer_or_consumer(kafka_config)
if kafka_config.respond_to?(:bulk_import_id_column) # consumer
klass.config.merge!(
bulk_import_id_column: kafka_config.bulk_import_id_column,
replace_associations: kafka_config.replace_associations
replace_associations: if kafka_config.replace_associations.nil?
Deimos.config.consumers.replace_associations
else
kafka_config.replace_associations
end,
bulk_import_id_generator: kafka_config.bulk_import_id_generator ||
Deimos.config.consumers.bulk_import_id_generator
)
end
end
end
# rubocop:enable Metrics/PerceivedComplexity, Metrics/AbcSize

define_settings do

Expand Down Expand Up @@ -242,6 +250,15 @@ def self.configure_producer_or_consumer(kafka_config)
# Not needed if reraise_errors is set to true.
# @return [Block]
setting(:fatal_error, proc { false })

# The default function to generate a bulk ID for bulk consumers
# @return [Block]
setting(:bulk_import_id_generator, proc { SecureRandom.uuid })

# If true, multi-table consumers will blow away associations rather than appending to them.
# Applies to all consumers unless specified otherwise
# @return [Boolean]
setting :replace_associations, true
end

setting :producers do
Expand Down Expand Up @@ -445,7 +462,13 @@ def self.configure_producer_or_consumer(kafka_config)
setting :bulk_import_id_column, :bulk_import_id
# If true, multi-table consumers will blow away associations rather than appending to them.
# @return [Boolean]
setting :replace_associations, true
setting :replace_associations, nil

# The default function to generate a bulk ID for this consumer
# Uses the consumers proc defined in the consumers config by default unless
# specified for individual consumers
# @return [Block]
setting :bulk_import_id_generator, nil

# These are the phobos "listener" configs. See CONFIGURATION.md for more
# info.
Expand Down
10 changes: 8 additions & 2 deletions spec/active_record_consume/mass_updater_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
end
end

let(:bulk_id_generator) { proc { SecureRandom.uuid } }

before(:each) do
stub_const('Widget', widget_class)
stub_const('Detail', detail_class)
Expand All @@ -52,20 +54,24 @@
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: Widget,
attributes: { test_id: 'id1', some_int: 5, detail: { title: 'Title 1' } },
bulk_import_column: 'bulk_import_id'
bulk_import_column: 'bulk_import_id',
bulk_import_id_generator: bulk_id_generator
),
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: Widget,
attributes: { test_id: 'id2', some_int: 10, detail: { title: 'Title 2' } },
bulk_import_column: 'bulk_import_id'
bulk_import_column: 'bulk_import_id',
bulk_import_id_generator: bulk_id_generator
)
]
)
end

it 'should mass update the batch' do
allow(SecureRandom).to receive(:uuid).and_return('1', '2')
described_class.new(Widget).mass_update(batch)
expect(Widget.count).to eq(2)
expect(Widget.all.to_a.map(&:bulk_import_id)).to match(%w(1 2))
expect(Detail.count).to eq(2)
expect(Widget.first.detail).not_to be_nil
expect(Widget.last.detail).not_to be_nil
Expand Down
45 changes: 42 additions & 3 deletions spec/config/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def consume
heartbeat_interval: 10,
handler: 'ConsumerTest::MyConsumer',
use_schema_classes: nil,
max_db_batch_size: nil
max_db_batch_size: nil,
bulk_import_id_generator: nil
}, {
topic: 'my_batch_consume_topic',
group_id: 'my_batch_group_id',
Expand All @@ -109,7 +110,8 @@ def consume
heartbeat_interval: 10,
handler: 'ConsumerTest::MyBatchConsumer',
use_schema_classes: nil,
max_db_batch_size: nil
max_db_batch_size: nil,
bulk_import_id_generator: nil
}
],
producer: {
Expand Down Expand Up @@ -261,7 +263,8 @@ def consume
heartbeat_interval: 13,
handler: 'MyConfigConsumer',
use_schema_classes: false,
max_db_batch_size: nil
max_db_batch_size: nil,
bulk_import_id_generator: nil
}
],
producer: {
Expand All @@ -279,4 +282,40 @@ def consume
}
)
end

it 'should override global configurations' do
described_class.configure do
consumers.bulk_import_id_generator(-> { 'global' })
consumers.replace_associations true

consumer do
class_name 'MyConfigConsumer'
schema 'blah'
topic 'blah'
group_id 'myconsumerid'
bulk_import_id_generator(-> { 'consumer' })
replace_associations false
end

consumer do
class_name 'MyConfigConsumer2'
schema 'blah'
topic 'blah'
group_id 'myconsumerid'
end
end

consumers = described_class.config.consumers
expect(consumers.replace_associations).to eq(true)
expect(consumers.bulk_import_id_generator.call).to eq('global')

custom = MyConfigConsumer.config
expect(custom[:replace_associations]).to eq(false)
expect(custom[:bulk_import_id_generator].call).to eq('consumer')

default = MyConfigConsumer2.config
expect(default[:replace_associations]).to eq(true)
expect(default[:bulk_import_id_generator].call).to eq('global')

end
end

0 comments on commit fe22821

Please sign in to comment.