Skip to content

Commit

Permalink
Merge branch 'devel' into hot-reload-config
Browse files Browse the repository at this point in the history
  • Loading branch information
resoluteCoder committed Sep 6, 2024
2 parents 6a53a6c + 07f101e commit 85a353d
Show file tree
Hide file tree
Showing 8 changed files with 492 additions and 53 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ linters-settings:
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/remotecommand"
- "github.com/quic-go/quic-go"
- "github.com/quic-go/quic-go/logging"

issues:
# Dont commit the following line.
Expand Down
13 changes: 7 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ require (
github.com/vishvananda/netlink v1.3.0
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.4.0
golang.org/x/net v0.28.0
golang.org/x/sys v0.24.0
golang.org/x/net v0.29.0
golang.org/x/sys v0.25.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.29.3
k8s.io/apimachinery v0.29.3
Expand All @@ -35,6 +35,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down Expand Up @@ -72,12 +73,12 @@ require (
github.com/vishvananda/netns v0.0.4 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/term v0.24.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/appengine v1.6.8 // indirect
Expand Down
175 changes: 163 additions & 12 deletions go.sum

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions pkg/backends/null_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package backends

import "testing"

func TestNullBackendCfgGetAddr(t *testing.T) {
type fields struct {
Local bool
}
tests := []struct {
name string
fields fields
want string
}{
{
name: "Positive",
fields: fields{
Local: true,
},
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &NullBackendCfg{
Local: tt.fields.Local,
}
if got := cfg.GetAddr(); got != tt.want {
t.Errorf("NullBackendCfg.GetAddr() = %v, want %v", got, tt.want)
}
})
}
}
25 changes: 25 additions & 0 deletions pkg/netceptor/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"time"

"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/logging"
"github.com/quic-go/quic-go/qlog"
)

