Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCOL-2440: Add consumer configuration to save associated keys prior to saving primary record #217

Merged
merged 4 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## UNRELEASED
- Feature: Enable `producers.persistent_connections` phobos setting
- Feature: Add consumer configuration, `save_associations_first` to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class

# 1.24.2 - 2024-05-01
- Fix: Deprecation notice with Rails 7.
Expand Down
1 change: 1 addition & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ heartbeat_interval|10|Interval between heartbeats; must be less than the session
backoff|`(1000..60_000)`|Range representing the minimum and maximum number of milliseconds to back off after a consumer error.
replace_associations|nil| Whether to delete existing associations for records during bulk consumption for this consumer. If no value is specified the provided/default value from the `consumers` configuration will be used.
bulk_import_id_generator|nil| Block to determine the `bulk_import_id` generated during bulk consumption. If no block is specified the provided/default block from the `consumers` configuration will be used.
save_associations_first|false|Whether to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class

## Defining Database Pollers

Expand Down
6 changes: 4 additions & 2 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def record_key(key)
def deleted_query(records)
keys = records.
map { |m| record_key(m.key)[@klass.primary_key] }.
reject(&:nil?)
compact

@klass.unscoped.where(@klass.primary_key => keys)
end
Expand Down Expand Up @@ -168,7 +168,9 @@ def upsert_records(messages)
key_col_proc: key_col_proc,
col_proc: col_proc,
replace_associations: self.class.replace_associations,
bulk_import_id_generator: self.class.bulk_import_id_generator)
bulk_import_id_generator: self.class.bulk_import_id_generator,
save_associations_first: self.class.save_associations_first,
bulk_import_id_column: self.class.bulk_import_id_column)
ActiveSupport::Notifications.instrument('batch_consumption.valid_records', {
records: updater.mass_update(record_list),
consumer: self.class
Expand Down
63 changes: 59 additions & 4 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ def default_cols(klass)
# @param col_proc [Proc<Class < ActiveRecord::Base>]
# @param replace_associations [Boolean]
def initialize(klass, key_col_proc: nil, col_proc: nil,
replace_associations: true, bulk_import_id_generator: nil)
replace_associations: true, bulk_import_id_generator: nil, save_associations_first: false,
bulk_import_id_column: nil)
@klass = klass
@replace_associations = replace_associations
@bulk_import_id_generator = bulk_import_id_generator
@save_associations_first = save_associations_first
@bulk_import_id_column = bulk_import_id_column&.to_s

@key_cols = {}
@key_col_proc = key_col_proc
Expand Down Expand Up @@ -84,15 +87,67 @@ def import_associations(record_list)
end
end

# Assign associated records to corresponding primary records
# @param record_list [BatchRecordList] RecordList of primary records for this consumer
# @return [Hash]
def assign_associations(record_list)
associations_info = {}
record_list.associations.each do |assoc|
col = @bulk_import_id_column if assoc.klass.column_names.include?(@bulk_import_id_column)
associations_info[[assoc, col]] = []
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to remove lines 95-98.

Copy link
Collaborator Author

@lionelpereira lionelpereira May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still need these lines to determine the bulk_import_column instead of checking for each batch record

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - didn't make the connection between this and line 100. -_- Maybe rename associations to something else - it's mirroring record.associations but isn't actually recording the same thing.

In that case there's no need to use Hash.new([]).

record_list.batch_records.each do |primary_batch_record|
associations_info.each_key do |assoc, col|
batch_record = BatchRecord.new(klass: assoc.klass,
attributes: primary_batch_record.associations[assoc.name],
bulk_import_column: col,
bulk_import_id_generator: @bulk_import_id_generator)
# Associate this associated batch record's record with the primary record to
# retrieve foreign_key after associated records have been saved and primary
# keys have been filled
primary_batch_record.record.assign_attributes({ assoc.name => batch_record.record })
associations_info[[assoc, col]] << batch_record
end
end
associations_info
end

# Save associated records and fill foreign keys on RecordList records
# @param record_list [BatchRecordList] RecordList of primary records for this consumer
# @param associations_info [Hash] Contains association info
def save_associations_first(record_list, associations_info)
associations_info.each_value do |records|
assoc_record_list = BatchRecordList.new(records)
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(assoc_record_list)
end
import_associations(assoc_record_list)
end
record_list.records.each do |record|
associations_info.each_key do |assoc, _|
record.assign_attributes({ assoc.foreign_key => record.send(assoc.name).id })
end
end
end

# @param record_list [BatchRecordList]
# @return [Array<ActiveRecord::Base>]
def mass_update(record_list)
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?

if @save_associations_first
associations_info = assign_associations(record_list)
save_associations_first(record_list, associations_info)
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(record_list)
end
else
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
end
end
record_list.records
end
Expand Down
5 changes: 5 additions & 0 deletions lib/deimos/active_record_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def replace_associations
config[:replace_associations]
end

# @return [Boolean]
def save_associations_first
config[:save_associations_first]
end

# @param val [Boolean] Turn pre-compaction of the batch on or off. If true,
# only the last message for each unique key in a batch is processed.
# @return [void]
Expand Down
8 changes: 7 additions & 1 deletion lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def self.configure_producer_or_consumer(kafka_config)
kafka_config.replace_associations
end,
bulk_import_id_generator: kafka_config.bulk_import_id_generator ||
Deimos.config.consumers.bulk_import_id_generator
Deimos.config.consumers.bulk_import_id_generator,
save_associations_first: kafka_config.save_associations_first
)
end
end
Expand Down Expand Up @@ -476,6 +477,11 @@ def self.configure_producer_or_consumer(kafka_config)
# @return [Block]
setting :bulk_import_id_generator, nil

