Skip to content

Commit

Permalink
CCOL-2039: Post process valid and invalid records from batch consumpt…
Browse files Browse the repository at this point in the history
…ion (#207)

* CCOL-2039: Post process valid and invalid records from batch consumption

* CCOL-2039: Lint fixes

* CCOL-2039: Add optional associations for should_consume?

* CCOL-2039: Linting changes

* CCOL-2039: Add CHANGELOG entry && refactor valid and invalid record collection

* CCOL-2039: Add consume_filter when filtering records

* CCOL-2039: YARD fixes

* CCOL-2039: Variable rename

* CCOL-2039: Process invalid records via ActiveSupport notifications

* CCOL-2039: Process valid and invalid records via notification

* CCOL-2039: Spec update

* CCOL-2039: Update association spec

* CCOL-2039: Update CHANGELOG entry

* CCOL-2039: Move deadlocks to mass updater

* CCOL-2039: Maintain backwards compatibility for should_consume?

* CCOL-2039: Set and get topic tag

* CCOL-2039: Add mock span to mock tracer

* CCOL-2039: Update mock set_tag

* CCOL-2039: Add retries spec for mass update and deletes

* CCOL-2039: Update datadog span tagging

* CCOL-2039: Update README

* CCOL-2039: Update CONFIGURATION docs

* CCOL-2039: Evaluate schemaclass schema and namespace when setting decoder

* CCOL-2039: Update specs with Schema classes

* CCOL-2039: Update linting & spec cleanup

* CCOL-2039: Add SchemaClasses context, move decoder logic to schema backend

* CCOL-2039: Use generate namespace folders in specs

* CCOL-2039: Add overriden schema

* CCOL-2039: Rename updated schema class
  • Loading branch information
lionelpereira authored Jan 3, 2024
1 parent 767eeb3 commit eb628f3
Show file tree
Hide file tree
Showing 21 changed files with 620 additions and 105 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
BUNDLE_WITHOUT: development:test

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Set up Ruby 2.7
uses: ruby/setup-ruby@v1
Expand All @@ -39,7 +39,7 @@ jobs:
ruby: [ '2.6', '2.7', '3.0', '3.1' ]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Feature: Add individual configuration for custom `bulk_import_id_generator` proc per consumer
- Feature: Add global `replace_assocations` value for for all consumers
- Feature: Add individual `replace_assocations` value for for individual consumers
- Feature: `should_consume?` method accepts BatchRecord associations
- Feature: Reintroduce `filter_records` for bulk filtering of records prior to insertion
- Feature: Return valid and invalid records saved during consumption for further processing in `batch_consumption.valid_records` and `batch_consumption.invalid_records` ActiveSupport Notifications

# 1.22.5 - 2023-07-18
- Fix: Fixed buffer overflow crash with DB producer.
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ produced by Phobos and RubyKafka):
* exception_object
* messages - the batch of messages (in the form of `Deimos::KafkaMessage`s)
that failed - this should have only a single message in the batch.
* `batch_consumption.valid_records` - sent when the consumer has successfully upserted records. Limited by `max_db_batch_size`.
* consumer: class of the consumer that upserted these records
* records: Records upserted into the DB (of type `ActiveRecord::Base`)
* `batch_consumption.invalid_records` - sent when the consumer has rejected records returned from `filtered_records`. Limited by `max_db_batch_size`.
* consumer: class of the consumer that rejected these records
* records: Rejected records (of type `Deimos::ActiveRecordConsume::BatchRecord`)

