Skip to content

Commit

Permalink
Merge pull request #251 from DataDog/jamie/auto-brokers
Browse files Browse the repository at this point in the history
Jamie/auto brokers
  • Loading branch information
jamiealquiza committed Apr 18, 2019
2 parents 23fced4 + 367b327 commit 740babf
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 13 deletions.
5 changes: 3 additions & 2 deletions cmd/topicmappr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ Usage:
topicmappr rebuild [flags]
Flags:
--brokers string Broker list to scope all partition placements to
--brokers string Broker list to scope all partition placements to ('-1' automatically expands to all currently mapped brokers)
--force-rebuild Forces a complete map rebuild
-h, --help help for rebuild
--map-string string Rebuild a partition map provided as a string literal
--metrics-age int Kafka metrics age tolerance (in minutes) (when using storage placement) (default 60)
--min-rack-ids int Minimum number of required of unique rack IDs per replica set (0 requires that all are unique)
--optimize string Optimization priority for the storage placement strategy: [distribution, storage] (default "distribution")
--out-file string If defined, write a combined map of all topics to a file
--out-path string Path to write output map files to
Expand Down Expand Up @@ -105,7 +106,7 @@ Usage:
topicmappr rebalance [flags]
Flags:
--brokers string Broker list to scope all partition placements to
--brokers string Broker list to scope all partition placements to ('-1' automatically expands to all currently mapped brokers)
-h, --help help for rebalance
--locality-scoped Disallow a relocation to traverse rack.id values among brokers
--metrics-age int Kafka metrics age tolerance (in minutes) (default 60)
Expand Down
2 changes: 1 addition & 1 deletion cmd/topicmappr/commands/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func init() {
rebalanceCmd.Flags().String("topics", "", "Rebuild topics (comma delim. list) by lookup in ZooKeeper")
rebalanceCmd.Flags().String("out-path", "", "Path to write output map files to")
rebalanceCmd.Flags().String("out-file", "", "If defined, write a combined map of all topics to a file")
rebalanceCmd.Flags().String("brokers", "", "Broker list to scope all partition placements to")
rebalanceCmd.Flags().String("brokers", "", "Broker list to scope all partition placements to ('-1' automatically expands to all currently mapped brokers)")
rebalanceCmd.Flags().Float64("storage-threshold", 0.20, "Percent below the harmonic mean storage free to target for partition offload (0 targets a brokers)")
rebalanceCmd.Flags().Float64("storage-threshold-gb", 0.00, "Storage free in gigabytes to target for partition offload (those below the specified value); 0 [default] defers target selection to --storage-threshold")
rebalanceCmd.Flags().Float64("tolerance", 0.10, "Percent distance from the mean storage free to limit storage scheduling")
Expand Down
2 changes: 1 addition & 1 deletion cmd/topicmappr/commands/rebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func init() {
rebuildCmd.Flags().Int("min-rack-ids", 0, "Minimum number of required of unique rack IDs per replica set (0 requires that all are unique)")
rebuildCmd.Flags().String("optimize", "distribution", "Optimization priority for the storage placement strategy: [distribution, storage]")
rebuildCmd.Flags().Float64("partition-size-factor", 1.0, "Factor by which to multiply partition sizes when using storage placement")
rebuildCmd.Flags().String("brokers", "", "Broker list to scope all partition placements to")
rebuildCmd.Flags().String("brokers", "", "Broker list to scope all partition placements to ('-1' automatically expands to all currently mapped brokers)")
rebuildCmd.Flags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics (when using storage placement)")
rebuildCmd.Flags().Int("metrics-age", 60, "Kafka metrics age tolerance (in minutes) (when using storage placement)")
rebuildCmd.Flags().Bool("skip-no-ops", false, "Skip no-op partition assigments")
Expand Down
33 changes: 24 additions & 9 deletions kafkazk/brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,33 @@ func (b BrokerList) SortPseudoShuffle(seed int64) {
}
}

// Update takes a []int of broker IDs and BrokerMap then adds them to the
// Update takes a []int of broker IDs and BrokerMetaMap then adds them to the
// BrokerMap, returning the count of marked for replacement, newly included,
// and brokers that weren't found in ZooKeeper. Additionally, a channel
// of msgs describing changes is returned.
func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) (*BrokerStatus, <-chan string) {
bs := &BrokerStatus{}
msgs := make(chan string, len(b)+(len(bl)*3))

// Build a map from the new broker list.
newBrokers := map[int]bool{}
var includeAllExisting = false

// Build a map from the provided broker list.
providedBrokers := map[int]bool{}
for _, broker := range bl {
newBrokers[broker] = true
// -1 is a placeholder that is substituted with
// all brokers already found in the BrokerMap.
if broker == -1 {
includeAllExisting = true
continue
}

providedBrokers[broker] = true
}

if includeAllExisting {
for id := range b {
providedBrokers[id] = true
}
}

// Do an initial pass on existing brokers
Expand All @@ -222,7 +237,7 @@ func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) (*BrokerStatus, <-chan str
b[id].Missing = true
// If this broker is missing and was provided in
// the broker list, consider it a "missing provided broker".
if _, ok := newBrokers[id]; len(bm) > 0 && ok {
if _, ok := providedBrokers[id]; len(bm) > 0 && ok {
bs.Missing++
} else {
bs.OldMissing++
Expand All @@ -232,21 +247,21 @@ func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) (*BrokerStatus, <-chan str
}

// Set the replace flag for existing brokers
// not in the new broker map.
// not in the provided broker map.
for _, broker := range b {
if broker.ID == StubBrokerID {
continue
}

if _, ok := newBrokers[broker.ID]; !ok {
if _, ok := providedBrokers[broker.ID]; !ok {
bs.Replace++
b[broker.ID].Replace = true
msgs <- fmt.Sprintf("Broker %d marked for removal", broker.ID)
}
}

// Merge new brokers with existing brokers.
for id := range newBrokers {
// Merge provided brokers with existing brokers.
for id := range providedBrokers {
// Don't overwrite existing (which will be most brokers).
if b[id] == nil {
// Skip metadata lookups if
Expand Down
15 changes: 15 additions & 0 deletions kafkazk/brokers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,21 @@ func TestUpdate(t *testing.T) {
}
}

func TestUpdateIncludeExisting(t *testing.T) {
zk := &Mock{}
bmm, _ := zk.GetAllBrokerMeta(false)
bm := newMockBrokerMap()

bm.Update([]int{-1}, bmm)

// Ensure all broker IDs are in the map.
for _, id := range []int{StubBrokerID, 1001, 1002, 1003, 1004} {
if _, ok := bm[id]; !ok {
t.Errorf("Expected presence of ID %d", id)
}
}
}

func TestSubStorageAll(t *testing.T) {
bm := newMockBrokerMap()
pm, _ := PartitionMapFromString(testGetMapString("test_topic"))
Expand Down

0 comments on commit 740babf

Please sign in to comment.