From ac779b12eff63dad0d51cde730412f06c3e07ca4 Mon Sep 17 00:00:00 2001 From: Ariana Jung Date: Tue, 7 May 2024 14:00:09 -0400 Subject: [PATCH 01/10] Allow producer max_batch_size to be configurable --- CHANGELOG.md | 2 +- docs/CONFIGURATION.md | 1 + lib/deimos/config/configuration.rb | 4 +++ lib/deimos/producer.rb | 5 +--- spec/producer_spec.rb | 40 ++++++++++++++++++++---------- 5 files changed, 34 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a6ceb31c..774140e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,9 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## UNRELEASED +- Feature: Added `max_batch_size` config to producer to allow custom batch size for publishing. # 1.24.3 - 2024-05-13 - - Feature: Enable `producers.persistent_connections` phobos setting - Feature: Add consumer configuration, `save_associations_first` to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index fb7f6e63..fb3ea87b 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -198,6 +198,7 @@ producers.schema_namespace|nil|Default namespace for all producers. Can remain n producers.topic_prefix|nil|Add a prefix to all topic names. This can be useful if you're using the same Kafka broker for different environments that are producing the same topics. producers.disabled|false|Disable all actual message producing. Generally more useful to use the `disable_producers` method instead. producers.backend|`:kafka_async`|Currently can be set to `:db`, `:kafka`, or `:kafka_async`. If using Kafka directly, a good pattern is to set to async in your user-facing app, and sync in your consumers or delayed workers. +producers.max_batch_size|500|Maximum batch size for publishing. ## Schema Configuration diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index 9a0abaf2..e4a44b7d 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -344,6 +344,10 @@ def self.configure_producer_or_consumer(kafka_config) # sync in your consumers or delayed workers. # @return [Symbol] setting :backend, :kafka_async + + # Maximum batch size for publishing. + # @return [Integer] + setting :max_batch_size, 500 end setting :schema do diff --git a/lib/deimos/producer.rb b/lib/deimos/producer.rb index 6b021525..80c70620 100644 --- a/lib/deimos/producer.rb +++ b/lib/deimos/producer.rb @@ -59,9 +59,6 @@ def producers_disabled?(producer_class=nil) class Producer include SharedConfig - # @return [Integer] - MAX_BATCH_SIZE = 500 - class << self # @return [Hash] @@ -126,7 +123,7 @@ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, head ) do messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) } messages.each { |m| _process_message(m, topic) } - messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch| + messages.in_groups_of(Deimos.config.producers.max_batch_size, false) do |batch| self.produce_batch(backend_class, batch) end end diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index c55fdc23..b4f33490 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -74,6 +74,21 @@ def self.partition_key(payload) end + let(:my_topic_records) do + [ + Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 }, + MyProducer, + topic: 'my-topic', + partition_key: 'foo', + key: 'foo'), + Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 }, + MyProducer, + topic: 'my-topic', + partition_key: 'bar', + key: 'bar') + ] + end + it 'should fail on invalid message with error handler' do subscriber = Deimos.subscribe('produce') do |event| expect(event.payload[:payloads]).to eq([{ 'invalid' => 'key' }]) @@ -86,19 +101,7 @@ def self.partition_key(payload) it 'should produce a message' do expect(described_class).to receive(:produce_batch).once.with( - Deimos::Backends::Test, - [ - Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 }, - MyProducer, - topic: 'my-topic', - partition_key: 'foo', - key: 'foo'), - Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 }, - MyProducer, - topic: 'my-topic', - partition_key: 'bar', - key: 'bar') - ] + Deimos::Backends::Test, my_topic_records ).and_call_original MyProducer.publish_list( @@ -110,6 +113,17 @@ def self.partition_key(payload) expect('my-topic').not_to have_sent('test_id' => 'foo2', 'some_int' => 123) end + it 'should call produce_batch multiple times when max_batch_size < records size' do + max_batch_size = my_topic_records.size - 1 + Deimos.configure { producers.max_batch_size = max_batch_size } + expect(described_class).to receive(:produce_batch).twice + + MyProducer.publish_list( + [{ 'test_id' => 'foo', 'some_int' => 123 }, + { 'test_id' => 'bar', 'some_int' => 124 }] + ) + end + it 'should allow setting the topic and headers from publish_list' do expect(described_class).to receive(:produce_batch).once.with( Deimos::Backends::Test, From dd2b1fa62fd59c1d49a49f1db26af36122066eb3 Mon Sep 17 00:00:00 2001 From: Ariana Jung Date: Tue, 7 May 2024 15:51:34 -0400 Subject: [PATCH 02/10] Allow on per producer level --- CHANGELOG.md | 8 ++++ docs/CONFIGURATION.md | 4 +- lib/deimos/config/configuration.rb | 5 +- lib/deimos/producer.rb | 11 ++++- spec/producer_spec.rb | 73 +++++++++++++++++++----------- 5 files changed, 70 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 774140e1..fc2f8dbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## UNRELEASED +<<<<<<< HEAD +======= +<<<<<<< HEAD +- Feature: Enable `producers.persistent_connections` phobos setting +>>>>>>> 3ea5999 (Allow on per producer level) - Feature: Added `max_batch_size` config to producer to allow custom batch size for publishing. +======= +- Feature: Added `max_batch_size` config to allow custom batch size for publishing per producer. +>>>>>>> 4806c5b (Allow on per producer level) # 1.24.3 - 2024-05-13 - Feature: Enable `producers.persistent_connections` phobos setting diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index fb3ea87b..3f3ce7c1 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -46,6 +46,8 @@ schema|nil|Name of the schema to use to encode data before producing. namespace|nil|Namespace of the schema to use when finding it locally. key_config|nil|Configuration hash for message keys. See [Kafka Message Keys](../README.md#installation) use_schema_classes|nil|Set to true or false to enable or disable using the producers schema classes. See [Generated Schema Classes](../README.md#generated-schema-classes) +max_batch_size|500|Maximum publishing batch size. Defaults to top-level configuration of 500. + ## Defining Consumers @@ -198,7 +200,7 @@ producers.schema_namespace|nil|Default namespace for all producers. Can remain n producers.topic_prefix|nil|Add a prefix to all topic names. This can be useful if you're using the same Kafka broker for different environments that are producing the same topics. producers.disabled|false|Disable all actual message producing. Generally more useful to use the `disable_producers` method instead. producers.backend|`:kafka_async`|Currently can be set to `:db`, `:kafka`, or `:kafka_async`. If using Kafka directly, a good pattern is to set to async in your user-facing app, and sync in your consumers or delayed workers. -producers.max_batch_size|500|Maximum batch size for publishing. +producers.max_batch_size|500|Maximum batch size for publishing. Individual producers can override. ## Schema Configuration diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index e4a44b7d..18b2a9c4 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -345,7 +345,7 @@ def self.configure_producer_or_consumer(kafka_config) # @return [Symbol] setting :backend, :kafka_async - # Maximum batch size for publishing. + # Maximum publishing batch size. Individual producers can override. # @return [Integer] setting :max_batch_size, 500 end @@ -440,6 +440,9 @@ def self.configure_producer_or_consumer(kafka_config) # instead of appending to them. # @return [Boolean] setting :replace_associations + # Maximum publishing batch size for this producer. + # @return [Integer] + setting :max_batch_size, 500 end setting_object :consumer do diff --git a/lib/deimos/producer.rb b/lib/deimos/producer.rb index 80c70620..646fb333 100644 --- a/lib/deimos/producer.rb +++ b/lib/deimos/producer.rb @@ -65,7 +65,8 @@ class << self def config @config ||= { encode_key: true, - namespace: Deimos.config.producers.schema_namespace + namespace: Deimos.config.producers.schema_namespace, + max_batch_size: Deimos.config.producers.max_batch_size } end @@ -89,6 +90,12 @@ def partition_key(_payload) nil end + # @param size [Integer] Override the default batch size for publishing. + # @return [void] + def max_batch_size(size) + config[:max_batch_size] = size + end + # Publish the payload to the topic. # @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key. # @param topic [String] if specifying the topic @@ -123,7 +130,7 @@ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, head ) do messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) } messages.each { |m| _process_message(m, topic) } - messages.in_groups_of(Deimos.config.producers.max_batch_size, false) do |batch| + messages.in_groups_of(self.config[:max_batch_size], false) do |batch| self.produce_batch(backend_class, batch) end end diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index b4f33490..97de7d37 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -72,21 +72,14 @@ def self.partition_key(payload) end stub_const('MyNoTopicProducer', producer_class) - end - - let(:my_topic_records) do - [ - Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 }, - MyProducer, - topic: 'my-topic', - partition_key: 'foo', - key: 'foo'), - Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 }, - MyProducer, - topic: 'my-topic', - partition_key: 'bar', - key: 'bar') - ] + producer_class = Class.new(Deimos::Producer) do + schema 'MySchema' + namespace 'com.my-namespace' + topic 'my-topic' + key_config field: 'test_id' + max_batch_size 1 + end + stub_const('MySmallBatchProducer', producer_class) end it 'should fail on invalid message with error handler' do @@ -101,7 +94,19 @@ def self.partition_key(payload) it 'should produce a message' do expect(described_class).to receive(:produce_batch).once.with( - Deimos::Backends::Test, my_topic_records + Deimos::Backends::Test, + [ + Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 }, + MyProducer, + topic: 'my-topic', + partition_key: 'foo', + key: 'foo'), + Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 }, + MyProducer, + topic: 'my-topic', + partition_key: 'bar', + key: 'bar') + ] ).and_call_original MyProducer.publish_list( @@ -113,17 +118,6 @@ def self.partition_key(payload) expect('my-topic').not_to have_sent('test_id' => 'foo2', 'some_int' => 123) end - it 'should call produce_batch multiple times when max_batch_size < records size' do - max_batch_size = my_topic_records.size - 1 - Deimos.configure { producers.max_batch_size = max_batch_size } - expect(described_class).to receive(:produce_batch).twice - - MyProducer.publish_list( - [{ 'test_id' => 'foo', 'some_int' => 123 }, - { 'test_id' => 'bar', 'some_int' => 124 }] - ) - end - it 'should allow setting the topic and headers from publish_list' do expect(described_class).to receive(:produce_batch).once.with( Deimos::Backends::Test, @@ -621,5 +615,30 @@ def self.partition_key(payload) end end + describe "max_batch_size" do + it 'should default to publishing batch size of 500' do + expect(MyProducer.config[:max_batch_size]).to eq(500) + end + + it 'should call produce_batch multiple times when max_batch_size < records size' do + Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 }, + MySmallBatchProducer, + topic: 'my-topic', + partition_key: 'foo', + key: 'foo') + Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 }, + MySmallBatchProducer, + topic: 'my-topic', + partition_key: 'bar', + key: 'bar') + expect(described_class).to receive(:produce_batch).twice + + MySmallBatchProducer.publish_list( + [{ 'test_id' => 'foo', 'some_int' => 123 }, + { 'test_id' => 'bar', 'some_int' => 124 }] + ) + end + end + end end From 3bc9228cce6c3a88772807ed285739966a2bfd5c Mon Sep 17 00:00:00 2001 From: Ariana Jung Date: Tue, 7 May 2024 15:54:18 -0400 Subject: [PATCH 03/10] small fix --- lib/deimos/config/configuration.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index 18b2a9c4..ac73e6b0 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -442,7 +442,7 @@ def self.configure_producer_or_consumer(kafka_config) setting :replace_associations # Maximum publishing batch size for this producer. # @return [Integer] - setting :max_batch_size, 500 + setting :max_batch_size end setting_object :consumer do From c6dc831dab25a52c03f7d2b55731ea2117ec8e0c Mon Sep 17 00:00:00 2001 From: Ariana Jung Date: Tue, 7 May 2024 15:56:19 -0400 Subject: [PATCH 04/10] wording fix --- spec/producer_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index 97de7d37..a841d3be 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -616,7 +616,7 @@ def self.partition_key(payload) end describe "max_batch_size" do - it 'should default to publishing batch size of 500' do + it 'should use top-level default value if max_batch_size is not defined by the producer' do expect(MyProducer.config[:max_batch_size]).to eq(500) end From d0cb06f4f83dc90fca6850a7ddac0481f7ad5131 Mon Sep 17 00:00:00 2001 From: Ariana Jung Date: Tue, 7 May 2024 16:13:33 -0400 Subject: [PATCH 05/10] docs change --- docs/CONFIGURATION.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 3f3ce7c1..8415be64 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -46,7 +46,6 @@ schema|nil|Name of the schema to use to encode data before producing. namespace|nil|Namespace of the schema to use when finding it locally. key_config|nil|Configuration hash for message keys. See [Kafka Message Keys](../README.md#installation) use_schema_classes|nil|Set to true or false to enable or disable using the producers schema classes. See [Generated Schema Classes](../README.md#generated-schema-classes) -max_batch_size|500|Maximum publishing batch size. Defaults to top-level configuration of 500. ## Defining Consumers From 79baa2b87f7d55163f805ae50865c64b24c154fe Mon Sep 17 00:00:00 2001 From: Ariana Jung Date: Tue, 7 May 2024 16:15:27 -0400 Subject: [PATCH 06/10] docs change --- CHANGELOG.md | 8 -------- 1 file changed, 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc2f8dbe..774140e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,15 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## UNRELEASED -<<<<<<< HEAD -======= -<<<<<<< HEAD -- Feature: Enable `producers.persistent_connections` phobos setting ->>>>>>> 3ea5999 (Allow on per producer level) - Feature: Added `max_batch_size` config to producer to allow custom batch size for publishing. -======= -- Feature: Added `max_batch_size` config to allow custom batch size for publishing per producer. ->>>>>>> 4806c5b (Allow on per producer level) # 1.24.3 - 2024-05-13 - Feature: Enable `producers.persistent_connections` phobos setting From 50e67b0082203eba4dce82a1629d1a3e2e01213b Mon Sep 17 00:00:00 2001 From: Ariana Jung Date: Tue, 7 May 2024 16:20:51 -0400 Subject: [PATCH 07/10] whitespace --- docs/CONFIGURATION.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 8415be64..8057ec5e 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -47,7 +47,6 @@ namespace|nil|Namespace of the schema to use when finding it locally. key_config|nil|Configuration hash for message keys. See [Kafka Message Keys](../README.md#installation) use_schema_classes|nil|Set to true or false to enable or disable using the producers schema classes. See [Generated Schema Classes](../README.md#generated-schema-classes) - ## Defining Consumers Consumers are defined almost identically to producers: From 3d495d3c86aa0dc6ec6d44434bea72e2bc68301a Mon Sep 17 00:00:00 2001 From: Ariana Jung Date: Wed, 8 May 2024 15:19:02 -0400 Subject: [PATCH 08/10] CR comment (rspecs broken) --- docs/CONFIGURATION.md | 1 + lib/deimos/config/configuration.rb | 2 ++ lib/deimos/producer.rb | 9 +-------- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 8057ec5e..85098edb 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -46,6 +46,7 @@ schema|nil|Name of the schema to use to encode data before producing. namespace|nil|Namespace of the schema to use when finding it locally. key_config|nil|Configuration hash for message keys. See [Kafka Message Keys](../README.md#installation) use_schema_classes|nil|Set to true or false to enable or disable using the producers schema classes. See [Generated Schema Classes](../README.md#generated-schema-classes) +max_batch_size|500|Maximum publishing batch size. Defaults to top-level configuration of 500. ## Defining Consumers diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index ac73e6b0..533c392f 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -100,6 +100,8 @@ def self.configure_producer_or_consumer(kafka_config) Deimos.config.consumers.bulk_import_id_generator, save_associations_first: kafka_config.save_associations_first ) + else # producer + klass.config[:max_batch_size] = kafka_config.max_batch_size || Deimos.config.producers.max_batch_size end end end diff --git a/lib/deimos/producer.rb b/lib/deimos/producer.rb index 646fb333..ebdca27d 100644 --- a/lib/deimos/producer.rb +++ b/lib/deimos/producer.rb @@ -65,8 +65,7 @@ class << self def config @config ||= { encode_key: true, - namespace: Deimos.config.producers.schema_namespace, - max_batch_size: Deimos.config.producers.max_batch_size + namespace: Deimos.config.producers.schema_namespace } end @@ -90,12 +89,6 @@ def partition_key(_payload) nil end - # @param size [Integer] Override the default batch size for publishing. - # @return [void] - def max_batch_size(size) - config[:max_batch_size] = size - end - # Publish the payload to the topic. # @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key. # @param topic [String] if specifying the topic From 115cbbbc1baec37ab2e9e1fea7f45532dbc0013a Mon Sep 17 00:00:00 2001 From: Ariana Jung Date: Mon, 13 May 2024 13:34:29 -0400 Subject: [PATCH 09/10] Revert "CR comment (rspecs broken)" This reverts commit 0999eea24bcc9def2848aa7340303db0315747aa. --- docs/CONFIGURATION.md | 1 - lib/deimos/config/configuration.rb | 2 -- lib/deimos/producer.rb | 9 ++++++++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 85098edb..8057ec5e 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -46,7 +46,6 @@ schema|nil|Name of the schema to use to encode data before producing. namespace|nil|Namespace of the schema to use when finding it locally. key_config|nil|Configuration hash for message keys. See [Kafka Message Keys](../README.md#installation) use_schema_classes|nil|Set to true or false to enable or disable using the producers schema classes. See [Generated Schema Classes](../README.md#generated-schema-classes) -max_batch_size|500|Maximum publishing batch size. Defaults to top-level configuration of 500. ## Defining Consumers diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index 533c392f..ac73e6b0 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -100,8 +100,6 @@ def self.configure_producer_or_consumer(kafka_config) Deimos.config.consumers.bulk_import_id_generator, save_associations_first: kafka_config.save_associations_first ) - else # producer - klass.config[:max_batch_size] = kafka_config.max_batch_size || Deimos.config.producers.max_batch_size end end end diff --git a/lib/deimos/producer.rb b/lib/deimos/producer.rb index ebdca27d..646fb333 100644 --- a/lib/deimos/producer.rb +++ b/lib/deimos/producer.rb @@ -65,7 +65,8 @@ class << self def config @config ||= { encode_key: true, - namespace: Deimos.config.producers.schema_namespace + namespace: Deimos.config.producers.schema_namespace, + max_batch_size: Deimos.config.producers.max_batch_size } end @@ -89,6 +90,12 @@ def partition_key(_payload) nil end + # @param size [Integer] Override the default batch size for publishing. + # @return [void] + def max_batch_size(size) + config[:max_batch_size] = size + end + # Publish the payload to the topic. # @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key. # @param topic [String] if specifying the topic From e6982200dd74fe1c800d8e76d3458d4758c6be96 Mon Sep 17 00:00:00 2001 From: Ariana Jung Date: Mon, 13 May 2024 13:36:20 -0400 Subject: [PATCH 10/10] Update documentation --- docs/CONFIGURATION.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 8057ec5e..85098edb 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -46,6 +46,7 @@ schema|nil|Name of the schema to use to encode data before producing. namespace|nil|Namespace of the schema to use when finding it locally. key_config|nil|Configuration hash for message keys. See [Kafka Message Keys](../README.md#installation) use_schema_classes|nil|Set to true or false to enable or disable using the producers schema classes. See [Generated Schema Classes](../README.md#generated-schema-classes) +max_batch_size|500|Maximum publishing batch size. Defaults to top-level configuration of 500. ## Defining Consumers