Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCOL-2039: Post process valid and invalid records from batch consumption #207

Merged
merged 29 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6553167
CCOL-2039: Post process valid and invalid records from batch consumption
Nov 30, 2023
ca6a1e3
CCOL-2039: Lint fixes
Dec 1, 2023
3b7461e
CCOL-2039: Add optional associations for should_consume?
Dec 1, 2023
8d347e4
CCOL-2039: Linting changes
Dec 1, 2023
4a688a0
CCOL-2039: Add CHANGELOG entry && refactor valid and invalid record c…
Dec 1, 2023
385e58b
CCOL-2039: Add consume_filter when filtering records
Dec 1, 2023
4661b4f
CCOL-2039: YARD fixes
Dec 1, 2023
8813fa4
CCOL-2039: Variable rename
Dec 1, 2023
d6181fd
CCOL-2039: Process invalid records via ActiveSupport notifications
Dec 4, 2023
2d9e8dd
CCOL-2039: Process valid and invalid records via notification
Dec 4, 2023
2cb9d4d
CCOL-2039: Spec update
Dec 4, 2023
6a750c8
CCOL-2039: Update association spec
Dec 4, 2023
eae600e
CCOL-2039: Update CHANGELOG entry
Dec 5, 2023
2b8aa2d
CCOL-2039: Move deadlocks to mass updater
Dec 5, 2023
ac27466
CCOL-2039: Maintain backwards compatibility for should_consume?
Dec 5, 2023
1a5b7db
CCOL-2039: Set and get topic tag
Dec 5, 2023
ea099a3
CCOL-2039: Add mock span to mock tracer
Dec 5, 2023
1ea3069
CCOL-2039: Update mock set_tag
Dec 5, 2023
1e5d3a5
CCOL-2039: Add retries spec for mass update and deletes
Dec 6, 2023
74364f6
CCOL-2039: Update datadog span tagging
Dec 6, 2023
90b5cf7
CCOL-2039: Update README
Dec 6, 2023
ceff055
CCOL-2039: Update CONFIGURATION docs
Dec 18, 2023
b61a403
CCOL-2039: Evaluate schemaclass schema and namespace when setting dec…
Dec 18, 2023
97bd093
CCOL-2039: Update specs with Schema classes
Dec 19, 2023
1ecee75
CCOL-2039: Update linting & spec cleanup
Dec 19, 2023
2b4dfc2
CCOL-2039: Add SchemaClasses context, move decoder logic to schema ba…
Jan 3, 2024
1c4e6b4
CCOL-2039: Use generate namespace folders in specs
Jan 3, 2024
4e4f774
CCOL-2039: Add overriden schema
Jan 3, 2024
e21bded
CCOL-2039: Rename updated schema class
Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ def consume_batch(payloads, metadata)
zip(metadata[:keys]).
map { |p, k| Deimos::Message.new(p, nil, key: k) }

