Skip to content

Commit

Permalink
Merge pull request #51 from DataDog/jamie/refactor
Browse files Browse the repository at this point in the history
refactor, tests
  • Loading branch information
jamiealquiza committed Dec 15, 2017
2 parents f4b315d + fcdce4b commit 8e256b9
Show file tree
Hide file tree
Showing 9 changed files with 1,216 additions and 745 deletions.
312 changes: 312 additions & 0 deletions brokers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
package main

import (
"fmt"
"os"
"sort"
"strconv"
"strings"
)

type brokerUseStats struct {
leader int
follower int
}

type brokerStatus struct {
new int
missing int
oldMissing int
replace int
}

// broker is used for internal
// metadata / accounting.
type broker struct {
id int
locality string
used int
replace bool
}

// brokerMap holds a mapping of
// broker IDs to *broker.
type brokerMap map[int]*broker

// brokerList is a slice of
// brokers for sorting by used count.
type brokerList []*broker

// Satisfy the sort interface for brokerList.

func (b brokerList) Len() int { return len(b) }
func (b brokerList) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b brokerList) Less(i, j int) bool {
if b[i].used < b[j].used {
return true
}
if b[i].used > b[j].used {
return false
}

return b[i].id < b[j].id
}

// bestCandidate takes a *constraints
// and returns the *broker with the lowest used
// count that satisfies all constraints.
func (b brokerList) bestCandidate(c *constraints) (*broker, error) {
sort.Sort(b)

var candidate *broker

// Iterate over candidates.
for _, candidate = range b {
// Candidate passes, return.
if c.passes(candidate) {
c.add(candidate)
candidate.used++

return candidate, nil
}
}

// List exhausted, no brokers passed.
return nil, errNoBrokers
}

// add takes a *broker and adds its
// attributes to the *constraints.
func (c *constraints) add(b *broker) {
if b.locality != "" {
c.locality[b.locality] = true
}

c.id[b.id] = true
}

// passes takes a *broker and returns
// whether or not it passes constraints.
func (c *constraints) passes(b *broker) bool {
switch {
// Fail if the candidate is one of the
// IDs already in the replica set.
case c.id[b.id]:
return false
// Fail if the candidate is in any of
// the existing replica set localities.
case c.locality[b.locality]:
return false
}
return true
}

// mergeConstraints takes a brokerlist and
// builds a *constraints by merging the
// attributes of all brokers from the supplied list.
func mergeConstraints(bl brokerList) *constraints {
c := newConstraints()

for _, b := range bl {
// Don't merge in attributes
// from nodes that will be removed.
if b.replace {
continue
}

if b.locality != "" {
c.locality[b.locality] = true
}

c.id[b.id] = true
}

return c
}

// update takes a brokerMap and a []int
// of broker IDs and adds them to the brokerMap,
// returning the count of marked for replacement,
// newly included, and brokers that weren't found
// in ZooKeeper.
func (b brokerMap) update(bl []int, bm brokerMetaMap) *brokerStatus {
bs := &brokerStatus{}

// Build a map from the new broker list.
newBrokers := map[int]bool{}
for _, broker := range bl {
newBrokers[broker] = true
}

// Do an initial pass on existing brokers
// and see if any are missing in ZooKeeper.
if len(bm) > 0 {
for id := range b {
// Skip reserved ID 0.
if id == 0 {
continue
}

if _, exist := bm[id]; !exist {
fmt.Printf("%sPrevious broker %d missing\n",
indent, id)
b[id].replace = true
// If this broker is missing and was provided in
// the broker list, consider it a "missing provided broker".
if _, ok := newBrokers[id]; len(bm) > 0 && ok {
bs.missing++
} else {
bs.oldMissing++
}
}
}
}

// Set the replace flag for existing brokers
// not in the new broker map.
for _, broker := range b {
// Broker ID 0 is a special stub
// ID used for internal purposes.
// Skip it.
if broker.id == 0 {
continue
}

if _, ok := newBrokers[broker.id]; !ok {
bs.replace++
b[broker.id].replace = true
fmt.Printf("%sBroker %d marked for removal\n",
indent, broker.id)
}
}

// Merge new brokers with existing brokers.
for id := range newBrokers {
// Don't overwrite existing (which will be most brokers).
if b[id] == nil {
// Skip metadata lookups if
// meta is not being used.
if len(bm) == 0 {
b[id] = &broker{
used: 0,
id: id,
replace: false,
}
bs.new++
continue
}

// Else check the broker against
// the broker metadata map.
if meta, exists := bm[id]; exists {
b[id] = &broker{
used: 0,
id: id,
replace: false,
locality: meta.Rack,
}
bs.new++
} else {
bs.missing++
fmt.Printf("%sBroker %d not found in ZooKeeper\n",
indent, id)
}
}
}

return bs
}

