Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCOL-2039: Post process valid and invalid records from batch consumption #207

Merged
merged 29 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6553167
CCOL-2039: Post process valid and invalid records from batch consumption
Nov 30, 2023
ca6a1e3
CCOL-2039: Lint fixes
Dec 1, 2023
3b7461e
CCOL-2039: Add optional associations for should_consume?
Dec 1, 2023
8d347e4
CCOL-2039: Linting changes
Dec 1, 2023
4a688a0
CCOL-2039: Add CHANGELOG entry && refactor valid and invalid record c…
Dec 1, 2023
385e58b
CCOL-2039: Add consume_filter when filtering records
Dec 1, 2023
4661b4f
CCOL-2039: YARD fixes
Dec 1, 2023
8813fa4
CCOL-2039: Variable rename
Dec 1, 2023
d6181fd
CCOL-2039: Process invalid records via ActiveSupport notifications
Dec 4, 2023
2d9e8dd
CCOL-2039: Process valid and invalid records via notification
Dec 4, 2023
2cb9d4d
CCOL-2039: Spec update
Dec 4, 2023
6a750c8
CCOL-2039: Update association spec
Dec 4, 2023
eae600e
CCOL-2039: Update CHANGELOG entry
Dec 5, 2023
2b8aa2d
CCOL-2039: Move deadlocks to mass updater
Dec 5, 2023
ac27466
CCOL-2039: Maintain backwards compatibility for should_consume?
Dec 5, 2023
1a5b7db
CCOL-2039: Set and get topic tag
Dec 5, 2023
ea099a3
CCOL-2039: Add mock span to mock tracer
Dec 5, 2023
1ea3069
CCOL-2039: Update mock set_tag
Dec 5, 2023
1e5d3a5
CCOL-2039: Add retries spec for mass update and deletes
Dec 6, 2023
74364f6
CCOL-2039: Update datadog span tagging
Dec 6, 2023
90b5cf7
CCOL-2039: Update README
Dec 6, 2023
ceff055
CCOL-2039: Update CONFIGURATION docs
Dec 18, 2023
b61a403
CCOL-2039: Evaluate schemaclass schema and namespace when setting dec…
Dec 18, 2023
97bd093
CCOL-2039: Update specs with Schema classes
Dec 19, 2023
1ecee75
CCOL-2039: Update linting & spec cleanup
Dec 19, 2023
2b4dfc2
CCOL-2039: Add SchemaClasses context, move decoder logic to schema ba…
Jan 3, 2024
1c4e6b4
CCOL-2039: Use generate namespace folders in specs
Jan 3, 2024
4e4f774
CCOL-2039: Add overriden schema
Jan 3, 2024
e21bded
CCOL-2039: Rename updated schema class
Jan 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
BUNDLE_WITHOUT: development:test

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Set up Ruby 2.7
uses: ruby/setup-ruby@v1
Expand All @@ -39,7 +39,7 @@ jobs:
ruby: [ '2.6', '2.7', '3.0', '3.1' ]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Feature: Add individual configuration for custom `bulk_import_id_generator` proc per consumer
- Feature: Add global `replace_assocations` value for for all consumers
- Feature: Add individual `replace_assocations` value for for individual consumers
- Feature: `should_consume?` method accepts BatchRecord associations
- Feature: Reintroduce `filter_records` for bulk filtering of records prior to insertion
- Feature: Return valid and invalid records saved during consumption for further processing in `batch_consumption.valid_records` and `batch_consumption.invalid_records` ActiveSupport Notifications
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be added to the README.


# 1.22.5 - 2023-07-18
- Fix: Fixed buffer overflow crash with DB producer.
Expand Down
42 changes: 27 additions & 15 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,13 @@ def consume_batch(payloads, metadata)
zip(metadata[:keys]).
map { |p, k| Deimos::Message.new(p, nil, key: k) }

