diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index df29bd52..b05e7f59 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,7 +17,7 @@ jobs: BUNDLE_WITHOUT: development:test steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Ruby 2.7 uses: ruby/setup-ruby@v1 @@ -39,7 +39,7 @@ jobs: ruby: [ '2.6', '2.7', '3.0', '3.1' ] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - uses: ruby/setup-ruby@v1 with: ruby-version: ${{ matrix.ruby }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 94b8f184..a68598ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Feature: Add individual configuration for custom `bulk_import_id_generator` proc per consumer - Feature: Add global `replace_assocations` value for for all consumers - Feature: Add individual `replace_assocations` value for for individual consumers +- Feature: `should_consume?` method accepts BatchRecord associations +- Feature: Reintroduce `filter_records` for bulk filtering of records prior to insertion +- Feature: Return valid and invalid records saved during consumption for further processing in `batch_consumption.valid_records` and `batch_consumption.invalid_records` ActiveSupport Notifications # 1.22.5 - 2023-07-18 - Fix: Fixed buffer overflow crash with DB producer. diff --git a/README.md b/README.md index be9628ad..f44c7c2a 100644 --- a/README.md +++ b/README.md @@ -189,6 +189,12 @@ produced by Phobos and RubyKafka): * exception_object * messages - the batch of messages (in the form of `Deimos::KafkaMessage`s) that failed - this should have only a single message in the batch. +* `batch_consumption.valid_records` - sent when the consumer has successfully upserted records. Limited by `max_db_batch_size`. + * consumer: class of the consumer that upserted these records + * records: Records upserted into the DB (of type `ActiveRecord::Base`) +* `batch_consumption.invalid_records` - sent when the consumer has rejected records returned from `filtered_records`. Limited by `max_db_batch_size`. + * consumer: class of the consumer that rejected these records + * records: Rejected records (of type `Deimos::ActiveRecordConsume::BatchRecord`) Similarly: ```ruby diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 6a3ed559..728d947d 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -100,6 +100,8 @@ offset_commit_threshold|0|Number of messages that can be processed before their offset_retention_time|nil|The time period that committed offsets will be retained, in seconds. Defaults to the broker setting. heartbeat_interval|10|Interval between heartbeats; must be less than the session window. backoff|`(1000..60_000)`|Range representing the minimum and maximum number of milliseconds to back off after a consumer error. +replace_associations|nil| Whether to delete existing associations for records during bulk consumption for this consumer. If no value is specified the provided/default value from the `consumers` configuration will be used. +bulk_import_id_generator|nil| Block to determine the `bulk_import_id` generated during bulk consumption. If no block is specified the provided/default block from the `consumers` configuration will be used. ## Defining Database Pollers @@ -172,6 +174,8 @@ consumers.backoff|`(1000..60_000)`|Range representing the minimum and maximum nu consumers.reraise_errors|false|Default behavior is to swallow uncaught exceptions and log to the metrics provider. Set this to true to instead raise all errors. Note that raising an error will ensure that the message cannot be processed - if there is a bad message which will always raise that error, your consumer will not be able to proceed past it and will be stuck forever until you fix your code. See also the `fatal_error` configuration. This is automatically set to true when using the `TestHelpers` module in RSpec. consumers.report_lag|false|Whether to send the `consumer_lag` metric. This requires an extra thread per consumer. consumers.fatal_error|`proc { false }`|Block taking an exception, payload and metadata and returning true if this should be considered a fatal error and false otherwise. E.g. you can use this to always fail if the database is available. Not needed if reraise_errors is set to true. +consumers.replace_associations|true|Whether to delete existing associations for records during bulk consumption prior to inserting new associated records +consumers.bulk_import_id_generator|`proc { SecureRandom.uuid }`| Block to determine the `bulk_import_id` generated during bulk consumption. Block will be used for all bulk consumers unless explicitly set for individual consumers ## Producer Configuration diff --git a/lib/deimos.rb b/lib/deimos.rb index 1ac0e24b..e278b622 100644 --- a/lib/deimos.rb +++ b/lib/deimos.rb @@ -57,7 +57,20 @@ def schema_backend_class # @param namespace [String] # @return [Deimos::SchemaBackends::Base] def schema_backend(schema:, namespace:) - schema_backend_class.new(schema: schema, namespace: namespace) + if Utils::SchemaClass.use?(config.to_h) + # Initialize an instance of the provided schema + # in the event the schema class is an override, the inherited + # schema and namespace will be applied + schema_class = Utils::SchemaClass.klass(schema, namespace) + if schema_class.nil? + schema_backend_class.new(schema: schema, namespace: namespace) + else + schema_instance = schema_class.new + schema_backend_class.new(schema: schema_instance.schema, namespace: schema_instance.namespace) + end + else + schema_backend_class.new(schema: schema, namespace: namespace) + end end # @param schema [String] diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 01c3273e..007e2a3a 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -28,18 +28,14 @@ def consume_batch(payloads, metadata) zip(metadata[:keys]). map { |p, k| Deimos::Message.new(p, nil, key: k) } - tags = %W(topic:#{metadata[:topic]}) - - Deimos.instrument('ar_consumer.consume_batch', tags) do - # The entire batch should be treated as one transaction so that if - # any message fails, the whole thing is rolled back or retried - # if there is deadlock - Deimos::Utils::DeadlockRetry.wrap(tags) do - if @compacted || self.class.config[:no_keys] - update_database(compact_messages(messages)) - else - uncompacted_update(messages) - end + tag = metadata[:topic] + Deimos.config.tracer.active_span.set_tag('topic', tag) + + Deimos.instrument('ar_consumer.consume_batch', tag) do + if @compacted || self.class.config[:no_keys] + update_database(compact_messages(messages)) + else + uncompacted_update(messages) end end end @@ -93,8 +89,9 @@ def deleted_query(records) end # @param _record [ActiveRecord::Base] + # @param _associations [Hash] # @return [Boolean] - def should_consume?(_record) + def should_consume?(_record, _associations=nil) true end @@ -155,8 +152,13 @@ def update_database(messages) # @return [void] def upsert_records(messages) record_list = build_records(messages) - record_list.filter!(self.method(:should_consume?).to_proc) - + invalid = filter_records(record_list) + if invalid.any? + ActiveSupport::Notifications.instrument('batch_consumption.invalid_records', { + records: invalid, + consumer: self.class + }) + end return if record_list.empty? key_col_proc = self.method(:key_columns).to_proc @@ -167,7 +169,16 @@ def upsert_records(messages) col_proc: col_proc, replace_associations: self.class.replace_associations, bulk_import_id_generator: self.class.bulk_import_id_generator) - updater.mass_update(record_list) + ActiveSupport::Notifications.instrument('batch_consumption.valid_records', { + records: updater.mass_update(record_list), + consumer: self.class + }) + end + + # @param record_list [BatchRecordList] + # @return [Array] + def filter_records(record_list) + record_list.filter!(self.method(:should_consume?).to_proc) end # Process messages prior to saving to database @@ -209,9 +220,11 @@ def build_records(messages) # deleted records. # @return [void] def remove_records(messages) - clause = deleted_query(messages) + Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do + clause = deleted_query(messages) - clause.delete_all + clause.delete_all + end end end end diff --git a/lib/deimos/active_record_consume/batch_record_list.rb b/lib/deimos/active_record_consume/batch_record_list.rb index 30f99d23..f8e85de9 100644 --- a/lib/deimos/active_record_consume/batch_record_list.rb +++ b/lib/deimos/active_record_consume/batch_record_list.rb @@ -17,10 +17,19 @@ def initialize(records) self.bulk_import_column = records.first&.bulk_import_column&.to_sym end - # Filter out any invalid records. + # Filter and return removed invalid batch records by the specified method # @param method [Proc] + # @return [Array] def filter!(method) - self.batch_records.delete_if { |record| !method.call(record.record) } + self.batch_records, invalid = self.batch_records.partition do |batch_record| + case method.parameters.size + when 2 + method.call(batch_record.record, batch_record.associations) + else + method.call(batch_record.record) + end + end + invalid end # Get the original ActiveRecord objects. diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index 267fc002..854eaf7f 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -19,7 +19,8 @@ def default_cols(klass) # @param key_col_proc [Proc] # @param col_proc [Proc] # @param replace_associations [Boolean] - def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, bulk_import_id_generator: nil) + def initialize(klass, key_col_proc: nil, col_proc: nil, + replace_associations: true, bulk_import_id_generator: nil) @klass = klass @replace_associations = replace_associations @bulk_import_id_generator = bulk_import_id_generator @@ -83,9 +84,16 @@ def import_associations(record_list) end # @param record_list [BatchRecordList] + # @return [Array] def mass_update(record_list) - save_records_to_database(record_list) - import_associations(record_list) if record_list.associations.any? + # The entire batch should be treated as one transaction so that if + # any message fails, the whole thing is rolled back or retried + # if there is deadlock + Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do + save_records_to_database(record_list) + import_associations(record_list) if record_list.associations.any? + end + record_list.records end end diff --git a/lib/deimos/test_helpers.rb b/lib/deimos/test_helpers.rb index c1afcd7e..4e41a8c4 100644 --- a/lib/deimos/test_helpers.rb +++ b/lib/deimos/test_helpers.rb @@ -28,6 +28,7 @@ def unit_test! deimos_config.kafka.seed_brokers ||= ['test_broker'] deimos_config.schema.backend = Deimos.schema_backend_class.mock_backend deimos_config.producers.backend = :test + deimos_config.tracer = Deimos::Tracing::Mock.new end end diff --git a/lib/deimos/tracing/datadog.rb b/lib/deimos/tracing/datadog.rb index 932404d4..597a6104 100644 --- a/lib/deimos/tracing/datadog.rb +++ b/lib/deimos/tracing/datadog.rb @@ -15,11 +15,7 @@ def initialize(config) # :nodoc: def start(span_name, options={}) - span = if ::Datadog.respond_to?(:tracer) - ::Datadog.tracer.trace(span_name) - else - ::Datadog::Tracing.trace(span_name) - end + span = tracer.trace(span_name) span.service = @service span.resource = options[:resource] span @@ -30,9 +26,14 @@ def finish(span) span.finish end + # :nodoc: + def tracer + @tracer ||= ::Datadog.respond_to?(:tracer) ? ::Datadog.tracer : ::Datadog::Tracing + end + # :nodoc: def active_span - ::Datadog.tracer.active_span + tracer.active_span end # :nodoc: @@ -45,6 +46,11 @@ def set_tag(tag, value, span=nil) (span || active_span).set_tag(tag, value) end + # :nodoc: + def get_tag(tag) + active_span.get_tag(tag) + end + end end end diff --git a/lib/deimos/tracing/mock.rb b/lib/deimos/tracing/mock.rb index df1a6593..7be50666 100644 --- a/lib/deimos/tracing/mock.rb +++ b/lib/deimos/tracing/mock.rb @@ -10,6 +10,7 @@ class Mock < Tracing::Provider def initialize(logger=nil) @logger = logger || Logger.new(STDOUT) @logger.info('MockTracingProvider initialized') + @active_span = MockSpan.new end # @param span_name [String] @@ -32,12 +33,22 @@ def finish(span) # :nodoc: def active_span - nil + @active_span ||= MockSpan.new end # :nodoc: def set_tag(tag, value, span=nil) - nil + if span + span.set_tag(tag, value) + else + active_span.set_tag(tag, value) + end + end + + # Get a tag from a span with the specified tag. + # @param tag [String] + def get_tag(tag) + @span.get_tag(tag) end # :nodoc: @@ -47,5 +58,23 @@ def set_error(span, exception) @logger.info("Mock span '#{name}' set an error: #{exception}") end end + + # Mock Span class + class MockSpan + # :nodoc: + def initialize + @span = {} + end + + # :nodoc: + def set_tag(tag, value) + @span[tag] = value + end + + # :nodoc: + def get_tag(tag) + @span[tag] + end + end end end diff --git a/lib/deimos/tracing/provider.rb b/lib/deimos/tracing/provider.rb index f46643d8..f1791a47 100644 --- a/lib/deimos/tracing/provider.rb +++ b/lib/deimos/tracing/provider.rb @@ -42,6 +42,12 @@ def set_tag(tag, value, span=nil) raise NotImplementedError end + # Get a tag from a span with the specified tag. + # @param tag [String] + def get_tag(tag) + raise NotImplementedError + end + end end end diff --git a/lib/deimos/utils/schema_class.rb b/lib/deimos/utils/schema_class.rb index 26c08909..220a507a 100644 --- a/lib/deimos/utils/schema_class.rb +++ b/lib/deimos/utils/schema_class.rb @@ -25,13 +25,21 @@ def modules_for(namespace) def instance(payload, schema, namespace='') return payload if payload.is_a?(Deimos::SchemaClass::Base) - constants = modules_for(namespace) + [schema.underscore.camelize.singularize] - klass = constants.join('::').safe_constantize + klass = klass(schema, namespace) return payload if klass.nil? || payload.nil? klass.new(**payload.symbolize_keys) end + # Determine and return the SchemaClass with the provided schema and namespace + # @param schema [String] + # @param namespace [String] + # @return [Deimos::SchemaClass] + def klass(schema, namespace) + constants = modules_for(namespace) + [schema.underscore.camelize.singularize] + constants.join('::').safe_constantize + end + # @param config [Hash] Producer or Consumer config # @return [Boolean] def use?(config) diff --git a/spec/active_record_batch_consumer_association_spec.rb b/spec/active_record_batch_consumer_association_spec.rb index c6af0f52..99b08aa1 100644 --- a/spec/active_record_batch_consumer_association_spec.rb +++ b/spec/active_record_batch_consumer_association_spec.rb @@ -96,12 +96,17 @@ def publish_batch(messages) key_config plain: true record_class Widget - def should_consume?(record) + def should_consume?(record, associations) if self.should_consume_proc - return self.should_consume_proc.call(record) + case self.should_consume_proc.parameters.size + when 2 + self.should_consume_proc.call(record, associations) + else + self.should_consume_proc.call(record) + end + else + true end - - true end def record_attributes(payload, _key) @@ -269,7 +274,7 @@ def columns(record_class) context 'with invalid models' do before(:each) do - consumer_class.should_consume_proc = proc { |val| val.some_int <= 10 } + consumer_class.should_consume_proc = proc { |record| record.some_int <= 10 } end it 'should only save valid models' do @@ -280,5 +285,27 @@ def columns(record_class) expect(Widget.count).to eq(2) end end + + context 'with invalid associations' do + + before(:each) do + consumer_class.should_consume_proc = proc { |record, associations| + record.some_int <= 10 && associations['detail']['title'] != 'invalid' + } + end + + it 'should only save valid associations' do + publish_batch([ + { key: 2, + payload: { test_id: 'xyz', some_int: 5, title: 'valid' } }, + { key: 3, + payload: { test_id: 'abc', some_int: 15, title: 'valid' } }, + { key: 4, + payload: { test_id: 'abc', some_int: 9, title: 'invalid' } } + ]) + expect(Widget.count).to eq(2) + expect(Widget.second.some_int).to eq(5) + end + end end end diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index 54e7d4db..3777e8a4 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -74,8 +74,12 @@ def publish_batch(messages) describe 'consume_batch' do SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do + before(:each) do - Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } + Deimos.configure do |config| + config.schema.use_schema_classes = use_schema_classes + config.schema.generate_namespace_folders = true + end end it 'should handle an empty batch' do @@ -206,6 +210,48 @@ def publish_batch(messages) ] ) end + + it 'should handle deletes with deadlock retries' do + allow(Deimos::Utils::DeadlockRetry).to receive(:sleep) + allow(instance_double(ActiveRecord::Relation)).to receive(:delete_all).and_raise( + ActiveRecord::Deadlocked.new('Lock wait timeout exceeded') + ).twice.ordered + + Widget.create!(id: 1, test_id: 'abc', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: nil }, + { key: 1, + payload: nil } + ] + ) + + expect(all_widgets).to be_empty + end + + it 'should not delete after multiple deadlock retries' do + allow(Deimos::Utils::DeadlockRetry).to receive(:sleep) + allow(instance_double(ActiveRecord::Relation)).to receive(:delete_all).and_raise( + ActiveRecord::Deadlocked.new('Lock wait timeout exceeded') + ).exactly(3).times + + Widget.create!(id: 1, test_id: 'abc', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: nil }, + { key: 1, + payload: nil } + ] + ) + + expect(Widget.count).to eq(0) + + end + end end end @@ -255,64 +301,6 @@ def deleted_query(_records) end end - describe 'batch atomicity' do - it 'should roll back if there was an exception while deleting' do - Widget.create!(id: 1, test_id: 'abc', some_int: 2) - - travel 1.day - - expect(Widget.connection).to receive(:delete).and_raise('Some error') - - expect { - publish_batch( - [ - { key: 1, - payload: { test_id: 'def', some_int: 3 } }, - { key: 1, - payload: nil } - ] - ) - }.to raise_error('Some error') - - expect(all_widgets). - to match_array( - [ - have_attributes(id: 1, test_id: 'abc', some_int: 2, updated_at: start, created_at: start) - ] - ) - end - - it 'should roll back if there was an invalid instance while upserting' do - Widget.create!(id: 1, test_id: 'abc', some_int: 2) # Updated but rolled back - Widget.create!(id: 3, test_id: 'ghi', some_int: 3) # Removed but rolled back - - travel 1.day - - expect { - publish_batch( - [ - { key: 1, - payload: { test_id: 'def', some_int: 3 } }, - { key: 2, - payload: nil }, - { key: 2, - payload: { test_id: '', some_int: 4 } }, # Empty string is not valid for test_id - { key: 3, - payload: nil } - ] - ) - }.to raise_error(ActiveRecord::RecordInvalid) - - expect(all_widgets). - to match_array( - [ - have_attributes(id: 1, test_id: 'abc', some_int: 2, updated_at: start, created_at: start), - have_attributes(id: 3, test_id: 'ghi', some_int: 3, updated_at: start, created_at: start) - ] - ) - end - end - describe 'compound keys' do let(:consumer_class) do Class.new(described_class) do @@ -603,5 +591,224 @@ def pre_process(messages) end + describe 'should_consume?' do + + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted false + + def should_consume?(record) + record.test_id != 'def' + end + + def self.process_invalid_records(_) + nil + end + + ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_invalid_records(payload[:records]) + end + + end + end + + it "should skip records that shouldn't be consumed" do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + + expect(Widget.count).to eq(2) + expect(Widget.all.to_a).to match_array([ + have_attributes(id: 1, + test_id: 'abc', + some_int: 11, + updated_at: start, + created_at: start), + have_attributes(id: 2, + test_id: 'def', + some_int: 2, + updated_at: start, + created_at: start) + ]) + end + + end + + describe 'post processing' do + + context 'with uncompacted messages' do + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted false + + def should_consume?(record) + record.some_int.even? + end + + def self.process_valid_records(valid) + # Success + attrs = valid.first.attributes + Widget.find_by(id: attrs['id'], test_id: attrs['test_id']).update!(some_int: 2000) + end + + def self.process_invalid_records(invalid) + # Invalid + attrs = invalid.first.record.attributes + Widget.find_by(id: attrs['id'], test_id: attrs['test_id']).update!(some_int: attrs['some_int']) + end + + ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_invalid_records(payload[:records]) + end + + ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_valid_records(payload[:records]) + end + + end + end + + it 'should process successful and failed records' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + + widget_one, widget_two = Widget.all.to_a + + expect(widget_one.some_int).to eq(11) + expect(widget_two.some_int).to eq(2000) + end + end + + context 'with compacted messages' do + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted true + + def should_consume?(record) + record.some_int.even? + end + + def self.process_valid_records(valid) + # Success + attrs = valid.first.attributes + Widget.find_by(id: attrs['id'], test_id: attrs['test_id']).update!(some_int: 2000) + end + + def self.process_invalid_records(invalid) + # Invalid + attrs = invalid.first.record.attributes + Widget.find_by(id: attrs['id'], test_id: attrs['test_id']).update!(some_int: attrs['some_int']) + end + + ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_invalid_records(payload[:records]) + end + + ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_valid_records(payload[:records]) + end + + end + end + + it 'should process successful and failed records' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + + widget_one, widget_two = Widget.all.to_a + + expect(widget_one.some_int).to eq(11) + expect(widget_two.some_int).to eq(2000) + end + end + + context 'with post processing errors' do + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted false + + def self.process_valid_records(_) + raise StandardError, 'Something went wrong' + end + + ActiveSupport::Notifications.subscribe('batch_consumption.valid_records') do |*args| + payload = ActiveSupport::Notifications::Event.new(*args).payload + payload[:consumer].process_valid_records(payload[:records]) + end + + end + end + + it 'should save records if an exception occurs in post processing' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + + expect { + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + }.to raise_error(StandardError, 'Something went wrong') + + widget_one, widget_two = Widget.all.to_a + + expect(widget_one.some_int).to eq(11) + expect(widget_two.some_int).to eq(20) + + end + end + + end + end end diff --git a/spec/active_record_consume/mass_updater_spec.rb b/spec/active_record_consume/mass_updater_spec.rb index edae353c..7fcb961c 100644 --- a/spec/active_record_consume/mass_updater_spec.rb +++ b/spec/active_record_consume/mass_updater_spec.rb @@ -69,7 +69,9 @@ it 'should mass update the batch' do allow(SecureRandom).to receive(:uuid).and_return('1', '2') - described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch) + results = described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch) + expect(results.count).to eq(2) + expect(results.map(&:test_id)).to match(%w(id1 id2)) expect(Widget.count).to eq(2) expect(Widget.all.to_a.map(&:bulk_import_id)).to match(%w(1 2)) expect(Detail.count).to eq(2) @@ -77,5 +79,40 @@ expect(Widget.last.detail).not_to be_nil end + context 'with deadlock retries' do + before(:each) do + allow(Deimos::Utils::DeadlockRetry).to receive(:sleep) + end + + it 'should upsert rows after deadlocks' do + allow(Widget).to receive(:import!).and_raise( + ActiveRecord::Deadlocked.new('Lock wait timeout exceeded') + ).twice.ordered + allow(Widget).to receive(:import!).and_raise( + ActiveRecord::Deadlocked.new('Lock wait timeout exceeded') + ).once.and_call_original + + results = described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch) + expect(results.count).to eq(2) + expect(results.map(&:test_id)).to match(%w(id1 id2)) + expect(Widget.count).to eq(2) + expect(Detail.count).to eq(2) + expect(Widget.first.detail).not_to be_nil + expect(Widget.last.detail).not_to be_nil + end + + it 'should not upsert after encountering multiple deadlocks' do + allow(Widget).to receive(:import!).and_raise( + ActiveRecord::Deadlocked.new('Lock wait timeout exceeded') + ).exactly(3).times + expect { + described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch) + }.to raise_error(ActiveRecord::Deadlocked) + expect(Widget.count).to eq(0) + expect(Detail.count).to eq(0) + end + + end + end end diff --git a/spec/active_record_consumer_spec.rb b/spec/active_record_consumer_spec.rb index 3d9f2a05..fae7807c 100644 --- a/spec/active_record_consumer_spec.rb +++ b/spec/active_record_consumer_spec.rb @@ -3,6 +3,7 @@ require 'date' # Wrapped in a module to prevent class leakage +# rubocop:disable Metrics/ModuleLength module ActiveRecordConsumerTest describe Deimos::ActiveRecordConsumer, 'Message Consumer' do before(:all) do @@ -66,13 +67,84 @@ def fetch_record(klass, payload, _key) stub_const('MyCustomFetchConsumer', consumer_class) Time.zone = 'Eastern Time (US & Canada)' + + schema_class = Class.new(Deimos::SchemaClass::Record) do + def schema + 'MySchema' + end + + def namespace + 'com.my-namespace' + end + + attr_accessor :test_id + attr_accessor :some_int + + def initialize(test_id: nil, + some_int: nil) + self.test_id = test_id + self.some_int = some_int + end + + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'payload_key' => @payload_key&.as_json + } + end + end + stub_const('Schemas::MySchema', schema_class) + + schema_datetime_class = Class.new(Deimos::SchemaClass::Record) do + def schema + 'MySchemaWithDateTimes' + end + + def namespace + 'com.my-namespace' + end + + attr_accessor :test_id + attr_accessor :some_int + attr_accessor :updated_at + attr_accessor :some_datetime_int + attr_accessor :timestamp + + def initialize(test_id: nil, + some_int: nil, + updated_at: nil, + some_datetime_int: nil, + timestamp: nil) + self.test_id = test_id + self.some_int = some_int + self.updated_at = updated_at + self.some_datetime_int = some_datetime_int + self.timestamp = timestamp + end + + def as_json(_opts={}) + { + 'test_id' => @test_id, + 'some_int' => @some_int, + 'updated_at' => @updated_at, + 'some_datetime_int' => @some_datetime_int, + 'timestamp' => @timestamp, + 'payload_key' => @payload_key&.as_json + } + end + end + stub_const('Schemas::MySchemaWithDateTimes', schema_datetime_class) end describe 'consume' do SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do before(:each) do - Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } + Deimos.configure do |config| + config.schema.use_schema_classes = use_schema_classes + config.schema.generate_namespace_folders = true + end end it 'should receive events correctly' do @@ -182,3 +254,4 @@ def fetch_record(klass, payload, _key) end end end +# rubocop:enable Metrics/ModuleLength diff --git a/spec/active_record_producer_spec.rb b/spec/active_record_producer_spec.rb index 07776009..ec576630 100644 --- a/spec/active_record_producer_spec.rb +++ b/spec/active_record_producer_spec.rb @@ -69,7 +69,10 @@ def self.post_process(batch) SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do before(:each) do - Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } + Deimos.configure do |config| + config.schema.use_schema_classes = use_schema_classes + config.schema.generate_namespace_folders = true + end end it 'should send events correctly' do diff --git a/spec/batch_consumer_spec.rb b/spec/batch_consumer_spec.rb index 5fb4af84..1b4de1a8 100644 --- a/spec/batch_consumer_spec.rb +++ b/spec/batch_consumer_spec.rb @@ -40,7 +40,10 @@ def consume_batch(_payloads, _metadata) end before(:each) do - Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } + Deimos.configure do |config| + config.schema.use_schema_classes = use_schema_classes + config.schema.generate_namespace_folders = true + end end it 'should provide backwards compatibility for BatchConsumer class' do diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index 07f47306..d5cd3962 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true # :nodoc: +# rubocop:disable Metrics/ModuleLength module ConsumerTest describe Deimos::Consumer, 'Message Consumer' do prepend_before(:each) do @@ -26,8 +27,12 @@ def consume(_payload, _metadata) describe 'consume' do SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes| context "with Schema Class consumption #{setting}" do + before(:each) do - Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes } + Deimos.configure do |config| + config.schema.use_schema_classes = use_schema_classes + config.schema.generate_namespace_folders = true + end end it 'should consume a message' do @@ -127,6 +132,41 @@ def consume(_payload, _metadata) end end end + + context 'with overriden schema classes' do + + before(:each) do + Deimos.configure do |config| + config.schema.use_schema_classes = true + config.schema.generate_namespace_folders = true + end + end + + prepend_before(:each) do + consumer_class = Class.new(described_class) do + schema 'MyUpdatedSchema' + namespace 'com.my-namespace' + key_config field: 'test_id' + + # :nodoc: + def consume(_payload, _metadata) + raise 'This should not be called unless call_original is set' + end + end + stub_const('ConsumerTest::MyConsumer', consumer_class) + end + + it 'should consume messages' do + test_consume_message('my_consume_topic', + { 'test_id' => 'foo', + 'some_int' => 1 }) do |payload, _metadata| + expect(payload['test_id']).to eq('foo') + expect(payload['some_int']).to eq(1) + expect(payload['super_int']).to eq(9000) + end + end + + end end describe 'decode_key' do @@ -218,3 +258,4 @@ def consume(_payload, _metadata) end end end +# rubocop:enable Metrics/ModuleLength diff --git a/spec/schemas/my_namespace/my_updated_schema.rb b/spec/schemas/my_namespace/my_updated_schema.rb new file mode 100644 index 00000000..2ece1dd9 --- /dev/null +++ b/spec/schemas/my_namespace/my_updated_schema.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +# This file is autogenerated by Deimos, Do NOT modify +module Schemas; module MyNamespace + ### Primary Schema Class ### + # Autogenerated Schema for Record at com.my-namespace.MySchema + class MyUpdatedSchema < Schemas::MyNamespace::MySchema + + attr_accessor :super_int + + def initialize(test_id: nil, + some_int: nil) + super + self.super_int = some_int.nil? ? 10 : some_int * 9000 + end + end +end +end