Skip to content

Commit

Permalink
Receive Messages Serially
Browse files Browse the repository at this point in the history
New functional option to add a serialHandler that reads N messages at
once amd emits to a new type of handler that takes a list of messages
and simply returns an error.

    receiver := NewReceiver(, WithSerialHandler(h, 200))

where h is an instance of:

type SerialHandler interface {
        Handle(context.Context, []*ReceivedMessage) error
        Open() error
        Close()
}

Defining a SerialHandler this way disables the normal parallel
processing defined by WithHandlers()

The user must dispose of the messages in the handler using the
Dispose() method:

    r.Dispose(CompleteDisposition, err, msg)

Currently spans and RenewMessageLock are ineffective for a
Serialhandler.

AB#9378
  • Loading branch information
eccles committed Apr 18, 2024
1 parent 3a14087 commit 7243ce6
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 30 deletions.
22 changes: 11 additions & 11 deletions azbus/disposition.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,31 @@ const (
)

func (d Disposition) String() string {
switch {
case d == DeadletterDisposition:
switch d {
case DeadletterDisposition:
return "DeadLetter"
case d == AbandonDisposition:
case AbandonDisposition:
return "Abandon"
case d == RescheduleDisposition:
case RescheduleDisposition:
return "Reschedule"
case d == CompleteDisposition:
case CompleteDisposition:
return "Complete"
}
return fmt.Sprintf("Unknown%d", d)
}

func (r *Receiver) dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage) {
switch {
case d == DeadletterDisposition:
func (r *Receiver) Dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage) {
switch d {
case DeadletterDisposition:
r.deadLetter(ctx, err, msg)
return
case d == AbandonDisposition:
case AbandonDisposition:
r.abandon(ctx, err, msg)
return
case d == RescheduleDisposition:
case RescheduleDisposition:
r.reschedule(ctx, err, msg)
return
case d == CompleteDisposition:
case CompleteDisposition:
r.complete(ctx, err, msg)
return
}
Expand Down
100 changes: 81 additions & 19 deletions azbus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ type Handler interface {
Close()
}

// This handler must explicitly handles disposition...
type SerialHandler interface {
Handle(context.Context, []*ReceivedMessage) error
Open() error
Close()
}

const (
// RenewalTime is the how often we want to renew the message PEEK lock
//
Expand Down Expand Up @@ -82,11 +89,13 @@ type Receiver struct {

Cfg ReceiverConfig

log Logger
mtx sync.Mutex
receiver *azservicebus.Receiver
options *azservicebus.ReceiverOptions
handlers []Handler
log Logger
mtx sync.Mutex
receiver *azservicebus.Receiver
options *azservicebus.ReceiverOptions
handlers []Handler
serialHandler SerialHandler
numberOfReceivedMessages int // for serial Handler only
}

type ReceiverOption func(*Receiver)
Expand All @@ -98,6 +107,14 @@ func WithHandlers(h ...Handler) ReceiverOption {
}
}

// WithSerialHandler
func WithSerialHandler(h SerialHandler, n int) ReceiverOption {
return func(r *Receiver) {
r.serialHandler = h
r.numberOfReceivedMessages = n
}
}

// WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less
// than the peek lock timeout. For example: the default peek lock timeout is 60s and the default
// renewal time is 50s.
Expand Down Expand Up @@ -172,7 +189,7 @@ func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration ti

r.log.Debugf("Processing message %d", count)
disp, ctx, err := r.handleReceivedMessageWithTracingContext(ctx, msg, handler)
r.dispose(ctx, disp, err, msg)
r.Dispose(ctx, disp, err, msg)

duration := time.Since(now)

Expand Down Expand Up @@ -226,7 +243,7 @@ func (r *Receiver) renewMessageLock(ctx context.Context, count int, msg *Receive
}
}

func (r *Receiver) receiveMessages() error {
func (r *Receiver) receiveMessagesInParallel() error {

numberOfReceivedMessages := len(r.handlers)
r.log.Debugf(
Expand Down Expand Up @@ -294,6 +311,37 @@ func (r *Receiver) receiveMessages() error {
}
}

func (r *Receiver) receiveMessagesInSerial() error {

r.log.Debugf(
"NumberOfReceivedMessages %d, RenewMessageLock: %v",
r.numberOfReceivedMessages,
r.Cfg.RenewMessageLock,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
var err error
var messages []*ReceivedMessage
messages, err = r.receiver.ReceiveMessages(ctx, r.numberOfReceivedMessages, nil)
if err != nil {
azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err))
r.log.Infof("%s", azerr)
return azerr
}
total := len(messages)
r.log.Debugf("total messages %d", total)
// we should wrap this in a span ...
// XXXX: add spans using something like processMessage/r.handleReceivedMessageWithTracingContext
// XXXX: add logic for RenewMessageLock here perhaps
err = r.serialHandler.Handle(ctx, messages)
if err != nil {
return err
}
}
}

// The following 2 methods satisfy the startup.Listener interface.
func (r *Receiver) Listen() error {
r.log.Debugf("listen")
Expand All @@ -303,21 +351,18 @@ func (r *Receiver) Listen() error {
r.log.Infof("%s", azerr)
return azerr
}
return r.receiveMessages()
if r.serialHandler != nil {
return r.receiveMessagesInSerial()
}
return r.receiveMessagesInParallel()
}

func (r *Receiver) Shutdown(ctx context.Context) error {
r.close_()
return nil
}

func (r *Receiver) open() error {
var err error

if r.receiver != nil {
return nil
}

func (r *Receiver) openReceiver() error {
client, err := r.azClient.azClient()
if err != nil {
return err
Expand All @@ -336,10 +381,27 @@ func (r *Receiver) open() error {
}

r.receiver = receiver
for j := range len(r.handlers) {
err = r.handlers[j].Open()
if err != nil {
return fmt.Errorf("failed to open handler: %w", err)
return nil
}

func (r *Receiver) open() error {
var err error

if r.receiver != nil {
return nil
}

err = r.openReceiver()
if err != nil {
return err
}

if r.serialHandler == nil {
for j := range len(r.handlers) {
err = r.handlers[j].Open()
if err != nil {
return fmt.Errorf("failed to open handler: %w", err)
}
}
}
return nil
Expand Down

0 comments on commit 7243ce6

Please sign in to comment.