// MaxIdleTimeoutForQuicConnections for quic connections. The default is 30 which we have replicated here.
Expand Down Expand Up @@ -90,6 +92,7 @@ func (s *Netceptor) listen(ctx context.Context, service string, tlscfg *tls.Conf
s.Logger.Debug("%s added service %s to listener registry", s.nodeID, service)
s.listenerRegistry[service] = pc
cfg := &quic.Config{
Tracer: s.tracer,
HandshakeIdleTimeout: 15 * time.Second,
MaxIdleTimeout: MaxIdleTimeoutForQuicConnections,
Allow0RTT: true,
Expand Down Expand Up @@ -134,6 +137,27 @@ func (s *Netceptor) listen(ctx context.Context, service string, tlscfg *tls.Conf
return li, nil
}

func (s *Netceptor) tracer(ctx context.Context, p logging.Perspective, connID quic.ConnectionID) *logging.ConnectionTracer {
qlogPath := os.Getenv("QLOGDIR")
if qlogPath != "" {
role := "server"
if p == logging.PerspectiveClient {
role = "client"
}
filename := fmt.Sprintf("log_%x_%s.qlog", connID, role)
f, err := os.Create(qlogPath + filename)
if err != nil {
s.Logger.Debug("failed to create qlog file at path: %s", qlogPath)

return nil
}

return qlog.NewConnectionTracer(f, p, connID)
} else {
return nil
}
}

// Listen returns a stream listener compatible with Go's net.Listener.
// If service is blank, generates and uses an ephemeral service name.
func (s *Netceptor) Listen(service string, tlscfg *tls.Config) (*Listener, error) {
Expand Down Expand Up @@ -296,6 +320,7 @@ func (s *Netceptor) DialContext(ctx context.Context, node string, service string
}
rAddr := s.NewAddr(node, service)
cfg := &quic.Config{
Tracer: s.tracer,
HandshakeIdleTimeout: 15 * time.Second,
MaxIdleTimeout: MaxIdleTimeoutForQuicConnections,
Allow0RTT: true,
Expand Down
25 changes: 25 additions & 0 deletions pkg/netceptor/netceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/ansible/receptor/tests/utils"
"github.com/prep/socketpair"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/logging"
)

type logWriter struct {
Expand Down Expand Up @@ -869,3 +871,26 @@ func TestTooShortSetMaxConnectionIdleTime(t *testing.T) {
t.Fatal("this should have failed out, as we're passing in an invalid time object that violates the logic in SetMaxConnectionIdleTime")
}
}

func TestTracerReturnsNewConnectionTracer(t *testing.T) {
t.Parallel()
s := New(context.Background(), "node1")
p := logging.PerspectiveClient
os.Setenv("QLOGDIR", "/tmp/")
trace := s.tracer(s.context, p, quic.ConnectionID{})
if trace == nil {
t.Fatalf("tracer should return a newConnectionTracer when QLOGDIR environment variable is defined but got %v", trace)
}
os.Unsetenv("QLOGDIR")
}

func TestTracerDoesNotReturnsNewConnectionTracer(t *testing.T) {
t.Parallel()
s := New(context.Background(), "node1")
p := logging.PerspectiveClient
os.Unsetenv("QLOGDIR")
trace := s.tracer(s.context, p, quic.ConnectionID{})
if trace != nil {
t.Fatalf("tracer should return nil when QLOGDIR environment variable is not defined but got %v", trace)
}
}
70 changes: 35 additions & 35 deletions pkg/utils/job_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,92 +18,92 @@ import (
// A single JobContext can only run one job at a time. If JobContext.NewJob() is called while a job
// is already running, that job will be cancelled and waited on prior to starting the new job.
type JobContext struct {
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
running bool
runningLock *sync.Mutex
Ctx context.Context
JcCancel context.CancelFunc
Wg *sync.WaitGroup
JcRunning bool
RunningLock *sync.Mutex
}

// NewJob starts a new job with a defined number of workers. If a prior job is running, it is cancelled.
func (mw *JobContext) NewJob(ctx context.Context, workers int, returnIfRunning bool) bool {
if mw.runningLock == nil {
mw.runningLock = &sync.Mutex{}
if mw.RunningLock == nil {
mw.RunningLock = &sync.Mutex{}
}

mw.runningLock.Lock()
for mw.running {
mw.RunningLock.Lock()
for mw.JcRunning {
if returnIfRunning {
mw.runningLock.Unlock()
mw.RunningLock.Unlock()

return false
}
mw.cancel()
mw.runningLock.Unlock()
mw.JcCancel()
mw.RunningLock.Unlock()
mw.Wait()
mw.runningLock.Lock()
mw.RunningLock.Lock()
}

mw.running = true
mw.ctx, mw.cancel = context.WithCancel(ctx)
mw.wg = &sync.WaitGroup{}
mw.wg.Add(workers)
mw.runningLock.Unlock()
mw.JcRunning = true
mw.Ctx, mw.JcCancel = context.WithCancel(ctx)
mw.Wg = &sync.WaitGroup{}
mw.Wg.Add(workers)
mw.RunningLock.Unlock()
go func() {
mw.wg.Wait()
mw.runningLock.Lock()
mw.running = false
mw.cancel()
mw.runningLock.Unlock()
mw.Wg.Wait()
mw.RunningLock.Lock()
mw.JcRunning = false
mw.JcCancel()
mw.RunningLock.Unlock()
}()

return true
}

// WorkerDone signals that a worker is finished, like sync.WaitGroup.Done().
func (mw *JobContext) WorkerDone() {
mw.wg.Done()
mw.Wg.Done()
}

// Wait waits for the current job to complete, like sync.WaitGroup.Wait().
// If no job has been started, always just returns.
func (mw *JobContext) Wait() {
if mw.wg != nil {
mw.wg.Wait()
if mw.Wg != nil {
mw.Wg.Wait()
}
}

// Done implements Context.Done().
func (mw *JobContext) Done() <-chan struct{} {
return mw.ctx.Done()
return mw.Ctx.Done()
}

// Err implements Context.Err().
func (mw *JobContext) Err() error {
return mw.ctx.Err()
return mw.Ctx.Err()
}

// Deadline implements Context.Deadline().
func (mw *JobContext) Deadline() (time time.Time, ok bool) {
return mw.ctx.Deadline()
return mw.Ctx.Deadline()
}

// Value implements Context.Value().
func (mw *JobContext) Value(key interface{}) interface{} {
return mw.ctx.Value(key)
return mw.Ctx.Value(key)
}

// Cancel cancels the JobContext's context. If no job has been started, this does nothing.
func (mw *JobContext) Cancel() {
if mw.cancel != nil {
mw.cancel()
if mw.JcCancel != nil {
mw.JcCancel()
}
}

// Running returns true if a job is currently running.
func (mw *JobContext) Running() bool {
mw.runningLock.Lock()
defer mw.runningLock.Unlock()
mw.RunningLock.Lock()
defer mw.RunningLock.Unlock()

return mw.running
return mw.JcRunning
}
Loading

0 comments on commit 85a353d

Please sign in to comment.