Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCOL-2039: Post process valid and invalid records from batch consumption #207

Merged
merged 29 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6553167
CCOL-2039: Post process valid and invalid records from batch consumption
Nov 30, 2023
ca6a1e3
CCOL-2039: Lint fixes
Dec 1, 2023
3b7461e
CCOL-2039: Add optional associations for should_consume?
Dec 1, 2023
8d347e4
CCOL-2039: Linting changes
Dec 1, 2023
4a688a0
CCOL-2039: Add CHANGELOG entry && refactor valid and invalid record c…
Dec 1, 2023
385e58b
CCOL-2039: Add consume_filter when filtering records
Dec 1, 2023
4661b4f
CCOL-2039: YARD fixes
Dec 1, 2023
8813fa4
CCOL-2039: Variable rename
Dec 1, 2023
d6181fd
CCOL-2039: Process invalid records via ActiveSupport notifications
Dec 4, 2023
2d9e8dd
CCOL-2039: Process valid and invalid records via notification
Dec 4, 2023
2cb9d4d
CCOL-2039: Spec update
Dec 4, 2023
6a750c8
CCOL-2039: Update association spec
Dec 4, 2023
eae600e
CCOL-2039: Update CHANGELOG entry
Dec 5, 2023
2b8aa2d
CCOL-2039: Move deadlocks to mass updater
Dec 5, 2023
ac27466
CCOL-2039: Maintain backwards compatibility for should_consume?
Dec 5, 2023
1a5b7db
CCOL-2039: Set and get topic tag
Dec 5, 2023
ea099a3
CCOL-2039: Add mock span to mock tracer
Dec 5, 2023
1ea3069
CCOL-2039: Update mock set_tag
Dec 5, 2023
1e5d3a5
CCOL-2039: Add retries spec for mass update and deletes
Dec 6, 2023
74364f6
CCOL-2039: Update datadog span tagging
Dec 6, 2023
90b5cf7
CCOL-2039: Update README
Dec 6, 2023
ceff055
CCOL-2039: Update CONFIGURATION docs
Dec 18, 2023
b61a403
CCOL-2039: Evaluate schemaclass schema and namespace when setting dec…
Dec 18, 2023
97bd093
CCOL-2039: Update specs with Schema classes
Dec 19, 2023
1ecee75
CCOL-2039: Update linting & spec cleanup
Dec 19, 2023
2b4dfc2
CCOL-2039: Add SchemaClasses context, move decoder logic to schema ba…
Jan 3, 2024
1c4e6b4
CCOL-2039: Use generate namespace folders in specs
Jan 3, 2024
4e4f774
CCOL-2039: Add overriden schema
Jan 3, 2024
e21bded
CCOL-2039: Rename updated schema class
Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion lib/deimos.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,20 @@ def schema_backend_class
# @param namespace [String]
# @return [Deimos::SchemaBackends::Base]
def schema_backend(schema:, namespace:)
schema_backend_class.new(schema: schema, namespace: namespace)
if Utils::SchemaClass.use?(config.to_h)
# Initialize an instance of the provided schema
# in the event the schema class is an override, the inherited
# schema and namespace will be applied
schema_class = Utils::SchemaClass.klass(schema, namespace)
if schema_class.nil?
schema_backend_class.new(schema: schema, namespace: namespace)
else
schema_instance = schema_class.new
schema_backend_class.new(schema: schema_instance.schema, namespace: schema_instance.namespace)
end
else
schema_backend_class.new(schema: schema, namespace: namespace)
end
end

# @param schema [String]
Expand Down
13 changes: 2 additions & 11 deletions lib/deimos/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,8 @@ class Consumer
class << self
# @return [Deimos::SchemaBackends::Base]
def decoder
return @decoder if @decoder

