Skip to content

Commit

Permalink
CCOL-2039: Return successful and failed records for further processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Lionel Pereira committed Nov 27, 2023
1 parent ac0c4fd commit 0c68990
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 19 deletions.
62 changes: 44 additions & 18 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@ def consume_batch(payloads, metadata)
tags = %W(topic:#{metadata[:topic]})

Deimos.instrument('ar_consumer.consume_batch', tags) do
valid_upserts = []
invalid_upserts = []
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(tags) do
if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
valid_upserts, invalid_upserts = if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
end
post_process(valid_upserts, invalid_upserts)
end
end

Expand Down Expand Up @@ -132,30 +135,39 @@ def update_database(messages)
removed, upserted = messages.partition(&:tombstone?)

max_db_batch_size = self.class.config[:max_db_batch_size]
valid_upserts = []
invalid_upserts = []
if upserted.any?
valid_upserts, invalid_upserts = if max_db_batch_size
upserted.each_slice(max_db_batch_size) do |group|
valid, invalid = upsert_records(group)
valid_upserts.push(valid)
invalid_upserts.push(invalid)
end
valid_upserts.compact!
invalid_upserts.compact!
else
upsert_records(upserted)
end
end

unless removed.empty?
if max_db_batch_size
upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) }
removed.each_slice(max_db_batch_size) { |group| remove_records(group) }
else
upsert_records(upserted)
remove_records(removed)
end
end

return if removed.empty?

if max_db_batch_size
removed.each_slice(max_db_batch_size) { |group| remove_records(group) }
else
remove_records(removed)
end
[valid_upserts, invalid_upserts]
end

# Upsert any non-deleted records
# @param messages [Array<Message>] List of messages for a group of
# records to either be updated or inserted.
# @return [void]
# @return [Array<Array<ActiveRecord::Base>, Array<BatchRecord>]
def upsert_records(messages)
record_list = build_records(messages)
record_list.filter!(self.method(:should_consume?).to_proc)
invalid = filter_records(record_list)

return if record_list.empty?

Expand All @@ -167,7 +179,13 @@ def upsert_records(messages)
col_proc: col_proc,
replace_associations: self.class.replace_associations,
batch_id_generator: self.class.bulk_import_id_generator)
updater.mass_update(record_list)
[updater.mass_update(record_list), invalid]
end

# @param record_list [BatchRecordList]
# @return [Array<BatchRecord>]
def filter_records(record_list)
record_list.reject!(self.method(:should_consume?).to_proc)
end

# Returns a lookup entity to be used during record attributes
Expand Down Expand Up @@ -216,6 +234,14 @@ def remove_records(messages)

clause.delete_all
end

# Additional processing after records have been successfully upserted
# @param _valid_records [Array<ActiveRecord>] Records to be post processed
# @param _invalid_records [Array<BatchRecord>] Invalid records to be processed
# @return [void]
def post_process(_valid_records, _invalid_records)
nil
end
end
end
end
2 changes: 1 addition & 1 deletion lib/deimos/active_record_consume/batch_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class BatchRecord
# @return [String] The column name to use for bulk IDs - defaults to `bulk_import_id`.
attr_accessor :bulk_import_column

delegate :valid?, to: :record
delegate :valid?, :errors, :send, :attributes, to: :record

# @param klass [Class < ActiveRecord::Base]
# @param attributes [Hash] the full attribute list, including associations.
Expand Down
18 changes: 18 additions & 0 deletions lib/deimos/active_record_consume/batch_record_list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ def filter!(method)
self.batch_records.delete_if { |record| !method.call(record.record) }
end

# @param method [Proc]
# @param block [Block]
# @return [Array<BatchRecord>]
def reject!(method=nil, &block)
if method.nil?
self.batch_records.reject!(&block)
else
case method.parameters.size
when 2
self.batch_records.select! do |record|
method.call(record.record, record.associations)
end
else
self.batch_records.select! { |record| method.call(record.record) }
end
end
end

# Get the original ActiveRecord objects.
# @return [Array<ActiveRecord::Base>]
def records
Expand Down
2 changes: 2 additions & 0 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ def import_associations(record_list)
end

# @param record_list [BatchRecordList]
# @return [Array<ActiveRecord::Base>]
def mass_update(record_list)
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
record_list.records
end

end
Expand Down

0 comments on commit 0c68990

Please sign in to comment.