Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
KusoKaihatsuSha committed May 13, 2022
1 parent 8653c2c commit 8d3c941
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions tasker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ package main

import (
"context"
"strings"
"sync"
"strings"
"time"
"time"
)

type CtxKey string

// Tasker work like workers puller
// Tasker work like workers pool
type Tasker struct {
Hands chan struct{} //same worker
Things chan Thing
Wg *sync.WaitGroup
Branch BranchContext
M *sync.RWMutex
Hands chan struct{} //same worker
Things chan Thing
Wg *sync.WaitGroup
Branch BranchContext
M *sync.RWMutex
}

// BranchContext help work with context inside Tasker
Expand Down Expand Up @@ -61,21 +61,21 @@ func AddCtx(o *Tasker, key string, val any) {
func GetCtx[T any](o *Tasker, key string, message *Message) T {
var N T
var notfound = make(chan struct{})
timeout := false
timeout := false
go func() {
for {
if o.Branch.Context.Value(CtxKey(strings.TrimSpace(message.UUID+key))) != nil || timeout {
if o.Branch.Context.Value(CtxKey(strings.TrimSpace(message.UUID+key))) != nil || timeout {
notfound <- struct{}{}
defer close(notfound)
return
}
}
}()
select {
case <-time.After(10*time.Minute):
timeout = true
case <-notfound:
}
select {
case <-time.After(10 * time.Minute):
timeout = true
case <-notfound:
}
o.M.RLock()
m := o.Branch.Context.Value(CtxKey(strings.TrimSpace(message.UUID + key)))
o.M.RUnlock()
Expand All @@ -101,7 +101,7 @@ func (o *Tasker) Init(cpuCapability, taskCapability int) *Tasker {
}

// Add(any, func(*Tasker, Thing, *Message), *Message) *Thing
// add tasks to puller
// add tasks
func (o *Tasker) Add(val any, fn func(*Tasker, Thing, *Message), mm *Message) *Thing {
select {
case <-o.Branch.Context.Done():
Expand Down

0 comments on commit 8d3c941

Please sign in to comment.