tags = %W(topic:#{metadata[:topic]})

Deimos.instrument('ar_consumer.consume_batch', tags) do
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(tags) do
if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
@tags = %W(topic:#{metadata[:topic]})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's weird to pass metric tags so far down into the workflow. Can we do something like

Deimos.config.tracer.active_span.set_tag("topic", metadata[:topic])
...
Deimos.config.tracer.active_span.get_tag("topic")

We'd have to add a get_tag method to the provider class but it feels a lot better because we can separate out the tracing/metric logic from the actual business logic.


Deimos.instrument('ar_consumer.consume_batch', @tags) do
if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
end
end
Expand Down Expand Up @@ -93,8 +88,9 @@ def deleted_query(records)
end

# @param _record [ActiveRecord::Base]
# @param _associations [Hash]
# @return [Boolean]
def should_consume?(_record)
def should_consume?(_record, _associations=nil)
lionelpereira marked this conversation as resolved.
Show resolved Hide resolved
true
end

Expand Down Expand Up @@ -155,19 +151,35 @@ def update_database(messages)
# @return [void]
def upsert_records(messages)
record_list = build_records(messages)
record_list.filter!(self.method(:should_consume?).to_proc)
invalid = filter_records(record_list)
dorner marked this conversation as resolved.
Show resolved Hide resolved
unless invalid.blank?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use positive conditions: if invalid.any?

ActiveSupport::Notifications.instrument('batch_consumption.invalid_records', {
dorner marked this conversation as resolved.
Show resolved Hide resolved
records: invalid,
consumer: self.class
})

end
return if record_list.empty?

key_col_proc = self.method(:key_columns).to_proc
col_proc = self.method(:columns).to_proc

updater = MassUpdater.new(@klass,
@tags,
key_col_proc: key_col_proc,
col_proc: col_proc,
replace_associations: self.class.replace_associations,
bulk_import_id_generator: self.class.bulk_import_id_generator)
updater.mass_update(record_list)
ActiveSupport::Notifications.instrument('batch_consumption.valid_records', {
records: updater.mass_update(record_list),
consumer: self.class
})
end

# @param record_list [BatchRecordList]
# @return [Array<BatchRecord>]
dorner marked this conversation as resolved.
Show resolved Hide resolved
def filter_records(record_list)
record_list.filter!(self.method(:should_consume?).to_proc)
dorner marked this conversation as resolved.
Show resolved Hide resolved
end

# Process messages prior to saving to database
Expand Down
13 changes: 11 additions & 2 deletions lib/deimos/active_record_consume/batch_record_list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@ def initialize(records)
self.bulk_import_column = records.first&.bulk_import_column&.to_sym
end

# Filter out any invalid records.
# Filter and return removed invalid batch records by the specified method
# @param method [Proc]
# @return [Array<BatchRecord>]
def filter!(method)
self.batch_records.delete_if { |record| !method.call(record.record) }
self.batch_records, invalid = self.batch_records.partition do |batch_record|
case method.parameters.size
when 2
method.call(batch_record.record, batch_record.associations)
else
method.call(batch_record.record)
end
end
invalid
end

# Get the original ActiveRecord objects.
Expand Down
15 changes: 12 additions & 3 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ def default_cols(klass)
# @param key_col_proc [Proc<Class < ActiveRecord::Base>]
# @param col_proc [Proc<Class < ActiveRecord::Base>]
# @param replace_associations [Boolean]
def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil)
def initialize(klass, tags, key_col_proc: nil, col_proc: nil,
replace_associations: true, bulk_import_id_generator: nil)
@klass = klass
@tags = tags
@replace_associations = replace_associations
@bulk_import_id_generator = bulk_import_id_generator

Expand Down Expand Up @@ -83,9 +85,16 @@ def import_associations(record_list)
end

# @param record_list [BatchRecordList]
# @return [Array<ActiveRecord::Base>]
def mass_update(record_list)
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(@tags) do
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
end
record_list.records
end

end
Expand Down
37 changes: 32 additions & 5 deletions spec/active_record_batch_consumer_association_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,17 @@ def publish_batch(messages)
key_config plain: true
record_class Widget

def should_consume?(record)
def should_consume?(record, associations)
if self.should_consume_proc
return self.should_consume_proc.call(record)
case self.should_consume_proc.parameters.size
when 2
self.should_consume_proc.call(record, associations)
else
self.should_consume_proc.call(record)
end
else
true
end

true
end

def record_attributes(payload, _key)
Expand Down Expand Up @@ -269,7 +274,7 @@ def columns(record_class)

context 'with invalid models' do
before(:each) do
consumer_class.should_consume_proc = proc { |val| val.some_int <= 10 }
consumer_class.should_consume_proc = proc { |record| record.some_int <= 10 }
end

it 'should only save valid models' do
Expand All @@ -280,5 +285,27 @@ def columns(record_class)
expect(Widget.count).to eq(2)
end
end

context 'with invalid associations' do

before(:each) do
consumer_class.should_consume_proc = proc { |record, associations|
record.some_int <= 10 && associations['detail']['title'] != 'invalid'
}
end

it 'should only save valid associations' do
publish_batch([
{ key: 2,
payload: { test_id: 'xyz', some_int: 5, title: 'valid' } },
{ key: 3,
payload: { test_id: 'abc', some_int: 15, title: 'valid' } },
{ key: 4,
payload: { test_id: 'abc', some_int: 9, title: 'invalid' } }
])
expect(Widget.count).to eq(2)
expect(Widget.second.some_int).to eq(5)
end
end
end
end
Loading