Skip to content

Commit

Permalink
first draft
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-goodisman committed Apr 24, 2024
1 parent 9b4148d commit 432af6a
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 63 deletions.
11 changes: 10 additions & 1 deletion lib/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
package config

import (
"time"
"strings"
"time"

"github.com/kelseyhightower/envconfig"
)

Expand All @@ -21,6 +22,7 @@ type oplogtoredisConfiguration struct {
MongoConnectTimeout time.Duration `default:"10s" split_words:"true"`
MongoQueryTimeout time.Duration `default:"5s" split_words:"true"`
OplogV2ExtractSubfieldChanges bool `default:"false" envconfig:"OPLOG_V2_EXTRACT_SUBFIELD_CHANGES"`
WriteParallelism int `default:"1" spit_words:"true"`
}

var globalConfig *oplogtoredisConfiguration
Expand Down Expand Up @@ -131,6 +133,13 @@ func OplogV2ExtractSubfieldChanges() bool {
return globalConfig.OplogV2ExtractSubfieldChanges
}

// WriteParallelism controls how many parallel write loops will be run (sharded based on a hash
// of the database name.) Each parallel loop has its own redis connection and internal buffer.
// Healthz endpoint will report fail if anyone of them dies.
func WriteParallelism() int {
return globalConfig.WriteParallelism
}

// ParseEnv parses the current environment variables and updates the stored
// configuration. It is *not* threadsafe, and should just be called once
// at the start of the program.
Expand Down
16 changes: 15 additions & 1 deletion lib/oplog/processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package oplog

import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/json"
"strings"

Expand Down Expand Up @@ -78,6 +81,16 @@ func processOplogEntry(op *oplogEntry) (*redispub.Publication, error) {
return nil, errors.Wrap(err, "marshalling outgoing message")
}

hash := sha256.Sum256([]byte(op.Database))
intSlice := hash[len(hash)-8:]

var hashInt uint64

err = binary.Read(bytes.NewReader(intSlice), binary.LittleEndian, &hashInt)
if err != nil {
panic(errors.Wrap(err, "decoding database hash as uint64"))
}

// We need to publish on both the full-collection channel and the
// single-document channel
return &redispub.Publication{
Expand All @@ -92,7 +105,8 @@ func processOplogEntry(op *oplogEntry) (*redispub.Publication, error) {
Msg: msgJSON,
OplogTimestamp: op.Timestamp,

TxIdx: op.TxIdx,
TxIdx: op.TxIdx,
ParallelismKey: int(hashInt),
}, nil
}

Expand Down
13 changes: 7 additions & 6 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
// Tailer persistently tails the oplog of a Mongo cluster, handling
// reconnection and resumption of where it left off.
type Tailer struct {
MongoClient *mongo.Client
MongoClient *mongo.Client
RedisClients []redis.UniversalClient
RedisPrefix string
MaxCatchUp time.Duration
RedisPrefix string
MaxCatchUp time.Duration
}

// Raw oplog entry from Mongo
Expand Down Expand Up @@ -107,7 +107,7 @@ func init() {

// Tail begins tailing the oplog. It doesn't return unless it receives a message
// on the stop channel, in which case it wraps up its work and then returns.
func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool) {
func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool) {
childStopC := make(chan bool)
wasStopped := false

Expand All @@ -131,7 +131,7 @@ func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool) {
}
}

