Skip to content

Commit

Permalink
Merge pull request #412 from DataDog/jamie/topicmappr-zkless
Browse files Browse the repository at this point in the history
Jamie/topicmappr zkless
  • Loading branch information
jamiealquiza committed Aug 12, 2022
2 parents fcf7f57 + b0f99f3 commit 63c3061
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 67 deletions.
62 changes: 36 additions & 26 deletions cmd/topicmappr/commands/metadata.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package commands

import (
"context"
"fmt"
"regexp"
"sort"
"time"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"
"github.com/DataDog/kafka-kit/v4/kafkazk"
"github.com/DataDog/kafka-kit/v4/mapper"
)
Expand All @@ -24,10 +27,39 @@ func checkMetaAge(zk kafkazk.Handler, maxAge int) error {
}

// getBrokerMeta returns a map of brokers and broker metadata for those
// registered in ZooKeeper. Optionally, metrics metadata persisted in ZooKeeper
// (via an external mechanism*) can be merged into the metadata.
func getBrokerMeta(zk kafkazk.Handler, m bool) (mapper.BrokerMetaMap, []error) {
return zk.GetAllBrokerMeta(m)
// registered in the cluster state. Optionally, broker metrics can be popularted
// via ZooKeeper.
func getBrokerMeta(ka kafkaadmin.KafkaAdmin, zk kafkazk.Handler, m bool) (mapper.BrokerMetaMap, []error) {
// Get broker states.
brokerStates, err := ka.DescribeBrokers(context.Background(), false)
if err != nil {
return nil, []error{err}
}

brokers, _ := mapper.BrokerMetaMapFromStates(brokerStates)

// Populate metrics.
if m {
if errs := kafkazk.LoadMetrics(zk, brokers); errs != nil {
return nil, errs
}
}

return brokers, nil
}

func getPartitionMaps(ka kafkaadmin.KafkaAdmin, topics []string) (*mapper.PartitionMap, error) {
// Get the topic states.
tState, err := ka.DescribeTopics(context.Background(), topics)
if err != nil {
return nil, err
}

// Translate it to a mapper object.
pm, _ := mapper.PartitionMapFromTopicStates(tState)
sort.Sort(pm.Partitions)

return pm, nil
}

// ensureBrokerMetrics takes a map of reference brokers and a map of discovered
Expand All @@ -51,28 +83,6 @@ func getPartitionMeta(zk kafkazk.Handler) (mapper.PartitionMetaMap, error) {
return zk.GetAllPartitionMeta()
}

// stripPendingDeletes takes a partition map and zk handler. It looks up any
// topics in a pending delete state and removes them from the provided partition
// map, returning a list of topics removed.
func stripPendingDeletes(pm *mapper.PartitionMap, zk kafkazk.Handler) ([]string, error) {
// Get pending deletions.
pd, err := zk.GetPendingDeletion()

if len(pd) == 0 {
return []string{}, err
}

// Convert to a series of literal regex.
var re []*regexp.Regexp
for _, topic := range pd {
r := regexp.MustCompile(fmt.Sprintf(`^%s$`, topic))
re = append(re, r)
}

// Update the PartitionMap and return a list of removed topic names.
return removeTopics(pm, re), err
}

// removeTopics takes a PartitionMap and []*regexp.Regexp of topic name patters.
// Any topic names that match any provided pattern will be removed from the
// PartitionMap and a []string of topics that were found and removed is returned.
Expand Down
19 changes: 7 additions & 12 deletions cmd/topicmappr/commands/reassignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"sync"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"
"github.com/DataDog/kafka-kit/v4/kafkazk"
"github.com/DataDog/kafka-kit/v4/mapper"

Expand Down Expand Up @@ -44,7 +45,7 @@ type reassignParams struct {
storageThreshold float64
storageThresholdGB float64
tolerance float64
topics []*regexp.Regexp
topics []string
topicsExclude []*regexp.Regexp
requireNewBrokers bool
verbose bool
Expand Down Expand Up @@ -72,21 +73,21 @@ func reassignParamsFromCmd(cmd *cobra.Command) (params reassignParams) {
tolerance, _ := cmd.Flags().GetFloat64("tolerance")
params.tolerance = tolerance
topics, _ := cmd.Flags().GetString("topics")
params.topics = topicRegex(topics)
params.topics = strings.Split(topics, ",")
topicsExclude, _ := cmd.Flags().GetString("topics-exclude")
params.topicsExclude = topicRegex(topicsExclude)
verbose, _ := cmd.Flags().GetBool("verbose")
params.verbose = verbose
return params
}

func reassign(params reassignParams, zk kafkazk.Handler) ([]*mapper.PartitionMap, []error) {
func reassign(params reassignParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handler) ([]*mapper.PartitionMap, []error) {
// Get broker and partition metadata.
if err := checkMetaAge(zk, params.maxMetadataAge); err != nil {
fmt.Println(err)
os.Exit(1)
}
brokerMeta, errs := getBrokerMeta(zk, true)
brokerMeta, errs := getBrokerMeta(ka, zk, true)
if errs != nil && brokerMeta == nil {
for _, e := range errs {
fmt.Println(e)
Expand All @@ -100,26 +101,20 @@ func reassign(params reassignParams, zk kafkazk.Handler) ([]*mapper.PartitionMap
}

// Get the current partition map.
partitionMapIn, err := kafkazk.PartitionMapFromZK(params.topics, zk)
partitionMapIn, err := getPartitionMaps(ka, params.topics)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

// Exclude any topics that are pending deletion.
pending, err := stripPendingDeletes(partitionMapIn, zk)
if err != nil {
fmt.Println("Error fetching topics pending deletion")
}

// Exclude any explicit exclusions.
excluded := removeTopics(partitionMapIn, params.topicsExclude)

// Print topics matched to input params.
printTopics(partitionMapIn)

// Print if any topics were excluded due to pending deletion.
printExcludedTopics(pending, excluded)
printExcludedTopics(nil, excluded)

// Get a broker map.
brokersIn := mapper.BrokerMapFromPartitionMap(partitionMapIn, brokerMeta, false)
Expand Down
12 changes: 11 additions & 1 deletion cmd/topicmappr/commands/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"os"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"

"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -54,7 +56,15 @@ func rebalance(cmd *cobra.Command, _ []string) {

defer zk.Close()

partitionMaps, errs := reassign(params, zk)
// Init kafkaadmin client.
bs := cmd.Parent().Flag("kafka-addr").Value.String()
ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs})
if err != nil {
fmt.Println(err)
os.Exit(1)
}

partitionMaps, errs := reassign(params, ka, zk)

// Handle errors that are possible to be overridden by the user (aka 'WARN'
// in topicmappr console output).
Expand Down
22 changes: 15 additions & 7 deletions cmd/topicmappr/commands/rebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"fmt"
"os"
"regexp"
"strings"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"
"github.com/DataDog/kafka-kit/v4/kafkazk"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -65,10 +67,10 @@ type rebuildParams struct {
replication int
skipNoOps bool
subAffinity bool
topics []*regexp.Regexp
topics []string
topicsExclude []*regexp.Regexp
useMetadata bool
leaderEvacTopics []*regexp.Regexp
leaderEvacTopics []string
leaderEvacBrokers []int
chunkStepSize int
}
Expand Down Expand Up @@ -101,17 +103,15 @@ func rebuildParamsFromCmd(cmd *cobra.Command) (params rebuildParams) {
subAffinity, _ := cmd.Flags().GetBool("sub-affinity")
params.subAffinity = subAffinity
topics, _ := cmd.Flags().GetString("topics")
params.topics = topicRegex(topics)
params.topics = strings.Split(topics, ",")
topicsExclude, _ := cmd.Flags().GetString("topics-exclude")
params.topicsExclude = topicRegex(topicsExclude)
useMetadata, _ := cmd.Flags().GetBool("use-meta")
params.useMetadata = useMetadata
chunkStepSize, _ := cmd.Flags().GetInt("chunk-step-size")
params.chunkStepSize = chunkStepSize
let, _ := cmd.Flags().GetString("leader-evac-topics")
if let != "" {
params.leaderEvacTopics = topicRegex(let)
}
params.leaderEvacTopics = strings.Split(let, ",")
leb, _ := cmd.Flags().GetString("leader-evac-brokers")
if leb != "" {
params.leaderEvacBrokers = brokerStringToSlice(leb)
Expand Down Expand Up @@ -150,6 +150,14 @@ func rebuild(cmd *cobra.Command, _ []string) {
fmt.Println("\n[INFO] --force-rebuild disables --sub-affinity")
}

// Init kafkaadmin client.
bs := cmd.Parent().Flag("kafka-addr").Value.String()
ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs})
if err != nil {
fmt.Println(err)
os.Exit(1)
}

// ZooKeeper init.
var zk kafkazk.Handler
if params.useMetadata || len(params.topics) > 0 || params.placement == "storage" {
Expand All @@ -164,7 +172,7 @@ func rebuild(cmd *cobra.Command, _ []string) {
defer zk.Close()
}

maps, errs := runRebuild(params, zk)
maps, errs := runRebuild(params, ka, zk)

// Print error/warnings.
handleOverridableErrs(cmd, errs)
Expand Down
26 changes: 12 additions & 14 deletions cmd/topicmappr/commands/rebuild_steps.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package commands

import (
"context"
"fmt"
"os"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"
"github.com/DataDog/kafka-kit/v4/kafkazk"
"github.com/DataDog/kafka-kit/v4/mapper"
)

func runRebuild(params rebuildParams, zk kafkazk.Handler) ([]*mapper.PartitionMap, []error) {
func runRebuild(params rebuildParams, ka kafkaadmin.KafkaAdmin, zk kafkazk.Handler) ([]*mapper.PartitionMap, []error) {
// General flow:
// 1) A PartitionMap is formed (either unmarshaled from the literal
// map input via --rebuild-map or generated from ZooKeeper Metadata
Expand All @@ -27,11 +29,13 @@ func runRebuild(params rebuildParams, zk kafkazk.Handler) ([]*mapper.PartitionMa
var evacTopics []string
var err error
if len(params.leaderEvacTopics) != 0 {
evacTopics, err = zk.GetTopics(params.leaderEvacTopics)
tState, err := ka.DescribeTopics(context.Background(), params.leaderEvacTopics)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
evacTopics = tState.List()

}

// Fetch broker metadata.
Expand All @@ -47,7 +51,7 @@ func runRebuild(params rebuildParams, zk kafkazk.Handler) ([]*mapper.PartitionMa
var brokerMeta mapper.BrokerMetaMap
var errs []error
if params.useMetadata {
if brokerMeta, errs = getBrokerMeta(zk, withMetrics); errs != nil && brokerMeta == nil {
if brokerMeta, errs = getBrokerMeta(ka, zk, withMetrics); errs != nil && brokerMeta == nil {
for _, e := range errs {
fmt.Println(e)
}
Expand All @@ -66,15 +70,15 @@ func runRebuild(params rebuildParams, zk kafkazk.Handler) ([]*mapper.PartitionMa

// Build a partition map either from literal map text input or by fetching the
// map data from ZooKeeper. Store a copy of the original.
partitionMapIn, pending, excluded := getPartitionMap(params, zk)
partitionMapIn, _, excluded := getPartitionMap(params, ka)
originalMap := partitionMapIn.Copy()

// Get a list of affected topics.
printTopics(partitionMapIn)

// Print if any topics were excluded due to pending deletion or explicit
// exclusion.
printExcludedTopics(pending, excluded)
printExcludedTopics(nil, excluded)

brokers, bs := getBrokers(params, partitionMapIn, brokerMeta)
brokersOrig := brokers.Copy()
Expand Down Expand Up @@ -175,7 +179,7 @@ func runRebuild(params rebuildParams, zk kafkazk.Handler) ([]*mapper.PartitionMa
// via the --topics flag. Two []string are returned; topics excluded due to
// pending deletion and topics explicitly excluded (via the --topics-exclude
// flag), respectively.
func getPartitionMap(params rebuildParams, zk kafkazk.Handler) (*mapper.PartitionMap, []string, []string) {
func getPartitionMap(params rebuildParams, ka kafkaadmin.KafkaAdmin) (*mapper.PartitionMap, []string, []string) {

switch {
// The map was provided as text.
Expand All @@ -191,22 +195,16 @@ func getPartitionMap(params rebuildParams, zk kafkazk.Handler) (*mapper.Partitio
return pm, []string{}, et
// The map needs to be fetched via ZooKeeper metadata for all specified topics.
case len(params.topics) > 0:
pm, err := kafkazk.PartitionMapFromZK(params.topics, zk)
pm, err := getPartitionMaps(ka, params.topics)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

// Exclude any topics that are pending deletion.
pd, err := stripPendingDeletes(pm, zk)
if err != nil {
fmt.Println("Error fetching topics pending deletion")
}

// Exclude topics explicitly listed.
et := removeTopics(pm, params.topicsExclude)

return pm, pd, et
return pm, nil, et
}

return nil, nil, nil
Expand Down
1 change: 1 addition & 0 deletions cmd/topicmappr/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func Execute() {
}

func init() {
rootCmd.PersistentFlags().String("kafka-addr", "localhost:9092", "Kafka bootstrap address")
rootCmd.PersistentFlags().String("zk-addr", "localhost:2181", "ZooKeeper connect string")
rootCmd.PersistentFlags().String("zk-prefix", "", "ZooKeeper prefix (if Kafka is configured with a chroot path prefix)")
rootCmd.PersistentFlags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics")
Expand Down
12 changes: 11 additions & 1 deletion cmd/topicmappr/commands/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"os"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"

"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -52,7 +54,15 @@ func scale(cmd *cobra.Command, _ []string) {

defer zk.Close()

partitionMaps, _ := reassign(params, zk)
// Init kafkaadmin client.
bs := cmd.Parent().Flag("kafka-addr").Value.String()
ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs})
if err != nil {
fmt.Println(err)
os.Exit(1)
}

partitionMaps, _ := reassign(params, ka, zk)

// TODO intentionally not handling the one error that can be returned here
// right now, but would be better to distinguish errors
Expand Down
Loading

0 comments on commit 63c3061

Please sign in to comment.