Skip to content

Commit

Permalink
Merge pull request #189 from DataDog/jamie/cmd
Browse files Browse the repository at this point in the history
Jamie/cmd
  • Loading branch information
jamiealquiza committed Oct 24, 2018
2 parents ece0ffe + b12acf4 commit de0ca17
Show file tree
Hide file tree
Showing 10 changed files with 394 additions and 297 deletions.
3 changes: 1 addition & 2 deletions cmd/autothrottle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ Autothrottle is designed to work as a piggyback system that doesn't take ownersh
- Ability to dynamically set fixed replication rates (via the HTTP API)

# Installation
- `go get github.com/DataDog/kafka-kit`
- `go install github.com/DataDog/kafka-kit/cmd/autothrottle`
- `go get -u github.com/DataDog/kafka-kit/...`

Binary will be found at `$GOPATH/bin/autothrottle`

Expand Down
3 changes: 1 addition & 2 deletions cmd/metricsfetcher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
Metricsfetcher is a simple tool that fetches Kafka broker and partition metrics from the Datadog API and stores it in ZooKeeper. This data is used for the topicmappr [storage placement](https://github.com/DataDog/kafka-kit/tree/master/cmd/topicmappr#placement-strategy) strategy.

# Installation
- `go get github.com/DataDog/kafka-kit`
- `go install github.com/DataDog/kafka-kit/cmd/metricsfetcher`
- `go get -u github.com/DataDog/kafka-kit/...`

Binary will be found at `$GOPATH/bin/metricsfetcher`

Expand Down
90 changes: 55 additions & 35 deletions cmd/topicmappr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,51 +25,71 @@ An output of what's changed along with advisory notices (e.g. insufficient broke
Additional statistical output is included where available. For instance, broker-to-broker relationships are represented as node degree counts (where edges are defined as brokers that belong in a common replica set for any given partition). These values can be used as a probabilistic indicator of replication bandwidth; replacing a broker with more edges will likely replicate from more source brokers than one with fewer edges.

# Installation
- `go get github.com/DataDog/kafka-kit`
- `go install github.com/DataDog/kafka-kit/cmd/topicmappr`
- `go get -u github.com/DataDog/kafka-kit/...`

Binary will be found at `$GOPATH/bin/topicmappr`

**Compatibility**

Tested with Go 1.10 (required), Kafka 0.10.x, ZooKeeper 3.4.x.
Tested with Go 1.10+ (required), Kafka 0.10.x, ZooKeeper 3.4.x.

# Usage

## Flags
## Commands

Currently, all topicmappr actions are performed through the `rebuild-topics` command.

```
Usage:
topicmappr [command]
Available Commands:
help Help about any command
rebuild-topics Build a partition map for one or more topics
Flags:
-h, --help help for topicmappr
--zk-addr string ZooKeeper connect string (for broker metadata or rebuild-topic lookups) (default "localhost:2181")
--zk-prefix string ZooKeeper namespace prefix (for Kafka brokers)
Use "topicmappr [command] --help" for more information about a command.
```



## rebuild-topics usage

```
Usage of topicmappr:
-brokers string
Broker list to scope all partition placements to [TOPICMAPPR_BROKERS]
-force-rebuild
Forces a complete map rebuild [TOPICMAPPR_FORCE_REBUILD]
-ignore-warns
Produce a map even if warnings are encountered [TOPICMAPPR_IGNORE_WARNS]
-optimize string
Optimization priority for the storage placement strategy: [distribution, storage] [TOPICMAPPR_OPTIMIZE] (default "distribution")
-out-file string
If defined, write a combined map of all topics to a file [TOPICMAPPR_OUT_FILE]
-out-path string
Path to write output map files to [TOPICMAPPR_OUT_PATH]
-placement string
Partition placement strategy: [count, storage] [TOPICMAPPR_PLACEMENT] (default "count")
-rebuild-map string
Rebuild a partition map provided as a string literal [TOPICMAPPR_REBUILD_MAP]
-rebuild-topics string
Rebuild topics (comma delim. list) by lookup in ZooKeeper [TOPICMAPPR_REBUILD_TOPICS]
-replication int
Normalize the topic replication factor across all replica sets [TOPICMAPPR_REPLICATION]
-sub-affinity
Replacement broker substitution affinity [TOPICMAPPR_SUB_AFFINITY]
-use-meta
Use broker metadata in placement constraints [TOPICMAPPR_USE_META] (default true)
-zk-addr string
ZooKeeper connect string (for broker metadata or rebuild-topic lookups) [TOPICMAPPR_ZK_ADDR] (default "localhost:2181")
-zk-metrics-prefix string
ZooKeeper namespace prefix (for Kafka metrics) [TOPICMAPPR_ZK_METRICS_PREFIX] (default "topicmappr")
-zk-prefix string
ZooKeeper namespace prefix (for Kafka brokers) [TOPICMAPPR_ZK_PREFIX]
rebuild-topics requires at least two inputs: a reference of
target topics and a list of broker IDs to which those topics should be mapped.
Target topics are provided as a comma delimited list of topic names and/or regex patterns
via the --topics parameter, which discovers matching topics in ZooKeeper (additionally,
the --zk-addr and --zk-prefix global flags should be set). Alternatively, a JSON map can be
provided via the --map-string flag. Target broker IDs are provided via the --broker flag.
Usage:
topicmappr rebuild-topics [flags]
Flags:
--brokers string Broker list to scope all partition placements to
--force-rebuild Forces a complete map rebuild
-h, --help help for rebuild-topics
--ignore-warns Produce a map even if warnings are encountered
--map-string string Rebuild a partition map provided as a string literal
--optimize string Optimization priority for the storage placement strategy: [distribution, storage] (default "distribution")
--out-file string If defined, write a combined map of all topics to a file
--out-path string Path to write output map files to
--partition-size-factor float Factor by which to multiply partition sizes when using storage placement (default 1)
--placement string Partition placement strategy: [count, storage] (default "count")
--replication int Normalize the topic replication factor across all replica sets (0 results in a no-op)
--sub-affinity Replacement broker substitution affinity
--topics string Rebuild topics (comma delim. list) by lookup in ZooKeeper
--use-meta Use broker metadata in placement constraints (default true)
--zk-metrics-prefix string ZooKeeper namespace prefix for Kafka metrics (when using storage placement) (default "topicmappr")
Global Flags:
--zk-addr string ZooKeeper connect string (for broker metadata or rebuild-topic lookups) (default "localhost:2181")
--zk-prefix string ZooKeeper namespace prefix (for Kafka brokers)
```

## Managing and Repairing Topics
Expand Down
214 changes: 214 additions & 0 deletions cmd/topicmappr/commands/rebuildtopics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package commands

import (
"fmt"
"io/ioutil"
"log"
"os"
"regexp"
"sort"
"strings"

"github.com/DataDog/kafka-kit/kafkazk"

"github.com/spf13/cobra"
)

var (
// Characters allowed in Kafka topic names
topicNormalChar, _ = regexp.Compile(`[a-zA-Z0-9_\\-]`)

Config struct {
rebuildTopics []*regexp.Regexp
brokers []int
}
)

var rebuildTopicsCmd = &cobra.Command{
Use: "rebuild-topics",
Short: "Build a partition map for one or more topics",
Long: `rebuild-topics requires at least two inputs: a reference of
target topics and a list of broker IDs to which those topics should be mapped.
Target topics are provided as a comma delimited list of topic names and/or regex patterns
via the --topics parameter, which discovers matching topics in ZooKeeper (additionally,
the --zk-addr and --zk-prefix global flags should be set). Alternatively, a JSON map can be
provided via the --map-string flag. Target broker IDs are provided via the --broker flag.`,
Run: rebuild,
}

func init() {
rootCmd.AddCommand(rebuildTopicsCmd)

rebuildTopicsCmd.Flags().String("topics", "", "Rebuild topics (comma delim. list) by lookup in ZooKeeper")
rebuildTopicsCmd.Flags().String("map-string", "", "Rebuild a partition map provided as a string literal")
rebuildTopicsCmd.Flags().Bool("use-meta", true, "Use broker metadata in placement constraints")
rebuildTopicsCmd.Flags().String("out-path", "", "Path to write output map files to")
rebuildTopicsCmd.Flags().String("out-file", "", "If defined, write a combined map of all topics to a file")
rebuildTopicsCmd.Flags().Bool("ignore-warns", false, "Produce a map even if warnings are encountered")
rebuildTopicsCmd.Flags().Bool("force-rebuild", false, "Forces a complete map rebuild")
rebuildTopicsCmd.Flags().Int("replication", 0, "Normalize the topic replication factor across all replica sets (0 results in a no-op)")
rebuildTopicsCmd.Flags().Bool("sub-affinity", false, "Replacement broker substitution affinity")
rebuildTopicsCmd.Flags().String("placement", "count", "Partition placement strategy: [count, storage]")
rebuildTopicsCmd.Flags().String("optimize", "distribution", "Optimization priority for the storage placement strategy: [distribution, storage]")
rebuildTopicsCmd.Flags().Float64("partition-size-factor", 1.0, "Factor by which to multiply partition sizes when using storage placement")
rebuildTopicsCmd.Flags().String("brokers", "", "Broker list to scope all partition placements to")
rebuildTopicsCmd.Flags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics (when using storage placement)")

// Required.
rebuildTopicsCmd.MarkFlagRequired("brokers")
}

func rebuild(cmd *cobra.Command, _ []string) {
// Suppress underlying ZK client noise.
log.SetOutput(ioutil.Discard)

b, _ := cmd.Flags().GetString("brokers")
Config.brokers = kafkazk.BrokerStringToSlice(b)
topics, _ := cmd.Flags().GetString("topics")

// Sanity check params.

p := cmd.Flag("placement").Value.String()
o := cmd.Flag("optimize").Value.String()
fr, _ := cmd.Flags().GetBool("force-rebuild")
sa, _ := cmd.Flags().GetBool("sub-affinity")
m, _ := cmd.Flags().GetBool("use-meta")

switch {
case p != "count" && p != "storage":
fmt.Println("\n[ERROR] --placement must be either 'count' or 'storage'")
defaultsAndExit()
case o != "distribution" && o != "storage":
fmt.Println("\n[ERROR] --optimize must be either 'distribution' or 'storage'")
defaultsAndExit()
case !m && p == "storage":
fmt.Println("\n[ERROR] --placement=storage requires --use-meta=true")
defaultsAndExit()
case fr && sa:
fmt.Println("\n[INFO] --force-rebuild disables --sub-affinity")
}

// Append trailing slash if not included.
op := cmd.Flag("out-path").Value.String()
if op != "" && !strings.HasSuffix(op, "/") {
cmd.Flags().Set("out-path", op+"/")
}

// Determine if regexp was provided in the topic
// name. If not, set the topic name to ^name$.
topicNames := strings.Split(topics, ",")
for n, t := range topicNames {
if !containsRegex(t) {
topicNames[n] = fmt.Sprintf(`^%s$`, t)
}
}

// Compile topic regex.
for _, t := range topicNames {
r, err := regexp.Compile(t)
if err != nil {
fmt.Printf("Invalid topic regex: %s\n", t)
os.Exit(1)
}

Config.rebuildTopics = append(Config.rebuildTopics, r)
}

// ZooKeeper init.
zk := initZooKeeper(cmd)
if zk != nil {
defer zk.Close()
}

// General flow:
// 1) A PartitionMap is formed (either unmarshaled from the literal
// map input via --rebuild-map or generated from ZooKeeper Metadata
// for topics matching --topics).
// 2) A BrokerMap is formed from brokers found in the PartitionMap
// along with any new brokers provided via the --brokers param.
// 3) The PartitionMap and BrokerMap are fed to a rebuild
// function. Missing brokers, brokers marked for replacement,
// and all other placements are performed, returning a new
// PartitionMap.
// 4) Differences between the original and new PartitionMap
// are detected and reported.
// 5) The new PartitionMap is split by topic. Map(s) are written.

// Fetch broker and partition Metadata.
brokerMeta := getbrokerMeta(cmd, zk)
partitionMeta := getPartitionMeta(cmd, zk)

// Build a partition map either from literal map text input or by fetching the
// map data from ZooKeeper. Store a copy of the original.
partitionMapIn := getPartitionMap(cmd, zk)
originalMap := partitionMapIn.Copy()

// Get a list of affected topics.
printTopics(partitionMapIn)

brokers, bs := getBrokers(cmd, partitionMapIn, brokerMeta)
brokersOrig := brokers.Copy()

if bs.Changes() {
fmt.Printf("%s-\n", indent)
}

// Check if any referenced brokers are marked as having
// missing/partial metrics data.
ensureBrokerMetrics(cmd, brokers, brokerMeta)

// Create substitution affinities.
affinities := getSubAffinities(cmd, brokers, brokersOrig, partitionMapIn)

if affinities != nil {
fmt.Printf("%s-\n", indent)
}

// Print changes, actions.
printChangesActions(cmd, bs)

// Apply any replication factor settings.
updateReplicationFactor(cmd, partitionMapIn)

// Build a new map using the provided list of brokers.
// This is OK to run even when a no-op is intended.
partitionMapOut, warns := buildMap(cmd, partitionMapIn, partitionMeta, brokers, affinities)

// Sort by topic, partition.
// TODO all functions should return lex sorted partition maps. Review for
// removal. Also, partitionMapIn shouldn't be further referenced at this point.
sort.Sort(partitionMapIn.Partitions)
sort.Sort(partitionMapOut.Partitions)

// Count missing brokers as a warning.
if bs.Missing > 0 {
w := fmt.Sprintf("%d provided brokers not found in ZooKeeper\n", bs.Missing)
warns = append(warns, w)
}

// Print warnings.
fmt.Println("\nWARN:")
if len(warns) > 0 {
sort.Strings(warns)
for _, e := range warns {
fmt.Printf("%s%s\n", indent, e)
}
} else {
fmt.Printf("%s[none]\n", indent)
}

// Print map change results.
printMapChanges(originalMap, partitionMapOut)

// Print broker assignment statistics.
printBrokerAssignmentStats(cmd, originalMap, partitionMapOut, brokersOrig, brokers)

// If no warnings were encountered, write out the output partition map(s).
iw, _ := cmd.Flags().GetBool("ignore-warns")
if !iw && len(warns) > 0 {
fmt.Printf("\n%sWarnings encountered, partition map not created. Override with --ignore-warns.\n", indent)
os.Exit(1)
}

writeMaps(cmd, partitionMapOut)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package commands

import (
"flag"
Expand Down
Loading

0 comments on commit de0ca17

Please sign in to comment.