Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCOL-2039: Post process valid and invalid records from batch consumption #207

Merged
merged 29 commits into from
Jan 3, 2024

Conversation

lionelpereira
Copy link
Collaborator

Pull Request Template

Description

  1. should_consume? method accepts validation of associations
  2. Return valid and invalid records saved during consumption for further processing in post_process method

Fixes # (issue)

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration

  • Test A
  • Test B

Checklist:

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have added a line in the CHANGELOG describing this change, under the UNRELEASED heading
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • Any dependent changes have been merged and published in downstream modules

slice(messages).
each(&method(:update_database))
valid_records = []
invalid_records = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename one of these variables? They represent two different things (BatchRecord and ActiveRecord).

upserted.each_slice(max_db_batch_size) do |group|
valid, invalid = upsert_records(group)
valid_upserts.push(*valid)
invalid_upserts.push(*invalid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simpler to do invalid_upserts.concat(invalid) instead of .push(*invalid). Same above/below.

invalid_upserts.push(*invalid)
end
valid_upserts.compact!
invalid_upserts.compact!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you're compacting inside each iteration instead of after all iterations are done?

# @param block [Proc]
# @return [Array<BatchRecord>]
def partition!(method=nil, &block)
valid, invalid = if method.nil?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never be nil since it's defined on the base class, no?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a case where we had to override filter_records like so

ex:

def filter_records(records)
      some_filter_query = filter_query(records)
      records.partition! do |r|
        some_filter_query.exclude?(r.record.id) && valid_batch_record?(r.record,
                                                                            r.associations)
      end
    end

Could this be done in a proc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure... I think I'd need to see this. It feels like you're diving too deep into Deimos guts to get this working - it feels wrong for calling code to have to call so many internal methods.

end
end

context 'with post processing' do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused - what's the difference between describe 'post processing' and context 'with post processing'?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be with compacted messages

Copy link
Member

@dorner dorner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't fully understand the use case here. We seem to be adding a lot of functionality for a very laser-specific use case.

lib/deimos/active_record_consume/batch_consumption.rb Outdated Show resolved Hide resolved
lib/deimos/active_record_consume/batch_consumption.rb Outdated Show resolved Hide resolved
@@ -41,9 +42,17 @@ def consume_batch(payloads, metadata)
uncompacted_update(messages)
end
end
process_valid_records(@valid_active_records)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this also can't be a notification?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be a notification

# @return [Boolean]
def should_consume?(_record)
def should_consume?(_batch_record)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this need to change? Did it actually change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did change types from ActiveRecord::Base to BatchRecord. This is in order to validate associations that are not yet saved


ActiveSupport::Notifications.subscribe('batch_consumption.invalid_records') do |*args|
payload = ActiveSupport::Notifications::Event.new(*args).payload
payload[:consumer].process_invalid_records(payload[:records])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling code should be able to do this themselves rather than us having a method that we call explicitly.

@@ -167,7 +178,13 @@ def upsert_records(messages)
col_proc: col_proc,
replace_associations: self.class.replace_associations,
bulk_import_id_generator: self.class.bulk_import_id_generator)
updater.mass_update(record_list)
@valid_active_records.concat(updater.mass_update(record_list))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get this working without the instance variable? I.e. return the updated records from mass_update to the calling code and have the concatenation happen there?

spec/active_record_batch_consumer_spec.rb Show resolved Hide resolved
else
uncompacted_update(messages)
end
@tags = %W(topic:#{metadata[:topic]})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's weird to pass metric tags so far down into the workflow. Can we do something like

Deimos.config.tracer.active_span.set_tag("topic", metadata[:topic])
...
Deimos.config.tracer.active_span.get_tag("topic")

We'd have to add a get_tag method to the provider class but it feels a lot better because we can separate out the tracing/metric logic from the actual business logic.

@@ -155,19 +151,35 @@ def update_database(messages)
# @return [void]
def upsert_records(messages)
record_list = build_records(messages)
record_list.filter!(self.method(:should_consume?).to_proc)
invalid = filter_records(record_list)
unless invalid.blank?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use positive conditions: if invalid.any?


Deimos.instrument('ar_consumer.consume_batch', @tags) do
Deimos.instrument('ar_consumer.consume_batch',
Deimos.config.tracer.active_span.get_tag('topic')) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol you don't need to do get_tag here, you already have the topic.

@@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Feature: Add individual configuration for custom `bulk_import_id_generator` proc per consumer
- Feature: Add global `replace_assocations` value for for all consumers
- Feature: Add individual `replace_assocations` value for for individual consumers
- Feature: `should_consume?` method accepts BatchRecord associations
- Feature: Reintroduce `filter_records` for bulk filtering of records prior to insertion
- Feature: Return valid and invalid records saved during consumption for further processing in `batch_consumption.valid_records` and `batch_consumption.invalid_records` ActiveSupport Notifications
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be added to the README.

# @param tag [String]
def get_tag(tag)
raise NotImplementedError
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to implement this for datadog.

dorner
dorner previously approved these changes Dec 6, 2023
# in the event the schema class is an override, the inherited
# schema and namespace will be applied
schema_class = "Schemas::#{config[:schema]}".constantize.new
Deimos.schema_backend(schema: schema_class.schema, namespace: schema_class.namespace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be moved into the schema_backend method rather than just used here?

@@ -11,15 +11,12 @@ def initialize(config)
raise 'Tracing config must specify service_name' if config[:service_name].nil?

@service = config[:service_name]
@tracer = ::Datadog.respond_to?(:tracer) ? ::Datadog.tracer : ::Datadog::Tracing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels more like a method than an instance variable. It isn't actually state that needs to be stored.

}
end
end
stub_const('Schemas::MySchema', schema_class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we have to add this in all four specs? If we're just trying to test overriding, can we have a dedicated spec for that? Also, why does the overriding class have to reimplement every method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schemas::MySchema wasn't defined anywhere. So in tests where SCHEMAS_CLASS_SETTING = on test were failing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be in spec/schemas/my_namespace. If it's not showing up, maybe the settings are wrong for the tests? Or do we need to add it to spec/schemas and require it similarly to the other generated schema files?

Copy link
Member

@dorner dorner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much better! One tiny comment left :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filename is backwards :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof. Updated lol

@dorner
Copy link
Member

dorner commented Jan 3, 2024

Time to hit the big green button! Thanks @lionelpereira !

@dorner dorner merged commit eb628f3 into master Jan 3, 2024
5 checks passed
@dorner dorner deleted the CCOL-2039-post-process branch January 3, 2024 20:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants