-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CCOL-2039: Post process valid and invalid records from batch consumption #207
Changes from 15 commits
6553167
ca6a1e3
3b7461e
8d347e4
4a688a0
385e58b
4661b4f
8813fa4
d6181fd
2d9e8dd
2cb9d4d
6a750c8
eae600e
2b8aa2d
ac27466
1a5b7db
ea099a3
1ea3069
1e5d3a5
74364f6
90b5cf7
ceff055
b61a403
97bd093
1ecee75
2b4dfc2
1c4e6b4
4e4f774
e21bded
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,18 +28,13 @@ def consume_batch(payloads, metadata) | |
zip(metadata[:keys]). | ||
map { |p, k| Deimos::Message.new(p, nil, key: k) } | ||
|
||
tags = %W(topic:#{metadata[:topic]}) | ||
|
||
Deimos.instrument('ar_consumer.consume_batch', tags) do | ||
# The entire batch should be treated as one transaction so that if | ||
# any message fails, the whole thing is rolled back or retried | ||
# if there is deadlock | ||
Deimos::Utils::DeadlockRetry.wrap(tags) do | ||
if @compacted || self.class.config[:no_keys] | ||
update_database(compact_messages(messages)) | ||
else | ||
uncompacted_update(messages) | ||
end | ||
@tags = %W(topic:#{metadata[:topic]}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's weird to pass metric tags so far down into the workflow. Can we do something like
We'd have to add a |
||
|
||
Deimos.instrument('ar_consumer.consume_batch', @tags) do | ||
if @compacted || self.class.config[:no_keys] | ||
update_database(compact_messages(messages)) | ||
else | ||
uncompacted_update(messages) | ||
end | ||
end | ||
end | ||
|
@@ -93,8 +88,9 @@ def deleted_query(records) | |
end | ||
|
||
# @param _record [ActiveRecord::Base] | ||
# @param _associations [Hash] | ||
# @return [Boolean] | ||
def should_consume?(_record) | ||
def should_consume?(_record, _associations=nil) | ||
lionelpereira marked this conversation as resolved.
Show resolved
Hide resolved
|
||
true | ||
end | ||
|
||
|
@@ -155,19 +151,35 @@ def update_database(messages) | |
# @return [void] | ||
def upsert_records(messages) | ||
record_list = build_records(messages) | ||
record_list.filter!(self.method(:should_consume?).to_proc) | ||
invalid = filter_records(record_list) | ||
dorner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
unless invalid.blank? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to use positive conditions: |
||
ActiveSupport::Notifications.instrument('batch_consumption.invalid_records', { | ||
dorner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
records: invalid, | ||
consumer: self.class | ||
}) | ||
|
||
end | ||
return if record_list.empty? | ||
|
||
key_col_proc = self.method(:key_columns).to_proc | ||
col_proc = self.method(:columns).to_proc | ||
|
||
updater = MassUpdater.new(@klass, | ||
@tags, | ||
key_col_proc: key_col_proc, | ||
col_proc: col_proc, | ||
replace_associations: self.class.replace_associations, | ||
bulk_import_id_generator: self.class.bulk_import_id_generator) | ||
updater.mass_update(record_list) | ||
ActiveSupport::Notifications.instrument('batch_consumption.valid_records', { | ||
records: updater.mass_update(record_list), | ||
consumer: self.class | ||
}) | ||
end | ||
|
||
# @param record_list [BatchRecordList] | ||
# @return [Array<BatchRecord>] | ||
dorner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def filter_records(record_list) | ||
record_list.filter!(self.method(:should_consume?).to_proc) | ||
dorner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
end | ||
|
||
# Process messages prior to saving to database | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be added to the README.