diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 54a9aa70..f5376332 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -115,9 +115,15 @@ func init() { prometheus.MustRegister(metricMaxOplogEntryByMinute) } +// PublisherChannels represents a collection of intake channels for a set of Redis Publishers. +// When multiple redis URLs are specified via OTR_REDIS_URL, each one produce a redis client, +// publisher coroutine, and intake channel. Since we want every message to go to all redis +// destinations, the tailer should send each message to all channels in the array. +type PublisherChannels []chan<- *redispub.Publication + // Tail begins tailing the oplog. It doesn't return unless it receives a message // on the stop channel, in which case it wraps up its work and then returns. -func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) { +func (tailer *Tailer) Tail(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) { childStopC := make(chan bool) wasStopped := false @@ -141,7 +147,10 @@ func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool, } } -func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) { +// this accepts an array of PublisherChannels instances whose size is equal to the degree of write-parallelism. +// Each incoming message will be routed to one of the PublisherChannels instances based on its parallelism key +// (hash of the database name), then sent to every channel within that PublisherChannels instance. +func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) { session, err := tailer.MongoClient.StartSession() if err != nil { log.Log.Errorw("Failed to start Mongo session", "error", err) @@ -228,9 +237,15 @@ func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan b sendMetricsData() } + // determine which shard this message should route to // inIdx and outIdx may be different if there are different #s of read and write routines outIdx := assignToShard(pub.ParallelismKey, len(out)) - out[outIdx] <- pub + // get the set of publisher channels for that shard + pubChans := out[outIdx] + // send the message to each channel on that shard + for _, pubChan := range pubChans { + pubChan <- pub + } } else { log.Log.Error("Nil Redis publication") } diff --git a/main.go b/main.go index 80d8ddc8..7bc5aca6 100644 --- a/main.go +++ b/main.go @@ -42,14 +42,19 @@ func main() { } writeParallelism := config.WriteParallelism() + // each array of redis clients holds one client for each destination (regular redis, sentinel) + // the aggregated array holds one such array for every write-parallelism shard aggregatedRedisClients := make([][]redis.UniversalClient, writeParallelism) - aggregatedRedisPubs := make([]chan<- *redispub.Publication, writeParallelism) - stopRedisPubs := make([]chan bool, writeParallelism) + // make one PublisherChannels for each parallel writer + aggregatedRedisPubs := make([]oplog.PublisherChannels, writeParallelism) + // one stopper channel corresponds to each writer, so it uses the same 2D array structure. + stopRedisPubs := make([][]chan bool, writeParallelism) bufferSize := 10000 waitGroup := sync.WaitGroup{} denylist := sync.Map{} + // this loop starts one writer shard on each pass. Repeat it a number of times equal to the write parallelism level. for i := 0; i < writeParallelism; i++ { redisClients, err := createRedisClients() if err != nil { @@ -68,42 +73,60 @@ func main() { log.Log.Infow("Initialized connection to Redis", "i", i) aggregatedRedisClients[i] = redisClients + clientsSize := len(redisClients) + + // each writer shard is going to make multiple writer coroutines, one for each redis destination, + // so we create one PublisherChannels for this shard and put each coroutine's intake channel in it. + // these will all be aggregated in the aggregatedRedisPubs 2D array and passed to the tailer. + redisPubsAggregationEntry := make(oplog.PublisherChannels, clientsSize) + stopRedisPubsEntry := make([]chan bool, clientsSize) + + for j := 0; j < clientsSize; j++ { + redisClient := redisClients[i] + + redisPubs := make(chan *redispub.Publication, bufferSize) + redisPubsAggregationEntry[j] = redisPubs + + stopRedisPub := make(chan bool) + stopRedisPubsEntry[j] = stopRedisPub + + waitGroup.Add(1) + + // We create two goroutines: + // + // The oplog.Tail goroutine reads messages from the oplog, and generates the + // messages that we need to write to redis. It then writes them to a + // buffered channel. + // + // The redispub.PublishStream goroutine reads messages from the buffered channel + // and sends them to Redis. + // + // TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2) + go func(ordinal int, clientIndex int) { + redispub.PublishStream([]redis.UniversalClient{redisClient}, redisPubs, &redispub.PublishOpts{ + FlushInterval: config.TimestampFlushInterval(), + DedupeExpiration: config.RedisDedupeExpiration(), + MetadataPrefix: config.RedisMetadataPrefix(), + }, stopRedisPub, ordinal) + log.Log.Infow("Redis publisher completed", "ordinal", ordinal, "clientIndex", clientIndex) + waitGroup.Done() + }(i, j) + log.Log.Info("Started up processing goroutines") + + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "otr", + Name: "buffer_available", + Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", + ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i), "clientIndex": strconv.Itoa(j)}, + }, func() float64 { + return float64(bufferSize - len(redisPubs)) + }) - // We crate two goroutines: - // - // The oplog.Tail goroutine reads messages from the oplog, and generates the - // messages that we need to write to redis. It then writes them to a - // buffered channel. - // - // The redispub.PublishStream goroutine reads messages from the buffered channel - // and sends them to Redis. - // - // TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2) - redisPubs := make(chan *redispub.Publication, bufferSize) - aggregatedRedisPubs[i] = redisPubs - - stopRedisPub := make(chan bool) - waitGroup.Add(1) - go func(ordinal int) { - redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{ - FlushInterval: config.TimestampFlushInterval(), - DedupeExpiration: config.RedisDedupeExpiration(), - MetadataPrefix: config.RedisMetadataPrefix(), - }, stopRedisPub, ordinal) - log.Log.Infow("Redis publisher completed", "i", ordinal) - waitGroup.Done() - }(i) - log.Log.Info("Started up processing goroutines") - stopRedisPubs[i] = stopRedisPub - - promauto.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "otr", - Name: "buffer_available", - Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", - ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i)}, - }, func() float64 { - return float64(bufferSize - len(redisPubs)) - }) + } + + // aggregate + aggregatedRedisPubs[i] = redisPubsAggregationEntry + stopRedisPubs[i] = stopRedisPubsEntry } readParallelism := config.ReadParallelism() @@ -140,6 +163,7 @@ func main() { MaxCatchUp: config.MaxCatchUp(), Denylist: &denylist, } + // pass all intake channels to the tailer, which will route messages accordingly tailer.Tail(aggregatedRedisPubs, stopOplogTail, i, readParallelism) log.Log.Info("Oplog tailer completed") @@ -177,8 +201,10 @@ func main() { for _, stopOplogTail := range stopOplogTails { stopOplogTail <- true } - for _, stopRedisPub := range stopRedisPubs { - stopRedisPub <- true + for _, stopRedisPubEntry := range stopRedisPubs { + for _, stopRedisPub := range stopRedisPubEntry { + stopRedisPub <- true + } } err = httpServer.Shutdown(context.Background())