Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCOL-2039: Post process valid and invalid records from batch consumption #207

Merged
merged 29 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6553167
CCOL-2039: Post process valid and invalid records from batch consumption
Nov 30, 2023
ca6a1e3
CCOL-2039: Lint fixes
Dec 1, 2023
3b7461e
CCOL-2039: Add optional associations for should_consume?
Dec 1, 2023
8d347e4
CCOL-2039: Linting changes
Dec 1, 2023
4a688a0
CCOL-2039: Add CHANGELOG entry && refactor valid and invalid record c…
Dec 1, 2023
385e58b
CCOL-2039: Add consume_filter when filtering records
Dec 1, 2023
4661b4f
CCOL-2039: YARD fixes
Dec 1, 2023
8813fa4
CCOL-2039: Variable rename
Dec 1, 2023
d6181fd
CCOL-2039: Process invalid records via ActiveSupport notifications
Dec 4, 2023
2d9e8dd
CCOL-2039: Process valid and invalid records via notification
Dec 4, 2023
2cb9d4d
CCOL-2039: Spec update
Dec 4, 2023
6a750c8
CCOL-2039: Update association spec
Dec 4, 2023
eae600e
CCOL-2039: Update CHANGELOG entry
Dec 5, 2023
2b8aa2d
CCOL-2039: Move deadlocks to mass updater
Dec 5, 2023
ac27466
CCOL-2039: Maintain backwards compatibility for should_consume?
Dec 5, 2023
1a5b7db
CCOL-2039: Set and get topic tag
Dec 5, 2023
ea099a3
CCOL-2039: Add mock span to mock tracer
Dec 5, 2023
1ea3069
CCOL-2039: Update mock set_tag
Dec 5, 2023
1e5d3a5
CCOL-2039: Add retries spec for mass update and deletes
Dec 6, 2023
74364f6
CCOL-2039: Update datadog span tagging
Dec 6, 2023
90b5cf7
CCOL-2039: Update README
Dec 6, 2023
ceff055
CCOL-2039: Update CONFIGURATION docs
Dec 18, 2023
b61a403
CCOL-2039: Evaluate schemaclass schema and namespace when setting dec…
Dec 18, 2023
97bd093
CCOL-2039: Update specs with Schema classes
Dec 19, 2023
1ecee75
CCOL-2039: Update linting & spec cleanup
Dec 19, 2023
2b4dfc2
CCOL-2039: Add SchemaClasses context, move decoder logic to schema ba…
Jan 3, 2024
1c4e6b4
CCOL-2039: Use generate namespace folders in specs
Jan 3, 2024
4e4f774
CCOL-2039: Add overriden schema
Jan 3, 2024
e21bded
CCOL-2039: Rename updated schema class
Jan 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
BUNDLE_WITHOUT: development:test

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Set up Ruby 2.7
uses: ruby/setup-ruby@v1
Expand All @@ -39,7 +39,7 @@ jobs:
ruby: [ '2.6', '2.7', '3.0', '3.1' ]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Feature: Add individual configuration for custom `bulk_import_id_generator` proc per consumer
- Feature: Add global `replace_assocations` value for for all consumers
- Feature: Add individual `replace_assocations` value for for individual consumers
- Feature: `should_consume?` method accepts BatchRecord associations
- Feature: Reintroduce `filter_records` for bulk filtering of records prior to insertion
- Feature: Return valid and invalid records saved during consumption for further processing in `batch_consumption.valid_records` and `batch_consumption.invalid_records` ActiveSupport Notifications
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be added to the README.


