Skip to content

Commit

Permalink
CCOL-2039: Process valid and invalid records via notification
Browse files Browse the repository at this point in the history
  • Loading branch information
Lionel Pereira committed Dec 4, 2023
1 parent d6181fd commit 2d9e8dd
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 16 deletions.
19 changes: 11 additions & 8 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,20 @@ def consume_batch(payloads, metadata)
uncompacted_update(messages)
end
end
process_valid_records(@valid_active_records)
ActiveSupport::Notifications.instrument('batch_consumption.valid_records', {
records: @valid_active_records,
consumer: self.class
})
end
end

# Additional processing after records have been successfully upserted
# @param _valid_active_records [Array<ActiveRecord>] Records to be post processed
# @return [void]
def self.process_valid_records(_valid_active_records)
nil
end

# Additional processing after records have been unsuccessfully upserted
# @param _invalid_batch_records [Array<BatchRecord>] Invalid records to be processed
# @return [void]
Expand Down Expand Up @@ -225,13 +235,6 @@ def build_records(messages)
BatchRecordList.new(records.compact)
end

# Additional processing after records have been successfully upserted
# @param _valid_active_records [Array<ActiveRecord>] Records to be post processed
# @return [void]
def process_valid_records(_valid_active_records)
nil
end

# Delete any records with a tombstone.
# @param messages [Array<Message>] List of messages for a group of
# deleted records.
Expand Down
5 changes: 0 additions & 5 deletions lib/deimos/instrumentation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,4 @@ def self.send_produce_error(event)
event = ActiveSupport::Notifications::Event.new(*args)
KafkaListener.send_produce_error(event)
end

ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args|
payload = ActiveSupport::Notifications::Event.new(*args).payload
payload[:consumer].process_invalid_records(payload[:records])
end
end
36 changes: 33 additions & 3 deletions spec/active_record_batch_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,11 @@ def self.process_invalid_records(_)
nil
end

ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args|
payload = ActiveSupport::Notifications::Event.new(*args).payload
payload[:consumer].process_invalid_records(payload[:records])
end

end
end

Expand Down Expand Up @@ -668,7 +673,7 @@ def should_consume?(record)
record.record.some_int.even?
end

def process_valid_records(valid)
def self.process_valid_records(valid)
# Success
Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3))
end
Expand All @@ -678,6 +683,16 @@ def self.process_invalid_records(invalid)
Widget.create!(invalid.first.record.attributes.deep_merge(id: 4))
end

ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args|
payload = ActiveSupport::Notifications::Event.new(*args).payload
payload[:consumer].process_invalid_records(payload[:records])
end

ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args|
payload = ActiveSupport::Notifications::Event.new(*args).payload
payload[:consumer].process_valid_records(payload[:records])
end

end
end

Expand Down Expand Up @@ -718,7 +733,7 @@ def should_consume?(record)
record.record.some_int.even?
end

def process_valid_records(valid)
def self.process_valid_records(valid)
# Success
Widget.create!(valid.first.attributes.deep_merge(some_int: 2000, id: 3))
end
Expand All @@ -728,6 +743,16 @@ def self.process_invalid_records(invalid)
Widget.create!(invalid.first.record.attributes.deep_merge(id: 4)) if invalid.any?
end

ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args|
payload = ActiveSupport::Notifications::Event.new(*args).payload
payload[:consumer].process_invalid_records(payload[:records])
end

ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args|
payload = ActiveSupport::Notifications::Event.new(*args).payload
payload[:consumer].process_valid_records(payload[:records])
end

end
end

Expand Down Expand Up @@ -764,10 +789,15 @@ def self.process_invalid_records(invalid)
record_class Widget
compacted false

def process_valid_records(_)
def self.process_valid_records(_)
raise StandardError, 'Something went wrong'
end

ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args|
payload = ActiveSupport::Notifications::Event.new(*args).payload
payload[:consumer].process_valid_records(payload[:records])
end

end
end

Expand Down

0 comments on commit 2d9e8dd

Please sign in to comment.