Skip to content

Commit

Permalink
CR fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
lionelpereira committed May 8, 2024
1 parent 5f23ab3 commit 195cfdd
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 38 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## UNRELEASED
- Feature: Add consumer configuration, `backfill_associations` to import associated records of primary class prior to upserting primary records
- Feature: Add consumer configuration, `save_associations_first` to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class

# 1.24.2 - 2024-05-01
- Fix: Deprecation notice with Rails 7.
Expand Down
1 change: 1 addition & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ heartbeat_interval|10|Interval between heartbeats; must be less than the session
backoff|`(1000..60_000)`|Range representing the minimum and maximum number of milliseconds to back off after a consumer error.
replace_associations|nil| Whether to delete existing associations for records during bulk consumption for this consumer. If no value is specified the provided/default value from the `consumers` configuration will be used.
bulk_import_id_generator|nil| Block to determine the `bulk_import_id` generated during bulk consumption. If no block is specified the provided/default block from the `consumers` configuration will be used.
save_associations_first|false|Whether to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class

## Defining Database Pollers

Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def upsert_records(messages)
col_proc: col_proc,
replace_associations: self.class.replace_associations,
bulk_import_id_generator: self.class.bulk_import_id_generator,
backfill_associations: self.class.backfill_associations,
save_associations_first: self.class.save_associations_first,
bulk_import_id_column: self.class.bulk_import_id_column)
ActiveSupport::Notifications.instrument('batch_consumption.valid_records', {
records: updater.mass_update(record_list),
Expand Down
41 changes: 24 additions & 17 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ def default_cols(klass)
# @param col_proc [Proc<Class < ActiveRecord::Base>]
# @param replace_associations [Boolean]
def initialize(klass, key_col_proc: nil, col_proc: nil,
replace_associations: true, bulk_import_id_generator: nil, backfill_associations: false,
replace_associations: true, bulk_import_id_generator: nil, save_associations_first: false,
bulk_import_id_column: nil)
@klass = klass
@replace_associations = replace_associations
@bulk_import_id_generator = bulk_import_id_generator
@backfill_associations = backfill_associations
@save_associations_first = save_associations_first
@bulk_import_id_column = bulk_import_id_column&.to_s

@key_cols = {}
Expand Down Expand Up @@ -87,50 +87,57 @@ def import_associations(record_list)
end
end

# rubocop:disable Metrics/AbcSize
# Upsert associated records prior to upserting primary records
# @param record_list [BatchRecordList]
def backfill_associations(record_list)
associations = {}
def save_associations_first(record_list)
associations = Hash.new([])
record_list.associations.each do |assoc|
col = @bulk_import_id_column if assoc.klass.column_names.include?(@bulk_import_id_column)
associations[[assoc.name, assoc.klass, col, assoc.foreign_key]] = []
associations[[assoc, col]] = []
end
record_list.batch_records.each do |primary_batch_record|
associations.each_key do |assoc, klass, col, foreign_key|
batch_record = BatchRecord.new(klass: klass,
attributes: primary_batch_record.associations[assoc],
associations.each_key do |assoc, col|
batch_record = BatchRecord.new(klass: assoc.klass,
attributes: primary_batch_record.associations[assoc.name],
bulk_import_column: col,
bulk_import_id_generator: @bulk_import_id_generator)
# Associate this associated batch record's record with the primary record to
# retrieve foreign_key after associated records have been saved and primary
# keys have been filled
primary_batch_record.record.send(:"#{assoc}=", batch_record.record)
associations[[assoc, klass, col, foreign_key]] << batch_record
primary_batch_record.record.assign_attributes({ assoc.name => batch_record.record })
associations[[assoc, col]] << batch_record
end
end
associations.each_value do |records|
assoc_record_list = BatchRecordList.new(records)
save_records_to_database(assoc_record_list)
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(assoc_record_list)
end
import_associations(assoc_record_list)
end
record_list.records.each do |record|
associations.each_key do |assoc, _, _, foreign_key|
record.send(:"#{foreign_key}=", record.send(assoc).id)
associations.each_key do |assoc, _|
record.assign_attributes({ assoc.foreign_key => record.send(assoc.name).id })
end
end
end
# rubocop:enable Metrics/AbcSize

# @param record_list [BatchRecordList]
# @return [Array<ActiveRecord::Base>]
def mass_update(record_list)
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
if @backfill_associations
backfill_associations(record_list)

if @save_associations_first
save_associations_first(record_list)
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(record_list)
else
end
else
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
end
Expand Down
5 changes: 3 additions & 2 deletions lib/deimos/active_record_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ def replace_associations
config[:replace_associations]
end

def backfill_associations
config[:backfill_associations]
# @return [Boolean]
def save_associations_first
config[:save_associations_first]
end

# @param val [Boolean] Turn pre-compaction of the batch on or off. If true,
Expand Down
7 changes: 5 additions & 2 deletions lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def self.configure_producer_or_consumer(kafka_config)
end,
bulk_import_id_generator: kafka_config.bulk_import_id_generator ||
Deimos.config.consumers.bulk_import_id_generator,
backfill_associations: kafka_config.backfill_associations
save_associations_first: kafka_config.save_associations_first
)
end
end
Expand Down Expand Up @@ -477,7 +477,10 @@ def self.configure_producer_or_consumer(kafka_config)
# @return [Block]
setting :bulk_import_id_generator, nil

setting :backfill_associations, false
# If enabled save associated records prior to saving the main record class
# This will also set foreign keys for associated records
# @return [Boolean]
setting :save_associations_first, false

# These are the phobos "listener" configs. See CONFIGURATION.md for more
# info.
Expand Down
18 changes: 9 additions & 9 deletions spec/active_record_consume/mass_updater_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@

end

context 'with backfill_associations' do
context 'with save_associations_first' do
before(:all) do
ActiveRecord::Base.connection.create_table(:fidgets, force: true) do |t|
t.string(:test_id)
Expand Down Expand Up @@ -175,7 +175,7 @@
case klass.to_s
when 'Widget', 'Fidget'
%w(id)
when 'WidgetFidgets'
when 'WidgetFidget'
%w(widget_id fidget_id)
when 'FidgetDetail', 'Detail'
%w(title)
Expand All @@ -189,18 +189,18 @@
before(:each) do
stub_const('Fidget', fidget_class)
stub_const('FidgetDetail', fidget_detail_class)
stub_const('WidgetFidgets', widget_fidget_class)
stub_const('WidgetFidget', widget_fidget_class)
Widget.reset_column_information
Fidget.reset_column_information
WidgetFidgets.reset_column_information
WidgetFidget.reset_column_information
end

# rubocop:disable RSpec/MultipleExpectations, RSpec/ExampleLength
it 'should backfill the associations when upserting primary records' do
batch = Deimos::ActiveRecordConsume::BatchRecordList.new(
[
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: WidgetFidgets,
klass: WidgetFidget,
attributes: {
widget: { test_id: 'id1', some_int: 10, detail: { title: 'Widget Title 1' } },
fidget: { test_id: 'id1', some_int: 10, fidget_detail: { title: 'Fidget Title 1' } },
Expand All @@ -210,7 +210,7 @@
bulk_import_id_generator: bulk_id_generator
),
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: WidgetFidgets,
klass: WidgetFidget,
attributes: {
widget: { test_id: 'id2', some_int: 20, detail: { title: 'Widget Title 2' } },
fidget: { test_id: 'id2', some_int: 20, fidget_detail: { title: 'Fidget Title 2' } },
Expand All @@ -222,18 +222,18 @@
]
)

results = described_class.new(WidgetFidgets,
results = described_class.new(WidgetFidget,
bulk_import_id_generator: bulk_id_generator,
bulk_import_id_column: 'bulk_import_id',
key_col_proc: key_proc,
backfill_associations: true).mass_update(batch)
save_associations_first: true).mass_update(batch)
expect(results.count).to eq(2)
expect(Widget.count).to eq(2)
expect(Detail.count).to eq(2)
expect(Fidget.count).to eq(2)
expect(FidgetDetail.count).to eq(2)

WidgetFidgets.all.each_with_index do |widget_fidget, ind|
WidgetFidget.all.each_with_index do |widget_fidget, ind|
widget = Widget.find_by(id: widget_fidget.widget_id)
expect(widget.test_id).to eq("id#{ind + 1}")
expect(widget.some_int).to eq((ind + 1) * 10)
Expand Down
12 changes: 6 additions & 6 deletions spec/config/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def consume
use_schema_classes: nil,
max_db_batch_size: nil,
bulk_import_id_generator: nil,
backfill_associations: false
save_associations_first: false
}, {
topic: 'my_batch_consume_topic',
group_id: 'my_batch_group_id',
Expand All @@ -113,7 +113,7 @@ def consume
use_schema_classes: nil,
max_db_batch_size: nil,
bulk_import_id_generator: nil,
backfill_associations: false
save_associations_first: false
}
],
producer: {
Expand Down Expand Up @@ -267,7 +267,7 @@ def consume
use_schema_classes: false,
max_db_batch_size: nil,
bulk_import_id_generator: nil,
backfill_associations: false
save_associations_first: false
}
],
producer: {
Expand Down Expand Up @@ -298,7 +298,7 @@ def consume
group_id 'myconsumerid'
bulk_import_id_generator(-> { 'consumer' })
replace_associations false
backfill_associations true
save_associations_first true
end

consumer do
Expand All @@ -316,12 +316,12 @@ def consume
custom = MyConfigConsumer.config
expect(custom[:replace_associations]).to eq(false)
expect(custom[:bulk_import_id_generator].call).to eq('consumer')
expect(custom[:backfill_associations]).to eq(true)
expect(custom[:save_associations_first]).to eq(true)

default = MyConfigConsumer2.config
expect(default[:replace_associations]).to eq(true)
expect(default[:bulk_import_id_generator].call).to eq('global')
expect(default[:backfill_associations]).to eq(false)
expect(default[:save_associations_first]).to eq(false)

end
end

0 comments on commit 195cfdd

Please sign in to comment.