# If enabled save associated records prior to saving the main record class
# This will also set foreign keys for associated records
# @return [Boolean]
setting :save_associations_first, false

# These are the phobos "listener" configs. See CONFIGURATION.md for more
# info.
setting :group_id
Expand Down
137 changes: 137 additions & 0 deletions spec/active_record_consume/mass_updater_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,142 @@

end

context 'with save_associations_first' do
before(:all) do
ActiveRecord::Base.connection.create_table(:fidgets, force: true) do |t|
t.string(:test_id)
t.integer(:some_int)
t.string(:bulk_import_id)
t.timestamps
end

ActiveRecord::Base.connection.create_table(:fidget_details, force: true) do |t|
t.string(:title)
t.string(:bulk_import_id)
t.belongs_to(:fidget)

t.index(%i(title), unique: true)
end

ActiveRecord::Base.connection.create_table(:widget_fidgets, force: true, id: false) do |t|
t.belongs_to(:fidget)
t.belongs_to(:widget)
t.string(:bulk_import_id)
t.string(:note)
t.index(%i(widget_id fidget_id), unique: true)
end
end

after(:all) do
ActiveRecord::Base.connection.drop_table(:fidgets)
ActiveRecord::Base.connection.drop_table(:fidget_details)
ActiveRecord::Base.connection.drop_table(:widget_fidgets)
end

let(:fidget_detail_class) do
Class.new(ActiveRecord::Base) do
self.table_name = 'fidget_details'
belongs_to :fidget
end
end

let(:fidget_class) do
Class.new(ActiveRecord::Base) do
self.table_name = 'fidgets'
has_one :fidget_detail
end
end

let(:widget_fidget_class) do
Class.new(ActiveRecord::Base) do
self.table_name = 'widget_fidgets'
belongs_to :fidget
belongs_to :widget
end
end

let(:bulk_id_generator) { proc { SecureRandom.uuid } }

let(:key_proc) do
lambda do |klass|
case klass.to_s
when 'Widget', 'Fidget'
%w(id)
when 'WidgetFidget'
%w(widget_id fidget_id)
when 'FidgetDetail', 'Detail'
%w(title)
else
raise "Key Columns for #{klass} not defined"
end

end
end

before(:each) do
stub_const('Fidget', fidget_class)
stub_const('FidgetDetail', fidget_detail_class)
stub_const('WidgetFidget', widget_fidget_class)
Widget.reset_column_information
Fidget.reset_column_information
WidgetFidget.reset_column_information
end

