From ee60f459b202de16073c612f6dceebd06fbbd603 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Tue, 24 Sep 2024 18:49:57 +0100 Subject: [PATCH] Rename `kafka-migrator` to `redpanda-migrator` Signed-off-by: Mihai Todor --- modules/ROOT/nav.adoc | 2 +- ...a_migrator.adoc => redpanda_migrator.adoc} | 22 +++--- ...dle.adoc => redpanda_migrator_bundle.adoc} | 12 +-- ...a_migrator.adoc => redpanda_migrator.adoc} | 52 ++++++++++--- ...dle.adoc => redpanda_migrator_bundle.adoc} | 12 +-- ...ts.adoc => redpanda_migrator_offsets.adoc} | 12 +-- ...a_migrator.adoc => redpanda_migrator.adoc} | 77 +++++++++---------- 7 files changed, 108 insertions(+), 81 deletions(-) rename modules/components/pages/inputs/{kafka_migrator.adoc => redpanda_migrator.adoc} (94%) rename modules/components/pages/inputs/{kafka_migrator_bundle.adoc => redpanda_migrator_bundle.adoc} (57%) rename modules/components/pages/outputs/{kafka_migrator.adoc => redpanda_migrator.adoc} (91%) rename modules/components/pages/outputs/{kafka_migrator_bundle.adoc => redpanda_migrator_bundle.adoc} (65%) rename modules/components/pages/outputs/{kafka_migrator_offsets.adoc => redpanda_migrator_offsets.adoc} (96%) rename modules/cookbooks/pages/{kafka_migrator.adoc => redpanda_migrator.adoc} (82%) diff --git a/modules/ROOT/nav.adoc b/modules/ROOT/nav.adoc index dcbb9ca..42f201f 100644 --- a/modules/ROOT/nav.adoc +++ b/modules/ROOT/nav.adoc @@ -314,7 +314,7 @@ ** xref:cookbooks:filtering.adoc[] ** xref:cookbooks:joining_streams.adoc[] ** xref:cookbooks:rag.adoc[] -** xref:cookbooks:kafka_migrator.adoc[] +** xref:cookbooks:redpanda_migrator.adoc[] diff --git a/modules/components/pages/inputs/kafka_migrator.adoc b/modules/components/pages/inputs/redpanda_migrator.adoc similarity index 94% rename from modules/components/pages/inputs/kafka_migrator.adoc rename to modules/components/pages/inputs/redpanda_migrator.adoc index 959ff26..0d73560 100644 --- a/modules/components/pages/inputs/kafka_migrator.adoc +++ b/modules/components/pages/inputs/redpanda_migrator.adoc @@ -1,4 +1,4 @@ -= kafka_migrator += redpanda_migrator // tag::single-source[] :type: input :status: beta @@ -9,11 +9,11 @@ component_type_dropdown::[] -Use this connector in conjunction with the xref:components:outputs/kafka_migrator.adoc[`kafka_migrator` output] to migrate topics between Apache Kafka brokers. The `kafka_migrator` input uses the https://github.com/twmb/franz-go[Franz Kafka client library^]. +Use this connector in conjunction with the xref:components:outputs/redpanda_migrator.adoc[`redpanda_migrator` output] to migrate topics between Apache Kafka brokers. The `redpanda_migrator` input uses the https://github.com/twmb/franz-go[Franz Kafka client library^]. ifndef::env-cloud[] -Introduced in version 4.35.0. +Introduced in version 4.37.0. endif::[] [tabs] @@ -26,7 +26,7 @@ Common:: # Common configuration fields, showing default values input: label: "" - kafka_migrator: + redpanda_migrator: seed_brokers: [] # No default (required) topics: [] # No default (required) regexp_topics: false @@ -43,7 +43,7 @@ Advanced:: # All configuration fields, showing default values input: label: "" - kafka_migrator: + redpanda_migrator: seed_brokers: [] # No default (required) topics: [] # No default (required) regexp_topics: false @@ -70,23 +70,23 @@ input: check: "" processors: [] # No default (optional) topic_lag_refresh_period: 5s - output_resource: kafka_migrator_output + output_resource: redpanda_migrator_output ``` -- ====== -The `kafka_migrator` input: +The `redpanda_migrator` input: * Reads a batch of messages from a Kafka broker. * Attempts to create all selected topics along with their associated ACLs in the broker that the output points to, identified by the label specified in `output_resource`. -* Waits for the `kafka_migrator` output to acknowledge the writes before updating the Kafka consumer group offset. +* Waits for the `redpanda_migrator` output to acknowledge the writes before updating the Kafka consumer group offset. Specify a consumer group for this input to consume one or more topics and automatically balance the topic partitions across any other connected clients with the same consumer group. Otherwise, topics are consumed in their entirety or with explicit partitions. == Metrics -This input emits a `input_kafka_migrator_lag` metric with `topic` and `partition` labels for each consumed topic. +This input emits a `input_redpanda_migrator_lag` metric with `topic` and `partition` labels for each consumed topic. == Metadata @@ -698,11 +698,11 @@ The period of time between each topic lag refresh cycle. === `output_resource` -The label of the `kafka_migrator` output in which the currently selected topics need to be created before attempting to read messages. +The label of the `redpanda_migrator` output in which the currently selected topics need to be created before attempting to read messages. *Type*: `string` -*Default*: `"kafka_migrator_output"` +*Default*: `"redpanda_migrator_output"` // end::single-source[] diff --git a/modules/components/pages/inputs/kafka_migrator_bundle.adoc b/modules/components/pages/inputs/redpanda_migrator_bundle.adoc similarity index 57% rename from modules/components/pages/inputs/kafka_migrator_bundle.adoc rename to modules/components/pages/inputs/redpanda_migrator_bundle.adoc index d4b70c1..496c414 100644 --- a/modules/components/pages/inputs/kafka_migrator_bundle.adoc +++ b/modules/components/pages/inputs/redpanda_migrator_bundle.adoc @@ -1,4 +1,4 @@ -= kafka_migrator_bundle += redpanda_migrator_bundle // tag::single-source[] :type: input :status: experimental @@ -10,14 +10,14 @@ component_type_dropdown::[] -The `kafka_migrator_bundle` input reads messages and schemas from an Apache Kafka or Redpanda cluster. Use this input in conjunction with the xref:components:outputs/kafka_migrator_bundle.adoc[`kafka_migrator_bundle` output]. +The `redpanda_migrator_bundle` input reads messages and schemas from an Apache Kafka or Redpanda cluster. Use this input in conjunction with the xref:components:outputs/redpanda_migrator_bundle.adoc[`redpanda_migrator_bundle` output]. ```yml # Config fields, showing default values input: label: "" - kafka_migrator_bundle: - kafka_migrator: {} # No default (required) + redpanda_migrator_bundle: + redpanda_migrator: {} # No default (required) schema_registry: {} # No default (required) migrate_schemas_before_data: true ``` @@ -25,9 +25,9 @@ input: == Fields -=== `kafka_migrator` +=== `redpanda_migrator` -The xref:components:inputs/kafka_migrator.adoc[`kafka_migrator` input] configuration. +The xref:components:inputs/redpanda_migrator.adoc[`redpanda_migrator` input] configuration. *Type*: `object` diff --git a/modules/components/pages/outputs/kafka_migrator.adoc b/modules/components/pages/outputs/redpanda_migrator.adoc similarity index 91% rename from modules/components/pages/outputs/kafka_migrator.adoc rename to modules/components/pages/outputs/redpanda_migrator.adoc index ef6eccf..b8fa618 100644 --- a/modules/components/pages/outputs/kafka_migrator.adoc +++ b/modules/components/pages/outputs/redpanda_migrator.adoc @@ -1,4 +1,4 @@ -= kafka_migrator += redpanda_migrator // tag::single-source[] :type: output :status: beta @@ -11,10 +11,10 @@ component_type_dropdown::[] Writes a batch of messages to an Apache Kafka broker and waits for acknowledgement before propagating them back to the input. -Use this connector in conjunction with the xref:components:inputs/kakfa_migrator[`kafka_migrator` input] to migrate topics between Kafka brokers. The `kafka_migrator` output uses the the https://github.com/twmb/franz-go[Franz Kafka client library^]. +Use this connector in conjunction with the xref:components:inputs/kakfa_migrator[`redpanda_migrator` input] to migrate topics between Kafka brokers. The `redpanda_migrator` output uses the the https://github.com/twmb/franz-go[Franz Kafka client library^]. ifndef::env-cloud[] -Introduced in version 4.35.0. +Introduced in version 4.37.0. endif::[] [tabs] @@ -27,7 +27,7 @@ Common:: # Common config fields, showing default values output: label: "" - kafka_migrator: + redpanda_migrator: seed_brokers: [] # No default (required) topic: "" # No default (required) key: "" # No default (optional) @@ -52,7 +52,7 @@ Advanced:: # All config fields, showing default values output: label: "" - kafka_migrator: + redpanda_migrator: seed_brokers: [] # No default (required) topic: "" # No default (required) key: "" # No default (optional) @@ -84,18 +84,26 @@ output: client_certs: [] sasl: [] # No default (optional) timestamp: ${! timestamp_unix() } # No default (optional) - input_resource: kafka_migrator_input + input_resource: redpanda_migrator_input + replication_factor_override: true + replication_factor: 3 ``` -- ====== -This output can query the `kafka_migrator` input for topic and ACL configurations. +This output can query the `redpanda_migrator` input for topic and ACL configurations. If the configured broker does not contain the current message topic, this output attempts to create it along with the topic -ACLs, which are automatically read from the `kafka_migrator` input, identified by the label specified in +ACLs, which are automatically read from the `redpanda_migrator` input, identified by the label specified in `input_resource`. +ACL migration adheres to the following principles: + +- `ALLOW WRITE` ACLs for topics are not migrated +- `ALLOW ALL` ACLs for topics are downgraded to `ALLOW READ` +- Only topic ACLs are migrated, group ACLs are not migrated + == Examples @@ -109,14 +117,14 @@ Writes messages to the configured broker and creates topics and topic ACLs if th ```yaml output: - kafka_migrator: + redpanda_migrator: seed_brokers: [ "127.0.0.1:9093" ] topic: ${! metadata("kafka_topic").or(throw("missing kafka_topic metadata")) } key: ${! metadata("kafka_key") } partitioner: manual partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) } timestamp: ${! metadata("kafka_timestamp_unix").or(timestamp_unix()) } - input_resource: kafka_migrator_input + input_resource: redpanda_migrator_input max_in_flight: 1 ``` @@ -808,11 +816,31 @@ timestamp: ${! metadata("kafka_timestamp_unix") } === `input_resource` -The label of the `kafka_migrator` input from which to read the configurations of topics and ACLs for creation. +The label of the `redpanda_migrator` input from which to read the configurations of topics and ACLs for creation. *Type*: `string` -*Default*: `"kafka_migrator_input"` +*Default*: `"redpanda_migrator_input"` + + +=== `replication_factor_override` + +Use the specified replication factor when creating topics. + + +*Type*: `bool` + +*Default*: `true` + + +=== `replication_factor` + +Replication factor for created topics. This is only used when `replication_factor_override` is set to `true`. + + +*Type*: `int` + +*Default*: `3` // end::single-source[] diff --git a/modules/components/pages/outputs/kafka_migrator_bundle.adoc b/modules/components/pages/outputs/redpanda_migrator_bundle.adoc similarity index 65% rename from modules/components/pages/outputs/kafka_migrator_bundle.adoc rename to modules/components/pages/outputs/redpanda_migrator_bundle.adoc index 654c801..6e3ee2a 100644 --- a/modules/components/pages/outputs/kafka_migrator_bundle.adoc +++ b/modules/components/pages/outputs/redpanda_migrator_bundle.adoc @@ -1,4 +1,4 @@ -= kafka_migrator_bundle += redpanda_migrator_bundle // tag::single-source[] :type: output :status: experimental @@ -10,23 +10,23 @@ component_type_dropdown::[] -Writes messages and schemas to an Apache Kafka or Redpanda cluster. Use this output in conjunction with the `kafka_migrator_bundle` input. +Writes messages and schemas to an Apache Kafka or Redpanda cluster. Use this output in conjunction with the `redpanda_migrator_bundle` input. ```yml # Config fields, showing default values output: label: "" - kafka_migrator_bundle: - kafka_migrator: {} # No default (required) + redpanda_migrator_bundle: + redpanda_migrator: {} # No default (required) schema_registry: {} # No default (required) ``` == Fields -=== `kafka_migrator` +=== `redpanda_migrator` -The xref:components:outputs/kafka_migrator.adoc[`kafka_migrator` output] configuration. +The xref:components:outputs/redpanda_migrator.adoc[`redpanda_migrator` output] configuration. *Type*: `object` diff --git a/modules/components/pages/outputs/kafka_migrator_offsets.adoc b/modules/components/pages/outputs/redpanda_migrator_offsets.adoc similarity index 96% rename from modules/components/pages/outputs/kafka_migrator_offsets.adoc rename to modules/components/pages/outputs/redpanda_migrator_offsets.adoc index 5d9c6f2..dd9d15a 100644 --- a/modules/components/pages/outputs/kafka_migrator_offsets.adoc +++ b/modules/components/pages/outputs/redpanda_migrator_offsets.adoc @@ -1,4 +1,4 @@ -= kafka_migrator_offsets += redpanda_migrator_offsets // tag::single-source[] :type: output :status: beta @@ -11,12 +11,12 @@ component_type_dropdown::[] -Use the `kafka_migrator_offsets` output in conjunction with the `kafka_franz` input that is configured to read the `__consumer_offsets` topic. +Use the `redpanda_migrator_offsets` output in conjunction with the `kafka_franz` input that is configured to read the `__consumer_offsets` topic. This output uses the https://github.com/twmb/franz-go[Franz Kafka client library^]. ifndef::env-cloud[] -Introduced in version 4.35.0. +Introduced in version 4.37.0. endif::[] [tabs] @@ -29,7 +29,7 @@ Common:: # Common config fields, showing default values output: label: "" - kafka_migrator_offsets: + redpanda_migrator_offsets: seed_brokers: [] # No default (required) kafka_key: ${! @kafka_key } max_in_flight: 1 @@ -44,7 +44,7 @@ Advanced:: # All config fields, showing default values output: label: "" - kafka_migrator_offsets: + redpanda_migrator_offsets: seed_brokers: [] # No default (required) kafka_key: ${! @kafka_key } client_id: benthos @@ -549,4 +549,4 @@ The maximum period to wait before retry attempts are abandoned. If set to `0`, t *Default*: `"30s"` -// end::single-source[] \ No newline at end of file +// end::single-source[] diff --git a/modules/cookbooks/pages/kafka_migrator.adoc b/modules/cookbooks/pages/redpanda_migrator.adoc similarity index 82% rename from modules/cookbooks/pages/kafka_migrator.adoc rename to modules/cookbooks/pages/redpanda_migrator.adoc index 985bf31..6d0df5a 100644 --- a/modules/cookbooks/pages/kafka_migrator.adoc +++ b/modules/cookbooks/pages/redpanda_migrator.adoc @@ -1,24 +1,23 @@ -= Kafka Migrator - -:description: Move your workloads from any Kafka system to Redpanda using a single command. Kafka Migrator lets you migrate Kafka messages, schemas, and ACLs quickly and efficiently. += Redpanda Migrator +:description: Move your workloads from any Kafka system to Redpanda using a single command. Redpanda Migrator lets you migrate Kafka messages, schemas, and ACLs quickly and efficiently. // tag::single-source[] -With Kafka Migrator, you can move your workloads from any Apache Kafka system to Redpanda using a single command. It lets you migrate Kafka messages, schemas, and ACLs quickly and efficiently. +With Redpanda Migrator, you can move your workloads from any Apache Kafka system to Redpanda using a single command. It lets you migrate Kafka messages, schemas, and ACLs quickly and efficiently. -Redpanda Connect's Kafka Migrator uses functionality from the following components: +Redpanda Connect's Redpanda Migrator uses functionality from the following components: -- xref:components:inputs/kafka_migrator.adoc[`kafka_migrator` input] -- xref:components:outputs/kafka_migrator.adoc[`kafka_migrator` output] +- xref:components:inputs/redpanda_migrator.adoc[`redpanda_migrator` input] +- xref:components:outputs/redpanda_migrator.adoc[`redpanda_migrator` output] - xref:components:inputs/kafka_franz.adoc[`kafka_franz` input] -- xref:components:outputs/kafka_migrator_offsets.adoc[`kafka_migrator_offsets` output] +- xref:components:outputs/redpanda_migrator_offsets.adoc[`redpanda_migrator_offsets` output] - xref:components:inputs/schema_registry.adoc[`schema_registry` input] - xref:components:outputs/schema_registry.adoc[`schema_registry` output] For convenience, these components are bundled together into the following: -- xref:components:inputs/kafka_migrator_bundle.adoc[`kafka_migrator_bundle` input] -- xref:components:outputs/kafka_migrator_bundle.adoc[`kafka_migrator_bundle` output] +- xref:components:inputs/redpanda_migrator_bundle.adoc[`redpanda_migrator_bundle` input] +- xref:components:outputs/redpanda_migrator_bundle.adoc[`redpanda_migrator_bundle` output] This cookbook shows how to use the bundled components. @@ -363,13 +362,13 @@ redpanda-connect run read_data_source.yaml At this point, the `source` cluster should have some data in both `foo` and `bar` topics, and the consumer should print the messages it reads from these topics to `stdout`. -== Configure and start Kafka Migrator +== Configure and start Redpanda Migrator -You're ready to start the new Kafka Migrator Bundle, which will do the following: +You're ready to start the new Redpanda Migrator Bundle, which will do the following: - On startup, it reads all the schemas from the `source` cluster Schema Registry through the REST API and pushes them to the destination cluster Schema Registry using the same API. It needs to preserve the schema IDs, so the `destination` cluster *must not have any schemas in it*. -- Once the schemas have been imported, Kafka Migrator begins the migration of all the selected topics from the `source` cluster, and any associated ACLs. After it finishes creating all the topics and ACLs that don't exist in the `destination` cluster, it begins the migration of messages and performs consumer group offsets remapping. -- If any new topics are created in the `source` cluster while Kafka Migrator is running, they are only migrated to the `destination` cluster if messages are written to them. +- Once the schemas have been imported, Redpanda Migrator begins the migration of all the selected topics from the `source` cluster, and any associated ACLs. After it finishes creating all the topics and ACLs that don't exist in the `destination` cluster, it begins the migration of messages and performs consumer group offsets remapping. +- If any new topics are created in the `source` cluster while Redpanda Migrator is running, they are only migrated to the `destination` cluster if messages are written to them. ACL migration for topics adheres to the following principles: @@ -379,19 +378,19 @@ ACL migration for topics adheres to the following principles: NOTE: Changing topic configurations, such as partition count, isn't currently supported. -Now, use the following Kafka Migrator Bundle configuration. See the xref:components:inputs/kafka_migrator_bundle.adoc[`kafka_migrator_bundle` input] and xref:components:outputs/kafka_migrator_bundle.adoc[`kafka_migrator_bundle` output] docs for details. +Now, use the following Redpanda Migrator Bundle configuration. See the xref:components:inputs/redpanda_migrator_bundle.adoc[`redpanda_migrator_bundle` input] and xref:components:outputs/redpanda_migrator_bundle.adoc[`redpanda_migrator_bundle` output] docs for details. -NOTE: The `max_in_flight: 1` setting is required to preserve message ordering at the partition level. See the xref:components:outputs/kafka_migrator.adoc#max_in_flight[`kafka_migrator` output documentation] for more details. +NOTE: The `max_in_flight: 1` setting is required to preserve message ordering at the partition level. See the xref:components:outputs/redpanda_migrator.adoc#max_in_flight[`redpanda_migrator` output documentation] for more details. -.`kafka_migrator_bundle.yaml` +.`redpanda_migrator_bundle.yaml` ifndef::env-cloud[] [source,yaml] ---- input: - kafka_migrator_bundle: - kafka_migrator: + redpanda_migrator_bundle: + redpanda_migrator: seed_brokers: [ "localhost:9092" ] topics: - '^[^_]' # Skip internal topics which start with `_` @@ -405,8 +404,8 @@ input: subject_filter: "" output: - kafka_migrator_bundle: - kafka_migrator: + redpanda_migrator_bundle: + redpanda_migrator: seed_brokers: [ "localhost:9093" ] max_in_flight: 1 @@ -416,7 +415,7 @@ output: metrics: prometheus: {} mapping: | - meta label = if this == "input_kafka_migrator_lag" { "source" } + meta label = if this == "input_redpanda_migrator_lag" { "source" } ---- endif::[] @@ -426,8 +425,8 @@ ifdef::env-cloud[] [source,yaml] ---- input: - kafka_migrator_bundle: - kafka_migrator: + redpanda_migrator_bundle: + redpanda_migrator: seed_brokers: [ "source.cloud.kafka.com:9092" ] topics: - '^[^_]' # Skip internal topics which start with `_` @@ -449,8 +448,8 @@ input: password: testpass output: - kafka_migrator_bundle: - kafka_migrator: + redpanda_migrator_bundle: + redpanda_migrator: seed_brokers: [ "destination.cloud.redpanda.com:9092" ] max_in_flight: 1 sasl: @@ -468,16 +467,16 @@ output: metrics: prometheus: {} mapping: | - meta label = if this == "input_kafka_migrator_lag" { "source" } + meta label = if this == "input_redpanda_migrator_lag" { "source" } ---- endif::[] -Launch the Kafka Migrator Bundle pipeline, and leave it running: +Launch the Redpanda Migrator Bundle pipeline, and leave it running: [source,console] ---- -redpanda-connect run kafka_migrator_bundle.yaml +redpanda-connect run redpanda_migrator_bundle.yaml ---- == Check the status of migrated topics @@ -524,20 +523,20 @@ endif::[] == Check metrics to monitor progress -Redpanda Connect provides a comprehensive suite of metrics in various formats, such as Prometheus, which you can use to monitor its performance in your observability stack. Besides the xref:components:metrics/about.adoc#metric-names[standard Redpanda Connect metrics], the `kafka_migrator` input also emits an `input_kafka_migrator_lag` metric for monitoring the migration progress of each topic and partition. +Redpanda Connect provides a comprehensive suite of metrics in various formats, such as Prometheus, which you can use to monitor its performance in your observability stack. Besides the xref:components:metrics/about.adoc#metric-names[standard Redpanda Connect metrics], the `redpanda_migrator` input also emits an `input_redpanda_migrator_lag` metric for monitoring the migration progress of each topic and partition. [source,console] ---- curl http://localhost:4195/metrics ... -# HELP input_kafka_migrator_lag Benthos Gauge metric -# TYPE input_kafka_migrator_lag gauge -input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0 -input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="bar"} 0 -input_kafka_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="foo"} 0 -input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0 -input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="bar"} 1 -input_kafka_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="foo"} 0 +# HELP input_redpanda_migrator_lag Benthos Gauge metric +# TYPE input_redpanda_migrator_lag gauge +input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0 +input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="bar"} 0 +input_redpanda_migrator_lag{label="source",partition="0",path="root.input.sequence.broker.inputs.0",topic="foo"} 0 +input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="__consumer_offsets"} 0 +input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="bar"} 1 +input_redpanda_migrator_lag{label="source",partition="1",path="root.input.sequence.broker.inputs.0",topic="foo"} 0 ... ---- @@ -626,7 +625,7 @@ It's worth clarifying that the `source` cluster consumer uses the same `foobar` And you're all done! -Due to the mechanics of the Kafka protocol, Kafka Migrator needs to perform offset remapping when migrating consumer group offsets to the `destination` cluster. While more sophisticated approaches are possible, Redpanda chose to use a simple timestamp-based approach. So, for each migrated offset, the `destination` cluster is queried to find the latest offset before the received offset timestamp. Kafka Migrator then writes this offset as the `destination` consumer group offset for the corresponding topic and partition pair. +Due to the mechanics of the Kafka protocol, Redpanda Migrator needs to perform offset remapping when migrating consumer group offsets to the `destination` cluster. While more sophisticated approaches are possible, Redpanda chose to use a simple timestamp-based approach. So, for each migrated offset, the `destination` cluster is queried to find the latest offset before the received offset timestamp. Redpanda Migrator then writes this offset as the `destination` consumer group offset for the corresponding topic and partition pair. Although the timestamp-based approach doesn't guarantee exactly-once delivery, it minimises the likelihood of message duplication and avoids the need for complex and error-prone offset remapping logic.