@decoder = if Utils::SchemaClass.use?(config.to_h)
# Initialize an instance of the provided schema
# in the event the schema class is an override, the inherited
# schema and namespace will be applied
schema_class = "Schemas::#{config[:schema]}".constantize.new
Deimos.schema_backend(schema: schema_class.schema, namespace: schema_class.namespace)
else
Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace])
end
@decoder ||= Deimos.schema_backend(schema: config[:schema],
namespace: config[:namespace])
end

# @return [Deimos::SchemaBackends::Base]
Expand Down
10 changes: 7 additions & 3 deletions lib/deimos/tracing/datadog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ def initialize(config)
raise 'Tracing config must specify service_name' if config[:service_name].nil?

@service = config[:service_name]
@tracer = ::Datadog.respond_to?(:tracer) ? ::Datadog.tracer : ::Datadog::Tracing
end

# :nodoc:
def start(span_name, options={})
span = @tracer.trace(span_name)
span = tracer.trace(span_name)
span.service = @service
span.resource = options[:resource]
span
Expand All @@ -27,9 +26,14 @@ def finish(span)
span.finish
end

# :nodoc:
def tracer
@tracer ||= ::Datadog.respond_to?(:tracer) ? ::Datadog.tracer : ::Datadog::Tracing
end

# :nodoc:
def active_span
@tracer.active_span
tracer.active_span
end

# :nodoc:
Expand Down
12 changes: 10 additions & 2 deletions lib/deimos/utils/schema_class.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,21 @@ def modules_for(namespace)
def instance(payload, schema, namespace='')
return payload if payload.is_a?(Deimos::SchemaClass::Base)

constants = modules_for(namespace) + [schema.underscore.camelize.singularize]
klass = constants.join('::').safe_constantize
klass = klass(schema, namespace)
return payload if klass.nil? || payload.nil?

klass.new(**payload.symbolize_keys)
end

# Determine and return the SchemaClass with the provided schema and namespace
# @param schema [String]
# @param namespace [String]
# @return [Deimos::SchemaClass]
def klass(schema, namespace)
constants = modules_for(namespace) + [schema.underscore.camelize.singularize]
constants.join('::').safe_constantize
end

# @param config [Hash] Producer or Consumer config
# @return [Boolean]
def use?(config)
Expand Down
33 changes: 5 additions & 28 deletions spec/active_record_batch_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,6 @@ class Widget < ActiveRecord::Base
stub_const('MyBatchConsumer', consumer_class)
stub_const('ConsumerTest::MyBatchConsumer', consumer_class)
consumer_class.config[:bulk_import_id_column] = :bulk_import_id # default
schema_class = Class.new(Deimos::SchemaClass::Record) do
def schema
'MySchema'
end

def namespace
'com.my-namespace'
end

attr_accessor :test_id
attr_accessor :some_int

def initialize(test_id: nil,
some_int: nil)
self.test_id = test_id
self.some_int = some_int
end

def as_json(_opts={})
{
'test_id' => @test_id,
'some_int' => @some_int,
'payload_key' => @payload_key&.as_json
}
end
end
stub_const('Schemas::MySchema', schema_class)
end

around(:each) do |ex|
Expand Down Expand Up @@ -101,8 +74,12 @@ def publish_batch(messages)
describe 'consume_batch' do
SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes|
context "with Schema Class consumption #{setting}" do

before(:each) do
Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes }
Deimos.configure do |config|
config.schema.use_schema_classes = use_schema_classes
config.schema.generate_namespace_folders = true
end
end

it 'should handle an empty batch' do
Expand Down
5 changes: 4 additions & 1 deletion spec/active_record_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ def as_json(_opts={})
SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes|
context "with Schema Class consumption #{setting}" do
before(:each) do
Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes }
Deimos.configure do |config|
config.schema.use_schema_classes = use_schema_classes
config.schema.generate_namespace_folders = true
end
end

it 'should receive events correctly' do
Expand Down
5 changes: 4 additions & 1 deletion spec/active_record_producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ def self.post_process(batch)
SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes|
context "with Schema Class consumption #{setting}" do
before(:each) do
Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes }
Deimos.configure do |config|
config.schema.use_schema_classes = use_schema_classes
config.schema.generate_namespace_folders = true
end
end

