From 51b509366e5782ecfe293dba7ddd53b0fe673897 Mon Sep 17 00:00:00 2001 From: ariana-flipp <83661699+ariana-flipp@users.noreply.github.com> Date: Mon, 13 May 2024 15:28:31 -0400 Subject: [PATCH] CCOL-2441: Allow custom publishing batch size per producer (#216) * Allow producer max_batch_size to be configurable * Allow on per producer level * small fix * wording fix * docs change * docs change * whitespace * CR comment (rspecs broken) * Revert "CR comment (rspecs broken)" This reverts commit 0999eea24bcc9def2848aa7340303db0315747aa. * Update documentation --- CHANGELOG.md | 2 +- docs/CONFIGURATION.md | 2 ++ lib/deimos/config/configuration.rb | 7 +++++++ lib/deimos/producer.rb | 14 ++++++++----- spec/producer_spec.rb | 33 ++++++++++++++++++++++++++++++ 5 files changed, 52 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a6ceb31c..774140e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,9 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## UNRELEASED +- Feature: Added `max_batch_size` config to producer to allow custom batch size for publishing. # 1.24.3 - 2024-05-13 - - Feature: Enable `producers.persistent_connections` phobos setting - Feature: Add consumer configuration, `save_associations_first` to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index fb7f6e63..85098edb 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -46,6 +46,7 @@ schema|nil|Name of the schema to use to encode data before producing. namespace|nil|Namespace of the schema to use when finding it locally. key_config|nil|Configuration hash for message keys. See [Kafka Message Keys](../README.md#installation) use_schema_classes|nil|Set to true or false to enable or disable using the producers schema classes. See [Generated Schema Classes](../README.md#generated-schema-classes) +max_batch_size|500|Maximum publishing batch size. Defaults to top-level configuration of 500. ## Defining Consumers @@ -198,6 +199,7 @@ producers.schema_namespace|nil|Default namespace for all producers. Can remain n producers.topic_prefix|nil|Add a prefix to all topic names. This can be useful if you're using the same Kafka broker for different environments that are producing the same topics. producers.disabled|false|Disable all actual message producing. Generally more useful to use the `disable_producers` method instead. producers.backend|`:kafka_async`|Currently can be set to `:db`, `:kafka`, or `:kafka_async`. If using Kafka directly, a good pattern is to set to async in your user-facing app, and sync in your consumers or delayed workers. +producers.max_batch_size|500|Maximum batch size for publishing. Individual producers can override. ## Schema Configuration diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index 9a0abaf2..ac73e6b0 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -344,6 +344,10 @@ def self.configure_producer_or_consumer(kafka_config) # sync in your consumers or delayed workers. # @return [Symbol] setting :backend, :kafka_async + + # Maximum publishing batch size. Individual producers can override. + # @return [Integer] + setting :max_batch_size, 500 end setting :schema do @@ -436,6 +440,9 @@ def self.configure_producer_or_consumer(kafka_config) # instead of appending to them. # @return [Boolean] setting :replace_associations + # Maximum publishing batch size for this producer. + # @return [Integer] + setting :max_batch_size end setting_object :consumer do diff --git a/lib/deimos/producer.rb b/lib/deimos/producer.rb index 6b021525..646fb333 100644 --- a/lib/deimos/producer.rb +++ b/lib/deimos/producer.rb @@ -59,16 +59,14 @@ def producers_disabled?(producer_class=nil) class Producer include SharedConfig - # @return [Integer] - MAX_BATCH_SIZE = 500 - class << self # @return [Hash] def config @config ||= { encode_key: true, - namespace: Deimos.config.producers.schema_namespace + namespace: Deimos.config.producers.schema_namespace, + max_batch_size: Deimos.config.producers.max_batch_size } end @@ -92,6 +90,12 @@ def partition_key(_payload) nil end + # @param size [Integer] Override the default batch size for publishing. + # @return [void] + def max_batch_size(size) + config[:max_batch_size] = size + end + # Publish the payload to the topic. # @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key. # @param topic [String] if specifying the topic @@ -126,7 +130,7 @@ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, head ) do messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) } messages.each { |m| _process_message(m, topic) } - messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch| + messages.in_groups_of(self.config[:max_batch_size], false) do |batch| self.produce_batch(backend_class, batch) end end diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index c55fdc23..a841d3be 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -72,6 +72,14 @@ def self.partition_key(payload) end stub_const('MyNoTopicProducer', producer_class) + producer_class = Class.new(Deimos::Producer) do + schema 'MySchema' + namespace 'com.my-namespace' + topic 'my-topic' + key_config field: 'test_id' + max_batch_size 1 + end + stub_const('MySmallBatchProducer', producer_class) end it 'should fail on invalid message with error handler' do @@ -607,5 +615,30 @@ def self.partition_key(payload) end end + describe "max_batch_size" do + it 'should use top-level default value if max_batch_size is not defined by the producer' do + expect(MyProducer.config[:max_batch_size]).to eq(500) + end + + it 'should call produce_batch multiple times when max_batch_size < records size' do + Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 }, + MySmallBatchProducer, + topic: 'my-topic', + partition_key: 'foo', + key: 'foo') + Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 }, + MySmallBatchProducer, + topic: 'my-topic', + partition_key: 'bar', + key: 'bar') + expect(described_class).to receive(:produce_batch).twice + + MySmallBatchProducer.publish_list( + [{ 'test_id' => 'foo', 'some_int' => 123 }, + { 'test_id' => 'bar', 'some_int' => 124 }] + ) + end + end + end end