Similarly:
```ruby
Expand Down
4 changes: 4 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ offset_commit_threshold|0|Number of messages that can be processed before their
offset_retention_time|nil|The time period that committed offsets will be retained, in seconds. Defaults to the broker setting.
heartbeat_interval|10|Interval between heartbeats; must be less than the session window.
backoff|`(1000..60_000)`|Range representing the minimum and maximum number of milliseconds to back off after a consumer error.
replace_associations|nil| Whether to delete existing associations for records during bulk consumption for this consumer. If no value is specified the provided/default value from the `consumers` configuration will be used.
bulk_import_id_generator|nil| Block to determine the `bulk_import_id` generated during bulk consumption. If no block is specified the provided/default block from the `consumers` configuration will be used.

## Defining Database Pollers

Expand Down Expand Up @@ -172,6 +174,8 @@ consumers.backoff|`(1000..60_000)`|Range representing the minimum and maximum nu
consumers.reraise_errors|false|Default behavior is to swallow uncaught exceptions and log to the metrics provider. Set this to true to instead raise all errors. Note that raising an error will ensure that the message cannot be processed - if there is a bad message which will always raise that error, your consumer will not be able to proceed past it and will be stuck forever until you fix your code. See also the `fatal_error` configuration. This is automatically set to true when using the `TestHelpers` module in RSpec.
consumers.report_lag|false|Whether to send the `consumer_lag` metric. This requires an extra thread per consumer.
consumers.fatal_error|`proc { false }`|Block taking an exception, payload and metadata and returning true if this should be considered a fatal error and false otherwise. E.g. you can use this to always fail if the database is available. Not needed if reraise_errors is set to true.
consumers.replace_associations|true|Whether to delete existing associations for records during bulk consumption prior to inserting new associated records
consumers.bulk_import_id_generator|`proc { SecureRandom.uuid }`| Block to determine the `bulk_import_id` generated during bulk consumption. Block will be used for all bulk consumers unless explicitly set for individual consumers

## Producer Configuration

Expand Down
15 changes: 14 additions & 1 deletion lib/deimos.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,20 @@ def schema_backend_class
# @param namespace [String]
# @return [Deimos::SchemaBackends::Base]
def schema_backend(schema:, namespace:)
schema_backend_class.new(schema: schema, namespace: namespace)
if Utils::SchemaClass.use?(config.to_h)
# Initialize an instance of the provided schema
# in the event the schema class is an override, the inherited
# schema and namespace will be applied
schema_class = Utils::SchemaClass.klass(schema, namespace)
if schema_class.nil?
schema_backend_class.new(schema: schema, namespace: namespace)
else
schema_instance = schema_class.new
schema_backend_class.new(schema: schema_instance.schema, namespace: schema_instance.namespace)
end
else
schema_backend_class.new(schema: schema, namespace: namespace)
end
end

# @param schema [String]
Expand Down
49 changes: 31 additions & 18 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,14 @@ def consume_batch(payloads, metadata)
zip(metadata[:keys]).
map { |p, k| Deimos::Message.new(p, nil, key: k) }

tags = %W(topic:#{metadata[:topic]})

Deimos.instrument('ar_consumer.consume_batch', tags) do
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(tags) do
if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
tag = metadata[:topic]
Deimos.config.tracer.active_span.set_tag('topic', tag)

Deimos.instrument('ar_consumer.consume_batch', tag) do
if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
end
end
Expand Down Expand Up @@ -93,8 +89,9 @@ def deleted_query(records)
end

# @param _record [ActiveRecord::Base]
# @param _associations [Hash]
# @return [Boolean]
def should_consume?(_record)
def should_consume?(_record, _associations=nil)
true
end

Expand Down Expand Up @@ -155,8 +152,13 @@ def update_database(messages)
# @return [void]
def upsert_records(messages)
record_list = build_records(messages)
record_list.filter!(self.method(:should_consume?).to_proc)

invalid = filter_records(record_list)
if invalid.any?
ActiveSupport::Notifications.instrument('batch_consumption.invalid_records', {
records: invalid,
consumer: self.class
})
end
return if record_list.empty?

key_col_proc = self.method(:key_columns).to_proc
Expand All @@ -167,7 +169,16 @@ def upsert_records(messages)
col_proc: col_proc,
replace_associations: self.class.replace_associations,
bulk_import_id_generator: self.class.bulk_import_id_generator)
updater.mass_update(record_list)
ActiveSupport::Notifications.instrument('batch_consumption.valid_records', {
records: updater.mass_update(record_list),
consumer: self.class
})
end

# @param record_list [BatchRecordList]
# @return [Array<BatchRecord>]
def filter_records(record_list)
record_list.filter!(self.method(:should_consume?).to_proc)
end

# Process messages prior to saving to database
Expand Down Expand Up @@ -209,9 +220,11 @@ def build_records(messages)
# deleted records.
# @return [void]
def remove_records(messages)
clause = deleted_query(messages)
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
clause = deleted_query(messages)

clause.delete_all
clause.delete_all
end
end
end
end
Expand Down
13 changes: 11 additions & 2 deletions lib/deimos/active_record_consume/batch_record_list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@ def initialize(records)
self.bulk_import_column = records.first&.bulk_import_column&.to_sym
end

# Filter out any invalid records.
# Filter and return removed invalid batch records by the specified method
# @param method [Proc]
# @return [Array<BatchRecord>]
def filter!(method)
self.batch_records.delete_if { |record| !method.call(record.record) }
self.batch_records, invalid = self.batch_records.partition do |batch_record|
case method.parameters.size
when 2
method.call(batch_record.record, batch_record.associations)
else
method.call(batch_record.record)
end
end
invalid
end

# Get the original ActiveRecord objects.
Expand Down
14 changes: 11 additions & 3 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def default_cols(klass)
# @param key_col_proc [Proc<Class < ActiveRecord::Base>]
# @param col_proc [Proc<Class < ActiveRecord::Base>]
# @param replace_associations [Boolean]
def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil)
def initialize(klass, key_col_proc: nil, col_proc: nil,
replace_associations: true, bulk_import_id_generator: nil)
@klass = klass
@replace_associations = replace_associations
@bulk_import_id_generator = bulk_import_id_generator
Expand Down Expand Up @@ -83,9 +84,16 @@ def import_associations(record_list)
end

# @param record_list [BatchRecordList]
# @return [Array<ActiveRecord::Base>]
def mass_update(record_list)
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
end
record_list.records
end

end
Expand Down
1 change: 1 addition & 0 deletions lib/deimos/test_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def unit_test!
deimos_config.kafka.seed_brokers ||= ['test_broker']
deimos_config.schema.backend = Deimos.schema_backend_class.mock_backend
deimos_config.producers.backend = :test
deimos_config.tracer = Deimos::Tracing::Mock.new
end
end

Expand Down
18 changes: 12 additions & 6 deletions lib/deimos/tracing/datadog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ def initialize(config)

# :nodoc:
def start(span_name, options={})
span = if ::Datadog.respond_to?(:tracer)
::Datadog.tracer.trace(span_name)
else
::Datadog::Tracing.trace(span_name)
end
span = tracer.trace(span_name)
span.service = @service
span.resource = options[:resource]
span
Expand All @@ -30,9 +26,14 @@ def finish(span)
span.finish
end

# :nodoc:
def tracer
@tracer ||= ::Datadog.respond_to?(:tracer) ? ::Datadog.tracer : ::Datadog::Tracing
end

# :nodoc:
def active_span
::Datadog.tracer.active_span
tracer.active_span
end

# :nodoc:
Expand All @@ -45,6 +46,11 @@ def set_tag(tag, value, span=nil)
(span || active_span).set_tag(tag, value)
end

# :nodoc:
def get_tag(tag)
active_span.get_tag(tag)
end

end
end
end
33 changes: 31 additions & 2 deletions lib/deimos/tracing/mock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Mock < Tracing::Provider
def initialize(logger=nil)
@logger = logger || Logger.new(STDOUT)
@logger.info('MockTracingProvider initialized')
@active_span = MockSpan.new
end

# @param span_name [String]
Expand All @@ -32,12 +33,22 @@ def finish(span)

# :nodoc:
def active_span
nil
@active_span ||= MockSpan.new
end

# :nodoc:
def set_tag(tag, value, span=nil)
nil
if span
span.set_tag(tag, value)
else
active_span.set_tag(tag, value)
end
end

# Get a tag from a span with the specified tag.
# @param tag [String]
def get_tag(tag)
@span.get_tag(tag)
end

# :nodoc:
Expand All @@ -47,5 +58,23 @@ def set_error(span, exception)
@logger.info("Mock span '#{name}' set an error: #{exception}")
end
end

# Mock Span class
class MockSpan
# :nodoc:
def initialize
@span = {}
end

# :nodoc:
def set_tag(tag, value)
@span[tag] = value
end

# :nodoc:
def get_tag(tag)
@span[tag]
end
end
end
end
6 changes: 6 additions & 0 deletions lib/deimos/tracing/provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ def set_tag(tag, value, span=nil)
raise NotImplementedError
end

# Get a tag from a span with the specified tag.
# @param tag [String]
def get_tag(tag)
raise NotImplementedError
end

end
end
end
12 changes: 10 additions & 2 deletions lib/deimos/utils/schema_class.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,21 @@ def modules_for(namespace)
def instance(payload, schema, namespace='')
return payload if payload.is_a?(Deimos::SchemaClass::Base)

constants = modules_for(namespace) + [schema.underscore.camelize.singularize]
klass = constants.join('::').safe_constantize
klass = klass(schema, namespace)
return payload if klass.nil? || payload.nil?

klass.new(**payload.symbolize_keys)
end

# Determine and return the SchemaClass with the provided schema and namespace
# @param schema [String]
# @param namespace [String]
# @return [Deimos::SchemaClass]
def klass(schema, namespace)
constants = modules_for(namespace) + [schema.underscore.camelize.singularize]
constants.join('::').safe_constantize
end

# @param config [Hash] Producer or Consumer config
# @return [Boolean]
def use?(config)
Expand Down
Loading

0 comments on commit eb628f3

Please sign in to comment.