it 'should send events correctly' do
Expand Down
32 changes: 4 additions & 28 deletions spec/batch_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,6 @@ def consume_batch(_payloads, _metadata)
end
end
stub_const('ConsumerTest::MyBatchConsumer', consumer_class)
schema_class = Class.new(Deimos::SchemaClass::Record) do
def schema
'MySchema'
end

def namespace
'com.my-namespace'
end

attr_accessor :test_id
attr_accessor :some_int

def initialize(test_id: nil,
some_int: nil)
self.test_id = test_id
self.some_int = some_int
end

def as_json(_opts={})
{
'test_id' => @test_id,
'some_int' => @some_int,
'payload_key' => @payload_key&.as_json
}
end
end
stub_const('Schemas::MySchema', schema_class)
end

let(:batch) do
Expand All @@ -67,7 +40,10 @@ def as_json(_opts={})
end

before(:each) do
Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes }
Deimos.configure do |config|
config.schema.use_schema_classes = use_schema_classes
config.schema.generate_namespace_folders = true
end
end

it 'should provide backwards compatibility for BatchConsumer class' do
Expand Down
68 changes: 40 additions & 28 deletions spec/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,17 @@ def consume(_payload, _metadata)
end
end
stub_const('ConsumerTest::MyConsumer', consumer_class)
schema_class = Class.new(Deimos::SchemaClass::Record) do
def schema
'MySchema'
end

def namespace
'com.my-namespace'
end

attr_accessor :test_id
attr_accessor :some_int

def initialize(test_id: nil,
some_int: nil)
self.test_id = test_id
self.some_int = some_int
end

def as_json(_opts={})
{
'test_id' => @test_id,
'some_int' => @some_int,
'payload_key' => @payload_key&.as_json
}
end
end
stub_const('Schemas::MySchema', schema_class)
end

describe 'consume' do
SCHEMA_CLASS_SETTINGS.each do |setting, use_schema_classes|
context "with Schema Class consumption #{setting}" do

before(:each) do
Deimos.configure { |config| config.schema.use_schema_classes = use_schema_classes }
Deimos.configure do |config|
config.schema.use_schema_classes = use_schema_classes
config.schema.generate_namespace_folders = true
end
end

it 'should consume a message' do
Expand Down Expand Up @@ -155,6 +132,41 @@ def as_json(_opts={})
end
end
end

context 'with overriden schema classes' do

before(:each) do
Deimos.configure do |config|
config.schema.use_schema_classes = true
config.schema.generate_namespace_folders = true
end
end

prepend_before(:each) do
consumer_class = Class.new(described_class) do
schema 'MyUpdatedSchema'
namespace 'com.my-namespace'
key_config field: 'test_id'

# :nodoc:
def consume(_payload, _metadata)
raise 'This should not be called unless call_original is set'
end
end
stub_const('ConsumerTest::MyConsumer', consumer_class)
end

it 'should consume messages' do
test_consume_message('my_consume_topic',
{ 'test_id' => 'foo',
'some_int' => 1 }) do |payload, _metadata|
expect(payload['test_id']).to eq('foo')
expect(payload['some_int']).to eq(1)
expect(payload['super_int']).to eq(9000)
end
end

end
end

describe 'decode_key' do
Expand Down
18 changes: 18 additions & 0 deletions spec/schemas/my_namespace/my_schema_updated.rb
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filename is backwards :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof. Updated lol

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

# This file is autogenerated by Deimos, Do NOT modify
module Schemas; module MyNamespace
### Primary Schema Class ###
# Autogenerated Schema for Record at com.my-namespace.MySchema
class MyUpdatedSchema < Schemas::MyNamespace::MySchema

attr_accessor :super_int

def initialize(test_id: nil,
some_int: nil)
super
self.super_int = some_int.nil? ? 10 : some_int * 9000
end
end
end
end