From a6399814381ba75cd3351cbd9f6c734e5243080d Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 3 Aug 2022 10:17:45 -0400 Subject: [PATCH 1/7] [topicmappr] bootstrap kafkaadmin client --- cmd/topicmappr/commands/reassignments.go | 3 ++- cmd/topicmappr/commands/rebalance.go | 12 +++++++++++- cmd/topicmappr/commands/rebuild.go | 11 ++++++++++- cmd/topicmappr/commands/rebuild_steps.go | 3 ++- cmd/topicmappr/commands/root.go | 1 + cmd/topicmappr/commands/scale.go | 12 +++++++++++- 6 files changed, 37 insertions(+), 5 deletions(-) diff --git a/cmd/topicmappr/commands/reassignments.go b/cmd/topicmappr/commands/reassignments.go index 57528f9..b9777da 100644 --- a/cmd/topicmappr/commands/reassignments.go +++ b/cmd/topicmappr/commands/reassignments.go @@ -10,6 +10,7 @@ import ( "strings" "sync" + "github.com/DataDog/kafka-kit/v4/kafkaadmin" "github.com/DataDog/kafka-kit/v4/kafkazk" "github.com/DataDog/kafka-kit/v4/mapper" @@ -80,7 +81,7 @@ func reassignParamsFromCmd(cmd *cobra.Command) (params reassignParams) { return params } -func reassign(params reassignParams, zk kafkazk.Handler) ([]*mapper.PartitionMap, []error) { +func reassign(params reassignParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handler) ([]*mapper.PartitionMap, []error) { // Get broker and partition metadata. if err := checkMetaAge(zk, params.maxMetadataAge); err != nil { fmt.Println(err) diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index 7d5fcad..4d98f4e 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -4,6 +4,8 @@ import ( "fmt" "os" + "github.com/DataDog/kafka-kit/v4/kafkaadmin" + "github.com/spf13/cobra" ) @@ -54,7 +56,15 @@ func rebalance(cmd *cobra.Command, _ []string) { defer zk.Close() - partitionMaps, errs := reassign(params, zk) + // Init kafkaadmin client. + bs := cmd.Parent().Flag("kafka-addr").Value.String() + ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs}) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + partitionMaps, errs := reassign(params, ka, zk) // Handle errors that are possible to be overridden by the user (aka 'WARN' // in topicmappr console output). diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index d8dc5bd..db77820 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -5,6 +5,7 @@ import ( "os" "regexp" + "github.com/DataDog/kafka-kit/v4/kafkaadmin" "github.com/DataDog/kafka-kit/v4/kafkazk" "github.com/spf13/cobra" @@ -150,6 +151,14 @@ func rebuild(cmd *cobra.Command, _ []string) { fmt.Println("\n[INFO] --force-rebuild disables --sub-affinity") } + // Init kafkaadmin client. + bs := cmd.Parent().Flag("kafka-addr").Value.String() + ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs}) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + // ZooKeeper init. var zk kafkazk.Handler if params.useMetadata || len(params.topics) > 0 || params.placement == "storage" { @@ -164,7 +173,7 @@ func rebuild(cmd *cobra.Command, _ []string) { defer zk.Close() } - maps, errs := runRebuild(params, zk) + maps, errs := runRebuild(params, ka, zk) // Print error/warnings. handleOverridableErrs(cmd, errs) diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index 76e11e9..25952be 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -4,11 +4,12 @@ import ( "fmt" "os" + "github.com/DataDog/kafka-kit/v4/kafkaadmin" "github.com/DataDog/kafka-kit/v4/kafkazk" "github.com/DataDog/kafka-kit/v4/mapper" ) -func runRebuild(params rebuildParams, zk kafkazk.Handler) ([]*mapper.PartitionMap, []error) { +func runRebuild(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handler) ([]*mapper.PartitionMap, []error) { // General flow: // 1) A PartitionMap is formed (either unmarshaled from the literal // map input via --rebuild-map or generated from ZooKeeper Metadata diff --git a/cmd/topicmappr/commands/root.go b/cmd/topicmappr/commands/root.go index ed17c02..6bb6a5c 100644 --- a/cmd/topicmappr/commands/root.go +++ b/cmd/topicmappr/commands/root.go @@ -23,6 +23,7 @@ func Execute() { } func init() { + rootCmd.PersistentFlags().String("kafka-addr", "localhost:9092", "Kafka bootstrap address") rootCmd.PersistentFlags().String("zk-addr", "localhost:2181", "ZooKeeper connect string") rootCmd.PersistentFlags().String("zk-prefix", "", "ZooKeeper prefix (if Kafka is configured with a chroot path prefix)") rootCmd.PersistentFlags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics") diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index 9ea1bb8..7e97975 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -4,6 +4,8 @@ import ( "fmt" "os" + "github.com/DataDog/kafka-kit/v4/kafkaadmin" + "github.com/spf13/cobra" ) @@ -52,7 +54,15 @@ func scale(cmd *cobra.Command, _ []string) { defer zk.Close() - partitionMaps, _ := reassign(params, zk) + // Init kafkaadmin client. + bs := cmd.Parent().Flag("kafka-addr").Value.String() + ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs}) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + partitionMaps, _ := reassign(params, ka, zk) // TODO intentionally not handling the one error that can be returned here // right now, but would be better to distinguish errors From 394715024859f06206b0e62c2644c95d835083e8 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 3 Aug 2022 11:35:23 -0400 Subject: [PATCH 2/7] updated healthchecks --- docker-compose.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 0683848..8664700 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -28,7 +28,7 @@ services: ports: - "2181:2181" healthcheck: - test: ["CMD", "nc", "-z", "localhost:2181"] + test: nc -z localhost 2181 || exit -1 interval: 5s timeout: 5s retries: 5 @@ -41,11 +41,11 @@ services: - "9092" - "9093" healthcheck: - test: ["CMD", "nc", "-z", "localhost:9092"] + test: nc -z localhost 9092 || exit -1 interval: 5s timeout: 5s retries: 5 - start_period: 5s + start_period: 15s depends_on: - ssl_setup - zookeeper @@ -93,7 +93,7 @@ services: - "8080" - "8090" healthcheck: - test: ["CMD", "curl", "-f", "localhost:8080/v1/brokers/list"] + test: curl -f localhost:8080/v1/brokers/list interval: 5s timeout: 5s retries: 5 From faca532a0433c41b459f56b440041e578a15fac4 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 3 Aug 2022 11:39:50 -0400 Subject: [PATCH 3/7] [kafkazk] LoadMetrics, GetBrokerMetrics methods --- kafkazk/mapper.go | 30 ++++++++++++++++++++++++++++++ kafkazk/zookeeper.go | 5 +++-- 2 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 kafkazk/mapper.go diff --git a/kafkazk/mapper.go b/kafkazk/mapper.go new file mode 100644 index 0000000..1ac6d01 --- /dev/null +++ b/kafkazk/mapper.go @@ -0,0 +1,30 @@ +package kafkazk + +import ( + "fmt" + + "github.com/DataDog/kafka-kit/v4/mapper" +) + +// LoadMetrics takes a Handler and fetches stored broker metrics, populating the +// BrokerMetaMap. +func LoadMetrics(zk Handler, bm mapper.BrokerMetaMap) []error { + metrics, err := zk.GetBrokerMetrics() + if err != nil { + return []error{err} + } + + // Populate each broker with metric data. + var errs []error + for id := range bm { + m, exists := metrics[id] + if !exists { + errs = append(errs, fmt.Errorf("Metrics not found for broker %d", id)) + bm[id].MetricsIncomplete = true + } else { + bm[id].StorageFree = m.StorageFree + } + } + + return errs +} diff --git a/kafkazk/zookeeper.go b/kafkazk/zookeeper.go index d990ba6..4ee2b5a 100644 --- a/kafkazk/zookeeper.go +++ b/kafkazk/zookeeper.go @@ -23,6 +23,7 @@ import ( // configuration methods. type Handler interface { SimpleZooKeeperClient + GetBrokerMetrics() (mapper.BrokerMetricsMap, error) GetTopicState(string) (*mapper.TopicState, error) GetTopicStateISR(string) (TopicStateISR, error) UpdateKafkaConfig(KafkaConfig) ([]bool, error) @@ -409,7 +410,7 @@ func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (mapper.BrokerMetaMap, [] // Fetch and populate in metrics. if withMetrics { - bmetrics, err := z.getBrokerMetrics() + bmetrics, err := z.GetBrokerMetrics() if err != nil { return nil, []error{err} } @@ -432,7 +433,7 @@ func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (mapper.BrokerMetaMap, [] // GetBrokerMetrics fetches broker metrics stored in ZooKeeper and returns a // BrokerMetricsMap and an error if encountered. -func (z *ZKHandler) getBrokerMetrics() (mapper.BrokerMetricsMap, error) { +func (z *ZKHandler) GetBrokerMetrics() (mapper.BrokerMetricsMap, error) { path := z.getMetricsPath("/brokermetrics") // Fetch the metrics object. From a02baf3eca48c3966da7beac07b10d54218b52f9 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 3 Aug 2022 11:40:14 -0400 Subject: [PATCH 4/7] [topicmappr] fetch broker meta with kafkaadmin --- cmd/topicmappr/commands/metadata.go | 21 +++++++++++++++++++-- cmd/topicmappr/commands/reassignments.go | 2 +- cmd/topicmappr/commands/rebuild_steps.go | 2 +- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/cmd/topicmappr/commands/metadata.go b/cmd/topicmappr/commands/metadata.go index a8feec9..0bd1761 100644 --- a/cmd/topicmappr/commands/metadata.go +++ b/cmd/topicmappr/commands/metadata.go @@ -1,10 +1,12 @@ package commands import ( + "context" "fmt" "regexp" "time" + "github.com/DataDog/kafka-kit/v4/kafkaadmin" "github.com/DataDog/kafka-kit/v4/kafkazk" "github.com/DataDog/kafka-kit/v4/mapper" ) @@ -26,8 +28,23 @@ func checkMetaAge(zk kafkazk.Handler, maxAge int) error { // getBrokerMeta returns a map of brokers and broker metadata for those // registered in ZooKeeper. Optionally, metrics metadata persisted in ZooKeeper // (via an external mechanism*) can be merged into the metadata. -func getBrokerMeta(zk kafkazk.Handler, m bool) (mapper.BrokerMetaMap, []error) { - return zk.GetAllBrokerMeta(m) +func getBrokerMeta(ka kafkaadmin.KafkaAdmin, zk kafkazk.Handler, m bool) (mapper.BrokerMetaMap, []error) { + // Get broker states. + brokerStates, err := ka.DescribeBrokers(context.Background(), false) + if err != nil { + return nil, []error{err} + } + + brokers, _ := mapper.BrokerMetaMapFromStates(brokerStates) + + // Populate metrics. + if m { + if errs := kafkazk.LoadMetrics(zk, brokers); errs != nil { + return nil, errs + } + } + + return brokers, nil } // ensureBrokerMetrics takes a map of reference brokers and a map of discovered diff --git a/cmd/topicmappr/commands/reassignments.go b/cmd/topicmappr/commands/reassignments.go index b9777da..499aae1 100644 --- a/cmd/topicmappr/commands/reassignments.go +++ b/cmd/topicmappr/commands/reassignments.go @@ -87,7 +87,7 @@ func reassign(params reassignParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handle fmt.Println(err) os.Exit(1) } - brokerMeta, errs := getBrokerMeta(zk, true) + brokerMeta, errs := getBrokerMeta(ka, zk, true) if errs != nil && brokerMeta == nil { for _, e := range errs { fmt.Println(e) diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index 25952be..d2f951f 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -48,7 +48,7 @@ func runRebuild(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handl var brokerMeta mapper.BrokerMetaMap var errs []error if params.useMetadata { - if brokerMeta, errs = getBrokerMeta(zk, withMetrics); errs != nil && brokerMeta == nil { + if brokerMeta, errs = getBrokerMeta(ka, zk, withMetrics); errs != nil && brokerMeta == nil { for _, e := range errs { fmt.Println(e) } From 782c30a3a61563813dd09ef090a9bfeab9a2f4d6 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 3 Aug 2022 11:48:08 -0400 Subject: [PATCH 5/7] [topicmappr] drops automatic pending delete checks --- cmd/topicmappr/commands/metadata.go | 26 ++---------------------- cmd/topicmappr/commands/reassignments.go | 8 +------- cmd/topicmappr/commands/rebuild_steps.go | 12 +++-------- 3 files changed, 6 insertions(+), 40 deletions(-) diff --git a/cmd/topicmappr/commands/metadata.go b/cmd/topicmappr/commands/metadata.go index 0bd1761..697f4a4 100644 --- a/cmd/topicmappr/commands/metadata.go +++ b/cmd/topicmappr/commands/metadata.go @@ -26,8 +26,8 @@ func checkMetaAge(zk kafkazk.Handler, maxAge int) error { } // getBrokerMeta returns a map of brokers and broker metadata for those -// registered in ZooKeeper. Optionally, metrics metadata persisted in ZooKeeper -// (via an external mechanism*) can be merged into the metadata. +// registered in the cluster state. Optionally, broker metrics can be popularted +// via ZooKeeper. func getBrokerMeta(ka kafkaadmin.KafkaAdmin, zk kafkazk.Handler, m bool) (mapper.BrokerMetaMap, []error) { // Get broker states. brokerStates, err := ka.DescribeBrokers(context.Background(), false) @@ -68,28 +68,6 @@ func getPartitionMeta(zk kafkazk.Handler) (mapper.PartitionMetaMap, error) { return zk.GetAllPartitionMeta() } -// stripPendingDeletes takes a partition map and zk handler. It looks up any -// topics in a pending delete state and removes them from the provided partition -// map, returning a list of topics removed. -func stripPendingDeletes(pm *mapper.PartitionMap, zk kafkazk.Handler) ([]string, error) { - // Get pending deletions. - pd, err := zk.GetPendingDeletion() - - if len(pd) == 0 { - return []string{}, err - } - - // Convert to a series of literal regex. - var re []*regexp.Regexp - for _, topic := range pd { - r := regexp.MustCompile(fmt.Sprintf(`^%s$`, topic)) - re = append(re, r) - } - - // Update the PartitionMap and return a list of removed topic names. - return removeTopics(pm, re), err -} - // removeTopics takes a PartitionMap and []*regexp.Regexp of topic name patters. // Any topic names that match any provided pattern will be removed from the // PartitionMap and a []string of topics that were found and removed is returned. diff --git a/cmd/topicmappr/commands/reassignments.go b/cmd/topicmappr/commands/reassignments.go index 499aae1..4429f4f 100644 --- a/cmd/topicmappr/commands/reassignments.go +++ b/cmd/topicmappr/commands/reassignments.go @@ -107,12 +107,6 @@ func reassign(params reassignParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handle os.Exit(1) } - // Exclude any topics that are pending deletion. - pending, err := stripPendingDeletes(partitionMapIn, zk) - if err != nil { - fmt.Println("Error fetching topics pending deletion") - } - // Exclude any explicit exclusions. excluded := removeTopics(partitionMapIn, params.topicsExclude) @@ -120,7 +114,7 @@ func reassign(params reassignParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handle printTopics(partitionMapIn) // Print if any topics were excluded due to pending deletion. - printExcludedTopics(pending, excluded) + printExcludedTopics(nil, excluded) // Get a broker map. brokersIn := mapper.BrokerMapFromPartitionMap(partitionMapIn, brokerMeta, false) diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index d2f951f..b8fc71c 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -67,7 +67,7 @@ func runRebuild(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handl // Build a partition map either from literal map text input or by fetching the // map data from ZooKeeper. Store a copy of the original. - partitionMapIn, pending, excluded := getPartitionMap(params, zk) + partitionMapIn, _, excluded := getPartitionMap(params, zk) originalMap := partitionMapIn.Copy() // Get a list of affected topics. @@ -75,7 +75,7 @@ func runRebuild(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handl // Print if any topics were excluded due to pending deletion or explicit // exclusion. - printExcludedTopics(pending, excluded) + printExcludedTopics(nil, excluded) brokers, bs := getBrokers(params, partitionMapIn, brokerMeta) brokersOrig := brokers.Copy() @@ -198,16 +198,10 @@ func getPartitionMap(params rebuildParams, zk kafkazk.Handler) (*mapper.Partitio os.Exit(1) } - // Exclude any topics that are pending deletion. - pd, err := stripPendingDeletes(pm, zk) - if err != nil { - fmt.Println("Error fetching topics pending deletion") - } - // Exclude topics explicitly listed. et := removeTopics(pm, params.topicsExclude) - return pm, pd, et + return pm, nil, et } return nil, nil, nil From c731f6b437aab82b148a6f0d8f1bdd4454057a1c Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 3 Aug 2022 12:19:58 -0400 Subject: [PATCH 6/7] [topicmappr] fetch topic states via kafkaadmin --- cmd/topicmappr/commands/metadata.go | 15 +++++++++++++++ cmd/topicmappr/commands/reassignments.go | 6 +++--- cmd/topicmappr/commands/rebuild.go | 5 +++-- cmd/topicmappr/commands/rebuild_steps.go | 6 +++--- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/cmd/topicmappr/commands/metadata.go b/cmd/topicmappr/commands/metadata.go index 697f4a4..5b62205 100644 --- a/cmd/topicmappr/commands/metadata.go +++ b/cmd/topicmappr/commands/metadata.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "regexp" + "sort" "time" "github.com/DataDog/kafka-kit/v4/kafkaadmin" @@ -47,6 +48,20 @@ func getBrokerMeta(ka kafkaadmin.KafkaAdmin, zk kafkazk.Handler, m bool) (mapper return brokers, nil } +func getPartitionMaps(ka kafkaadmin.KafkaAdmin, topics []string) (*mapper.PartitionMap, error) { + // Get the topic states. + tState, err := ka.DescribeTopics(context.Background(), topics) + if err != nil { + return nil, err + } + + // Translate it to a mapper object. + pm, _ := mapper.PartitionMapFromTopicStates(tState) + sort.Sort(pm.Partitions) + + return pm, nil +} + // ensureBrokerMetrics takes a map of reference brokers and a map of discovered // broker metadata. Any non-missing brokers in the broker map must be present // in the broker metadata map and have a non-true MetricsIncomplete value. diff --git a/cmd/topicmappr/commands/reassignments.go b/cmd/topicmappr/commands/reassignments.go index 4429f4f..c498420 100644 --- a/cmd/topicmappr/commands/reassignments.go +++ b/cmd/topicmappr/commands/reassignments.go @@ -45,7 +45,7 @@ type reassignParams struct { storageThreshold float64 storageThresholdGB float64 tolerance float64 - topics []*regexp.Regexp + topics []string topicsExclude []*regexp.Regexp requireNewBrokers bool verbose bool @@ -73,7 +73,7 @@ func reassignParamsFromCmd(cmd *cobra.Command) (params reassignParams) { tolerance, _ := cmd.Flags().GetFloat64("tolerance") params.tolerance = tolerance topics, _ := cmd.Flags().GetString("topics") - params.topics = topicRegex(topics) + params.topics = strings.Split(topics, ",") topicsExclude, _ := cmd.Flags().GetString("topics-exclude") params.topicsExclude = topicRegex(topicsExclude) verbose, _ := cmd.Flags().GetBool("verbose") @@ -101,7 +101,7 @@ func reassign(params reassignParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handle } // Get the current partition map. - partitionMapIn, err := kafkazk.PartitionMapFromZK(params.topics, zk) + partitionMapIn, err := getPartitionMaps(ka, params.topics) if err != nil { fmt.Println(err) os.Exit(1) diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index db77820..04b2484 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "regexp" + "strings" "github.com/DataDog/kafka-kit/v4/kafkaadmin" "github.com/DataDog/kafka-kit/v4/kafkazk" @@ -66,7 +67,7 @@ type rebuildParams struct { replication int skipNoOps bool subAffinity bool - topics []*regexp.Regexp + topics []string topicsExclude []*regexp.Regexp useMetadata bool leaderEvacTopics []*regexp.Regexp @@ -102,7 +103,7 @@ func rebuildParamsFromCmd(cmd *cobra.Command) (params rebuildParams) { subAffinity, _ := cmd.Flags().GetBool("sub-affinity") params.subAffinity = subAffinity topics, _ := cmd.Flags().GetString("topics") - params.topics = topicRegex(topics) + params.topics = strings.Split(topics, ",") topicsExclude, _ := cmd.Flags().GetString("topics-exclude") params.topicsExclude = topicRegex(topicsExclude) useMetadata, _ := cmd.Flags().GetBool("use-meta") diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index b8fc71c..b805002 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -67,7 +67,7 @@ func runRebuild(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handl // Build a partition map either from literal map text input or by fetching the // map data from ZooKeeper. Store a copy of the original. - partitionMapIn, _, excluded := getPartitionMap(params, zk) + partitionMapIn, _, excluded := getPartitionMap(params, ka, zk) originalMap := partitionMapIn.Copy() // Get a list of affected topics. @@ -176,7 +176,7 @@ func runRebuild(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handl // via the --topics flag. Two []string are returned; topics excluded due to // pending deletion and topics explicitly excluded (via the --topics-exclude // flag), respectively. -func getPartitionMap(params rebuildParams, zk kafkazk.Handler) (*mapper.PartitionMap, []string, []string) { +func getPartitionMap(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handler) (*mapper.PartitionMap, []string, []string) { switch { // The map was provided as text. @@ -192,7 +192,7 @@ func getPartitionMap(params rebuildParams, zk kafkazk.Handler) (*mapper.Partitio return pm, []string{}, et // The map needs to be fetched via ZooKeeper metadata for all specified topics. case len(params.topics) > 0: - pm, err := kafkazk.PartitionMapFromZK(params.topics, zk) + pm, err := getPartitionMaps(ka, params.topics) if err != nil { fmt.Println(err) os.Exit(1) From b0f99f3d18488d1ae10411d569b8bfaa9aa15e75 Mon Sep 17 00:00:00 2001 From: Jamie Alquiza Date: Wed, 3 Aug 2022 12:31:56 -0400 Subject: [PATCH 7/7] [topicmappr] leader evac topics uses kafkaadmin --- cmd/topicmappr/commands/rebuild.go | 6 ++---- cmd/topicmappr/commands/rebuild_steps.go | 9 ++++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index 04b2484..ccb1eca 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -70,7 +70,7 @@ type rebuildParams struct { topics []string topicsExclude []*regexp.Regexp useMetadata bool - leaderEvacTopics []*regexp.Regexp + leaderEvacTopics []string leaderEvacBrokers []int chunkStepSize int } @@ -111,9 +111,7 @@ func rebuildParamsFromCmd(cmd *cobra.Command) (params rebuildParams) { chunkStepSize, _ := cmd.Flags().GetInt("chunk-step-size") params.chunkStepSize = chunkStepSize let, _ := cmd.Flags().GetString("leader-evac-topics") - if let != "" { - params.leaderEvacTopics = topicRegex(let) - } + params.leaderEvacTopics = strings.Split(let, ",") leb, _ := cmd.Flags().GetString("leader-evac-brokers") if leb != "" { params.leaderEvacBrokers = brokerStringToSlice(leb) diff --git a/cmd/topicmappr/commands/rebuild_steps.go b/cmd/topicmappr/commands/rebuild_steps.go index b805002..a01ade5 100644 --- a/cmd/topicmappr/commands/rebuild_steps.go +++ b/cmd/topicmappr/commands/rebuild_steps.go @@ -1,6 +1,7 @@ package commands import ( + "context" "fmt" "os" @@ -28,11 +29,13 @@ func runRebuild(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handl var evacTopics []string var err error if len(params.leaderEvacTopics) != 0 { - evacTopics, err = zk.GetTopics(params.leaderEvacTopics) + tState, err := ka.DescribeTopics(context.Background(), params.leaderEvacTopics) if err != nil { fmt.Println(err) os.Exit(1) } + evacTopics = tState.List() + } // Fetch broker metadata. @@ -67,7 +70,7 @@ func runRebuild(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handl // Build a partition map either from literal map text input or by fetching the // map data from ZooKeeper. Store a copy of the original. - partitionMapIn, _, excluded := getPartitionMap(params, ka, zk) + partitionMapIn, _, excluded := getPartitionMap(params, ka) originalMap := partitionMapIn.Copy() // Get a list of affected topics. @@ -176,7 +179,7 @@ func runRebuild(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handl // via the --topics flag. Two []string are returned; topics excluded due to // pending deletion and topics explicitly excluded (via the --topics-exclude // flag), respectively. -func getPartitionMap(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handler) (*mapper.PartitionMap, []string, []string) { +func getPartitionMap(params rebuildParams, ka kafkaadmin.KafkaAdmin) (*mapper.PartitionMap, []string, []string) { switch { // The map was provided as text.