# rubocop:disable RSpec/MultipleExpectations, RSpec/ExampleLength
it 'should backfill the associations when upserting primary records' do
batch = Deimos::ActiveRecordConsume::BatchRecordList.new(
[
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: WidgetFidget,
attributes: {
widget: { test_id: 'id1', some_int: 10, detail: { title: 'Widget Title 1' } },
fidget: { test_id: 'id1', some_int: 10, fidget_detail: { title: 'Fidget Title 1' } },
note: 'Stuff 1'
},
bulk_import_column: 'bulk_import_id',
bulk_import_id_generator: bulk_id_generator
),
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: WidgetFidget,
attributes: {
widget: { test_id: 'id2', some_int: 20, detail: { title: 'Widget Title 2' } },
fidget: { test_id: 'id2', some_int: 20, fidget_detail: { title: 'Fidget Title 2' } },
note: 'Stuff 2'
},
bulk_import_column: 'bulk_import_id',
bulk_import_id_generator: bulk_id_generator
)
]
)

results = described_class.new(WidgetFidget,
bulk_import_id_generator: bulk_id_generator,
bulk_import_id_column: 'bulk_import_id',
key_col_proc: key_proc,
save_associations_first: true).mass_update(batch)
expect(results.count).to eq(2)
expect(Widget.count).to eq(2)
expect(Detail.count).to eq(2)
expect(Fidget.count).to eq(2)
expect(FidgetDetail.count).to eq(2)

WidgetFidget.all.each_with_index do |widget_fidget, ind|
widget = Widget.find_by(id: widget_fidget.widget_id)
expect(widget.test_id).to eq("id#{ind + 1}")
expect(widget.some_int).to eq((ind + 1) * 10)
detail = Detail.find_by(widget_id: widget_fidget.widget_id)
expect(detail.title).to eq("Widget Title #{ind + 1}")
fidget = Fidget.find_by(id: widget_fidget.fidget_id)
expect(fidget.test_id).to eq("id#{ind + 1}")
expect(fidget.some_int).to eq((ind + 1) * 10)
fidget_detail = FidgetDetail.find_by(fidget_id: widget_fidget.fidget_id)
expect(fidget_detail.title).to eq("Fidget Title #{ind + 1}")
expect(widget_fidget.note).to eq("Stuff #{ind + 1}")
end
end
# rubocop:enable RSpec/MultipleExpectations, RSpec/ExampleLength

end

end
end
12 changes: 9 additions & 3 deletions spec/config/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def consume
handler: 'ConsumerTest::MyConsumer',
use_schema_classes: nil,
max_db_batch_size: nil,
bulk_import_id_generator: nil
bulk_import_id_generator: nil,
save_associations_first: false
}, {
topic: 'my_batch_consume_topic',
group_id: 'my_batch_group_id',
Expand All @@ -111,7 +112,8 @@ def consume
handler: 'ConsumerTest::MyBatchConsumer',
use_schema_classes: nil,
max_db_batch_size: nil,
bulk_import_id_generator: nil
bulk_import_id_generator: nil,
save_associations_first: false
}
],
producer: {
Expand Down Expand Up @@ -265,7 +267,8 @@ def consume
handler: 'MyConfigConsumer',
use_schema_classes: false,
max_db_batch_size: nil,
bulk_import_id_generator: nil
bulk_import_id_generator: nil,
save_associations_first: false
}
],
producer: {
Expand Down Expand Up @@ -297,6 +300,7 @@ def consume
group_id 'myconsumerid'
bulk_import_id_generator(-> { 'consumer' })
replace_associations false
save_associations_first true
end

consumer do
Expand All @@ -314,10 +318,12 @@ def consume
custom = MyConfigConsumer.config
expect(custom[:replace_associations]).to eq(false)
expect(custom[:bulk_import_id_generator].call).to eq('consumer')
expect(custom[:save_associations_first]).to eq(true)

default = MyConfigConsumer2.config
expect(default[:replace_associations]).to eq(true)
expect(default[:bulk_import_id_generator].call).to eq('global')
expect(default[:save_associations_first]).to eq(false)

end
end
Loading