Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OplogToRedis: Run writer routines in parallel for each redis client. #75

Merged
merged 2 commits into from
May 3, 2024

Conversation

alex-goodisman
Copy link
Contributor

@alex-goodisman alex-goodisman commented Apr 30, 2024

in main, count through each redis client and make a separate chan and coroutine for clientsCount * ordinalCount different entries. make some changes to aggregation in order to accomodate this.

in the tailer, get the [][]chan and for each incoming publication, first index by the shard ordinal (hash of db name), then iterate through the remaining chans (should be one for each redis client) and publish to all of them.

at present, the publisher stream still accepts an array, but now it's only getting one (each).

@@ -116,7 +116,7 @@ func init() {

// Tail begins tailing the oplog. It doesn't return unless it receives a message
// on the stop channel, in which case it wraps up its work and then returns.
func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool) {
func (tailer *Tailer) Tail(out [][]chan<- *redispub.Publication, stop <-chan bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this 2d array of channels is getting complicated-enough that the semantics aren't obvious. could we document it, or use type aliases, or something similar that would make it easier to understand?

main.go Outdated

waitGroup.Add(1)

// We crate two goroutines:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol that was there, i'll do a spelling pass

redisPubsAggregationEntry := make([]chan<- *redispub.Publication, clientsSize)
stopRedisPubsEntry := make([]chan bool, clientsSize)

for j := 0; j < clientsSize; j++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general comment, this has grown somewhat hard to follow because of the nested arrays and loops, etc. it would be nice to refactor it to be a bit more abstract when we're back on firmer ground.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think type aliases will help with this somewhat, but will also add some comments.

Copy link
Member

@torywheelwright torywheelwright left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the understandability changes; they helped a lot

@aranair
Copy link
Member

aranair commented May 2, 2024

(small reminder to bump version before merging)

@alex-goodisman alex-goodisman marked this pull request as ready for review May 2, 2024 19:11
@alex-goodisman alex-goodisman merged commit 136216a into master May 3, 2024
11 checks passed
@delete-merged-branch delete-merged-branch bot deleted the alex.sentinel-parallel branch May 3, 2024 19:51
alex-goodisman added a commit that referenced this pull request May 3, 2024
alex-goodisman added a commit that referenced this pull request May 3, 2024
Revert #75
Also bump to 3.5 for all the parallelism & orchestration changes
alex-goodisman added a commit that referenced this pull request May 13, 2024
Redo #75
in main, count through each redis client and make a separate chan and coroutine for clientsCount * ordinalCount different entries. make some changes to aggregation in order to accomodate this.

in the tailer, get the [][]chan and for each incoming publication, first index by the shard ordinal (hash of db name), then iterate through the remaining chans (should be one for each redis client) and publish to all of them. 

at present, the publisher stream still accepts an array, but now it's only getting one (each).
[fixed](66e79df) array index out of bounds issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants