Skip to content

Commit

Permalink
Revert "OplogToRedis: Run writer routines in parallel for each redis …
Browse files Browse the repository at this point in the history
…client. (#75)"

This reverts commit 136216a.
  • Loading branch information
alex-goodisman committed May 3, 2024
1 parent 2bb8370 commit a691cae
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 83 deletions.
21 changes: 3 additions & 18 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,9 @@ func init() {
prometheus.MustRegister(metricMaxOplogEntryByMinute)
}

// PublisherChannels represents a collection of intake channels for a set of Redis Publishers.
// When multiple redis URLs are specified via OTR_REDIS_URL, each one produce a redis client,
// publisher coroutine, and intake channel. Since we want every message to go to all redis
// destinations, the tailer should send each message to all channels in the array.
type PublisherChannels []chan<- *redispub.Publication

// Tail begins tailing the oplog. It doesn't return unless it receives a message
// on the stop channel, in which case it wraps up its work and then returns.
func (tailer *Tailer) Tail(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) {
func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) {
childStopC := make(chan bool)
wasStopped := false

Expand All @@ -147,10 +141,7 @@ func (tailer *Tailer) Tail(out []PublisherChannels, stop <-chan bool, readOrdina
}
}

// this accepts an array of PublisherChannels instances whose size is equal to the degree of write-parallelism.
// Each incoming message will be routed to one of the PublisherChannels instances based on its parallelism key
// (hash of the database name), then sent to every channel within that PublisherChannels instance.
func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) {
func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) {
session, err := tailer.MongoClient.StartSession()
if err != nil {
log.Log.Errorw("Failed to start Mongo session", "error", err)
Expand Down Expand Up @@ -237,15 +228,9 @@ func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOr
sendMetricsData()
}

// determine which shard this message should route to
// inIdx and outIdx may be different if there are different #s of read and write routines
outIdx := assignToShard(pub.ParallelismKey, len(out))
// get the set of publisher channels for that shard
pubChans := out[outIdx]
// send the message to each channel on that shard
for _, pubChan := range pubChans {
pubChan <- pub
}
out[outIdx] <- pub
} else {
log.Log.Error("Nil Redis publication")
}
Expand Down
104 changes: 39 additions & 65 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,14 @@ func main() {
}

writeParallelism := config.WriteParallelism()
// each array of redis clients holds one client for each destination (regular redis, sentinel)
// the aggregated array holds one such array for every write-parallelism shard
aggregatedRedisClients := make([][]redis.UniversalClient, writeParallelism)
// make one PublisherChannels for each parallel writer
aggregatedRedisPubs := make([]oplog.PublisherChannels, writeParallelism)
// one stopper channel corresponds to each writer, so it uses the same 2D array structure.
stopRedisPubs := make([][]chan bool, writeParallelism)
aggregatedRedisPubs := make([]chan<- *redispub.Publication, writeParallelism)
stopRedisPubs := make([]chan bool, writeParallelism)

bufferSize := 10000
waitGroup := sync.WaitGroup{}
denylist := sync.Map{}

// this loop starts one writer shard on each pass. Repeat it a number of times equal to the write parallelism level.
for i := 0; i < writeParallelism; i++ {
redisClients, err := createRedisClients()
if err != nil {
Expand All @@ -73,60 +68,42 @@ func main() {
log.Log.Infow("Initialized connection to Redis", "i", i)

aggregatedRedisClients[i] = redisClients
clientsSize := len(redisClients)

// each writer shard is going to make multiple writer coroutines, one for each redis destination,
// so we create one PublisherChannels for this shard and put each coroutine's intake channel in it.
// these will all be aggregated in the aggregatedRedisPubs 2D array and passed to the tailer.
redisPubsAggregationEntry := make(oplog.PublisherChannels, clientsSize)
stopRedisPubsEntry := make([]chan bool, clientsSize)

for j := 0; j < clientsSize; j++ {
redisClient := redisClients[i]

redisPubs := make(chan *redispub.Publication, bufferSize)
redisPubsAggregationEntry[j] = redisPubs

stopRedisPub := make(chan bool)
stopRedisPubsEntry[j] = stopRedisPub

waitGroup.Add(1)

// We create two goroutines:
//
// The oplog.Tail goroutine reads messages from the oplog, and generates the
// messages that we need to write to redis. It then writes them to a
// buffered channel.
//
// The redispub.PublishStream goroutine reads messages from the buffered channel
// and sends them to Redis.
//
// TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2)
go func(ordinal int, clientIndex int) {
redispub.PublishStream([]redis.UniversalClient{redisClient}, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub, ordinal)
log.Log.Infow("Redis publisher completed", "ordinal", ordinal, "clientIndex", clientIndex)
waitGroup.Done()
}(i, j)
log.Log.Info("Started up processing goroutines")

promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "otr",
Name: "buffer_available",
Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.",
ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i), "clientIndex": strconv.Itoa(j)},
}, func() float64 {
return float64(bufferSize - len(redisPubs))
})

}

// aggregate
aggregatedRedisPubs[i] = redisPubsAggregationEntry
stopRedisPubs[i] = stopRedisPubsEntry
// We crate two goroutines:
//
// The oplog.Tail goroutine reads messages from the oplog, and generates the
// messages that we need to write to redis. It then writes them to a
// buffered channel.
//
// The redispub.PublishStream goroutine reads messages from the buffered channel
// and sends them to Redis.
//
// TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2)
redisPubs := make(chan *redispub.Publication, bufferSize)
aggregatedRedisPubs[i] = redisPubs

stopRedisPub := make(chan bool)
waitGroup.Add(1)
go func(ordinal int) {
redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub, ordinal)
log.Log.Infow("Redis publisher completed", "i", ordinal)
waitGroup.Done()
}(i)
log.Log.Info("Started up processing goroutines")
stopRedisPubs[i] = stopRedisPub

promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "otr",
Name: "buffer_available",
Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.",
ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i)},
}, func() float64 {
return float64(bufferSize - len(redisPubs))
})
}

readParallelism := config.ReadParallelism()
Expand Down Expand Up @@ -163,7 +140,6 @@ func main() {
MaxCatchUp: config.MaxCatchUp(),
Denylist: &denylist,
}
// pass all intake channels to the tailer, which will route messages accordingly
tailer.Tail(aggregatedRedisPubs, stopOplogTail, i, readParallelism)

log.Log.Info("Oplog tailer completed")
Expand Down Expand Up @@ -201,10 +177,8 @@ func main() {
for _, stopOplogTail := range stopOplogTails {
stopOplogTail <- true
}
for _, stopRedisPubEntry := range stopRedisPubs {
for _, stopRedisPub := range stopRedisPubEntry {
stopRedisPub <- true
}
for _, stopRedisPub := range stopRedisPubs {
stopRedisPub <- true
}

err = httpServer.Shutdown(context.Background())
Expand Down

0 comments on commit a691cae

Please sign in to comment.