From fffddf1a6d4e1fafd792d7e4b93e000434fb2a2b Mon Sep 17 00:00:00 2001 From: Sergey Egorov Date: Tue, 14 May 2024 11:34:37 +0300 Subject: [PATCH] Fix connReaper goroutine may leak --- go.mod | 2 + go.sum | 2 + reaper/reaper_test.go | 101 ++++++++++++++++++++++++++++++++++++++++++ socket.go | 17 ++++++- 4 files changed, 120 insertions(+), 2 deletions(-) create mode 100644 reaper/reaper_test.go diff --git a/go.mod b/go.mod index 6ec42da..4a1fcca 100644 --- a/go.mod +++ b/go.mod @@ -7,3 +7,5 @@ require ( golang.org/x/sync v0.3.0 golang.org/x/text v0.13.0 ) + +require go.uber.org/goleak v1.3.0 // indirect diff --git a/go.sum b/go.sum index ab43c90..235848a 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U= github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= diff --git a/reaper/reaper_test.go b/reaper/reaper_test.go new file mode 100644 index 0000000..b4a6d82 --- /dev/null +++ b/reaper/reaper_test.go @@ -0,0 +1,101 @@ +// Copyright 2024 The go-zeromq Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package reaper + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/go-zeromq/zmq4" + "go.uber.org/goleak" +) + +// TestReaper does multiple rapid Dial/Close to check that connection reaper goroutines are not leaking. +// In is in own package as goleak detects also threads from values created during init(). +func TestReaper(t *testing.T) { + defer goleak.VerifyNone(t) + + mu := &sync.Mutex{} + errs := []error{} + + ctx, cancel := context.WithCancel(context.Background()) + rep := zmq4.NewRep(ctx) + ep := "ipc://@test.rep.socket" + err := rep.Listen(ep) + if err != nil { + t.Fatal(err) + } + + maxClients := 100 + maxMsgs := 100 + wgClients := &sync.WaitGroup{} + wgServer := &sync.WaitGroup{} + client := func() { + defer wgClients.Done() + for n := 0; n < maxMsgs; n++ { + func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + req := zmq4.NewReq(ctx) + err := req.Dial(ep) + if err != nil { + mu.Lock() + defer mu.Unlock() + errs = append(errs, err) + return + } + + err = req.Close() + if err != nil { + mu.Lock() + defer mu.Unlock() + errs = append(errs, err) + } + }() + } + } + server := func() { + defer wgServer.Done() + pong := zmq4.NewMsgString("pong") + for { + msg, err := rep.Recv() + if errors.Is(err, context.Canceled) { + break + } + if err != nil { + break + } + if string(msg.Frames[0]) != "ping" { + mu.Lock() + defer mu.Unlock() + errs = append(errs, errors.New("unexpected message")) + return + } + err = rep.Send(pong) + if err != nil { + mu.Lock() + defer mu.Unlock() + errs = append(errs, err) + } + } + } + + wgServer.Add(1) + go server() + wgClients.Add(maxClients) + for n := 0; n < maxClients; n++ { + go client() + } + wgClients.Wait() + cancel() + wgServer.Wait() + rep.Close() + for _, err := range errs { + t.Fatal(err) + } +} diff --git a/socket.go b/socket.go index 7a41c71..ae1e451 100644 --- a/socket.go +++ b/socket.go @@ -120,8 +120,12 @@ func (sck *socket) topics() []string { // Close closes the open Socket func (sck *socket) Close() error { + // The Lock around Signal ensures the connReaper is running + // and is in sck.reaperCond.Wait() + sck.reaperCond.L.Lock() sck.cancel() sck.reaperCond.Signal() + sck.reaperCond.L.Unlock() if sck.listener != nil { defer sck.listener.Close() @@ -193,7 +197,11 @@ func (sck *socket) Listen(endpoint string) error { sck.listener = l go sck.accept() - go sck.connReaper() + if !sck.reaperStarted { + sck.reaperCond.L.Lock() + go sck.connReaper() + sck.reaperStarted = true + } return nil } @@ -268,6 +276,7 @@ connect: } if !sck.reaperStarted { + sck.reaperCond.L.Lock() go sck.connReaper() sck.reaperStarted = true } @@ -372,7 +381,11 @@ func (sck *socket) Timeout() time.Duration { } func (sck *socket) connReaper() { - sck.reaperCond.L.Lock() + // We are not locking here sck.reaperCond.L.Lock() + // as it should be locked prior starting connReaper as goroutine + // That would ensure that sck.reaperCond.Signal() + // would be delivered only when reaper goroutine is really started + // and is in sck.reaperCond.Wait() defer sck.reaperCond.L.Unlock() for {