Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCOL-2039: Global consumer configurations for replace_association & bulk_id_generation #205

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## UNRELEASED
- Fix: Fixed handler metric for status:received, status:success in batch consumption
- Feature: Allow pre processing of messages prior to bulk consumption
- Feature: Add global configuration for custom `bulk_import_id_generator` proc for all consumers
- Feature: Add individual configuration for custom `bulk_import_id_generator` proc per consumer
- Feature: Add global `replace_assocations` value for for all consumers
- Feature: Add individual `replace_assocations` value for for individual consumers

# 1.22.5 - 2023-07-18
- Fix: Fixed buffer overflow crash with DB producer.
Expand Down
6 changes: 4 additions & 2 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ def upsert_records(messages)
updater = MassUpdater.new(@klass,
key_col_proc: key_col_proc,
col_proc: col_proc,
replace_associations: self.class.config[:replace_associations])
replace_associations: self.class.replace_associations,
bulk_import_id_generator: self.class.bulk_import_id_generator)
updater.mass_update(record_list)
end

Expand Down Expand Up @@ -197,7 +198,8 @@ def build_records(messages)

BatchRecord.new(klass: @klass,
attributes: attrs,
bulk_import_column: col)
bulk_import_column: col,
bulk_import_id_generator: self.class.bulk_import_id_generator)
end
BatchRecordList.new(records.compact)
end
Expand Down
9 changes: 5 additions & 4 deletions lib/deimos/active_record_consume/batch_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ class BatchRecord
# @return [String] The column name to use for bulk IDs - defaults to `bulk_import_id`.
attr_accessor :bulk_import_column

delegate :valid?, to: :record
delegate :valid?, :errors, :send, :attributes, to: :record

# @param klass [Class < ActiveRecord::Base]
# @param attributes [Hash] the full attribute list, including associations.
# @param bulk_import_column [String]
def initialize(klass:, attributes:, bulk_import_column: nil)
# @param bulk_import_id_generator [Proc]
def initialize(klass:, attributes:, bulk_import_column: nil, bulk_import_id_generator: nil)
@klass = klass
if bulk_import_column
self.bulk_import_column = bulk_import_column
self.bulk_import_id = SecureRandom.uuid
self.bulk_import_id = bulk_import_id_generator&.call
attributes[bulk_import_column] = bulk_import_id
end
attributes = attributes.with_indifferent_access
Expand All @@ -43,7 +44,7 @@ def validate_import_id!
return if @klass.column_names.include?(self.bulk_import_column.to_s)

raise "Create bulk_import_id on the #{@klass.table_name} table." \
' Run rails g deimos:bulk_import_id {table} to create the migration.'
' Run rails g deimos:bulk_import_id {table} to create the migration.'
end

# @return [Class < ActiveRecord::Base]
Expand Down
5 changes: 3 additions & 2 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ def default_cols(klass)
# @param key_col_proc [Proc<Class < ActiveRecord::Base>]
# @param col_proc [Proc<Class < ActiveRecord::Base>]
# @param replace_associations [Boolean]
def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true)
def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil)
@klass = klass
@replace_associations = replace_associations
@bulk_import_id_generator = bulk_import_id_generator

@key_cols = {}
@key_col_proc = key_col_proc
Expand Down Expand Up @@ -69,7 +70,7 @@ def save_records_to_database(record_list)
def import_associations(record_list)
record_list.fill_primary_keys!

import_id = @replace_associations ? SecureRandom.uuid : nil
import_id = @replace_associations ? @bulk_import_id_generator&.call : nil
record_list.associations.each do |assoc|
sub_records = record_list.map { |r| r.sub_records(assoc.name, import_id) }.flatten
next unless sub_records.any?
Expand Down
10 changes: 10 additions & 0 deletions lib/deimos/active_record_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ def bulk_import_id_column
config[:bulk_import_id_column]
end

# @return [Proc]
def bulk_import_id_generator
config[:bulk_import_id_generator]
end

# @return [Boolean]
def replace_associations
config[:replace_associations]
end

# @param val [Boolean] Turn pre-compaction of the batch on or off. If true,
# only the last message for each unique key in a batch is processed.
# @return [void]
Expand Down
27 changes: 25 additions & 2 deletions lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def self.validate_consumers

# @!visibility private
# @param kafka_config [FigTree::ConfigStruct]
# rubocop:disable Metrics/PerceivedComplexity, Metrics/AbcSize
def self.configure_producer_or_consumer(kafka_config)
klass = kafka_config.class_name.constantize
klass.class_eval do
Expand All @@ -90,11 +91,18 @@ def self.configure_producer_or_consumer(kafka_config)
if kafka_config.respond_to?(:bulk_import_id_column) # consumer
klass.config.merge!(
bulk_import_id_column: kafka_config.bulk_import_id_column,
replace_associations: kafka_config.replace_associations
replace_associations: if kafka_config.replace_associations.nil?
Deimos.config.consumers.replace_associations
else
kafka_config.replace_associations
end,
bulk_import_id_generator: kafka_config.bulk_import_id_generator ||
Deimos.config.consumers.bulk_import_id_generator
)
end
end
end
# rubocop:enable Metrics/PerceivedComplexity, Metrics/AbcSize

define_settings do

Expand Down Expand Up @@ -242,6 +250,15 @@ def self.configure_producer_or_consumer(kafka_config)
# Not needed if reraise_errors is set to true.
# @return [Block]
setting(:fatal_error, proc { false })

# The default function to generate a bulk ID for bulk consumers
# @return [Block]
setting(:bulk_import_id_generator, proc { SecureRandom.uuid })

# If true, multi-table consumers will blow away associations rather than appending to them.
# Applies to all consumers unless specified otherwise
# @return [Boolean]
setting :replace_associations, true
end

