From 7243ce694c8e844b130b1afc0e1fb3da598c2281 Mon Sep 17 00:00:00 2001 From: Paul Hewlett Date: Thu, 18 Apr 2024 09:10:11 +0100 Subject: [PATCH] Receive Messages Serially New functional option to add a serialHandler that reads N messages at once amd emits to a new type of handler that takes a list of messages and simply returns an error. receiver := NewReceiver(, WithSerialHandler(h, 200)) where h is an instance of: type SerialHandler interface { Handle(context.Context, []*ReceivedMessage) error Open() error Close() } Defining a SerialHandler this way disables the normal parallel processing defined by WithHandlers() The user must dispose of the messages in the handler using the Dispose() method: r.Dispose(CompleteDisposition, err, msg) Currently spans and RenewMessageLock are ineffective for a Serialhandler. AB#9378 --- azbus/disposition.go | 22 +++++----- azbus/receiver.go | 100 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 92 insertions(+), 30 deletions(-) diff --git a/azbus/disposition.go b/azbus/disposition.go index 7f24e15..82c34ee 100644 --- a/azbus/disposition.go +++ b/azbus/disposition.go @@ -21,31 +21,31 @@ const ( ) func (d Disposition) String() string { - switch { - case d == DeadletterDisposition: + switch d { + case DeadletterDisposition: return "DeadLetter" - case d == AbandonDisposition: + case AbandonDisposition: return "Abandon" - case d == RescheduleDisposition: + case RescheduleDisposition: return "Reschedule" - case d == CompleteDisposition: + case CompleteDisposition: return "Complete" } return fmt.Sprintf("Unknown%d", d) } -func (r *Receiver) dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage) { - switch { - case d == DeadletterDisposition: +func (r *Receiver) Dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage) { + switch d { + case DeadletterDisposition: r.deadLetter(ctx, err, msg) return - case d == AbandonDisposition: + case AbandonDisposition: r.abandon(ctx, err, msg) return - case d == RescheduleDisposition: + case RescheduleDisposition: r.reschedule(ctx, err, msg) return - case d == CompleteDisposition: + case CompleteDisposition: r.complete(ctx, err, msg) return } diff --git a/azbus/receiver.go b/azbus/receiver.go index 1c23649..2b7654e 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -21,6 +21,13 @@ type Handler interface { Close() } +// This handler must explicitly handles disposition... +type SerialHandler interface { + Handle(context.Context, []*ReceivedMessage) error + Open() error + Close() +} + const ( // RenewalTime is the how often we want to renew the message PEEK lock // @@ -82,11 +89,13 @@ type Receiver struct { Cfg ReceiverConfig - log Logger - mtx sync.Mutex - receiver *azservicebus.Receiver - options *azservicebus.ReceiverOptions - handlers []Handler + log Logger + mtx sync.Mutex + receiver *azservicebus.Receiver + options *azservicebus.ReceiverOptions + handlers []Handler + serialHandler SerialHandler + numberOfReceivedMessages int // for serial Handler only } type ReceiverOption func(*Receiver) @@ -98,6 +107,14 @@ func WithHandlers(h ...Handler) ReceiverOption { } } +// WithSerialHandler +func WithSerialHandler(h SerialHandler, n int) ReceiverOption { + return func(r *Receiver) { + r.serialHandler = h + r.numberOfReceivedMessages = n + } +} + // WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less // than the peek lock timeout. For example: the default peek lock timeout is 60s and the default // renewal time is 50s. @@ -172,7 +189,7 @@ func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration ti r.log.Debugf("Processing message %d", count) disp, ctx, err := r.handleReceivedMessageWithTracingContext(ctx, msg, handler) - r.dispose(ctx, disp, err, msg) + r.Dispose(ctx, disp, err, msg) duration := time.Since(now) @@ -226,7 +243,7 @@ func (r *Receiver) renewMessageLock(ctx context.Context, count int, msg *Receive } } -func (r *Receiver) receiveMessages() error { +func (r *Receiver) receiveMessagesInParallel() error { numberOfReceivedMessages := len(r.handlers) r.log.Debugf( @@ -294,6 +311,37 @@ func (r *Receiver) receiveMessages() error { } } +func (r *Receiver) receiveMessagesInSerial() error { + + r.log.Debugf( + "NumberOfReceivedMessages %d, RenewMessageLock: %v", + r.numberOfReceivedMessages, + r.Cfg.RenewMessageLock, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + var err error + var messages []*ReceivedMessage + messages, err = r.receiver.ReceiveMessages(ctx, r.numberOfReceivedMessages, nil) + if err != nil { + azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err)) + r.log.Infof("%s", azerr) + return azerr + } + total := len(messages) + r.log.Debugf("total messages %d", total) + // we should wrap this in a span ... + // XXXX: add spans using something like processMessage/r.handleReceivedMessageWithTracingContext + // XXXX: add logic for RenewMessageLock here perhaps + err = r.serialHandler.Handle(ctx, messages) + if err != nil { + return err + } + } +} + // The following 2 methods satisfy the startup.Listener interface. func (r *Receiver) Listen() error { r.log.Debugf("listen") @@ -303,7 +351,10 @@ func (r *Receiver) Listen() error { r.log.Infof("%s", azerr) return azerr } - return r.receiveMessages() + if r.serialHandler != nil { + return r.receiveMessagesInSerial() + } + return r.receiveMessagesInParallel() } func (r *Receiver) Shutdown(ctx context.Context) error { @@ -311,13 +362,7 @@ func (r *Receiver) Shutdown(ctx context.Context) error { return nil } -func (r *Receiver) open() error { - var err error - - if r.receiver != nil { - return nil - } - +func (r *Receiver) openReceiver() error { client, err := r.azClient.azClient() if err != nil { return err @@ -336,10 +381,27 @@ func (r *Receiver) open() error { } r.receiver = receiver - for j := range len(r.handlers) { - err = r.handlers[j].Open() - if err != nil { - return fmt.Errorf("failed to open handler: %w", err) + return nil +} + +func (r *Receiver) open() error { + var err error + + if r.receiver != nil { + return nil + } + + err = r.openReceiver() + if err != nil { + return err + } + + if r.serialHandler == nil { + for j := range len(r.handlers) { + err = r.handlers[j].Open() + if err != nil { + return fmt.Errorf("failed to open handler: %w", err) + } } } return nil