Skip to content

Commit

Permalink
Merge pull request #222 from DataDog/jamie/rebalance-metrics
Browse files Browse the repository at this point in the history
rebalance checks broker metrics
  • Loading branch information
jamiealquiza committed Nov 19, 2018
2 parents 8ea5e8e + 96bfde4 commit da31cab
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 18 deletions.
15 changes: 15 additions & 0 deletions cmd/topicmappr/commands/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,21 @@ func initZooKeeper(cmd *cobra.Command) (kafkazk.Handler, error) {
return zk, nil
}

// ensureBrokerMetrics takes a map of reference brokers and
// a map of discovered broker metadata. Any non-missing brokers
// in the broker map must be present in the broker metadata map
// and have a non-true MetricsIncomplete value.
func ensureBrokerMetrics(cmd *cobra.Command, bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) {
for id, b := range bm {
// Missing brokers won't even
// be found in the brokerMeta.
if !b.Missing && id != 0 && bmm[id].MetricsIncomplete {
fmt.Printf("Metrics not found for broker %d\n", id)
os.Exit(1)
}
}
}

// containsRegex takes a topic name
// reference and returns whether or not
// it should be interpreted as regex.
Expand Down
4 changes: 4 additions & 0 deletions cmd/topicmappr/commands/rebalance_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func validateBrokersForRebalance(cmd *cobra.Command, brokers kafkazk.BrokerMap,
fmt.Printf("%s-\n", indent)
}

// Check if any referenced brokers are marked as having
// missing/partial metrics data.
ensureBrokerMetrics(cmd, brokers, bm)

switch {
case c.Missing > 0, c.OldMissing > 0, c.Replace > 0:
fmt.Printf("%s[ERROR] rebalance only allows broker additions\n", indent)
Expand Down
4 changes: 3 additions & 1 deletion cmd/topicmappr/commands/rebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ func rebuild(cmd *cobra.Command, _ []string) {

// Check if any referenced brokers are marked as having
// missing/partial metrics data.
ensureBrokerMetrics(cmd, brokers, brokerMeta)
if m, _ := cmd.Flags().GetBool("use-meta"); m {
ensureBrokerMetrics(cmd, brokers, brokerMeta)
}

// Create substitution affinities.
affinities := getSubAffinities(cmd, brokers, brokersOrig, partitionMapIn)
Expand Down
17 changes: 0 additions & 17 deletions cmd/topicmappr/commands/rebuild_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,6 @@ func getPartitionMap(cmd *cobra.Command, zk kafkazk.Handler) *kafkazk.PartitionM
return nil
}

// ensureBrokerMetrics takes a map of reference brokers and
// a map of discovered broker metadata. Any non-missing brokers
// in the broker map must be present in the broker metadata map
// and have a non-true MetricsIncomplete value.
func ensureBrokerMetrics(cmd *cobra.Command, bm kafkazk.BrokerMap, bmm kafkazk.BrokerMetaMap) {
if m, _ := cmd.Flags().GetBool("use-meta"); m {
for id, b := range bm {
// Missing brokers won't even
// be found in the brokerMeta.
if !b.Missing && id != 0 && bmm[id].MetricsIncomplete {
fmt.Printf("Metrics not found for broker %d\n", id)
os.Exit(1)
}
}
}
}

// getSubAffinities, if enabled via --sub-affinity, takes reference broker maps
// and a partition map and attempts to return a complete SubstitutionAffinities.
func getSubAffinities(cmd *cobra.Command, bm kafkazk.BrokerMap, bmo kafkazk.BrokerMap, pm *kafkazk.PartitionMap) kafkazk.SubstitutionAffinities {
Expand Down

0 comments on commit da31cab

Please sign in to comment.