From 4886399f89afdbe975c13bd8e749388569720328 Mon Sep 17 00:00:00 2001 From: Paul Hewlett Date: Tue, 18 Jun 2024 11:08:30 +0100 Subject: [PATCH] Receive Messages Serially New functional option to add a serialHandler that reads N messages at once and process them one at a time - serially. receiver := NewReceiver(, WithSerialHandler(h, 200)) where h is an instance of Handler. Defining a SerialHandler this way disables the normal parallel processing defined by WithHandlers(). It is highly recommended to specify RenewMessageLock if the number of received messages is high (say > 10). One has to be sure that processing the number of messages within 60s is achievable. AB#9378 --- azbus/disposition.go | 2 - azbus/receiver.go | 197 +++++++++++++++++++++++++++++++--- azbus/tracing.go | 36 ++++++- taskfiles/Taskfile_codeqa.yml | 9 +- 4 files changed, 225 insertions(+), 19 deletions(-) diff --git a/azbus/disposition.go b/azbus/disposition.go index 7f24e15..bef4265 100644 --- a/azbus/disposition.go +++ b/azbus/disposition.go @@ -51,8 +51,6 @@ func (r *Receiver) dispose(ctx context.Context, d Disposition, err error, msg *R } } -// NB: ALL disposition methods return nil so they can be used in return statements - // Abandon abandons message. This function is not used but is present for consistency. func (r *Receiver) abandon(ctx context.Context, err error, msg *ReceivedMessage) { ctx = context.WithoutCancel(ctx) diff --git a/azbus/receiver.go b/azbus/receiver.go index 1c23649..1bbc6cd 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -21,6 +21,13 @@ type Handler interface { Close() } +// BatchHandler processes ReceivedMessages. +type BatchHandler interface { + Handle(context.Context, []*ReceivedMessage) ([]Disposition, context.Context, error) + Open() error + Close() +} + const ( // RenewalTime is the how often we want to renew the message PEEK lock // @@ -82,11 +89,14 @@ type Receiver struct { Cfg ReceiverConfig - log Logger - mtx sync.Mutex - receiver *azservicebus.Receiver - options *azservicebus.ReceiverOptions - handlers []Handler + log Logger + mtx sync.Mutex + receiver *azservicebus.Receiver + options *azservicebus.ReceiverOptions + handlers []Handler + serialHandler Handler + batchHandler BatchHandler + numberOfReceivedMessages int // for serial or Batch Handler only } type ReceiverOption func(*Receiver) @@ -98,6 +108,22 @@ func WithHandlers(h ...Handler) ReceiverOption { } } +// WithBatchHandler +func WithBatchHandler(h BatchHandler, n int) ReceiverOption { + return func(r *Receiver) { + r.batchHandler = h + r.numberOfReceivedMessages = n + } +} + +// WithSerialHandler +func WithSerialHandler(h Handler, n int) ReceiverOption { + return func(r *Receiver) { + r.serialHandler = h + r.numberOfReceivedMessages = n + } +} + // WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less // than the peek lock timeout. For example: the default peek lock timeout is 60s and the default // renewal time is 50s. @@ -165,6 +191,34 @@ func (r *Receiver) String() string { } // processMessage disposes of messages and emits 2 log messages detailing how long processing took. +func (r *Receiver) processMessages(ctx context.Context, maxDuration time.Duration, messages []*ReceivedMessage, handler BatchHandler) { + count := len(messages) + now := time.Now() + dispositions, ctx, err := r.handleReceivedMessagesWithTracingContext(ctx, messages, handler) + for i, disp := range dispositions { + r.dispose(ctx, disp, err, messages[i]) + } + duration := time.Since(now) + + // Now we do have a tracing context we can use it for logging + log := r.log.FromContext(ctx) + defer log.Close() + + log.Debugf("Processing messages %d took %s", len(messages), duration) + + // This is safe because maxDuration is only defined if RenewMessageLock is false. + if !r.Cfg.RenewMessageLock && duration >= maxDuration { + log.Infof("WARNING: processing msg %d duration %v took more than %v seconds", count, duration, maxDuration) + log.Infof("WARNING: please either enable SERVICEBUS_RENEW_LOCK or reduce SERVICEBUS_INCOMING_MESSAGES") + log.Infof("WARNING: both can be found in the helm chart for each service.") + } + if errors.Is(err, ErrPeekLockTimeout) { + log.Infof("WARNING: processing msg %d duration %s returned error: %v", count, duration, err) + log.Infof("WARNING: please enable SERVICEBUS_RENEW_LOCK which can be found in the helm chart") + } +} + +// processMessage disposes of message and emits 2 log messages detailing how long processing took. func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration time.Duration, msg *ReceivedMessage, handler Handler) { now := time.Now() @@ -226,7 +280,7 @@ func (r *Receiver) renewMessageLock(ctx context.Context, count int, msg *Receive } } -func (r *Receiver) receiveMessages() error { +func (r *Receiver) receiveMessagesInParallel() error { numberOfReceivedMessages := len(r.handlers) r.log.Debugf( @@ -294,6 +348,98 @@ func (r *Receiver) receiveMessages() error { } } +func (r *Receiver) receiveMessagesInSerial() error { + + r.log.Debugf( + "NumberOfReceivedMessages %d, RenewMessageLock: %v", + r.numberOfReceivedMessages, + r.Cfg.RenewMessageLock, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + var err error + var messages []*ReceivedMessage + messages, err = r.receiver.ReceiveMessages(ctx, r.numberOfReceivedMessages, nil) + if err != nil { + azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err)) + r.log.Infof("%s", azerr) + return azerr + } + total := len(messages) + r.log.Debugf("total messages %d", total) + var renewCtx context.Context + var renewCancel context.CancelFunc + var maxDuration time.Duration + // XXX: if the number of Received Messages is large (>10) then RenewMessageLock is required. + if r.Cfg.RenewMessageLock { + func() { + renewCtx, renewCancel = context.WithCancel(ctx) + defer renewCancel() + for i, msg := range messages { + go r.renewMessageLock(renewCtx, i+1, msg) + } + for i, msg := range messages { + r.processMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler) + } + }() + } else { + for i, msg := range messages { + func() { + // we need a timeout per message if RenewMessageLock is disabled + renewCtx, renewCancel, maxDuration = r.setTimeout(ctx, r.log, msg) + defer renewCancel() + r.processMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler) + }() + } + } + } +} + +func (r *Receiver) receiveMessagesInBatch() error { + + r.log.Debugf( + "NumberOfReceivedMessages %d, RenewMessageLock: %v", + r.numberOfReceivedMessages, + r.Cfg.RenewMessageLock, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + var err error + var messages []*ReceivedMessage + messages, err = r.receiver.ReceiveMessages(ctx, r.numberOfReceivedMessages, nil) + if err != nil { + azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err)) + r.log.Infof("%s", azerr) + return azerr + } + total := len(messages) + r.log.Debugf("total messages %d", total) + var renewCtx context.Context + var renewCancel context.CancelFunc + var maxDuration time.Duration + // XXX: if the number of Received Messages is large (>10) then RenewMessageLock is required. + func() { + if r.Cfg.RenewMessageLock { + renewCtx, renewCancel = context.WithCancel(ctx) + defer renewCancel() + for i, msg := range messages { + go r.renewMessageLock(renewCtx, i+1, msg) + } + } else { + // we need a timeout per message if RenewMessageLock is disabled - use first message + // for all messages + renewCtx, renewCancel, maxDuration = r.setTimeout(ctx, r.log, messages[0]) + defer renewCancel() + } + r.processMessages(renewCtx, maxDuration, messages, r.batchHandler) + }() + } +} + // The following 2 methods satisfy the startup.Listener interface. func (r *Receiver) Listen() error { r.log.Debugf("listen") @@ -303,7 +449,13 @@ func (r *Receiver) Listen() error { r.log.Infof("%s", azerr) return azerr } - return r.receiveMessages() + if r.batchHandler != nil { + return r.receiveMessagesInBatch() + } + if r.serialHandler != nil { + return r.receiveMessagesInSerial() + } + return r.receiveMessagesInParallel() } func (r *Receiver) Shutdown(ctx context.Context) error { @@ -334,12 +486,25 @@ func (r *Receiver) open() error { r.log.Infof("%s", azerr) return azerr } - r.receiver = receiver - for j := range len(r.handlers) { - err = r.handlers[j].Open() + + switch { + case r.batchHandler != nil: + err = r.batchHandler.Open() + if err != nil { + return fmt.Errorf("failed to open batch handler: %w", err) + } + case r.serialHandler != nil: + err = r.serialHandler.Open() if err != nil { - return fmt.Errorf("failed to open handler: %w", err) + return fmt.Errorf("failed to open serial handler: %w", err) + } + default: + for j := range len(r.handlers) { + err = r.handlers[j].Open() + if err != nil { + return fmt.Errorf("failed to open handler: %w", err) + } } } return nil @@ -356,11 +521,19 @@ func (r *Receiver) close_() { azerr := fmt.Errorf("%s: Error closing receiver: %w", r, NewAzbusError(err)) r.log.Infof("%s", azerr) } - r.receiver = nil for j := range len(r.handlers) { r.handlers[j].Close() } r.handlers = []Handler{} + if r.serialHandler != nil { + r.serialHandler.Close() + r.serialHandler = nil + } + if r.batchHandler != nil { + r.batchHandler.Close() + r.batchHandler = nil + } + r.receiver = nil } } } diff --git a/azbus/tracing.go b/azbus/tracing.go index 87c572c..cfe69b3 100644 --- a/azbus/tracing.go +++ b/azbus/tracing.go @@ -8,7 +8,7 @@ import ( func (r *Receiver) handleReceivedMessageWithTracingContext(ctx context.Context, message *ReceivedMessage, handler Handler) (Disposition, context.Context, error) { // We don't have the tracing span info on the context yet, that is what this function will add - // we we log using the reciever logger + // we log using the receiver logger r.log.Debugf("ContextFromReceivedMessage(): ApplicationProperties %v", message.ApplicationProperties) var opts = []opentracing.StartSpanOption{} @@ -36,6 +36,40 @@ func (r *Receiver) handleReceivedMessageWithTracingContext(ctx context.Context, return handler.Handle(ctx, message) } +func (r *Receiver) handleReceivedMessagesWithTracingContext(ctx context.Context, messages []*ReceivedMessage, handler BatchHandler) ([]Disposition, context.Context, error) { + + var opts = []opentracing.StartSpanOption{} + carrier := opentracing.TextMapCarrier{} + // This just gets all the message Application Properties into a string map. That map is then passed into the + // open tracing constructor which extracts any bits it is interested in to use to setup the spans etc. + // It will ignore anything it doesn't care about. So the filtering of the map is done for us and + // we don't need to pre-filter it. + + // We don't have the tracing span info on the context yet, that is what this function will add + // we log using the receiver logger + // only use first message for tracing..... will fix later + msg := messages[0] + r.log.Debugf("ContextFromReceivedMessage(): ApplicationProperties %v", msg.ApplicationProperties) + + for k, v := range msg.ApplicationProperties { + // Tracing properties will be strings + value, ok := v.(string) + if ok { + carrier.Set(k, value) + } + } + spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, carrier) + if err != nil { + r.log.Infof("handleReceivedMessageWithTracingContext(): Unable to extract span context: %v", err) + } else { + opts = append(opts, opentracing.ChildOf(spanCtx)) + } + span := opentracing.StartSpan("handle message", opts...) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + return handler.Handle(ctx, messages) +} + func (s *Sender) updateSendingMesssageForSpan(ctx context.Context, message *OutMessage, span opentracing.Span) { log := s.log.FromContext(ctx) defer log.Close() diff --git a/taskfiles/Taskfile_codeqa.yml b/taskfiles/Taskfile_codeqa.yml index 89eb4b2..d4ed14a 100644 --- a/taskfiles/Taskfile_codeqa.yml +++ b/taskfiles/Taskfile_codeqa.yml @@ -27,17 +27,18 @@ tasks: desc: Quality assurance of code summary: "format sources (go fmt)" cmds: - - gofmt -l -s -w . + - | + go fix ./... + goimports -w . + gofmt -l -s -w . lint: desc: Quality assurance of code cmds: - | - golangci-lint --version go vet ./... - goimports {{.VERBOSE}} -w . + golangci-lint --version golangci-lint {{.VERBOSE}} run --timeout 10m ./... - gofmt -l -s -w . unit-tests: desc: "run unit tests"