-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CCOL-2440: Add consumer configuration to save associated keys prior to saving primary record #217
Changes from 3 commits
ce10ecd
d9fbe48
c1cbd9f
6df054e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,10 +20,13 @@ def default_cols(klass) | |
# @param col_proc [Proc<Class < ActiveRecord::Base>] | ||
# @param replace_associations [Boolean] | ||
def initialize(klass, key_col_proc: nil, col_proc: nil, | ||
replace_associations: true, bulk_import_id_generator: nil) | ||
replace_associations: true, bulk_import_id_generator: nil, save_associations_first: false, | ||
bulk_import_id_column: nil) | ||
@klass = klass | ||
@replace_associations = replace_associations | ||
@bulk_import_id_generator = bulk_import_id_generator | ||
@save_associations_first = save_associations_first | ||
@bulk_import_id_column = bulk_import_id_column&.to_s | ||
|
||
@key_cols = {} | ||
@key_col_proc = key_col_proc | ||
|
@@ -84,15 +87,60 @@ def import_associations(record_list) | |
end | ||
end | ||
|
||
# rubocop:disable Metrics/AbcSize | ||
# Upsert associated records prior to upserting primary records | ||
# @param record_list [BatchRecordList] | ||
def save_associations_first(record_list) | ||
associations = Hash.new([]) | ||
record_list.associations.each do |assoc| | ||
col = @bulk_import_id_column if assoc.klass.column_names.include?(@bulk_import_id_column) | ||
associations[[assoc, col]] = [] | ||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should be able to remove lines 95-98. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would still need these lines to determine the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah - didn't make the connection between this and line 100. -_- Maybe rename In that case there's no need to use |
||
record_list.batch_records.each do |primary_batch_record| | ||
associations.each_key do |assoc, col| | ||
batch_record = BatchRecord.new(klass: assoc.klass, | ||
attributes: primary_batch_record.associations[assoc.name], | ||
bulk_import_column: col, | ||
bulk_import_id_generator: @bulk_import_id_generator) | ||
# Associate this associated batch record's record with the primary record to | ||
# retrieve foreign_key after associated records have been saved and primary | ||
# keys have been filled | ||
primary_batch_record.record.assign_attributes({ assoc.name => batch_record.record }) | ||
associations[[assoc, col]] << batch_record | ||
end | ||
end | ||
associations.each_value do |records| | ||
assoc_record_list = BatchRecordList.new(records) | ||
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do | ||
save_records_to_database(assoc_record_list) | ||
end | ||
import_associations(assoc_record_list) | ||
end | ||
record_list.records.each do |record| | ||
associations.each_key do |assoc, _| | ||
record.assign_attributes({ assoc.foreign_key => record.send(assoc.name).id }) | ||
end | ||
end | ||
end | ||
# rubocop:enable Metrics/AbcSize | ||
|
||
# @param record_list [BatchRecordList] | ||
# @return [Array<ActiveRecord::Base>] | ||
def mass_update(record_list) | ||
# The entire batch should be treated as one transaction so that if | ||
# any message fails, the whole thing is rolled back or retried | ||
# if there is deadlock | ||
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do | ||
save_records_to_database(record_list) | ||
import_associations(record_list) if record_list.associations.any? | ||
|
||
if @save_associations_first | ||
save_associations_first(record_list) | ||
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do | ||
save_records_to_database(record_list) | ||
end | ||
else | ||
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do | ||
save_records_to_database(record_list) | ||
import_associations(record_list) if record_list.associations.any? | ||
end | ||
end | ||
record_list.records | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not disable this - if it's complaining, the function is too big. Let's break it up.