From 58e12d2a8577b2384a641d7bafc864c378226ee9 Mon Sep 17 00:00:00 2001 From: Lionel Pereira <81594013+lionelpereira@users.noreply.github.com> Date: Fri, 1 Dec 2023 15:16:12 -0500 Subject: [PATCH] CCOL-2039: Preprocess message before batch consumption (#206) * CCOL-2039: Preprocess message before batch consumption * CCOL-2039: Add specs * CCOL-2039: Rename spec * CCOL-2039: Add CHANGELOG entry & fix typo --- CHANGELOG.md | 1 + .../batch_consumption.rb | 8 ++++ spec/active_record_batch_consumer_spec.rb | 41 +++++++++++++++++++ 3 files changed, 50 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60c31c65..7c08c053 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## UNRELEASED - Fix: Fixed handler metric for status:received, status:success in batch consumption +- Feature: Allow pre processing of messages prior to bulk consumption # 1.22.5 - 2023-07-18 - Fix: Fixed buffer overflow crash with DB producer. diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index dee466f8..060597f9 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -169,9 +169,17 @@ def upsert_records(messages) updater.mass_update(record_list) end + # Process messages prior to saving to database + # @param _messages [Array] + # @return [Void] + def pre_process(_messages) + nil + end + # @param messages [Array] # @return [BatchRecordList] def build_records(messages) + pre_process(messages) records = messages.map do |m| attrs = if self.method(:record_attributes).parameters.size == 2 record_attributes(m.payload, m.key) diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index d1558dd2..ad281306 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -493,5 +493,46 @@ def record_attributes(payload, key) end end + describe 'pre processing' do + context 'with uncompacted messages' do + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted false + + def pre_process(messages) + messages.each do |message| + message.payload[:some_int] = -message.payload[:some_int] + end + end + + end + end + + it 'should pre-process records' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + + widget_one, widget_two = Widget.all.to_a + + expect(widget_one.some_int).to eq(-11) + expect(widget_two.some_int).to eq(-20) + end + end + + end + end end