# 1.22.5 - 2023-07-18
- Fix: Fixed buffer overflow crash with DB producer.
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ produced by Phobos and RubyKafka):
* exception_object
* messages - the batch of messages (in the form of `Deimos::KafkaMessage`s)
that failed - this should have only a single message in the batch.
* `batch_consumption.valid_records` - sent when the consumer has successfully upserted records. Limited by `max_db_batch_size`.
* consumer: class of the consumer that upserted these records
* records: Records upserted into the DB (of type `ActiveRecord::Base`)
* `batch_consumption.invalid_records` - sent when the consumer has rejected records returned from `filtered_records`. Limited by `max_db_batch_size`.
* consumer: class of the consumer that rejected these records
* records: Rejected records (of type `Deimos::ActiveRecordConsume::BatchRecord`)

Similarly:
```ruby
Expand Down
4 changes: 4 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ offset_commit_threshold|0|Number of messages that can be processed before their
offset_retention_time|nil|The time period that committed offsets will be retained, in seconds. Defaults to the broker setting.
heartbeat_interval|10|Interval between heartbeats; must be less than the session window.
backoff|`(1000..60_000)`|Range representing the minimum and maximum number of milliseconds to back off after a consumer error.
replace_associations|nil| Whether to delete existing associations for records during bulk consumption for this consumer. If no value is specified the provided/default value from the `consumers` configuration will be used.
bulk_import_id_generator|nil| Block to determine the `bulk_import_id` generated during bulk consumption. If no block is specified the provided/default block from the `consumers` configuration will be used.

## Defining Database Pollers

Expand Down Expand Up @@ -172,6 +174,8 @@ consumers.backoff|`(1000..60_000)`|Range representing the minimum and maximum nu
consumers.reraise_errors|false|Default behavior is to swallow uncaught exceptions and log to the metrics provider. Set this to true to instead raise all errors. Note that raising an error will ensure that the message cannot be processed - if there is a bad message which will always raise that error, your consumer will not be able to proceed past it and will be stuck forever until you fix your code. See also the `fatal_error` configuration. This is automatically set to true when using the `TestHelpers` module in RSpec.
consumers.report_lag|false|Whether to send the `consumer_lag` metric. This requires an extra thread per consumer.
consumers.fatal_error|`proc { false }`|Block taking an exception, payload and metadata and returning true if this should be considered a fatal error and false otherwise. E.g. you can use this to always fail if the database is available. Not needed if reraise_errors is set to true.
consumers.replace_associations|true|Whether to delete existing associations for records during bulk consumption prior to inserting new associated records
consumers.bulk_import_id_generator|`proc { SecureRandom.uuid }`| Block to determine the `bulk_import_id` generated during bulk consumption. Block will be used for all bulk consumers unless explicitly set for individual consumers

## Producer Configuration

Expand Down
49 changes: 31 additions & 18 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,14 @@ def consume_batch(payloads, metadata)
zip(metadata[:keys]).
map { |p, k| Deimos::Message.new(p, nil, key: k) }

tags = %W(topic:#{metadata[:topic]})

Deimos.instrument('ar_consumer.consume_batch', tags) do
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(tags) do
if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
tag = metadata[:topic]
Deimos.config.tracer.active_span.set_tag('topic', tag)

Deimos.instrument('ar_consumer.consume_batch', tag) do
if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
end
end
Expand Down Expand Up @@ -93,8 +89,9 @@ def deleted_query(records)
end

# @param _record [ActiveRecord::Base]
# @param _associations [Hash]
# @return [Boolean]
def should_consume?(_record)
def should_consume?(_record, _associations=nil)
lionelpereira marked this conversation as resolved.
Show resolved Hide resolved
true
end

Expand Down Expand Up @@ -155,8 +152,13 @@ def update_database(messages)
# @return [void]
def upsert_records(messages)
record_list = build_records(messages)
record_list.filter!(self.method(:should_consume?).to_proc)

invalid = filter_records(record_list)
dorner marked this conversation as resolved.
Show resolved Hide resolved
if invalid.any?
ActiveSupport::Notifications.instrument('batch_consumption.invalid_records', {
dorner marked this conversation as resolved.
Show resolved Hide resolved
records: invalid,
consumer: self.class
})
end
return if record_list.empty?

key_col_proc = self.method(:key_columns).to_proc
Expand All @@ -167,7 +169,16 @@ def upsert_records(messages)
col_proc: col_proc,
replace_associations: self.class.replace_associations,
bulk_import_id_generator: self.class.bulk_import_id_generator)
updater.mass_update(record_list)
ActiveSupport::Notifications.instrument('batch_consumption.valid_records', {
records: updater.mass_update(record_list),
consumer: self.class
})
end

# @param record_list [BatchRecordList]
# @return [Array<BatchRecord>]
dorner marked this conversation as resolved.
Show resolved Hide resolved
def filter_records(record_list)
record_list.filter!(self.method(:should_consume?).to_proc)
dorner marked this conversation as resolved.
Show resolved Hide resolved
end

# Process messages prior to saving to database
Expand Down Expand Up @@ -209,9 +220,11 @@ def build_records(messages)
# deleted records.
# @return [void]
def remove_records(messages)
clause = deleted_query(messages)
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
clause = deleted_query(messages)

clause.delete_all
clause.delete_all
end
end
end
end
Expand Down
13 changes: 11 additions & 2 deletions lib/deimos/active_record_consume/batch_record_list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@ def initialize(records)
self.bulk_import_column = records.first&.bulk_import_column&.to_sym
end

# Filter out any invalid records.
# Filter and return removed invalid batch records by the specified method
# @param method [Proc]
# @return [Array<BatchRecord>]
def filter!(method)
self.batch_records.delete_if { |record| !method.call(record.record) }
self.batch_records, invalid = self.batch_records.partition do |batch_record|
case method.parameters.size
when 2
method.call(batch_record.record, batch_record.associations)
else
method.call(batch_record.record)
end
end
invalid
end

# Get the original ActiveRecord objects.
Expand Down
14 changes: 11 additions & 3 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def default_cols(klass)
# @param key_col_proc [Proc<Class < ActiveRecord::Base>]
# @param col_proc [Proc<Class < ActiveRecord::Base>]
# @param replace_associations [Boolean]
def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil)
def initialize(klass, key_col_proc: nil, col_proc: nil,
replace_associations: true, bulk_import_id_generator: nil)
@klass = klass
@replace_associations = replace_associations
@bulk_import_id_generator = bulk_import_id_generator
Expand Down Expand Up @@ -83,9 +84,16 @@ def import_associations(record_list)
end

# @param record_list [BatchRecordList]
# @return [Array<ActiveRecord::Base>]
def mass_update(record_list)
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
end
record_list.records
end

end
Expand Down
13 changes: 11 additions & 2 deletions lib/deimos/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@ class Consumer
class << self
# @return [Deimos::SchemaBackends::Base]
def decoder
@decoder ||= Deimos.schema_backend(schema: config[:schema],
namespace: config[:namespace])
return @decoder if @decoder

@decoder = if Utils::SchemaClass.use?(config.to_h)
# Initialize an instance of the provided schema
# in the event the schema class is an override, the inherited
# schema and namespace will be applied
schema_class = "Schemas::#{config[:schema]}".constantize.new
Deimos.schema_backend(schema: schema_class.schema, namespace: schema_class.namespace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be moved into the schema_backend method rather than just used here?

else
Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace])
end
end

# @return [Deimos::SchemaBackends::Base]
Expand Down
1 change: 1 addition & 0 deletions lib/deimos/test_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def unit_test!
deimos_config.kafka.seed_brokers ||= ['test_broker']
deimos_config.schema.backend = Deimos.schema_backend_class.mock_backend
deimos_config.producers.backend = :test
deimos_config.tracer = Deimos::Tracing::Mock.new
end
end

Expand Down
14 changes: 8 additions & 6 deletions lib/deimos/tracing/datadog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@ def initialize(config)
raise 'Tracing config must specify service_name' if config[:service_name].nil?

@service = config[:service_name]
@tracer = ::Datadog.respond_to?(:tracer) ? ::Datadog.tracer : ::Datadog::Tracing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels more like a method than an instance variable. It isn't actually state that needs to be stored.

end

# :nodoc:
def start(span_name, options={})
span = if ::Datadog.respond_to?(:tracer)
::Datadog.tracer.trace(span_name)
else
::Datadog::Tracing.trace(span_name)
end
span = @tracer.trace(span_name)
span.service = @service
span.resource = options[:resource]
span
Expand All @@ -32,7 +29,7 @@ def finish(span)

# :nodoc:
def active_span
::Datadog.tracer.active_span
@tracer.active_span
end

# :nodoc:
Expand All @@ -45,6 +42,11 @@ def set_tag(tag, value, span=nil)
(span || active_span).set_tag(tag, value)
end

# :nodoc:
def get_tag(tag)
active_span.get_tag(tag)
end

end
end
end
33 changes: 31 additions & 2 deletions lib/deimos/tracing/mock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Mock < Tracing::Provider
def initialize(logger=nil)
@logger = logger || Logger.new(STDOUT)
@logger.info('MockTracingProvider initialized')
@active_span = MockSpan.new
end

# @param span_name [String]
Expand All @@ -32,12 +33,22 @@ def finish(span)

# :nodoc:
def active_span
nil
@active_span ||= MockSpan.new
end

# :nodoc:
def set_tag(tag, value, span=nil)
nil
if span
span.set_tag(tag, value)
else
active_span.set_tag(tag, value)
end
end

# Get a tag from a span with the specified tag.
# @param tag [String]
def get_tag(tag)
@span.get_tag(tag)
end

# :nodoc:
Expand All @@ -47,5 +58,23 @@ def set_error(span, exception)
@logger.info("Mock span '#{name}' set an error: #{exception}")
end
end

# Mock Span class
class MockSpan
# :nodoc:
def initialize
@span = {}
end

# :nodoc:
def set_tag(tag, value)
@span[tag] = value
end

# :nodoc:
def get_tag(tag)
@span[tag]
end
end
end
end
6 changes: 6 additions & 0 deletions lib/deimos/tracing/provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ def set_tag(tag, value, span=nil)
raise NotImplementedError
end

# Get a tag from a span with the specified tag.
# @param tag [String]
def get_tag(tag)
raise NotImplementedError
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to implement this for datadog.


end
end
end
37 changes: 32 additions & 5 deletions spec/active_record_batch_consumer_association_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,17 @@ def publish_batch(messages)
key_config plain: true
record_class Widget

def should_consume?(record)
def should_consume?(record, associations)
if self.should_consume_proc
return self.should_consume_proc.call(record)
case self.should_consume_proc.parameters.size
when 2
self.should_consume_proc.call(record, associations)
else
self.should_consume_proc.call(record)
end
else
true
end

true
end

def record_attributes(payload, _key)
Expand Down Expand Up @@ -269,7 +274,7 @@ def columns(record_class)

context 'with invalid models' do
before(:each) do
consumer_class.should_consume_proc = proc { |val| val.some_int <= 10 }
consumer_class.should_consume_proc = proc { |record| record.some_int <= 10 }
end

it 'should only save valid models' do
Expand All @@ -280,5 +285,27 @@ def columns(record_class)
expect(Widget.count).to eq(2)
end
end

context 'with invalid associations' do

before(:each) do
consumer_class.should_consume_proc = proc { |record, associations|
record.some_int <= 10 && associations['detail']['title'] != 'invalid'
}
end

it 'should only save valid associations' do
publish_batch([
{ key: 2,
payload: { test_id: 'xyz', some_int: 5, title: 'valid' } },
{ key: 3,
payload: { test_id: 'abc', some_int: 15, title: 'valid' } },
{ key: 4,
payload: { test_id: 'abc', some_int: 9, title: 'invalid' } }
])
expect(Widget.count).to eq(2)
expect(Widget.second.some_int).to eq(5)
end
end
end
end
Loading