Skip to content

Commit

Permalink
OplogToRedis: Write Parallelism per-Redis (PR Redo) (#78)
Browse files Browse the repository at this point in the history
Redo #75
in main, count through each redis client and make a separate chan and coroutine for clientsCount * ordinalCount different entries. make some changes to aggregation in order to accomodate this.

in the tailer, get the [][]chan and for each incoming publication, first index by the shard ordinal (hash of db name), then iterate through the remaining chans (should be one for each redis client) and publish to all of them. 

at present, the publisher stream still accepts an array, but now it's only getting one (each).
[fixed](66e79df) array index out of bounds issue
  • Loading branch information
alex-goodisman committed May 13, 2024
1 parent f7a2b89 commit 9cf1006
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 42 deletions.
21 changes: 18 additions & 3 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,15 @@ func init() {
prometheus.MustRegister(metricMaxOplogEntryByMinute)
}

// PublisherChannels represents a collection of intake channels for a set of Redis Publishers.
// When multiple redis URLs are specified via OTR_REDIS_URL, each one produce a redis client,
// publisher coroutine, and intake channel. Since we want every message to go to all redis
// destinations, the tailer should send each message to all channels in the array.
type PublisherChannels []chan<- *redispub.Publication

// Tail begins tailing the oplog. It doesn't return unless it receives a message
// on the stop channel, in which case it wraps up its work and then returns.
func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) {
func (tailer *Tailer) Tail(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) {
childStopC := make(chan bool)
wasStopped := false

Expand All @@ -141,7 +147,10 @@ func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool,
}
}

func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool, readOrdinal, readParallelism int) {
// this accepts an array of PublisherChannels instances whose size is equal to the degree of write-parallelism.
// Each incoming message will be routed to one of the PublisherChannels instances based on its parallelism key
// (hash of the database name), then sent to every channel within that PublisherChannels instance.
func (tailer *Tailer) tailOnce(out []PublisherChannels, stop <-chan bool, readOrdinal, readParallelism int) {
session, err := tailer.MongoClient.StartSession()
if err != nil {
log.Log.Errorw("Failed to start Mongo session", "error", err)
Expand Down Expand Up @@ -228,9 +237,15 @@ func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan b
sendMetricsData()
}

// determine which shard this message should route to
// inIdx and outIdx may be different if there are different #s of read and write routines
outIdx := assignToShard(pub.ParallelismKey, len(out))
out[outIdx] <- pub
// get the set of publisher channels for that shard
pubChans := out[outIdx]
// send the message to each channel on that shard
for _, pubChan := range pubChans {
pubChan <- pub
}
} else {
log.Log.Error("Nil Redis publication")
}
Expand Down
104 changes: 65 additions & 39 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ func main() {
}

writeParallelism := config.WriteParallelism()
// each array of redis clients holds one client for each destination (regular redis, sentinel)
// the aggregated array holds one such array for every write-parallelism shard
aggregatedRedisClients := make([][]redis.UniversalClient, writeParallelism)
aggregatedRedisPubs := make([]chan<- *redispub.Publication, writeParallelism)
stopRedisPubs := make([]chan bool, writeParallelism)
// make one PublisherChannels for each parallel writer
aggregatedRedisPubs := make([]oplog.PublisherChannels, writeParallelism)
// one stopper channel corresponds to each writer, so it uses the same 2D array structure.
stopRedisPubs := make([][]chan bool, writeParallelism)

bufferSize := 10000
waitGroup := sync.WaitGroup{}
Expand All @@ -58,6 +62,7 @@ func main() {
panic("Error loading persistent denylist: " + err.Error())
}

// this loop starts one writer shard on each pass. Repeat it a number of times equal to the write parallelism level.
for i := 0; i < writeParallelism; i++ {
redisClients, err := createRedisClients()
if err != nil {
Expand All @@ -76,42 +81,60 @@ func main() {
log.Log.Infow("Initialized connection to Redis", "i", i)

aggregatedRedisClients[i] = redisClients
clientsSize := len(redisClients)

// each writer shard is going to make multiple writer coroutines, one for each redis destination,
// so we create one PublisherChannels for this shard and put each coroutine's intake channel in it.
// these will all be aggregated in the aggregatedRedisPubs 2D array and passed to the tailer.
redisPubsAggregationEntry := make(oplog.PublisherChannels, clientsSize)
stopRedisPubsEntry := make([]chan bool, clientsSize)

for j := 0; j < clientsSize; j++ {
redisClient := redisClients[j]

redisPubs := make(chan *redispub.Publication, bufferSize)
redisPubsAggregationEntry[j] = redisPubs

stopRedisPub := make(chan bool)
stopRedisPubsEntry[j] = stopRedisPub

waitGroup.Add(1)

// We create two goroutines:
//
// The oplog.Tail goroutine reads messages from the oplog, and generates the
// messages that we need to write to redis. It then writes them to a
// buffered channel.
//
// The redispub.PublishStream goroutine reads messages from the buffered channel
// and sends them to Redis.
//
// TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2)
go func(ordinal int, clientIndex int) {
redispub.PublishStream([]redis.UniversalClient{redisClient}, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub, ordinal)
log.Log.Infow("Redis publisher completed", "ordinal", ordinal, "clientIndex", clientIndex)
waitGroup.Done()
}(i, j)
log.Log.Info("Started up processing goroutines")

promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "otr",
Name: "buffer_available",
Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.",
ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i), "clientIndex": strconv.Itoa(j)},
}, func() float64 {
return float64(bufferSize - len(redisPubs))
})

// We crate two goroutines:
//
// The oplog.Tail goroutine reads messages from the oplog, and generates the
// messages that we need to write to redis. It then writes them to a
// buffered channel.
//
// The redispub.PublishStream goroutine reads messages from the buffered channel
// and sends them to Redis.
//
// TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2)
redisPubs := make(chan *redispub.Publication, bufferSize)
aggregatedRedisPubs[i] = redisPubs

stopRedisPub := make(chan bool)
waitGroup.Add(1)
go func(ordinal int) {
redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub, ordinal)
log.Log.Infow("Redis publisher completed", "i", ordinal)
waitGroup.Done()
}(i)
log.Log.Info("Started up processing goroutines")
stopRedisPubs[i] = stopRedisPub

promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "otr",
Name: "buffer_available",
Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.",
ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i)},
}, func() float64 {
return float64(bufferSize - len(redisPubs))
})
}

// aggregate
aggregatedRedisPubs[i] = redisPubsAggregationEntry
stopRedisPubs[i] = stopRedisPubsEntry
}

readParallelism := config.ReadParallelism()
Expand Down Expand Up @@ -148,6 +171,7 @@ func main() {
MaxCatchUp: config.MaxCatchUp(),
Denylist: denylist,
}
// pass all intake channels to the tailer, which will route messages accordingly
tailer.Tail(aggregatedRedisPubs, stopOplogTail, i, readParallelism)

log.Log.Info("Oplog tailer completed")
Expand Down Expand Up @@ -185,8 +209,10 @@ func main() {
for _, stopOplogTail := range stopOplogTails {
stopOplogTail <- true
}
for _, stopRedisPub := range stopRedisPubs {
stopRedisPub <- true
for _, stopRedisPubEntry := range stopRedisPubs {
for _, stopRedisPub := range stopRedisPubEntry {
stopRedisPub <- true
}
}

err = httpServer.Shutdown(context.Background())
Expand Down

0 comments on commit 9cf1006

Please sign in to comment.