@tags = %W(topic:#{metadata[:topic]})
Deimos.config.tracer.active_span.set_tag('topic', metadata[:topic])

Deimos.instrument('ar_consumer.consume_batch', @tags) do
Deimos.instrument('ar_consumer.consume_batch',
Deimos.config.tracer.active_span.get_tag('topic')) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol you don't need to do get_tag here, you already have the topic.

if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
end
end
end

protected
Expand Down Expand Up @@ -152,20 +153,18 @@ def update_database(messages)
def upsert_records(messages)
record_list = build_records(messages)
invalid = filter_records(record_list)
dorner marked this conversation as resolved.
Show resolved Hide resolved
unless invalid.blank?
if invalid.any?
ActiveSupport::Notifications.instrument('batch_consumption.invalid_records', {
dorner marked this conversation as resolved.
Show resolved Hide resolved
records: invalid,
consumer: self.class
})

end
return if record_list.empty?

key_col_proc = self.method(:key_columns).to_proc
col_proc = self.method(:columns).to_proc

updater = MassUpdater.new(@klass,
@tags,
key_col_proc: key_col_proc,
col_proc: col_proc,
replace_associations: self.class.replace_associations,
Expand Down Expand Up @@ -221,9 +220,11 @@ def build_records(messages)
# deleted records.
# @return [void]
def remove_records(messages)
clause = deleted_query(messages)
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
clause = deleted_query(messages)

clause.delete_all
clause.delete_all
end
end
end
end
Expand Down
5 changes: 2 additions & 3 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ def default_cols(klass)
# @param key_col_proc [Proc<Class < ActiveRecord::Base>]
# @param col_proc [Proc<Class < ActiveRecord::Base>]
# @param replace_associations [Boolean]
def initialize(klass, tags, key_col_proc: nil, col_proc: nil,
def initialize(klass, key_col_proc: nil, col_proc: nil,
replace_associations: true, bulk_import_id_generator: nil)
@klass = klass
@tags = tags
@replace_associations = replace_associations
@bulk_import_id_generator = bulk_import_id_generator

Expand Down Expand Up @@ -90,7 +89,7 @@ def mass_update(record_list)
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(@tags) do
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
end
Expand Down
1 change: 1 addition & 0 deletions lib/deimos/test_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def unit_test!
deimos_config.kafka.seed_brokers ||= ['test_broker']
deimos_config.schema.backend = Deimos.schema_backend_class.mock_backend
deimos_config.producers.backend = :test
deimos_config.tracer = Deimos::Tracing::Mock.new
end
end

Expand Down
33 changes: 31 additions & 2 deletions lib/deimos/tracing/mock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Mock < Tracing::Provider
def initialize(logger=nil)
@logger = logger || Logger.new(STDOUT)
@logger.info('MockTracingProvider initialized')
@active_span = MockSpan.new
end

# @param span_name [String]
Expand All @@ -32,12 +33,22 @@ def finish(span)

# :nodoc:
def active_span
nil
@active_span ||= MockSpan.new
end

# :nodoc:
def set_tag(tag, value, span=nil)
nil
if span
span.set_tag(tag, value)
else
active_span.set_tag(tag, value)
end
end

# Get a tag from a span with the specified tag.
# @param tag [String]
def get_tag(tag)
@span.get_tag(tag)
end

# :nodoc:
Expand All @@ -47,5 +58,23 @@ def set_error(span, exception)
@logger.info("Mock span '#{name}' set an error: #{exception}")
end
end

# Mock Span class
class MockSpan
# :nodoc:
def initialize
@span = {}
end

# :nodoc:
def set_tag(tag, value)
@span[tag] = value
end

# :nodoc:
def get_tag(tag)
@span[tag]
end
end
end
end
6 changes: 6 additions & 0 deletions lib/deimos/tracing/provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ def set_tag(tag, value, span=nil)
raise NotImplementedError
end

# Get a tag from a span with the specified tag.
# @param tag [String]
def get_tag(tag)
raise NotImplementedError
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to implement this for datadog.


end
end
end
42 changes: 42 additions & 0 deletions spec/active_record_batch_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,48 @@ def publish_batch(messages)
]
)
end

it 'should handle deletes with deadlock retries' do
allow(Deimos::Utils::DeadlockRetry).to receive(:sleep)
allow(instance_double(ActiveRecord::Relation)).to receive(:delete_all).and_raise(
ActiveRecord::Deadlocked.new('Lock wait timeout exceeded')
).twice.ordered

Widget.create!(id: 1, test_id: 'abc', some_int: 2)

publish_batch(
[
{ key: 1,
payload: nil },
{ key: 1,
payload: nil }
]
)

expect(all_widgets).to be_empty
end

it 'should not delete after multiple deadlock retries' do
allow(Deimos::Utils::DeadlockRetry).to receive(:sleep)
allow(instance_double(ActiveRecord::Relation)).to receive(:delete_all).and_raise(
ActiveRecord::Deadlocked.new('Lock wait timeout exceeded')
).exactly(3).times

Widget.create!(id: 1, test_id: 'abc', some_int: 2)

publish_batch(
[
{ key: 1,
payload: nil },
{ key: 1,
payload: nil }
]
)

expect(Widget.count).to eq(0)

end

end
end
end
Expand Down
38 changes: 36 additions & 2 deletions spec/active_record_consume/mass_updater_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@

it 'should mass update the batch' do
allow(SecureRandom).to receive(:uuid).and_return('1', '2')
results = described_class.new(Widget, %w(topic:my-topic),
bulk_import_id_generator: bulk_id_generator).mass_update(batch)
results = described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch)
expect(results.count).to eq(2)
expect(results.map(&:test_id)).to match(%w(id1 id2))
expect(Widget.count).to eq(2)
Expand All @@ -80,5 +79,40 @@
expect(Widget.last.detail).not_to be_nil
end

context 'with deadlock retries' do
before(:each) do
allow(Deimos::Utils::DeadlockRetry).to receive(:sleep)
end

it 'should upsert rows after deadlocks' do
allow(Widget).to receive(:import!).and_raise(
ActiveRecord::Deadlocked.new('Lock wait timeout exceeded')
).twice.ordered
allow(Widget).to receive(:import!).and_raise(
ActiveRecord::Deadlocked.new('Lock wait timeout exceeded')
).once.and_call_original

results = described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch)
expect(results.count).to eq(2)
expect(results.map(&:test_id)).to match(%w(id1 id2))
expect(Widget.count).to eq(2)
expect(Detail.count).to eq(2)
expect(Widget.first.detail).not_to be_nil
expect(Widget.last.detail).not_to be_nil
end

it 'should not upsert after encountering multiple deadlocks' do
allow(Widget).to receive(:import!).and_raise(
ActiveRecord::Deadlocked.new('Lock wait timeout exceeded')
).exactly(3).times
expect {
described_class.new(Widget, bulk_import_id_generator: bulk_id_generator).mass_update(batch)
}.to raise_error(ActiveRecord::Deadlocked)
expect(Widget.count).to eq(0)
expect(Detail.count).to eq(0)
end

end

end
end