diff --git a/brokers.go b/brokers.go new file mode 100644 index 0000000..ca5f367 --- /dev/null +++ b/brokers.go @@ -0,0 +1,312 @@ +package main + +import ( + "fmt" + "os" + "sort" + "strconv" + "strings" +) + +type brokerUseStats struct { + leader int + follower int +} + +type brokerStatus struct { + new int + missing int + oldMissing int + replace int +} + +// broker is used for internal +// metadata / accounting. +type broker struct { + id int + locality string + used int + replace bool +} + +// brokerMap holds a mapping of +// broker IDs to *broker. +type brokerMap map[int]*broker + +// brokerList is a slice of +// brokers for sorting by used count. +type brokerList []*broker + +// Satisfy the sort interface for brokerList. + +func (b brokerList) Len() int { return len(b) } +func (b brokerList) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b brokerList) Less(i, j int) bool { + if b[i].used < b[j].used { + return true + } + if b[i].used > b[j].used { + return false + } + + return b[i].id < b[j].id +} + +// bestCandidate takes a *constraints +// and returns the *broker with the lowest used +// count that satisfies all constraints. +func (b brokerList) bestCandidate(c *constraints) (*broker, error) { + sort.Sort(b) + + var candidate *broker + + // Iterate over candidates. + for _, candidate = range b { + // Candidate passes, return. + if c.passes(candidate) { + c.add(candidate) + candidate.used++ + + return candidate, nil + } + } + + // List exhausted, no brokers passed. + return nil, errNoBrokers +} + +// add takes a *broker and adds its +// attributes to the *constraints. +func (c *constraints) add(b *broker) { + if b.locality != "" { + c.locality[b.locality] = true + } + + c.id[b.id] = true +} + +// passes takes a *broker and returns +// whether or not it passes constraints. +func (c *constraints) passes(b *broker) bool { + switch { + // Fail if the candidate is one of the + // IDs already in the replica set. + case c.id[b.id]: + return false + // Fail if the candidate is in any of + // the existing replica set localities. + case c.locality[b.locality]: + return false + } + return true +} + +// mergeConstraints takes a brokerlist and +// builds a *constraints by merging the +// attributes of all brokers from the supplied list. +func mergeConstraints(bl brokerList) *constraints { + c := newConstraints() + + for _, b := range bl { + // Don't merge in attributes + // from nodes that will be removed. + if b.replace { + continue + } + + if b.locality != "" { + c.locality[b.locality] = true + } + + c.id[b.id] = true + } + + return c +} + +// update takes a brokerMap and a []int +// of broker IDs and adds them to the brokerMap, +// returning the count of marked for replacement, +// newly included, and brokers that weren't found +// in ZooKeeper. +func (b brokerMap) update(bl []int, bm brokerMetaMap) *brokerStatus { + bs := &brokerStatus{} + + // Build a map from the new broker list. + newBrokers := map[int]bool{} + for _, broker := range bl { + newBrokers[broker] = true + } + + // Do an initial pass on existing brokers + // and see if any are missing in ZooKeeper. + if len(bm) > 0 { + for id := range b { + // Skip reserved ID 0. + if id == 0 { + continue + } + + if _, exist := bm[id]; !exist { + fmt.Printf("%sPrevious broker %d missing\n", + indent, id) + b[id].replace = true + // If this broker is missing and was provided in + // the broker list, consider it a "missing provided broker". + if _, ok := newBrokers[id]; len(bm) > 0 && ok { + bs.missing++ + } else { + bs.oldMissing++ + } + } + } + } + + // Set the replace flag for existing brokers + // not in the new broker map. + for _, broker := range b { + // Broker ID 0 is a special stub + // ID used for internal purposes. + // Skip it. + if broker.id == 0 { + continue + } + + if _, ok := newBrokers[broker.id]; !ok { + bs.replace++ + b[broker.id].replace = true + fmt.Printf("%sBroker %d marked for removal\n", + indent, broker.id) + } + } + + // Merge new brokers with existing brokers. + for id := range newBrokers { + // Don't overwrite existing (which will be most brokers). + if b[id] == nil { + // Skip metadata lookups if + // meta is not being used. + if len(bm) == 0 { + b[id] = &broker{ + used: 0, + id: id, + replace: false, + } + bs.new++ + continue + } + + // Else check the broker against + // the broker metadata map. + if meta, exists := bm[id]; exists { + b[id] = &broker{ + used: 0, + id: id, + replace: false, + locality: meta.Rack, + } + bs.new++ + } else { + bs.missing++ + fmt.Printf("%sBroker %d not found in ZooKeeper\n", + indent, id) + } + } + } + + return bs +} + +// filteredList converts a brokerMap to a brokerList, +// excluding nodes marked for replacement. +func (b brokerMap) filteredList() brokerList { + bl := brokerList{} + + for broker := range b { + if !b[broker].replace { + bl = append(bl, b[broker]) + } + } + + return bl +} + +// brokerMapFromTopicMap creates a brokerMap +// from a topicMap. Counts occurance is counted. +// TODO can we remove marked for replacement here too? +func brokerMapFromTopicMap(pm *partitionMap, bm brokerMetaMap, force bool) brokerMap { + bmap := brokerMap{} + // For each partition. + for _, partition := range pm.Partitions { + // For each broker in the + // partition replica set. + for _, id := range partition.Replicas { + // If the broker isn't in the + // broker map, add it. + if bmap[id] == nil { + // If we're doing a force rebuid, replace + // should be set to true. + bmap[id] = &broker{used: 0, id: id, replace: false} + } + + // Track use scoring unless we're + // doing a force rebuild. In this case, + // we're treating existing brokers the same + // as new brokers (which start with a score of 0). + if !force { + bmap[id].used++ + } + + // Add metadata if we have it. + if meta, exists := bm[id]; exists { + bmap[id].locality = meta.Rack + } + } + } + + // Broker ID 0 is used for --force-rebuild. + // We request a stripped map which replaces + // all existing brokers with the fake broker + // with ID set for replacement. + bmap[0] = &broker{used: 0, id: 0, replace: true} + + return bmap +} + +// brokerStringToSlice takes a broker list +// as a string and returns a []int of +// broker IDs. +func brokerStringToSlice(s string) []int { + ids := map[int]bool{} + var info int + + parts := strings.Split(s, ",") + is := []int{} + + // Iterate and convert + // each broker ID. + for _, p := range parts { + i, err := strconv.Atoi(strings.TrimSpace(p)) + // Err and exit on bad input. + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + if ids[i] { + fmt.Printf("ID %d supplied as duplicate, excluding\n", i) + info++ + continue + } + + ids[i] = true + is = append(is, i) + } + + // Formatting purposes. + if info > 0 { + fmt.Println() + } + + return is +} diff --git a/brokers_test.go b/brokers_test.go new file mode 100644 index 0000000..86402bd --- /dev/null +++ b/brokers_test.go @@ -0,0 +1,47 @@ +package main + +import ( + "testing" +) + +func TestBrokerMapFromTopicMap(t *testing.T) { + zk := &zkmock{} + bm, _ := zk.getAllBrokerMeta() + pm, _ := partitionMapFromString(testGetMapString("test_topic")) + forceRebuild := false + + brokers := brokerMapFromTopicMap(pm, bm, forceRebuild) + + expected := brokerMap{ + 0: &broker{id: 0, replace: true}, + 1001: &broker{id: 1001, locality: "a", used: 3, replace: false}, + 1002: &broker{id: 1002, locality: "b", used: 3, replace: false}, + 1003: &broker{id: 1003, locality: "c", used: 2, replace: false}, + 1004: &broker{id: 1004, locality: "a", used: 2, replace: false}, + } + + for id, b := range brokers { + switch { + case b.id != expected[id].id: + t.Errorf("Expected id %d, got %d for broker %d", + expected[id].id, b.id, id) + case b.locality != expected[id].locality: + t.Errorf("Expected locality %s, got %s for broker %d", + expected[id].locality, b.locality, id) + case b.used != expected[id].used: + t.Errorf("Expected used %d, got %d for broker %d", + expected[id].used, b.used, id) + case b.replace != expected[id].replace: + t.Errorf("Expected replace %b, got %b for broker %d", + expected[id].replace, b.replace, id) + } + } +} + +// func TestBestCandidate(t *testing.T) {} +// func TestConstraintsAdd(t *testing.T) {} +// func TestConstraintsPasses(t *testing.T) {} +// func TestMergeConstraints(t *testing.T) {} +// func TestUpdate(t *testing.T) {} +// func TestFilteredList(t *testing.T) {} +// func TestBrokerStringToSlice(t *testing.T) {} diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..6ad08a7 --- /dev/null +++ b/helpers.go @@ -0,0 +1,135 @@ +package main + +import ( + "bytes" + "flag" + "math" + "os" + "sort" +) + +type constraints struct { + locality map[string]bool + id map[int]bool +} + +func newConstraints() *constraints { + return &constraints{ + locality: make(map[string]bool), + id: make(map[int]bool), + } +} + +// whatChanged takes a before and after broker +// replica set and returns a string describing +// what changed. +func whatChanged(s1 []int, s2 []int) string { + var changes []string + + a, b := make([]int, len(s1)), make([]int, len(s2)) + copy(a, s1) + copy(b, s2) + + var lchanged bool + var echanged bool + + // Check if the len is different. + switch { + case len(a) > len(b): + lchanged = true + changes = append(changes, "decreased replication") + case len(a) < len(b): + lchanged = true + changes = append(changes, "increased replication") + } + + // If the len is the same, + // check elements. + if !lchanged { + for i := range a { + if a[i] != b[i] { + echanged = true + } + } + } + + // Nothing changed. + if !lchanged && !echanged { + return "no-op" + } + + // Determine what else changed. + + // Get smaller replica set len between + // old vs new, then cap both to this len for + // comparison. + slen := int(math.Min(float64(len(a)), float64(len(b)))) + + a = a[:slen] + b = b[:slen] + + echanged = false + for i := range a { + if a[i] != b[i] { + echanged = true + } + } + + sort.Ints(a) + sort.Ints(b) + + samePostSort := true + for i := range a { + if a[i] != b[i] { + samePostSort = false + } + } + + // If the broker lists changed but + // are the same after sorting, + // we've just changed the preferred + // leader. + if echanged && samePostSort { + changes = append(changes, "preferred leader") + } + + // If the broker lists changed and + // aren't the same after sorting, we've + // replaced a broker. + if echanged && !samePostSort { + changes = append(changes, "replaced broker") + } + + // Construct change string. + var buf bytes.Buffer + for i, c := range changes { + buf.WriteString(c) + if i < len(changes)-1 { + buf.WriteString(", ") + } + } + + return buf.String() +} + +// containsRegex takes a topic name +// reference and returns whether or not +// it should be interpreted as regex. +func containsRegex(t string) bool { + // Check each character of the + // topic name. If it doesn't contain + // a legal Kafka topic name character, we're + // going to assume it's regex. + for _, c := range t { + if !topicNormalChar.MatchString(string(c)) { + return true + } + } + + return false +} + +func defaultsAndExit() { + flag.PrintDefaults() + os.Exit(1) +} diff --git a/helpers_test.go b/helpers_test.go new file mode 100644 index 0000000..9668ccf --- /dev/null +++ b/helpers_test.go @@ -0,0 +1,4 @@ +package main + +// func TestWhatChanged(t *Testing.t) {} +// func TestContainsRegex(t *Testing.t) {} diff --git a/main.go b/main.go index dff7e4b..902d4ed 100644 --- a/main.go +++ b/main.go @@ -1,18 +1,14 @@ package main import ( - "bytes" - "encoding/json" "errors" "flag" "fmt" "io/ioutil" "log" - "math" "os" "regexp" "sort" - "strconv" "strings" ) @@ -21,6 +17,8 @@ const ( ) var ( + // Config holds configuration + // parameters. Config struct { rebuildMap string rebuildTopics []*regexp.Regexp @@ -35,110 +33,21 @@ var ( replication int } - zkc = &zkConfig{} // Characters allowed in Kafka topic names topicNormalChar, _ = regexp.Compile(`[a-zA-Z0-9_\\-]`) errNoBrokers = errors.New("No additional brokers that meet constraints") ) -// TODO make references to topic map vs -// broker map consistent, e.g. types vs -// func names. - -// Partition maps the partition objects -// in the Kafka topic mapping syntax. -type Partition struct { - Topic string `json:"topic"` - Partition int `json:"partition"` - Replicas []int `json:"replicas"` -} - -type partitionList []Partition - -// partitionMap maps the -// Kafka topic mapping syntax. -type partitionMap struct { - Version int `json:"version"` - Partitions partitionList `json:"partitions"` -} - -// Satisfy the sort interface for partitionList. - -func (p partitionList) Len() int { return len(p) } -func (p partitionList) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -func (p partitionList) Less(i, j int) bool { - if p[i].Topic < p[j].Topic { - return true - } - if p[i].Topic > p[j].Topic { - return false - } - - return p[i].Partition < p[j].Partition -} - -func newPartitionMap() *partitionMap { - return &partitionMap{Version: 1} -} - -type brokerUseStats struct { - leader int - follower int -} - -type brokerStatus struct { - new int - missing int - oldMissing int - replace int -} - -// broker is used for internal -// metadata / accounting. -type broker struct { - id int - locality string - used int - replace bool -} - -// brokerMap holds a mapping of -// broker IDs to *broker. -type brokerMap map[int]*broker - -// brokerList is a slice of -// brokers for sorting by used count. -type brokerList []*broker - -// Satisfy the sort interface for brokerList. - -func (b brokerList) Len() int { return len(b) } -func (b brokerList) Swap(i, j int) { b[i], b[j] = b[j], b[i] } -func (b brokerList) Less(i, j int) bool { - if b[i].used < b[j].used { - return true - } - if b[i].used > b[j].used { - return false - } - - return b[i].id < b[j].id -} - -type constraints struct { - locality map[string]bool - id map[int]bool -} - -func newConstraints() *constraints { - return &constraints{ - locality: make(map[string]bool), - id: make(map[int]bool), +func init() { + // Skip init in tests to avoid + // errors as a result of the + // sanity checks that follow + // flag parsing. + if flag.Lookup("test.v") != nil { + return } -} -func init() { log.SetOutput(ioutil.Discard) fmt.Println() @@ -194,40 +103,20 @@ func init() { } } -func containsRegex(t string) bool { - // Check each character of the - // topic name. If it doesn't contain - // a legal Kafka topic name character, we're - // going to assume it's regex. - for _, c := range t { - if !topicNormalChar.MatchString(string(c)) { - return true - } - } - - return false -} - -func defaultsAndExit() { - flag.PrintDefaults() - os.Exit(1) -} - func main() { // ZooKeeper init. + var zk *zk if Config.useMeta || len(Config.rebuildTopics) > 0 { - // ZooKeeper config params. - zkc = &zkConfig{ - ConnectString: Config.zkAddr, - Prefix: Config.zkPrefix} + var err error + zk, err = newZK(&zkConfig{ + connect: Config.zkAddr, + prefix: Config.zkPrefix, + }) - // Init the ZK client. - err := initZK(zkc) if err != nil { fmt.Printf("Error connecting to ZooKeeper: %s\n", err) os.Exit(1) } - } // General flow: @@ -245,65 +134,41 @@ func main() { var brokerMetadata brokerMetaMap if Config.useMeta { var err error - brokerMetadata, err = getAllBrokerMeta(zkc) + brokerMetadata, err = zk.getAllBrokerMeta() if err != nil { - fmt.Printf("Error fetching metadata: %s\n", err) + fmt.Printf("Error fetching broker metadata: %s\n", err) os.Exit(1) } } - partitionMapIn := newPartitionMap() - // Build a topic map with either // explicit input or by fetching the // map data from ZooKeeper. + partitionMapIn := newPartitionMap() switch { case Config.rebuildMap != "": - err := json.Unmarshal([]byte(Config.rebuildMap), &partitionMapIn) + pm, err := partitionMapFromString(Config.rebuildMap) if err != nil { - fmt.Printf("Error parsing topic map: %s\n", err) + fmt.Println(err) os.Exit(1) } + partitionMapIn = pm case len(Config.rebuildTopics) > 0: - // Get a list of topic names from ZK - // matching the provided list. - topicsToRebuild, err := getTopics(zkc, Config.rebuildTopics) + pm, err := partitionMapFromZK(Config.rebuildTopics, zk) if err != nil { fmt.Println(err) os.Exit(1) } + partitionMapIn = pm + } - // Log and exit if no matching topics were found. - if len(topicsToRebuild) == 0 { - fmt.Printf("No topics found matching: ") - for n, t := range Config.rebuildTopics { - fmt.Printf("/%s/", t) - if n < len(Config.rebuildTopics)-1 { - fmt.Printf(", ") - } - } - fmt.Println() - os.Exit(1) - } - - // Get current reassign_partitions. - reassignments := getReassignments(zkc) - - // Get a partition map for each topic. - pmapMerged := newPartitionMap() - for _, t := range topicsToRebuild { - pmap, err := partitionMapFromZk(zkc, t, reassignments) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - // Merge multiple maps. - pmapMerged.Partitions = append(pmapMerged.Partitions, pmap.Partitions...) - } + // Order from ZooKeeper can be + // random. + sort.Sort(partitionMapIn.Partitions) - partitionMapIn = pmapMerged - } + // Store a copy of the + // original map. + originalMap := partitionMapIn.copy() // Get a list of affected topics. topics := map[string]bool{} @@ -348,8 +213,6 @@ func main() { fmt.Printf("%sno-op\n", indent) } - originalMap := partitionMapIn.copy() - // If the replication factor is changed, // the partition map input needs to have stub // brokers appended (for r factor increase) or @@ -462,550 +325,3 @@ func main() { } } } - -// useStats returns a map of broker IDs -// to brokerUseStats; each contains a count -// of leader and follower partition assignments. -func (pm partitionMap) useStats() map[int]*brokerUseStats { - stats := map[int]*brokerUseStats{} - // Get counts. - for _, p := range pm.Partitions { - for i, b := range p.Replicas { - if _, exists := stats[b]; !exists { - stats[b] = &brokerUseStats{} - } - // Idx 0 for each replica set - // is a leader assignment. - if i == 0 { - stats[b].leader++ - } else { - stats[b].follower++ - } - } - } - - return stats -} - -// rebuild takes a brokerMap and traverses -// the partition map, replacing brokers marked removal -// with the best available candidate. -func (pm partitionMap) rebuild(bm brokerMap) (*partitionMap, []string) { - sort.Sort(pm.Partitions) - - newMap := newPartitionMap() - // We need a filtered list for - // usage sorting and exclusion - // of nodes marked for removal. - bl := bm.filteredList() - - var errs []string - - pass := 0 - // For each partition partn in the - // partitions list: -start: - skipped := 0 - for n, partn := range pm.Partitions { - // If this is the first pass, create - // the new partition. - if pass == 0 { - newP := Partition{Partition: partn.Partition, Topic: partn.Topic} - newMap.Partitions = append(newMap.Partitions, newP) - } - - // Build a brokerList from the - // IDs in the old replica set to - // get a *constraints. - replicaSet := brokerList{} - for _, bid := range partn.Replicas { - replicaSet = append(replicaSet, bm[bid]) - } - // Add existing brokers in the - // new replica set as well. - for _, bid := range newMap.Partitions[n].Replicas { - replicaSet = append(replicaSet, bm[bid]) - } - - constraints := mergeConstraints(replicaSet) - - // The number of needed passes may vary; - // e.g. if most replica sets have a len - // of 2 and a few with a len of 3, we have - // to do 3 passes while skipping some - // on final passes. - if pass > len(partn.Replicas)-1 { - skipped++ - continue - } - - // Get the broker ID we're - // either going to move into - // the new map or replace. - bid := partn.Replicas[pass] - - // If the broker ID is marked as replace - // in the broker map, get a new ID. - if bm[bid].replace { - // Fetch the best candidate and append. - newBroker, err := bl.bestCandidate(constraints) - if err != nil { - // Append any caught errors. - errString := fmt.Sprintf("%s p%d: %s", partn.Topic, partn.Partition, err.Error()) - errs = append(errs, errString) - continue - } - - newMap.Partitions[n].Replicas = append(newMap.Partitions[n].Replicas, newBroker.id) - } else { - // Otherwise keep the broker where it is. - newMap.Partitions[n].Replicas = append(newMap.Partitions[n].Replicas, bid) - } - - } - - pass++ - // Check if we need more passes. - // If we've just counted as many skips - // as there are partitions to handle, - // we have nothing left to do. - if skipped < len(pm.Partitions) { - goto start - } - - return newMap, errs -} - -// bestCandidate takes a *constraints -// and returns the *broker with the lowest used -// count that satisfies all constraints. -func (b brokerList) bestCandidate(c *constraints) (*broker, error) { - sort.Sort(b) - - var candidate *broker - - // Iterate over candidates. - for _, candidate = range b { - // Candidate passes, return. - if c.passes(candidate) { - c.add(candidate) - candidate.used++ - - return candidate, nil - } - } - - // List exhausted, no brokers passed. - return nil, errNoBrokers -} - -// add takes a *broker and adds its -// attributes to the *constraints. -func (c *constraints) add(b *broker) { - if b.locality != "" { - c.locality[b.locality] = true - } - - c.id[b.id] = true -} - -// passes takes a *broker and returns -// whether or not it passes constraints. -func (c *constraints) passes(b *broker) bool { - switch { - // Fail if the candidate is one of the - // IDs already in the replica set. - case c.id[b.id]: - return false - // Fail if the candidate is in any of - // the existing replica set localities. - case c.locality[b.locality]: - return false - } - return true -} - -// setReplication ensures that replica sets -// is reset to the replication factor r. Sets -// exceeding r are truncated, sets below r -// are extended with stub brokers. -func (pm partitionMap) setReplication(r int) { - for n, p := range pm.Partitions { - l := len(p.Replicas) - - switch { - // Truncate replicas beyond r. - case l > r: - pm.Partitions[n].Replicas = p.Replicas[:r] - // Add stub brokers to meet r. - case l < r: - r := make([]int, r-l) - pm.Partitions[n].Replicas = append(p.Replicas, r...) - } - } -} - -func (pm partitionMap) copy() *partitionMap { - cpy := newPartitionMap() - - for _, p := range pm.Partitions { - part := Partition{ - Topic: p.Topic, - Partition: p.Partition, - Replicas: make([]int, len(p.Replicas)), - } - - copy(part.Replicas, p.Replicas) - cpy.Partitions = append(cpy.Partitions, part) - } - - return cpy -} - -// strip takes a partitionMap and returns a -// copy where all broker ID references are replaced -// with the stub broker with ID 0 where the replace -// field is set to true. This ensures that the -// entire map is rebuilt, even if the provided broker -// list matches what's already in the map. -func (pm partitionMap) strip() *partitionMap { - stripped := newPartitionMap() - - // Copy each partition sans the replicas list. - // The make([]int, ...) defaults the replica set to - // ID 0, which is a default stub broker with replace - // set to true. - for _, p := range pm.Partitions { - part := Partition{ - Topic: p.Topic, - Partition: p.Partition, - Replicas: make([]int, len(p.Replicas)), - } - - stripped.Partitions = append(stripped.Partitions, part) - } - - return stripped -} - -// mergeConstraints takes a brokerlist and -// builds a *constraints by merging the -// attributes of all brokers from the supplied list. -func mergeConstraints(bl brokerList) *constraints { - c := newConstraints() - - for _, b := range bl { - // Don't merge in attributes - // from nodes that will be removed. - if b.replace { - continue - } - - if b.locality != "" { - c.locality[b.locality] = true - } - - c.id[b.id] = true - } - - return c -} - -// update takes a brokerMap and a []int -// of broker IDs and adds them to the brokerMap, -// returning the count of marked for replacement, -// newly included, and brokers that weren't found -// in ZooKeeper. -func (b brokerMap) update(bl []int, bm brokerMetaMap) *brokerStatus { - bs := &brokerStatus{} - - // Build a map from the new broker list. - newBrokers := map[int]bool{} - for _, broker := range bl { - newBrokers[broker] = true - } - - // Do an initial pass on existing brokers - // and see if any are missing in ZooKeeper. - if len(bm) > 0 { - for id := range b { - // Skip reserved ID 0. - if id == 0 { - continue - } - - if _, exist := bm[id]; !exist { - fmt.Printf("%sPrevious broker %d missing\n", - indent, id) - b[id].replace = true - // If this broker is missing and was provided in - // the broker list, consider it a "missing provided broker". - if _, ok := newBrokers[id]; len(bm) > 0 && ok { - bs.missing++ - } else { - bs.oldMissing++ - } - } - } - } - - // Set the replace flag for existing brokers - // not in the new broker map. - for _, broker := range b { - // Broker ID 0 is a special stub - // ID used for internal purposes. - // Skip it. - if broker.id == 0 { - continue - } - - if _, ok := newBrokers[broker.id]; !ok { - bs.replace++ - b[broker.id].replace = true - fmt.Printf("%sBroker %d marked for removal\n", - indent, broker.id) - } - } - - // Merge new brokers with existing brokers. - for id := range newBrokers { - // Don't overwrite existing (which will be most brokers). - if b[id] == nil { - // Skip metadata lookups if - // meta is not being used. - if len(bm) == 0 { - b[id] = &broker{ - used: 0, - id: id, - replace: false, - } - bs.new++ - continue - } - - // Else check the broker against - // the broker metadata map. - if meta, exists := bm[id]; exists { - b[id] = &broker{ - used: 0, - id: id, - replace: false, - locality: meta.Rack, - } - bs.new++ - } else { - bs.missing++ - fmt.Printf("%sBroker %d not found in ZooKeeper\n", - indent, id) - } - } - } - - return bs -} - -// filteredList converts a brokerMap to a brokerList, -// excluding nodes marked for replacement. -func (b brokerMap) filteredList() brokerList { - bl := brokerList{} - - for broker := range b { - if !b[broker].replace { - bl = append(bl, b[broker]) - } - } - - return bl -} - -// brokerMapFromTopicMap creates a brokerMap -// from a topicMap. Counts occurance is counted. -// TODO can we remove marked for replacement here too? -func brokerMapFromTopicMap(pm *partitionMap, bm brokerMetaMap, force bool) brokerMap { - bmap := brokerMap{} - // For each partition. - for _, partition := range pm.Partitions { - // For each broker in the - // partition replica set. - for _, id := range partition.Replicas { - // If the broker isn't in the - // broker map, add it. - if bmap[id] == nil { - // If we're doing a force rebuid, replace - // should be set to true. - bmap[id] = &broker{used: 0, id: id, replace: false} - } - - // Track use scoring unless we're - // doing a force rebuild. In this case, - // we're treating existing brokers the same - // as new brokers (which start with a score of 0). - if !force { - bmap[id].used++ - } - - // Add metadata if we have it. - if meta, exists := bm[id]; exists { - bmap[id].locality = meta.Rack - } - } - } - - // Broker ID 0 is used for --force-rebuild. - // We request a stripped map which replaces - // all existing brokers with the fake broker - // with ID set for replacement. - bmap[0] = &broker{used: 0, id: 0, replace: true} - - return bmap -} - -// whatChanged takes a before and after broker -// replica set and returns a string describing -// what changed. -func whatChanged(s1 []int, s2 []int) string { - var changes []string - - a, b := make([]int, len(s1)), make([]int, len(s2)) - copy(a, s1) - copy(b, s2) - - var lchanged bool - var echanged bool - - // Check if the len is different. - switch { - case len(a) > len(b): - lchanged = true - changes = append(changes, "decreased replication") - case len(a) < len(b): - lchanged = true - changes = append(changes, "increased replication") - } - - // If the len is the same, - // check elements. - if !lchanged { - for i := range a { - if a[i] != b[i] { - echanged = true - } - } - } - - // Nothing changed. - if !lchanged && !echanged { - return "no-op" - } - - // Determine what else changed. - - // Get smaller replica set len between - // old vs new, then cap both to this len for - // comparison. - slen := int(math.Min(float64(len(a)), float64(len(b)))) - - a = a[:slen] - b = b[:slen] - - echanged = false - for i := range a { - if a[i] != b[i] { - echanged = true - } - } - - sort.Ints(a) - sort.Ints(b) - - samePostSort := true - for i := range a { - if a[i] != b[i] { - samePostSort = false - } - } - - // If the broker lists changed but - // are the same after sorting, - // we've just changed the preferred - // leader. - if echanged && samePostSort { - changes = append(changes, "preferred leader") - } - - // If the broker lists changed and - // aren't the same after sorting, we've - // replaced a broker. - if echanged && !samePostSort { - changes = append(changes, "replaced broker") - } - - // Construct change string. - var buf bytes.Buffer - for i, c := range changes { - buf.WriteString(c) - if i < len(changes)-1 { - buf.WriteString(", ") - } - } - - return buf.String() -} - -// brokerStringToSlice takes a broker list -// as a string and returns a []int of -// broker IDs. -func brokerStringToSlice(s string) []int { - ids := map[int]bool{} - var info int - - parts := strings.Split(s, ",") - is := []int{} - - // Iterate and convert - // each broker ID. - for _, p := range parts { - i, err := strconv.Atoi(strings.TrimSpace(p)) - // Err and exit on bad input. - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - if ids[i] { - fmt.Printf("ID %d supplied as duplicate, excluding\n", i) - info++ - continue - } - - ids[i] = true - is = append(is, i) - } - - // Formatting purposes. - if info > 0 { - fmt.Println() - } - - return is -} - -// writeMap takes a *partitionMap and writes a JSON -// text file to the provided path. -func writeMap(pm *partitionMap, path string) error { - // Marshal. - out, err := json.Marshal(pm) - if err != nil { - return err - } - - mapOut := string(out) - - // Write file. - err = ioutil.WriteFile(path+".json", []byte(mapOut+"\n"), 0644) - if err != nil { - return err - } - - return nil -} diff --git a/partitions.go b/partitions.go new file mode 100644 index 0000000..e3c306e --- /dev/null +++ b/partitions.go @@ -0,0 +1,338 @@ +package main + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "regexp" + "sort" +) + +// Partition maps the partition objects +// in the Kafka topic mapping syntax. +type Partition struct { + Topic string `json:"topic"` + Partition int `json:"partition"` + Replicas []int `json:"replicas"` +} + +type partitionList []Partition + +// partitionMap maps the +// Kafka topic mapping syntax. +type partitionMap struct { + Version int `json:"version"` + Partitions partitionList `json:"partitions"` +} + +// Satisfy the sort interface for partitionList. + +func (p partitionList) Len() int { return len(p) } +func (p partitionList) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p partitionList) Less(i, j int) bool { + if p[i].Topic < p[j].Topic { + return true + } + if p[i].Topic > p[j].Topic { + return false + } + + return p[i].Partition < p[j].Partition +} + +func newPartitionMap() *partitionMap { + return &partitionMap{Version: 1} +} + +// Rebuild takes a brokerMap and traverses +// the partition map, replacing brokers marked removal +// with the best available candidate. +func (pm *partitionMap) rebuild(bm brokerMap) (*partitionMap, []string) { + sort.Sort(pm.Partitions) + + newMap := newPartitionMap() + // We need a filtered list for + // usage sorting and exclusion + // of nodes marked for removal. + bl := bm.filteredList() + + var errs []string + + pass := 0 + // For each partition partn in the + // partitions list: +pass: + skipped := 0 + for n, partn := range pm.Partitions { + // If this is the first pass, create + // the new partition. + if pass == 0 { + newP := Partition{Partition: partn.Partition, Topic: partn.Topic} + newMap.Partitions = append(newMap.Partitions, newP) + } + + // Build a brokerList from the + // IDs in the old replica set to + // get a *constraints. + replicaSet := brokerList{} + for _, bid := range partn.Replicas { + replicaSet = append(replicaSet, bm[bid]) + } + // Add existing brokers in the + // new replica set as well. + for _, bid := range newMap.Partitions[n].Replicas { + replicaSet = append(replicaSet, bm[bid]) + } + + constraints := mergeConstraints(replicaSet) + + // The number of needed passes may vary; + // e.g. if most replica sets have a len + // of 2 and a few with a len of 3, we have + // to do 3 passes while skipping some + // on final passes. + if pass > len(partn.Replicas)-1 { + skipped++ + continue + } + + // Get the broker ID we're + // either going to move into + // the new map or replace. + bid := partn.Replicas[pass] + + // If the broker ID is marked as replace + // in the broker map, get a new ID. + if bm[bid].replace { + // Fetch the best candidate and append. + newBroker, err := bl.bestCandidate(constraints) + if err != nil { + // Append any caught errors. + errString := fmt.Sprintf("%s p%d: %s", partn.Topic, partn.Partition, err.Error()) + errs = append(errs, errString) + continue + } + + newMap.Partitions[n].Replicas = append(newMap.Partitions[n].Replicas, newBroker.id) + } else { + // Otherwise keep the broker where it is. + newMap.Partitions[n].Replicas = append(newMap.Partitions[n].Replicas, bid) + } + + } + + pass++ + // Check if we need more passes. + // If we've just counted as many skips + // as there are partitions to handle, + // we have nothing left to do. + if skipped < len(pm.Partitions) { + goto pass + } + + return newMap, errs +} + +// partitionMapFromString takes a json encoded string +// and returns a *partitionMap. +func partitionMapFromString(s string) (*partitionMap, error) { + pm := newPartitionMap() + + err := json.Unmarshal([]byte(s), &pm) + if err != nil { + errString := fmt.Sprintf("Error parsing topic map: %s", err.Error()) + return nil, errors.New(errString) + } + + return pm, nil +} + +// partitionMapFromZK takes a slice of regexp +// and finds all matching topics for each. A +// merged *partitionMap of all matching topic +// maps is returned. +func partitionMapFromZK(t []*regexp.Regexp, zk zkhandler) (*partitionMap, error) { + // Get a list of topic names from ZK + // matching the provided list. + topicsToRebuild, err := zk.getTopics(t) + if err != nil { + return nil, err + } + + // Err if no matching topics were found. + if len(topicsToRebuild) == 0 { + var b bytes.Buffer + b.WriteString("No topics found matching: ") + for n, t := range Config.rebuildTopics { + b.WriteString(fmt.Sprintf("/%s/", t)) + if n < len(Config.rebuildTopics)-1 { + b.WriteString(", ") + } + } + + return nil, errors.New(b.String()) + } + + // Get a partition map for each topic. + pmapMerged := newPartitionMap() + for _, t := range topicsToRebuild { + pmap, err := zk.getPartitionMap(t) + if err != nil { + return nil, err + } + + // Merge multiple maps. + pmapMerged.Partitions = append(pmapMerged.Partitions, pmap.Partitions...) + } + + return pmapMerged, nil +} + +// setReplication ensures that replica sets +// is reset to the replication factor r. Sets +// exceeding r are truncated, sets below r +// are extended with stub brokers. +func (pm *partitionMap) setReplication(r int) { + // 0 is a no-op. + if r == 0 { + return + } + + for n, p := range pm.Partitions { + l := len(p.Replicas) + + switch { + // Truncate replicas beyond r. + case l > r: + pm.Partitions[n].Replicas = p.Replicas[:r] + // Add stub brokers to meet r. + case l < r: + r := make([]int, r-l) + pm.Partitions[n].Replicas = append(p.Replicas, r...) + } + } +} + +// copy returns a copy of a *partitionMap. +func (pm *partitionMap) copy() *partitionMap { + cpy := newPartitionMap() + + for _, p := range pm.Partitions { + part := Partition{ + Topic: p.Topic, + Partition: p.Partition, + Replicas: make([]int, len(p.Replicas)), + } + + copy(part.Replicas, p.Replicas) + cpy.Partitions = append(cpy.Partitions, part) + } + + return cpy +} + +// Equal checks the equality betwee two partition maps. +// Equality requires that the total order is exactly +// the same. +func (pm *partitionMap) equal(pm2 *partitionMap) (bool, error) { + // Crude checks. + switch { + case len(pm.Partitions) != len(pm2.Partitions): + return false, errors.New("partitions len") + case pm.Version != pm2.Version: + return false, errors.New("version") + } + + // Iterative comparison. + for i, p1 := range pm.Partitions { + p2 := pm2.Partitions[i] + switch { + case p1.Topic != p2.Topic: + return false, errors.New("topic order") + case p1.Partition != p2.Partition: + return false, errors.New("partition order") + case len(p1.Replicas) != len(p2.Replicas): + return false, errors.New("replica list") + } + // This is fine... + for n := range p1.Replicas { + if p1.Replicas[n] != p2.Replicas[n] { + return false, errors.New("replica") + } + } + } + + return true, nil +} + +// strip takes a partitionMap and returns a +// copy where all broker ID references are replaced +// with the stub broker with ID 0 where the replace +// field is set to true. This ensures that the +// entire map is rebuilt, even if the provided broker +// list matches what's already in the map. +func (pm *partitionMap) strip() *partitionMap { + stripped := newPartitionMap() + + // Copy each partition sans the replicas list. + // The make([]int, ...) defaults the replica set to + // ID 0, which is a default stub broker with replace + // set to true. + for _, p := range pm.Partitions { + part := Partition{ + Topic: p.Topic, + Partition: p.Partition, + Replicas: make([]int, len(p.Replicas)), + } + + stripped.Partitions = append(stripped.Partitions, part) + } + + return stripped +} + +// writeMap takes a *partitionMap and writes a JSON +// text file to the provided path. +func writeMap(pm *partitionMap, path string) error { + // Marshal. + out, err := json.Marshal(pm) + if err != nil { + return err + } + + mapOut := string(out) + + // Write file. + err = ioutil.WriteFile(path+".json", []byte(mapOut+"\n"), 0644) + if err != nil { + return err + } + + return nil +} + +// useStats returns a map of broker IDs +// to brokerUseStats; each contains a count +// of leader and follower partition assignments. +func (pm *partitionMap) useStats() map[int]*brokerUseStats { + stats := map[int]*brokerUseStats{} + // Get counts. + for _, p := range pm.Partitions { + for i, b := range p.Replicas { + if _, exists := stats[b]; !exists { + stats[b] = &brokerUseStats{} + } + // Idx 0 for each replica set + // is a leader assignment. + if i == 0 { + stats[b].leader++ + } else { + stats[b].follower++ + } + } + } + + return stats +} diff --git a/partitions_test.go b/partitions_test.go new file mode 100644 index 0000000..cb9a78f --- /dev/null +++ b/partitions_test.go @@ -0,0 +1,222 @@ +package main + +import ( + "fmt" + "regexp" + "sort" + "testing" +) + +func testGetMapString(n string) string { + return fmt.Sprintf(`{"version":1,"partitions":[ + {"topic":"%s","partition":0,"replicas":[1001,1002]}, + {"topic":"%s","partition":1,"replicas":[1002,1001]}, + {"topic":"%s","partition":2,"replicas":[1003,1004,1001]}, + {"topic":"%s","partition":3,"replicas":[1004,1003,1002]}]}`, n, n, n, n) +} + +func TestEqual(t *testing.T) { + pm, _ := partitionMapFromString(testGetMapString("test_topic")) + pm2, _ := partitionMapFromString(testGetMapString("test_topic")) + + if same, _ := pm.equal(pm2); !same { + t.Error("Unexpected inequality") + } + + // After modifying the partitions list, + // we expect inequality. + pm.Partitions = pm.Partitions[:2] + if same, _ := pm.equal(pm2); same { + t.Errorf("Unexpected equality") + } +} + +func TestCopy(t *testing.T) { + pm, _ := partitionMapFromString(testGetMapString("test_topic")) + pm2 := pm.copy() + + if same, _ := pm.equal(pm2); !same { + t.Error("Unexpected inequality") + } + + // After modifying the partitions list, + // we expect inequality. + pm.Partitions = pm.Partitions[:2] + if same, _ := pm.equal(pm2); same { + t.Errorf("Unexpected equality") + } +} + +func TestPartitionMapFromString(t *testing.T) { + pm, _ := partitionMapFromString(testGetMapString("test_topic")) + zk := &zkmock{} + pm2, _ := zk.getPartitionMap("test_topic") + + // We expect equality here. + if same, _ := pm.equal(pm2); !same { + t.Errorf("Unexpected inequality") + } +} + +func TestPartitionMapFromZK(t *testing.T) { + zk := &zkmock{} + + r := []*regexp.Regexp{} + r = append(r, regexp.MustCompile("/^null$/")) + pm, err := partitionMapFromZK(r, zk) + + // This should fail because we're passing + // a regex that the mock call to getTopics() + // from partitionMapFromZK doesn't have + // any matches. + if pm != nil || err.Error() != "No topics found matching: " { + t.Errorf("Expected topic lookup failure") + } + + r = r[:0] + r = append(r, regexp.MustCompile("test")) + + // This is going to match both "test_topic" + // and "test_topic2" from the mock. + pm, _ = partitionMapFromZK(r, zk) + + // Build a merged map of these for + // equality testing. + pm2 := newPartitionMap() + for _, t := range []string{"test_topic", "test_topic2"} { + pmap, _ := partitionMapFromString(testGetMapString(t)) + pm2.Partitions = append(pm2.Partitions, pmap.Partitions...) + } + + sort.Sort(pm.Partitions) + sort.Sort(pm2.Partitions) + + // Compare. + if same, err := pm.equal(pm2); !same { + t.Errorf("Unexpected inequality: %s", err) + } + +} + +func TestSetReplication(t *testing.T) { + pm, _ := partitionMapFromString(testGetMapString("test_topic")) + + pm.setReplication(3) + // All partitions should now have 3 replicas. + for _, r := range pm.Partitions { + if len(r.Replicas) != 3 { + t.Errorf("Expected 3 replicas, got %d", len(r.Replicas)) + } + } + + pm.setReplication(2) + // All partitions should now have 3 replicas. + for _, r := range pm.Partitions { + if len(r.Replicas) != 2 { + t.Errorf("Expected 2 replicas, got %d", len(r.Replicas)) + } + } + + pm.setReplication(0) + // Setting to 0 is a no-op. + for _, r := range pm.Partitions { + if len(r.Replicas) != 2 { + t.Errorf("Expected 2 replicas, got %d", len(r.Replicas)) + } + } +} + +func TestStrip(t *testing.T) { + pm, _ := partitionMapFromString(testGetMapString("test_topic")) + + spm := pm.strip() + + for _, p := range spm.Partitions { + for _, b := range p.Replicas { + if b != 0 { + t.Errorf("Unexpected non-stub broker ID %d", b) + } + } + } +} + +func TestUseStats(t *testing.T) { + pm, _ := partitionMapFromString(testGetMapString("test_topic")) + + s := pm.useStats() + + expected := map[int][2]int{ + 1001: [2]int{1, 2}, + 1002: [2]int{1, 2}, + 1003: [2]int{1, 1}, + 1004: [2]int{1, 1}, + } + + for b, bs := range s { + if bs.leader != expected[b][0] { + t.Errorf("Expected leader count %d for %d, got %d", + expected[b][0], b, bs.leader) + } + + if bs.follower != expected[b][1] { + t.Errorf("Expected follower count %d for %d, got %d", + expected[b][1], b, bs.follower) + } + } +} + +func TestRebuild(t *testing.T) { + zk := &zkmock{} + bm, _ := zk.getAllBrokerMeta() + pm, _ := partitionMapFromString(testGetMapString("test_topic")) + forceRebuild := false + + brokers := brokerMapFromTopicMap(pm, bm, forceRebuild) + out, errs := pm.rebuild(brokers) + if errs != nil { + t.Errorf("Unexpected error(s): %s", errs) + } + + // This rebuild should be a no-op since + // all brokers already in the map were provided, + // none marked as replace. + if same, _ := pm.equal(out); !same { + t.Error("Expected no-op, topic map changed") + } + + // Mark 1004 for replacement. + brokers[1004].replace = true + out, errs = pm.rebuild(brokers) + if errs != nil { + t.Errorf("Unexpected error(s): %s", errs) + } + + // Expected map after a replacement rebuild. + expected, _ := partitionMapFromString(testGetMapString("test_topic")) + expected.Partitions[2].Replicas = []int{1003, 1002, 1001} + expected.Partitions[3].Replicas = []int{1001, 1003, 1002} + + if same, _ := out.equal(expected); !same { + t.Error("Unexpected inequality after broker replacement") + } + + // Test a rebuild with a change in + // replication factor. + pm.setReplication(2) + expected.setReplication(2) + + out, _ = pm.rebuild(brokers) + + if same, _ := out.equal(expected); !same { + t.Error("Unexpected inequality after replication factor change -> rebuild") + } + + // Test a force rebuild. + pmStripped := pm.strip() + out, _ = pmStripped.rebuild(brokers) + + same, _ := pm.equal(out) + if same { + t.Error("Unexpected inequality after force rebuild") + } +} diff --git a/zookeeper.go b/zookeeper.go index 5557c47..1dae737 100644 --- a/zookeeper.go +++ b/zookeeper.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "errors" "fmt" "regexp" "strconv" @@ -12,14 +13,31 @@ import ( "github.com/docker/libkv/store/zookeeper" ) -var ( - zk store.Store -) +type zk struct { + client store.Store + connect string + prefix string +} + +type zkConfig struct { + connect string + prefix string +} + +type zkhandler interface { + getReassignments() reassignments + getTopics([]*regexp.Regexp) ([]string, error) + getAllBrokerMeta() (brokerMetaMap, error) + getPartitionMap(string) (*partitionMap, error) +} func init() { zookeeper.Register() } +// BrokerMeta holds metadata that +// describes a broker, used in satisfying +// constraints. type BrokerMeta struct { Rack string `json:"rack"` } @@ -52,39 +70,40 @@ type reassignConfig struct { Replicas []int `json:"replicas"` } -type zkConfig struct { - ConnectString string - Prefix string -} +func newZK(c *zkConfig) (*zk, error) { + z := &zk{ + connect: c.connect, + prefix: c.prefix, + } -func initZK(zc *zkConfig) error { var err error - zk, err = libkv.NewStore( + z.client, err = libkv.NewStore( store.ZK, - []string{zc.ConnectString}, + []string{z.connect}, &store.Config{ ConnectionTimeout: 10 * time.Second, }, ) + if err != nil { - return err + return nil, err } - return nil + return z, nil } -func getReassignments(zc *zkConfig) reassignments { +func (z *zk) getReassignments() reassignments { reassigns := reassignments{} var path string - if zc.Prefix != "" { - path = fmt.Sprintf("%s/admin/reassign_partitions", zc.Prefix) + if z.prefix != "" { + path = fmt.Sprintf("%s/admin/reassign_partitions", z.prefix) } else { path = "admin/reassign_partitions" } // Get reassignment config. - c, err := zk.Get(path) + c, err := z.client.Get(path) if err != nil { return reassigns } @@ -104,18 +123,18 @@ func getReassignments(zc *zkConfig) reassignments { return reassigns } -func getTopics(zc *zkConfig, ts []*regexp.Regexp) ([]string, error) { +func (z *zk) getTopics(ts []*regexp.Regexp) ([]string, error) { matchingTopics := []string{} var path string - if zc.Prefix != "" { - path = fmt.Sprintf("%s/brokers/topics", zc.Prefix) + if z.prefix != "" { + path = fmt.Sprintf("%s/brokers/topics", z.prefix) } else { path = "brokers/topics" } - // Find all topics in ZK. - entries, err := zk.List(path) + // Find all topics in z. + entries, err := z.client.List(path) if err != nil { return nil, err } @@ -139,17 +158,20 @@ func getTopics(zc *zkConfig, ts []*regexp.Regexp) ([]string, error) { return matchingTopics, nil } -func getAllBrokerMeta(zc *zkConfig) (brokerMetaMap, error) { +func (z *zk) getAllBrokerMeta() (brokerMetaMap, error) { var path string - if zc.Prefix != "" { - path = fmt.Sprintf("%s/brokers/ids", zc.Prefix) + if z.prefix != "" { + path = fmt.Sprintf("%s/brokers/ids", z.prefix) } else { path = "brokers/ids" } // Get all brokers. - entries, err := zk.List(path) + entries, err := z.client.List(path) if err != nil { + if err.Error() == "Key not found in store" { + return nil, errors.New("No brokers registered") + } return nil, err } @@ -178,20 +200,23 @@ func getAllBrokerMeta(zc *zkConfig) (brokerMetaMap, error) { return bmm, nil } -func partitionMapFromZk(zc *zkConfig, t string, re reassignments) (*partitionMap, error) { +func (z *zk) getPartitionMap(t string) (*partitionMap, error) { var path string - if zc.Prefix != "" { - path = fmt.Sprintf("%s/brokers/topics/%s", zc.Prefix, t) + if z.prefix != "" { + path = fmt.Sprintf("%s/brokers/topics/%s", z.prefix, t) } else { path = fmt.Sprintf("brokers/topics/%s", t) } - // Fetch topic data from ZK. + // Get current reassign_partitions. + re := z.getReassignments() + + // Fetch topic data from z. ts := &topicState{} - m, err := zk.Get(path) + m, err := z.client.Get(path) switch err { case store.ErrKeyNotFound: - return nil, fmt.Errorf("Topic %s not found in ZooKeeper\n", t) + return nil, fmt.Errorf("Topic %s not found in ZooKeeper", t) case nil: break default: diff --git a/zookeeper_test.go b/zookeeper_test.go new file mode 100644 index 0000000..7c82622 --- /dev/null +++ b/zookeeper_test.go @@ -0,0 +1,72 @@ +package main + +import ( + //"testing" + "regexp" +) + +// zkmock implements a mock zkhandler. +type zkmock struct{} + +func (z *zkmock) getReassignments() reassignments { + r := reassignments{ + "test_topic": map[int][]int{ + 2: []int{1003, 1004}, + 3: []int{1004, 1003}, + }, + } + return r +} + +func (z *zkmock) getTopics(ts []*regexp.Regexp) ([]string, error) { + t := []string{"test_topic", "test_topic2"} + + match := map[string]bool{} + // Get all topics that match all + // provided topic regexps. + for _, topicRe := range ts { + for _, topic := range t { + if topicRe.MatchString(topic) { + match[topic] = true + } + } + } + + // Add matches to a slice. + matched := []string{} + for topic := range match { + matched = append(matched, topic) + } + + return matched, nil +} + +func (z *zkmock) getAllBrokerMeta() (brokerMetaMap, error) { + b := brokerMetaMap{ + 1001: &BrokerMeta{Rack: "a"}, + 1002: &BrokerMeta{Rack: "b"}, + 1003: &BrokerMeta{Rack: "c"}, + 1004: &BrokerMeta{Rack: "a"}, + } + + return b, nil +} + +func (z *zkmock) getPartitionMap(t string) (*partitionMap, error) { + p := &partitionMap{ + Version: 1, + Partitions: partitionList{ + Partition{Topic: t, Partition: 0, Replicas: []int{1001, 1002}}, + Partition{Topic: t, Partition: 1, Replicas: []int{1002, 1001}}, + Partition{Topic: t, Partition: 2, Replicas: []int{1003, 1004, 1001}}, + Partition{Topic: t, Partition: 3, Replicas: []int{1004, 1003, 1002}}, + }, + } + + return p, nil +} + +// func TestGetReassignments(t *testing.T) {} +// func TestGetTopics(t *testing.T) {} +// func TestGetAllBrokerMeta(t *testing.T) {} +// func TestGetPartitionMap(t *testing.T) {}