Skip to content

Commit

Permalink
feat: 合并冲突--story=115998324
Browse files Browse the repository at this point in the history
  • Loading branch information
q15971095971 committed Sep 23, 2024
2 parents b4516ea + 336ac63 commit 7ca51ca
Show file tree
Hide file tree
Showing 582 changed files with 83,148 additions and 28,969 deletions.
58 changes: 44 additions & 14 deletions bcs-common/common/task/brokers/etcd/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,32 @@ func NewDelivery(ctx context.Context, client *clientv3.Client, key string, node
node: node,
}

if err := d.assign(key, node); err != nil {
if err := d.tryAssign(key, node); err != nil {
return nil, err
}

return d, nil
}

func (d *deliver) assign(key string, node string) error {
assignKey := fmt.Sprintf("%s/assign", key)
func (d *deliver) tryAssign(key string, node string) error {
ctx, cancel := context.WithTimeout(d.ctx, time.Second*5)
defer cancel()

grantResp, err := d.client.Grant(ctx, 30)
grantResp, err := d.client.Grant(ctx, 60)
if err != nil {
return err
}

keyExist := clientv3.Compare(clientv3.CreateRevision(key), ">", 0)
assignNotExist := clientv3.Compare(clientv3.CreateRevision(assignKey), "=", 0)
putReq := clientv3.OpPut(assignKey, node, clientv3.WithLease(grantResp.ID))
getReq := clientv3.OpGet(key)
runningKey := fmt.Sprintf("%s/%s", runningTaskPrefix, key)
pendingKey := fmt.Sprintf("%s/%s", pendingTaskPrefix, key)

keyExist := clientv3.Compare(clientv3.CreateRevision(pendingKey), ">", 0)
assignNotExist := clientv3.Compare(clientv3.CreateRevision(runningKey), "=", 0)

value := fmt.Sprintf("%s-%s", node, time.Now().Format(time.RFC3339))
putReq := clientv3.OpPut(runningKey, value, clientv3.WithLease(grantResp.ID))
getReq := clientv3.OpGet(pendingKey)

resp, err := d.client.Txn(ctx).If(keyExist, assignNotExist).Then(putReq, getReq).Commit()
if err != nil {
return err
Expand All @@ -98,11 +103,29 @@ func (d *deliver) assign(key string, node string) error {
}

aliveCtx, aliveCancel := context.WithCancel(d.ctx)
if _, err = d.client.KeepAlive(aliveCtx, grantResp.ID); err != nil {
keepRespCh, err := d.client.KeepAlive(aliveCtx, grantResp.ID)
if err != nil {
aliveCancel()
return err
}

go func() {
defer aliveCancel()

for {
select {
case <-d.ctx.Done():
return
case <-aliveCtx.Done():
return
case _, ok := <-keepRespCh:
if !ok {
return
}
}
}
}()

d.aliveCancel = aliveCancel
d.signature = signature
d.value = kv.Value
Expand All @@ -113,10 +136,17 @@ func (d *deliver) assign(key string, node string) error {
func (d *deliver) Ack() {
defer d.aliveCancel()

ctx, cancel := context.WithTimeout(d.ctx, time.Second*2)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

_, err := d.client.Delete(ctx, d.key, clientv3.WithPrefix())
pendingKey := fmt.Sprintf("%s/%s", pendingTaskPrefix, d.key)
_, err := d.client.Delete(ctx, pendingKey)
if err != nil {
log.ERROR.Printf("ack task %s err: %s", d.key, err)
}

runningKey := fmt.Sprintf("%s/%s", runningTaskPrefix, d.key)
_, err = d.client.Delete(ctx, runningKey)
if err != nil {
log.ERROR.Printf("ack task %s err: %s", d.key, err)
}
Expand All @@ -126,11 +156,11 @@ func (d *deliver) Ack() {
func (d *deliver) Nack() {
defer d.aliveCancel()

assignKey := fmt.Sprintf("%s/assign", d.key)
ctx, cancel := context.WithTimeout(d.ctx, time.Second*2)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

_, err := d.client.Delete(ctx, assignKey)
runningKey := fmt.Sprintf("%s/%s", runningTaskPrefix, d.key)
_, err := d.client.Delete(ctx, runningKey)
if err != nil {
log.ERROR.Printf("nack task %s err: %s", d.key, err)
}
Expand Down
Loading

0 comments on commit 7ca51ca

Please sign in to comment.