Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCOL-2441: Allow custom publishing batch size per producer #216

Merged
merged 10 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## UNRELEASED
- Feature: Added `max_batch_size` config to producer to allow custom batch size for publishing.

# 1.24.3 - 2024-05-13

- Feature: Enable `producers.persistent_connections` phobos setting
- Feature: Add consumer configuration, `save_associations_first` to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class

Expand Down
2 changes: 2 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ schema|nil|Name of the schema to use to encode data before producing.
namespace|nil|Namespace of the schema to use when finding it locally.
key_config|nil|Configuration hash for message keys. See [Kafka Message Keys](../README.md#installation)
use_schema_classes|nil|Set to true or false to enable or disable using the producers schema classes. See [Generated Schema Classes](../README.md#generated-schema-classes)
max_batch_size|500|Maximum publishing batch size. Defaults to top-level configuration of 500.

## Defining Consumers

Expand Down Expand Up @@ -198,6 +199,7 @@ producers.schema_namespace|nil|Default namespace for all producers. Can remain n
producers.topic_prefix|nil|Add a prefix to all topic names. This can be useful if you're using the same Kafka broker for different environments that are producing the same topics.
producers.disabled|false|Disable all actual message producing. Generally more useful to use the `disable_producers` method instead.
producers.backend|`:kafka_async`|Currently can be set to `:db`, `:kafka`, or `:kafka_async`. If using Kafka directly, a good pattern is to set to async in your user-facing app, and sync in your consumers or delayed workers.
producers.max_batch_size|500|Maximum batch size for publishing. Individual producers can override.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing the doc for the new per-producer setting?


## Schema Configuration

Expand Down
7 changes: 7 additions & 0 deletions lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ def self.configure_producer_or_consumer(kafka_config)
# sync in your consumers or delayed workers.
# @return [Symbol]
setting :backend, :kafka_async

# Maximum publishing batch size. Individual producers can override.
# @return [Integer]
setting :max_batch_size, 500
end

setting :schema do
Expand Down Expand Up @@ -436,6 +440,9 @@ def self.configure_producer_or_consumer(kafka_config)
# instead of appending to them.
# @return [Boolean]
setting :replace_associations
# Maximum publishing batch size for this producer.
# @return [Integer]
setting :max_batch_size
end

setting_object :consumer do
Expand Down
14 changes: 9 additions & 5 deletions lib/deimos/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,14 @@ def producers_disabled?(producer_class=nil)
class Producer
include SharedConfig

# @return [Integer]
MAX_BATCH_SIZE = 500

class << self

# @return [Hash]
def config
@config ||= {
encode_key: true,
namespace: Deimos.config.producers.schema_namespace
namespace: Deimos.config.producers.schema_namespace,
max_batch_size: Deimos.config.producers.max_batch_size
}
end

Expand All @@ -92,6 +90,12 @@ def partition_key(_payload)
nil
end

# @param size [Integer] Override the default batch size for publishing.
# @return [void]
def max_batch_size(size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need an actual method here, we can just assign it directly to the config inside configuration.rb.

Copy link
Collaborator Author

@ariana-flipp ariana-flipp May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure what's going on but it seems like producer specs are not working as intended in general. As you suggested I added a line in configuration.rb to allow per-producer level batch size setting like so:

if kafka_config.respond_to?(:bulk_import_id_column) # consumer
    ...
else # new code here for producer
    klass.config[:max_batch_size] = kafka_config.max_batch_size || Deimos.config.producers.max_batch_size
end

I tried setting max_batch_size for a producer in item-feeds and it is working from there. However, self.configure_producer_or_consumer method is never reached for stubbed producers in producer_spec.rb so this config is not being set.

Also, now that I removed max_batch_size method from producer.rb, I'm getting this error when running producer specs:

Deimos::Producer should produce a message
     Failure/Error: max_batch_size 1

NoMethodError:
     undefined method `max_batch_size'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you push your branch? I can take a look.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... we're going to change this entirely in V2 🙃 I forgot that the stubbed producers don't go through the normal process. OK - you can put it back the way it was!

config[:max_batch_size] = size
end

# Publish the payload to the topic.
# @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key.
# @param topic [String] if specifying the topic
Expand Down Expand Up @@ -126,7 +130,7 @@ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, head
) do
messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) }
messages.each { |m| _process_message(m, topic) }
messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch|
messages.in_groups_of(self.config[:max_batch_size], false) do |batch|
self.produce_batch(backend_class, batch)
end
end
Expand Down
33 changes: 33 additions & 0 deletions spec/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ def self.partition_key(payload)
end
stub_const('MyNoTopicProducer', producer_class)

producer_class = Class.new(Deimos::Producer) do
schema 'MySchema'
namespace 'com.my-namespace'
topic 'my-topic'
key_config field: 'test_id'
max_batch_size 1
end
stub_const('MySmallBatchProducer', producer_class)
end

it 'should fail on invalid message with error handler' do
Expand Down Expand Up @@ -607,5 +615,30 @@ def self.partition_key(payload)
end
end

describe "max_batch_size" do
it 'should use top-level default value if max_batch_size is not defined by the producer' do
expect(MyProducer.config[:max_batch_size]).to eq(500)
end

it 'should call produce_batch multiple times when max_batch_size < records size' do
Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 },
MySmallBatchProducer,
topic: 'my-topic',
partition_key: 'foo',
key: 'foo')
Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 },
MySmallBatchProducer,
topic: 'my-topic',
partition_key: 'bar',
key: 'bar')
expect(described_class).to receive(:produce_batch).twice

MySmallBatchProducer.publish_list(
[{ 'test_id' => 'foo', 'some_int' => 123 },
{ 'test_id' => 'bar', 'some_int' => 124 }]
)
end
end

end
end
Loading