Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OplogToRedis: Run writer routines in parallel for each redis client. #75

Merged
merged 2 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,15 @@ func init() {
prometheus.MustRegister(metricMaxOplogEntryByMinute)
}

// PublisherChannels represents a collection of intake channels for a set of Redis Publishers.
// When multiple redis URLs are specified via OTR_REDIS_URL, each one produce a redis client,
// publisher coroutine, and intake channel. Since we want every message to go to all redis
// destinations, the tailer should send each message to all channels in the array.
type PublisherChannels []chan<- *redispub.Publication

// Tail begins tailing the oplog. It doesn't return unless it receives a message
// on the stop channel, in which case it wraps up its work and then returns.
func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) {
func (tailer *Tailer) Tail(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) {
childStopC := make(chan bool)
wasStopped := false

Expand All @@ -141,7 +147,10 @@ func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool,
}
}

func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) {
// this accepts an array of PublisherChannels instances whose size is equal to the degree of write-parallelism.
// Each incoming message will be routed to one of the PublisherChannels instances based on its parallelism key
// (hash of the database name), then sent to every channel within that PublisherChannels instance.
func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) {
session, err := tailer.MongoClient.StartSession()
if err != nil {
log.Log.Errorw("Failed to start Mongo session", "error", err)
Expand Down Expand Up @@ -228,9 +237,15 @@ func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan b
sendMetricsData()
}

// determine which shard this message should route to
// inIdx and outIdx may be different if there are different #s of read and write routines
outIdx := assignToShard(pub.ParallelismKey, len(out))
out[outIdx] <- pub
// get the set of publisher channels for that shard
pubChans := out[outIdx]
// send the message to each channel on that shard
for _, pubChan := range pubChans {
pubChan <- pub
}
} else {
log.Log.Error("Nil Redis publication")
}
Expand Down
104 changes: 65 additions & 39 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,19 @@ func main() {
}

writeParallelism := config.WriteParallelism()
// each array of redis clients holds one client for each destination (regular redis, sentinel)
// the aggregated array holds one such array for every write-parallelism shard
aggregatedRedisClients := make([][]redis.UniversalClient, writeParallelism)
aggregatedRedisPubs := make([]chan<- *redispub.Publication, writeParallelism)
stopRedisPubs := make([]chan bool, writeParallelism)
// make one PublisherChannels for each parallel writer
aggregatedRedisPubs := make([]oplog.PublisherChannels, writeParallelism)
// one stopper channel corresponds to each writer, so it uses the same 2D array structure.
stopRedisPubs := make([][]chan bool, writeParallelism)

bufferSize := 10000
waitGroup := sync.WaitGroup{}
denylist := sync.Map{}

// this loop starts one writer shard on each pass. Repeat it a number of times equal to the write parallelism level.
for i := 0; i < writeParallelism; i++ {
redisClients, err := createRedisClients()
if err != nil {
Expand All @@ -68,42 +73,60 @@ func main() {
log.Log.Infow("Initialized connection to Redis", "i", i)

aggregatedRedisClients[i] = redisClients
clientsSize := len(redisClients)

// each writer shard is going to make multiple writer coroutines, one for each redis destination,
// so we create one PublisherChannels for this shard and put each coroutine's intake channel in it.
// these will all be aggregated in the aggregatedRedisPubs 2D array and passed to the tailer.
redisPubsAggregationEntry := make(oplog.PublisherChannels, clientsSize)
stopRedisPubsEntry := make([]chan bool, clientsSize)

for j := 0; j < clientsSize; j++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general comment, this has grown somewhat hard to follow because of the nested arrays and loops, etc. it would be nice to refactor it to be a bit more abstract when we're back on firmer ground.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think type aliases will help with this somewhat, but will also add some comments.

redisClient := redisClients[i]

redisPubs := make(chan *redispub.Publication, bufferSize)
redisPubsAggregationEntry[j] = redisPubs

stopRedisPub := make(chan bool)
stopRedisPubsEntry[j] = stopRedisPub

waitGroup.Add(1)

// We create two goroutines:
//
// The oplog.Tail goroutine reads messages from the oplog, and generates the
// messages that we need to write to redis. It then writes them to a
// buffered channel.
//
// The redispub.PublishStream goroutine reads messages from the buffered channel
// and sends them to Redis.
//
// TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2)
go func(ordinal int, clientIndex int) {
redispub.PublishStream([]redis.UniversalClient{redisClient}, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub, ordinal)
log.Log.Infow("Redis publisher completed", "ordinal", ordinal, "clientIndex", clientIndex)
waitGroup.Done()
}(i, j)
log.Log.Info("Started up processing goroutines")

promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "otr",
Name: "buffer_available",
Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.",
ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i), "clientIndex": strconv.Itoa(j)},
}, func() float64 {
return float64(bufferSize - len(redisPubs))
})

// We crate two goroutines:
//
// The oplog.Tail goroutine reads messages from the oplog, and generates the
// messages that we need to write to redis. It then writes them to a
// buffered channel.
//
// The redispub.PublishStream goroutine reads messages from the buffered channel
// and sends them to Redis.
//
// TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2)
redisPubs := make(chan *redispub.Publication, bufferSize)
aggregatedRedisPubs[i] = redisPubs

stopRedisPub := make(chan bool)
waitGroup.Add(1)
go func(ordinal int) {
redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub, ordinal)
log.Log.Infow("Redis publisher completed", "i", ordinal)
waitGroup.Done()
}(i)
log.Log.Info("Started up processing goroutines")
stopRedisPubs[i] = stopRedisPub

promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "otr",
Name: "buffer_available",
Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.",
ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i)},
}, func() float64 {
return float64(bufferSize - len(redisPubs))
})
}

// aggregate
aggregatedRedisPubs[i] = redisPubsAggregationEntry
stopRedisPubs[i] = stopRedisPubsEntry
}

readParallelism := config.ReadParallelism()
Expand Down Expand Up @@ -140,6 +163,7 @@ func main() {
MaxCatchUp: config.MaxCatchUp(),
Denylist: &denylist,
}
// pass all intake channels to the tailer, which will route messages accordingly
tailer.Tail(aggregatedRedisPubs, stopOplogTail, i, readParallelism)

log.Log.Info("Oplog tailer completed")
Expand Down Expand Up @@ -177,8 +201,10 @@ func main() {
for _, stopOplogTail := range stopOplogTails {
stopOplogTail <- true
}
for _, stopRedisPub := range stopRedisPubs {
stopRedisPub <- true
for _, stopRedisPubEntry := range stopRedisPubs {
for _, stopRedisPub := range stopRedisPubEntry {
stopRedisPub <- true
}
}

err = httpServer.Shutdown(context.Background())
Expand Down
Loading