Skip to content

Commit

Permalink
Merge pull request #188 from DataDog/jamie/tests
Browse files Browse the repository at this point in the history
updated TestRebuildByCountSA
  • Loading branch information
jamiealquiza committed Oct 12, 2018
2 parents 2268a2e + 61ab806 commit ece0ffe
Showing 1 changed file with 109 additions and 1 deletion.
110 changes: 109 additions & 1 deletion kafkazk/partitions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func TestUseStats(t *testing.T) {
}
}

// Count rebuild.
func TestRebuildByCount(t *testing.T) {
forceRebuild := true
withMetrics := false
Expand Down Expand Up @@ -365,6 +366,61 @@ func TestRebuildByCount(t *testing.T) {
}
}

// Count rebuild with substitution affinities.
func TestRebuildByCountSA(t *testing.T) {
forceRebuild := true
withMetrics := false

zk := &Mock{}
bm, _ := zk.GetAllBrokerMeta(withMetrics)
// Simulate that we've lost broker 1002.
delete(bm, 1002)

pm, _ := PartitionMapFromString(testGetMapString4("test_topic"))
// Until https://github.com/DataDog/kafka-kit/issues/187 is closed,
// we need to pretend another broker with rack b was present.
pm.Partitions[2].Replicas = []int{1001, 1005}

pmm := NewPartitionMetaMap()
brokers := BrokerMapFromPartitionMap(pm, bm, forceRebuild)

// simulate that we've found broker 1010.
bm[1010] = &BrokerMeta{Rack: "b"}
brokers.Update([]int{1001, 1003, 1004, 1005, 1010}, bm)

// Get substitution affinities.
sa, err := brokers.SubstitutionAffinities(pm)
if err != nil {
t.Errorf("Unexpected error: %s", err)
}

rebuildParams := RebuildParams{
PMM: pmm,
BM: brokers,
Strategy: "count",
Optimization: "distribution",
Affinities: sa,
}

// Rebuild.
out, errs := pm.Rebuild(rebuildParams)
if errs != nil {
t.Errorf("Unexpected error(s): %s", errs)
}

expected, _ := PartitionMapFromString(testGetMapString4("test_topic"))
expected.Partitions[0].Replicas = []int{1004, 1003}
expected.Partitions[1].Replicas = []int{1003, 1004}
expected.Partitions[2].Replicas = []int{1001, 1005}
expected.Partitions[3].Replicas = []int{1003, 1010}
expected.Partitions[4].Replicas = []int{1001, 1003}
expected.Partitions[5].Replicas = []int{1010, 1001}

if same, err := out.equal(expected); !same {
t.Errorf("Unexpected inequality after rebuild: %s", err)
}
}

// Storage rebuild, distribution optimization.
func TestRebuildByStorageDistribution(t *testing.T) {
forceRebuild := true
Expand Down Expand Up @@ -421,7 +477,59 @@ func TestRebuildByStorageDistribution(t *testing.T) {
}

// Storage rebuild, storage optimization.
// func TestRebuildByStorageStorage(t *testing.T) {}
func TestRebuildByStorageStorage(t *testing.T) {
forceRebuild := true
withMetrics := true

zk := &Mock{}
bm, _ := zk.GetAllBrokerMeta(withMetrics)
pm, _ := PartitionMapFromString(testGetMapString4("test_topic"))
pmm, _ := zk.GetAllPartitionMeta()

// We need to reduce the test partition sizes
// for more accurate tests here.
for _, partn := range pmm["test_topic"] {
partn.Size = partn.Size / 3
}

brokers := BrokerMapFromPartitionMap(pm, bm, forceRebuild)

pmStripped := pm.Strip()
_ = brokers.SubStorageAll(pm, pmm)

// Normalize storage. The mock broker storage
// free vs mock partition sizes would actually
// represent brokers with varying storage sizes.
for _, b := range brokers {
b.StorageFree = 6000.00
}

rebuildParams := RebuildParams{
PMM: pmm,
BM: brokers,
Strategy: "storage",
Optimization: "storage",
PartnSzFactor: 1,
}

out, errs := pmStripped.Rebuild(rebuildParams)
if errs != nil {
t.Errorf("Unexpected error(s): %s", errs)
}

expected := pm.Copy()
expected.Partitions[0].Replicas = []int{1002, 1001}
expected.Partitions[1].Replicas = []int{1003, 1004}
expected.Partitions[2].Replicas = []int{1002, 1001}
expected.Partitions[3].Replicas = []int{1004, 1003}
expected.Partitions[4].Replicas = []int{1003, 1004}
expected.Partitions[5].Replicas = []int{1001, 1002}

same, err := out.equal(expected)
if !same {
t.Errorf("Unexpected inequality after rebuild: %s", err)
}
}

func TestLocalitiesAvailable(t *testing.T) {
pm, _ := PartitionMapFromString(testGetMapString("test_topic"))
Expand Down

0 comments on commit ece0ffe

Please sign in to comment.