func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan bool) {
func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool) {
session, err := tailer.MongoClient.StartSession()
if err != nil {
log.Log.Errorw("Failed to start Mongo session", "error", err)
Expand Down Expand Up @@ -203,7 +203,8 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo

for _, pub := range pubs {
if pub != nil {
out <- pub
outIdx := pub.ParallelismKey % len(out)
out[outIdx] <- pub
} else {
log.Log.Error("Nil Redis publication")
}
Expand Down
4 changes: 4 additions & 0 deletions lib/redispub/publication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ type Publication struct {

// TxIdx is the index of the operation within a transaction. Used to supplement OplogTimestamp in a transaction.
TxIdx uint

// ParallelismKey is a number representing which parallel write loop will process this message.
// It is a hash of the database name, assuming that a single database is the unit of ordering guarantee.
ParallelismKey int
}
132 changes: 77 additions & 55 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
stdlog "log"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -53,74 +54,91 @@ func main() {
}()
log.Log.Info("Initialized connection to Mongo")

redisClients, err := createRedisClients()
if err != nil {
panic("Error initializing Redis client: " + err.Error())
}
defer func() {
for _, redisClient := range redisClients {
redisCloseErr := redisClient.Close()
if redisCloseErr != nil {
log.Log.Errorw("Error closing Redis client",
"error", redisCloseErr)
}
}
}()
log.Log.Info("Initialized connection to Redis")
parallelism := config.WriteParallelism()
aggregatedRedisClients := make([][]redis.UniversalClient, parallelism)
aggregatedRedisPubs := make([]chan<- *redispub.Publication, parallelism)
stopRedisPubs := make([]chan bool, parallelism)

// We crate two goroutines:
//
// The oplog.Tail goroutine reads messages from the oplog, and generates the
// messages that we need to write to redis. It then writes them to a
// buffered channel.
//
// The redispub.PublishStream goroutine reads messages from the buffered channel
// and sends them to Redis.
//
// TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2)
bufferSize := 10000
redisPubs := make(chan *redispub.Publication, bufferSize)
waitGroup := sync.WaitGroup{}

for i := 0; i < config.WriteParallelism(); i++ {
redisClients, err := createRedisClients()
if err != nil {
panic(fmt.Sprintf("[%d] Error initializing Redis client: %s", i, err.Error()))
}
defer func() {
for _, redisClient := range redisClients {
redisCloseErr := redisClient.Close()
if redisCloseErr != nil {
log.Log.Errorw("Error closing Redis client",
"error", redisCloseErr,
"i", i)
}
}
}()
log.Log.Infow("Initialized connection to Redis", "i", i)

aggregatedRedisClients[i] = redisClients

// We crate two goroutines:
//
// The oplog.Tail goroutine reads messages from the oplog, and generates the
// messages that we need to write to redis. It then writes them to a
// buffered channel.
//
// The redispub.PublishStream goroutine reads messages from the buffered channel
// and sends them to Redis.
//
// TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2)
redisPubs := make(chan *redispub.Publication, bufferSize)
aggregatedRedisPubs[i] = redisPubs

stopRedisPub := make(chan bool)
waitGroup.Add(1)
go func() {
redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub)
log.Log.Infow("Redis publisher completed", "i", i)
waitGroup.Done()
}()
log.Log.Info("Started up processing goroutines")
stopRedisPubs[i] = stopRedisPub
}

promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "otr",
Name: "buffer_available",
Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.",
}, func () float64 {
return float64(bufferSize - len(redisPubs))
}, func() float64 {
total := 0
for _, redisPubs := range aggregatedRedisPubs {
total += (bufferSize - len(redisPubs))
}
return float64(total)
})

waitGroup := sync.WaitGroup{}

stopOplogTail := make(chan bool)
waitGroup.Add(1)
go func() {
tailer := oplog.Tailer{
MongoClient: mongoSession,
RedisClients: redisClients,
RedisPrefix: config.RedisMetadataPrefix(),
MaxCatchUp: config.MaxCatchUp(),
RedisClients: aggregatedRedisClients[0], // the tailer coroutine needs a redis client for determining start timestamp
// it doesn't really matter which one since this isn't a meaningful amount of load, so just take the first one
RedisPrefix: config.RedisMetadataPrefix(),
MaxCatchUp: config.MaxCatchUp(),
}
tailer.Tail(redisPubs, stopOplogTail)
tailer.Tail(aggregatedRedisPubs, stopOplogTail)

log.Log.Info("Oplog tailer completed")
waitGroup.Done()
}()

stopRedisPub := make(chan bool)
waitGroup.Add(1)
go func() {
redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub)
log.Log.Info("Redis publisher completed")
waitGroup.Done()
}()
log.Log.Info("Started up processing goroutines")

// Start one more goroutine for the HTTP server
httpServer := makeHTTPServer(redisClients, mongoSession)
httpServer := makeHTTPServer(aggregatedRedisClients, mongoSession)
go func() {
httpErr := httpServer.ListenAndServe()
if httpErr != nil {
Expand All @@ -147,7 +165,9 @@ func main() {
signal.Reset()

stopOplogTail <- true
stopRedisPub <- true
for _, stopRedisPub := range stopRedisPubs {
stopRedisPub <- true
}

err = httpServer.Shutdown(context.Background())
if err != nil {
Expand Down Expand Up @@ -226,17 +246,19 @@ func createRedisClients() ([]redis.UniversalClient, error) {
return ret, nil
}

func makeHTTPServer(clients []redis.UniversalClient, mongo *mongo.Client) *http.Server {
func makeHTTPServer(aggregatedClients [][]redis.UniversalClient, mongo *mongo.Client) *http.Server {
mux := http.NewServeMux()

mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
redisOK := true
for _, redis := range clients {
redisErr := redis.Ping(context.Background()).Err()
redisOK = (redisOK && (redisErr == nil))
if !redisOK {
log.Log.Errorw("Error connecting to Redis during healthz check",
"error", redisErr)
for _, clients := range aggregatedClients {
for _, redis := range clients {
redisErr := redis.Ping(context.Background()).Err()
redisOK = (redisOK && (redisErr == nil))
if !redisOK {
log.Log.Errorw("Error connecting to Redis during healthz check",
"error", redisErr)
}
}
}

Expand Down

0 comments on commit 432af6a

Please sign in to comment.