// filteredList converts a brokerMap to a brokerList,
// excluding nodes marked for replacement.
func (b brokerMap) filteredList() brokerList {
bl := brokerList{}

for broker := range b {
if !b[broker].replace {
bl = append(bl, b[broker])
}
}

return bl
}

// brokerMapFromTopicMap creates a brokerMap
// from a topicMap. Counts occurance is counted.
// TODO can we remove marked for replacement here too?
func brokerMapFromTopicMap(pm *partitionMap, bm brokerMetaMap, force bool) brokerMap {
bmap := brokerMap{}
// For each partition.
for _, partition := range pm.Partitions {
// For each broker in the
// partition replica set.
for _, id := range partition.Replicas {
// If the broker isn't in the
// broker map, add it.
if bmap[id] == nil {
// If we're doing a force rebuid, replace
// should be set to true.
bmap[id] = &broker{used: 0, id: id, replace: false}
}

// Track use scoring unless we're
// doing a force rebuild. In this case,
// we're treating existing brokers the same
// as new brokers (which start with a score of 0).
if !force {
bmap[id].used++
}

// Add metadata if we have it.
if meta, exists := bm[id]; exists {
bmap[id].locality = meta.Rack
}
}
}

// Broker ID 0 is used for --force-rebuild.
// We request a stripped map which replaces
// all existing brokers with the fake broker
// with ID set for replacement.
bmap[0] = &broker{used: 0, id: 0, replace: true}

return bmap
}

// brokerStringToSlice takes a broker list
// as a string and returns a []int of
// broker IDs.
func brokerStringToSlice(s string) []int {
ids := map[int]bool{}
var info int

parts := strings.Split(s, ",")
is := []int{}

// Iterate and convert
// each broker ID.
for _, p := range parts {
i, err := strconv.Atoi(strings.TrimSpace(p))
// Err and exit on bad input.
if err != nil {
fmt.Println(err)
os.Exit(1)
}

if ids[i] {
fmt.Printf("ID %d supplied as duplicate, excluding\n", i)
info++
continue
}

ids[i] = true
is = append(is, i)
}

// Formatting purposes.
if info > 0 {
fmt.Println()
}

return is
}
47 changes: 47 additions & 0 deletions brokers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"testing"
)

func TestBrokerMapFromTopicMap(t *testing.T) {
zk := &zkmock{}
bm, _ := zk.getAllBrokerMeta()
pm, _ := partitionMapFromString(testGetMapString("test_topic"))
forceRebuild := false

brokers := brokerMapFromTopicMap(pm, bm, forceRebuild)

expected := brokerMap{
0: &broker{id: 0, replace: true},
1001: &broker{id: 1001, locality: "a", used: 3, replace: false},
1002: &broker{id: 1002, locality: "b", used: 3, replace: false},
1003: &broker{id: 1003, locality: "c", used: 2, replace: false},
1004: &broker{id: 1004, locality: "a", used: 2, replace: false},
}

for id, b := range brokers {
switch {
case b.id != expected[id].id:
t.Errorf("Expected id %d, got %d for broker %d",
expected[id].id, b.id, id)
case b.locality != expected[id].locality:
t.Errorf("Expected locality %s, got %s for broker %d",
expected[id].locality, b.locality, id)
case b.used != expected[id].used:
t.Errorf("Expected used %d, got %d for broker %d",
expected[id].used, b.used, id)
case b.replace != expected[id].replace:
t.Errorf("Expected replace %b, got %b for broker %d",
expected[id].replace, b.replace, id)
}
}
}

// func TestBestCandidate(t *testing.T) {}
// func TestConstraintsAdd(t *testing.T) {}
// func TestConstraintsPasses(t *testing.T) {}
// func TestMergeConstraints(t *testing.T) {}
// func TestUpdate(t *testing.T) {}
// func TestFilteredList(t *testing.T) {}
// func TestBrokerStringToSlice(t *testing.T) {}
Loading

0 comments on commit 8e256b9

Please sign in to comment.