Skip to content

Commit

Permalink
CCOL-2441: Allow custom publishing batch size per producer (#216)
Browse files Browse the repository at this point in the history
* Allow producer max_batch_size to be configurable

* Allow on per producer level

* small fix

* wording fix

* docs change

* docs change

* whitespace

* CR comment (rspecs broken)

* Revert "CR comment (rspecs broken)"

This reverts commit 0999eea.

* Update documentation
  • Loading branch information
ariana-flipp authored May 13, 2024
1 parent 27e541a commit 51b5093
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 6 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## UNRELEASED
- Feature: Added `max_batch_size` config to producer to allow custom batch size for publishing.

# 1.24.3 - 2024-05-13

- Feature: Enable `producers.persistent_connections` phobos setting
- Feature: Add consumer configuration, `save_associations_first` to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class

Expand Down
2 changes: 2 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ schema|nil|Name of the schema to use to encode data before producing.
namespace|nil|Namespace of the schema to use when finding it locally.
key_config|nil|Configuration hash for message keys. See [Kafka Message Keys](../README.md#installation)
use_schema_classes|nil|Set to true or false to enable or disable using the producers schema classes. See [Generated Schema Classes](../README.md#generated-schema-classes)
max_batch_size|500|Maximum publishing batch size. Defaults to top-level configuration of 500.

## Defining Consumers

Expand Down Expand Up @@ -198,6 +199,7 @@ producers.schema_namespace|nil|Default namespace for all producers. Can remain n
producers.topic_prefix|nil|Add a prefix to all topic names. This can be useful if you're using the same Kafka broker for different environments that are producing the same topics.
producers.disabled|false|Disable all actual message producing. Generally more useful to use the `disable_producers` method instead.
producers.backend|`:kafka_async`|Currently can be set to `:db`, `:kafka`, or `:kafka_async`. If using Kafka directly, a good pattern is to set to async in your user-facing app, and sync in your consumers or delayed workers.
producers.max_batch_size|500|Maximum batch size for publishing. Individual producers can override.

## Schema Configuration

Expand Down
7 changes: 7 additions & 0 deletions lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ def self.configure_producer_or_consumer(kafka_config)
# sync in your consumers or delayed workers.
# @return [Symbol]
setting :backend, :kafka_async

# Maximum publishing batch size. Individual producers can override.
# @return [Integer]
setting :max_batch_size, 500
end

setting :schema do
Expand Down Expand Up @@ -436,6 +440,9 @@ def self.configure_producer_or_consumer(kafka_config)
# instead of appending to them.
# @return [Boolean]
setting :replace_associations
# Maximum publishing batch size for this producer.
# @return [Integer]
setting :max_batch_size
end

setting_object :consumer do
Expand Down
14 changes: 9 additions & 5 deletions lib/deimos/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,14 @@ def producers_disabled?(producer_class=nil)
class Producer
include SharedConfig

# @return [Integer]
MAX_BATCH_SIZE = 500

class << self

# @return [Hash]
def config
@config ||= {
encode_key: true,
namespace: Deimos.config.producers.schema_namespace
namespace: Deimos.config.producers.schema_namespace,
max_batch_size: Deimos.config.producers.max_batch_size
}
end

Expand All @@ -92,6 +90,12 @@ def partition_key(_payload)
nil
end

# @param size [Integer] Override the default batch size for publishing.
# @return [void]
def max_batch_size(size)
config[:max_batch_size] = size
end

# Publish the payload to the topic.
# @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key.
# @param topic [String] if specifying the topic
Expand Down Expand Up @@ -126,7 +130,7 @@ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, head
) do
messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) }
messages.each { |m| _process_message(m, topic) }
messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch|
messages.in_groups_of(self.config[:max_batch_size], false) do |batch|
self.produce_batch(backend_class, batch)
end
end
Expand Down
33 changes: 33 additions & 0 deletions spec/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ def self.partition_key(payload)
end
stub_const('MyNoTopicProducer', producer_class)

producer_class = Class.new(Deimos::Producer) do
schema 'MySchema'
namespace 'com.my-namespace'
topic 'my-topic'
key_config field: 'test_id'
max_batch_size 1
end
stub_const('MySmallBatchProducer', producer_class)
end

it 'should fail on invalid message with error handler' do
Expand Down Expand Up @@ -607,5 +615,30 @@ def self.partition_key(payload)
end
end

describe "max_batch_size" do
it 'should use top-level default value if max_batch_size is not defined by the producer' do
expect(MyProducer.config[:max_batch_size]).to eq(500)
end

it 'should call produce_batch multiple times when max_batch_size < records size' do
Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 },
MySmallBatchProducer,
topic: 'my-topic',
partition_key: 'foo',
key: 'foo')
Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 },
MySmallBatchProducer,
topic: 'my-topic',
partition_key: 'bar',
key: 'bar')
expect(described_class).to receive(:produce_batch).twice

MySmallBatchProducer.publish_list(
[{ 'test_id' => 'foo', 'some_int' => 123 },
{ 'test_id' => 'bar', 'some_int' => 124 }]
)
end
end

end
end

0 comments on commit 51b5093

Please sign in to comment.