setting :producers do
Expand Down Expand Up @@ -445,7 +462,13 @@ def self.configure_producer_or_consumer(kafka_config)
setting :bulk_import_id_column, :bulk_import_id
# If true, multi-table consumers will blow away associations rather than appending to them.
# @return [Boolean]
setting :replace_associations, true
setting :replace_associations, nil

# The default function to generate a bulk ID for this consumer
# Uses the consumers proc defined in the consumers config by default unless
# specified for individual consumers
# @return [Block]
setting :bulk_import_id_generator, nil

# These are the phobos "listener" configs. See CONFIGURATION.md for more
# info.
Expand Down
69 changes: 69 additions & 0 deletions spec/active_record_batch_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module ActiveRecordBatchConsumerTest
t.string(:part_two)
t.integer(:some_int)
t.boolean(:deleted, default: false)
t.string(:bulk_import_id)
t.timestamps

t.index(%i(part_one part_two), unique: true)
Expand Down Expand Up @@ -531,6 +532,74 @@ def pre_process(messages)
expect(widget_two.some_int).to eq(-20)
end
end
end

describe 'global configurations' do

context 'with a global bulk_import_id_generator' do

before(:each) do
Deimos.configure do
consumers.bulk_import_id_generator(proc { 'global' })
end
end

it 'should call the default bulk_import_id_generator proc' do
publish_batch(
[
{ key: 1,
payload: { test_id: 'abc', some_int: 3 } }
]
)

expect(all_widgets).
to match_array(
[
have_attributes(id: 1,
test_id: 'abc',
some_int: 3,
updated_at: start,
created_at: start,
bulk_import_id: 'global')
]
)

end

end

context 'with a class defined bulk_import_id_generator' do

before(:each) do
Deimos.configure do
consumers.bulk_import_id_generator(proc { 'global' })
end
consumer_class.config[:bulk_import_id_generator] = proc { 'custom' }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should still define the global one here - this should override the global.

end

it 'should call the default bulk_import_id_generator proc' do

publish_batch(
[
{ key: 1,
payload: { test_id: 'abc', some_int: 3 } }
]
)

expect(all_widgets).
to match_array(
[
have_attributes(id: 1,
test_id: 'abc',
some_int: 3,
updated_at: start,
created_at: start,
bulk_import_id: 'custom')
]
)

end
end

end

Expand Down
12 changes: 9 additions & 3 deletions spec/active_record_consume/mass_updater_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
end
end

let(:bulk_id_generator) { proc { SecureRandom.uuid } }

before(:each) do
stub_const('Widget', widget_class)
stub_const('Detail', detail_class)
Expand All @@ -52,20 +54,24 @@
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: Widget,
attributes: { test_id: 'id1', some_int: 5, detail: { title: 'Title 1' } },
bulk_import_column: 'bulk_import_id'
bulk_import_column: 'bulk_import_id',
bulk_import_id_generator: bulk_id_generator
),
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: Widget,
attributes: { test_id: 'id2', some_int: 10, detail: { title: 'Title 2' } },
bulk_import_column: 'bulk_import_id'
bulk_import_column: 'bulk_import_id',
bulk_import_id_generator: bulk_id_generator
)
]
)
end

it 'should mass update the batch' do
described_class.new(Widget).mass_update(batch)
allow(SecureRandom).to receive(:uuid).and_return('1', '2')
described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch)
expect(Widget.count).to eq(2)
expect(Widget.all.to_a.map(&:bulk_import_id)).to match(%w(1 2))
expect(Detail.count).to eq(2)
expect(Widget.first.detail).not_to be_nil
expect(Widget.last.detail).not_to be_nil
Expand Down
45 changes: 42 additions & 3 deletions spec/config/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def consume
heartbeat_interval: 10,
handler: 'ConsumerTest::MyConsumer',
use_schema_classes: nil,
max_db_batch_size: nil
max_db_batch_size: nil,
bulk_import_id_generator: nil
}, {
topic: 'my_batch_consume_topic',
group_id: 'my_batch_group_id',
Expand All @@ -109,7 +110,8 @@ def consume
heartbeat_interval: 10,
handler: 'ConsumerTest::MyBatchConsumer',
use_schema_classes: nil,
max_db_batch_size: nil
max_db_batch_size: nil,
bulk_import_id_generator: nil
}
],
producer: {
Expand Down Expand Up @@ -261,7 +263,8 @@ def consume
heartbeat_interval: 13,
handler: 'MyConfigConsumer',
use_schema_classes: false,
max_db_batch_size: nil
max_db_batch_size: nil,
bulk_import_id_generator: nil
}
],
producer: {
Expand All @@ -279,4 +282,40 @@ def consume
}
)
end

it 'should override global configurations' do
described_class.configure do
consumers.bulk_import_id_generator(-> { 'global' })
consumers.replace_associations true

consumer do
class_name 'MyConfigConsumer'
schema 'blah'
topic 'blah'
group_id 'myconsumerid'
bulk_import_id_generator(-> { 'consumer' })
replace_associations false
end

consumer do
class_name 'MyConfigConsumer2'
schema 'blah'
topic 'blah'
group_id 'myconsumerid'
end
end

consumers = described_class.config.consumers
expect(consumers.replace_associations).to eq(true)
expect(consumers.bulk_import_id_generator.call).to eq('global')

custom = MyConfigConsumer.config
expect(custom[:replace_associations]).to eq(false)
expect(custom[:bulk_import_id_generator].call).to eq('consumer')

default = MyConfigConsumer2.config
expect(default[:replace_associations]).to eq(true)
expect(default[:bulk_import_id_generator].call).to eq('global')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually test that the generator is called when expected (i.e. during the batch updates). Ideally you'd make this change in e.g. the mass updater spec.


end
end
Loading