diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index f5376332..54a9aa70 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -115,15 +115,9 @@ func init() { prometheus.MustRegister(metricMaxOplogEntryByMinute) } -// PublisherChannels represents a collection of intake channels for a set of Redis Publishers. -// When multiple redis URLs are specified via OTR_REDIS_URL, each one produce a redis client, -// publisher coroutine, and intake channel. Since we want every message to go to all redis -// destinations, the tailer should send each message to all channels in the array. -type PublisherChannels []chan<- *redispub.Publication - // Tail begins tailing the oplog. It doesn't return unless it receives a message // on the stop channel, in which case it wraps up its work and then returns. -func (tailer *Tailer) Tail(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) { +func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) { childStopC := make(chan bool) wasStopped := false @@ -147,10 +141,7 @@ func (tailer *Tailer) Tail(out []PublisherChannels, stop <-chan bool, readOrdina } } -// this accepts an array of PublisherChannels instances whose size is equal to the degree of write-parallelism. -// Each incoming message will be routed to one of the PublisherChannels instances based on its parallelism key -// (hash of the database name), then sent to every channel within that PublisherChannels instance. -func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) { +func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) { session, err := tailer.MongoClient.StartSession() if err != nil { log.Log.Errorw("Failed to start Mongo session", "error", err) @@ -237,15 +228,9 @@ func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOr sendMetricsData() } - // determine which shard this message should route to // inIdx and outIdx may be different if there are different #s of read and write routines outIdx := assignToShard(pub.ParallelismKey, len(out)) - // get the set of publisher channels for that shard - pubChans := out[outIdx] - // send the message to each channel on that shard - for _, pubChan := range pubChans { - pubChan <- pub - } + out[outIdx] <- pub } else { log.Log.Error("Nil Redis publication") } diff --git a/main.go b/main.go index 7bc5aca6..80d8ddc8 100644 --- a/main.go +++ b/main.go @@ -42,19 +42,14 @@ func main() { } writeParallelism := config.WriteParallelism() - // each array of redis clients holds one client for each destination (regular redis, sentinel) - // the aggregated array holds one such array for every write-parallelism shard aggregatedRedisClients := make([][]redis.UniversalClient, writeParallelism) - // make one PublisherChannels for each parallel writer - aggregatedRedisPubs := make([]oplog.PublisherChannels, writeParallelism) - // one stopper channel corresponds to each writer, so it uses the same 2D array structure. - stopRedisPubs := make([][]chan bool, writeParallelism) + aggregatedRedisPubs := make([]chan<- *redispub.Publication, writeParallelism) + stopRedisPubs := make([]chan bool, writeParallelism) bufferSize := 10000 waitGroup := sync.WaitGroup{} denylist := sync.Map{} - // this loop starts one writer shard on each pass. Repeat it a number of times equal to the write parallelism level. for i := 0; i < writeParallelism; i++ { redisClients, err := createRedisClients() if err != nil { @@ -73,60 +68,42 @@ func main() { log.Log.Infow("Initialized connection to Redis", "i", i) aggregatedRedisClients[i] = redisClients - clientsSize := len(redisClients) - - // each writer shard is going to make multiple writer coroutines, one for each redis destination, - // so we create one PublisherChannels for this shard and put each coroutine's intake channel in it. - // these will all be aggregated in the aggregatedRedisPubs 2D array and passed to the tailer. - redisPubsAggregationEntry := make(oplog.PublisherChannels, clientsSize) - stopRedisPubsEntry := make([]chan bool, clientsSize) - - for j := 0; j < clientsSize; j++ { - redisClient := redisClients[i] - - redisPubs := make(chan *redispub.Publication, bufferSize) - redisPubsAggregationEntry[j] = redisPubs - - stopRedisPub := make(chan bool) - stopRedisPubsEntry[j] = stopRedisPub - - waitGroup.Add(1) - - // We create two goroutines: - // - // The oplog.Tail goroutine reads messages from the oplog, and generates the - // messages that we need to write to redis. It then writes them to a - // buffered channel. - // - // The redispub.PublishStream goroutine reads messages from the buffered channel - // and sends them to Redis. - // - // TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2) - go func(ordinal int, clientIndex int) { - redispub.PublishStream([]redis.UniversalClient{redisClient}, redisPubs, &redispub.PublishOpts{ - FlushInterval: config.TimestampFlushInterval(), - DedupeExpiration: config.RedisDedupeExpiration(), - MetadataPrefix: config.RedisMetadataPrefix(), - }, stopRedisPub, ordinal) - log.Log.Infow("Redis publisher completed", "ordinal", ordinal, "clientIndex", clientIndex) - waitGroup.Done() - }(i, j) - log.Log.Info("Started up processing goroutines") - - promauto.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "otr", - Name: "buffer_available", - Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", - ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i), "clientIndex": strconv.Itoa(j)}, - }, func() float64 { - return float64(bufferSize - len(redisPubs)) - }) - } - - // aggregate - aggregatedRedisPubs[i] = redisPubsAggregationEntry - stopRedisPubs[i] = stopRedisPubsEntry + // We crate two goroutines: + // + // The oplog.Tail goroutine reads messages from the oplog, and generates the + // messages that we need to write to redis. It then writes them to a + // buffered channel. + // + // The redispub.PublishStream goroutine reads messages from the buffered channel + // and sends them to Redis. + // + // TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2) + redisPubs := make(chan *redispub.Publication, bufferSize) + aggregatedRedisPubs[i] = redisPubs + + stopRedisPub := make(chan bool) + waitGroup.Add(1) + go func(ordinal int) { + redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{ + FlushInterval: config.TimestampFlushInterval(), + DedupeExpiration: config.RedisDedupeExpiration(), + MetadataPrefix: config.RedisMetadataPrefix(), + }, stopRedisPub, ordinal) + log.Log.Infow("Redis publisher completed", "i", ordinal) + waitGroup.Done() + }(i) + log.Log.Info("Started up processing goroutines") + stopRedisPubs[i] = stopRedisPub + + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "otr", + Name: "buffer_available", + Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", + ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i)}, + }, func() float64 { + return float64(bufferSize - len(redisPubs)) + }) } readParallelism := config.ReadParallelism() @@ -163,7 +140,6 @@ func main() { MaxCatchUp: config.MaxCatchUp(), Denylist: &denylist, } - // pass all intake channels to the tailer, which will route messages accordingly tailer.Tail(aggregatedRedisPubs, stopOplogTail, i, readParallelism) log.Log.Info("Oplog tailer completed") @@ -201,10 +177,8 @@ func main() { for _, stopOplogTail := range stopOplogTails { stopOplogTail <- true } - for _, stopRedisPubEntry := range stopRedisPubs { - for _, stopRedisPub := range stopRedisPubEntry { - stopRedisPub <- true - } + for _, stopRedisPub := range stopRedisPubs { + stopRedisPub <- true } err = httpServer.Shutdown(context.Background())