Skip to content

Commit

Permalink
CCOL-2039: Preprocess message before batch consumption (#206)
Browse files Browse the repository at this point in the history
* CCOL-2039: Preprocess message before batch consumption

* CCOL-2039: Add specs

* CCOL-2039: Rename spec

* CCOL-2039: Add CHANGELOG entry & fix typo
  • Loading branch information
lionelpereira authored Dec 1, 2023
1 parent 92437aa commit 58e12d2
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## UNRELEASED
- Fix: Fixed handler metric for status:received, status:success in batch consumption
- Feature: Allow pre processing of messages prior to bulk consumption

# 1.22.5 - 2023-07-18
- Fix: Fixed buffer overflow crash with DB producer.
Expand Down
8 changes: 8 additions & 0 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,17 @@ def upsert_records(messages)
updater.mass_update(record_list)
end

# Process messages prior to saving to database
# @param _messages [Array<Deimos::Message>]
# @return [Void]
def pre_process(_messages)
nil
end

# @param messages [Array<Deimos::Message>]
# @return [BatchRecordList]
def build_records(messages)
pre_process(messages)
records = messages.map do |m|
attrs = if self.method(:record_attributes).parameters.size == 2
record_attributes(m.payload, m.key)
Expand Down
41 changes: 41 additions & 0 deletions spec/active_record_batch_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -493,5 +493,46 @@ def record_attributes(payload, key)
end
end

describe 'pre processing' do
context 'with uncompacted messages' do
let(:consumer_class) do
Class.new(described_class) do
schema 'MySchema'
namespace 'com.my-namespace'
key_config plain: true
record_class Widget
compacted false

def pre_process(messages)
messages.each do |message|
message.payload[:some_int] = -message.payload[:some_int]
end
end

end
end

it 'should pre-process records' do
Widget.create!(id: 1, test_id: 'abc', some_int: 1)
Widget.create!(id: 2, test_id: 'def', some_int: 2)

publish_batch(
[
{ key: 1,
payload: { test_id: 'abc', some_int: 11 } },
{ key: 2,
payload: { test_id: 'def', some_int: 20 } }
]
)

widget_one, widget_two = Widget.all.to_a

expect(widget_one.some_int).to eq(-11)
expect(widget_two.some_int).to eq(-20)
end
end

end

end
end

0 comments on commit 58e12d2

Please sign in to comment.