Skip to content

Commit

Permalink
Receive Messages Serially
Browse files Browse the repository at this point in the history
New functional option to add a serialHandler that reads N messages at
once and process them one at a time - serially.

    receiver := NewReceiver(, WithSerialHandler(h, 200))

where h is an instance of Handler.

Defining a SerialHandler this way disables the normal parallel
processing defined by WithHandlers().

It is highly recommended to specify RenewMessageLock if the number of
received messages is high (say > 10). One has to be sure that processing
the number of messages within 60s is achievable.

AB#9378
  • Loading branch information
eccles committed Jun 28, 2024
1 parent dd2783b commit 6e077a0
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 22 deletions.
2 changes: 0 additions & 2 deletions azbus/disposition.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ func (r *Receiver) dispose(ctx context.Context, d Disposition, err error, msg *R
}
}

// NB: ALL disposition methods return nil so they can be used in return statements

// Abandon abandons message. This function is not used but is present for consistency.
func (r *Receiver) abandon(ctx context.Context, err error, msg *ReceivedMessage) {
ctx = context.WithoutCancel(ctx)
Expand Down
203 changes: 188 additions & 15 deletions azbus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ type Handler interface {
Close()
}

// BatchHandler processes ReceivedMessages.
type BatchHandler interface {
Handle(context.Context, []*ReceivedMessage) ([]Disposition, context.Context, error)
Open() error
Close()
}

const (
// RenewalTime is the how often we want to renew the message PEEK lock
//
Expand Down Expand Up @@ -82,11 +89,14 @@ type Receiver struct {

Cfg ReceiverConfig

log Logger
mtx sync.Mutex
receiver *azservicebus.Receiver
options *azservicebus.ReceiverOptions
handlers []Handler
log Logger
mtx sync.Mutex
receiver *azservicebus.Receiver
options *azservicebus.ReceiverOptions
handlers []Handler
serialHandler Handler
batchHandler BatchHandler
numberOfReceivedMessages int // for serial or Batch Handler only
}

type ReceiverOption func(*Receiver)
Expand All @@ -98,6 +108,22 @@ func WithHandlers(h ...Handler) ReceiverOption {
}
}

// WithBatchHandler
func WithBatchHandler(h BatchHandler, n int) ReceiverOption {
return func(r *Receiver) {
r.batchHandler = h
r.numberOfReceivedMessages = n
}
}

// WithSerialHandler
func WithSerialHandler(h Handler, n int) ReceiverOption {
return func(r *Receiver) {
r.serialHandler = h
r.numberOfReceivedMessages = n
}
}

// WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less
// than the peek lock timeout. For example: the default peek lock timeout is 60s and the default
// renewal time is 50s.
Expand Down Expand Up @@ -164,8 +190,36 @@ func (r *Receiver) String() string {
return fmt.Sprintf("%s", r.Cfg.TopicOrQueueName)
}

// processMessage disposes of messages and emits 2 log messages detailing how long processing took.
func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration time.Duration, msg *ReceivedMessage, handler Handler) {
// processBatchMessages disposes of messages and emits 2 log messages detailing how long processing took.
func (r *Receiver) processBatchMessages(ctx context.Context, maxDuration time.Duration, messages []*ReceivedMessage, handler BatchHandler) {
count := len(messages)
now := time.Now()
dispositions, ctx, err := r.handleReceivedMessagesWithTracingContext(ctx, messages, handler)
for i, disp := range dispositions {
r.dispose(ctx, disp, err, messages[i])
}
duration := time.Since(now)

// Now we do have a tracing context we can use it for logging
log := r.log.FromContext(ctx)
defer log.Close()

log.Debugf("Processing messages %d took %s", len(messages), duration)

// This is safe because maxDuration is only defined if RenewMessageLock is false.
if !r.Cfg.RenewMessageLock && duration >= maxDuration {
log.Infof("WARNING: processing msg %d duration %v took more than %v seconds", count, duration, maxDuration)
log.Infof("WARNING: please either enable SERVICEBUS_RENEW_LOCK or reduce SERVICEBUS_INCOMING_MESSAGES")
log.Infof("WARNING: both can be found in the helm chart for each service.")
}
if errors.Is(err, ErrPeekLockTimeout) {
log.Infof("WARNING: processing msg %d duration %s returned error: %v", count, duration, err)
log.Infof("WARNING: please enable SERVICEBUS_RENEW_LOCK which can be found in the helm chart")
}
}

// processSingleMessage disposes of message and emits 2 log messages detailing how long processing took.
func (r *Receiver) processSingleMessage(ctx context.Context, count int, maxDuration time.Duration, msg *ReceivedMessage, handler Handler) {
now := time.Now()

// the context wont have a trace span on it yet, so stick with the reciever logger instance
Expand Down Expand Up @@ -226,7 +280,7 @@ func (r *Receiver) renewMessageLock(ctx context.Context, count int, msg *Receive
}
}

func (r *Receiver) receiveMessages() error {
func (r *Receiver) receiveMessagesInParallel() error {

numberOfReceivedMessages := len(r.handlers)
r.log.Debugf(
Expand Down Expand Up @@ -259,7 +313,7 @@ func (r *Receiver) receiveMessages() error {
// we need a timeout if RenewMessageLock is disabled
renewCtx, renewCancel, maxDuration = rr.setTimeout(rctx, rr.log, msg)
}
rr.processMessage(renewCtx, ii+1, maxDuration, msg, rr.handlers[ii])
rr.processSingleMessage(renewCtx, ii+1, maxDuration, msg, rr.handlers[ii])
renewCancel()
wg.Done()
}
Expand Down Expand Up @@ -294,6 +348,98 @@ func (r *Receiver) receiveMessages() error {
}
}

func (r *Receiver) receiveMessagesInSerial() error {

r.log.Debugf(
"NumberOfReceivedMessages %d, RenewMessageLock: %v",
r.numberOfReceivedMessages,
r.Cfg.RenewMessageLock,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
var err error
var messages []*ReceivedMessage
messages, err = r.receiver.ReceiveMessages(ctx, r.numberOfReceivedMessages, nil)
if err != nil {
azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err))
r.log.Infof("%s", azerr)
return azerr
}
total := len(messages)
r.log.Debugf("total messages %d", total)
var renewCtx context.Context
var renewCancel context.CancelFunc
var maxDuration time.Duration
// XXX: if the number of Received Messages is large (>10) then RenewMessageLock is required.
if r.Cfg.RenewMessageLock {
func() {
renewCtx, renewCancel = context.WithCancel(ctx)
defer renewCancel()
for i, msg := range messages {
go r.renewMessageLock(renewCtx, i+1, msg)
}
for i, msg := range messages {
r.processSingleMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler)
}
}()
} else {
for i, msg := range messages {
func() {
// we need a timeout per message if RenewMessageLock is disabled
renewCtx, renewCancel, maxDuration = r.setTimeout(ctx, r.log, msg)
defer renewCancel()
r.processSingleMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler)
}()
}
}
}
}

func (r *Receiver) receiveMessagesInBatch() error {

r.log.Debugf(
"NumberOfReceivedMessages %d, RenewMessageLock: %v",
r.numberOfReceivedMessages,
r.Cfg.RenewMessageLock,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
var err error
var messages []*ReceivedMessage
messages, err = r.receiver.ReceiveMessages(ctx, r.numberOfReceivedMessages, nil)
if err != nil {
azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err))
r.log.Infof("%s", azerr)
return azerr
}
total := len(messages)
r.log.Debugf("total messages %d", total)
var renewCtx context.Context
var renewCancel context.CancelFunc
var maxDuration time.Duration
// XXX: if the number of Received Messages is large (>10) then RenewMessageLock is required.
func() {
if r.Cfg.RenewMessageLock {
renewCtx, renewCancel = context.WithCancel(ctx)
defer renewCancel()
for i, msg := range messages {
go r.renewMessageLock(renewCtx, i+1, msg)
}
} else {
// we need a timeout per message if RenewMessageLock is disabled - use first message
// for all messages
renewCtx, renewCancel, maxDuration = r.setTimeout(ctx, r.log, messages[0])
defer renewCancel()
}
r.processBatchMessages(renewCtx, maxDuration, messages, r.batchHandler)
}()
}
}

// The following 2 methods satisfy the startup.Listener interface.
func (r *Receiver) Listen() error {
r.log.Debugf("listen")
Expand All @@ -303,7 +449,13 @@ func (r *Receiver) Listen() error {
r.log.Infof("%s", azerr)
return azerr
}
return r.receiveMessages()
if r.batchHandler != nil {
return r.receiveMessagesInBatch()
}
if r.serialHandler != nil {
return r.receiveMessagesInSerial()
}
return r.receiveMessagesInParallel()
}

func (r *Receiver) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -334,12 +486,25 @@ func (r *Receiver) open() error {
r.log.Infof("%s", azerr)
return azerr
}

r.receiver = receiver
for j := range len(r.handlers) {
err = r.handlers[j].Open()

switch {
case r.batchHandler != nil:
err = r.batchHandler.Open()
if err != nil {
return fmt.Errorf("failed to open batch handler: %w", err)
}
case r.serialHandler != nil:
err = r.serialHandler.Open()
if err != nil {
return fmt.Errorf("failed to open handler: %w", err)
return fmt.Errorf("failed to open serial handler: %w", err)
}
default:
for j := range len(r.handlers) {
err = r.handlers[j].Open()
if err != nil {
return fmt.Errorf("failed to open handler: %w", err)
}
}
}
return nil
Expand All @@ -356,11 +521,19 @@ func (r *Receiver) close_() {
azerr := fmt.Errorf("%s: Error closing receiver: %w", r, NewAzbusError(err))
r.log.Infof("%s", azerr)
}
r.receiver = nil
for j := range len(r.handlers) {
r.handlers[j].Close()
}
r.handlers = []Handler{}
if r.serialHandler != nil {
r.serialHandler.Close()
r.serialHandler = nil
}
if r.batchHandler != nil {
r.batchHandler.Close()
r.batchHandler = nil
}
r.receiver = nil
}
}
}
36 changes: 35 additions & 1 deletion azbus/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func (r *Receiver) handleReceivedMessageWithTracingContext(ctx context.Context, message *ReceivedMessage, handler Handler) (Disposition, context.Context, error) {
// We don't have the tracing span info on the context yet, that is what this function will add
// we we log using the reciever logger
// we log using the receiver logger
r.log.Debugf("ContextFromReceivedMessage(): ApplicationProperties %v", message.ApplicationProperties)

var opts = []opentracing.StartSpanOption{}
Expand Down Expand Up @@ -36,6 +36,40 @@ func (r *Receiver) handleReceivedMessageWithTracingContext(ctx context.Context,
return handler.Handle(ctx, message)
}

func (r *Receiver) handleReceivedMessagesWithTracingContext(ctx context.Context, messages []*ReceivedMessage, handler BatchHandler) ([]Disposition, context.Context, error) {

var opts = []opentracing.StartSpanOption{}
carrier := opentracing.TextMapCarrier{}
// This just gets all the message Application Properties into a string map. That map is then passed into the
// open tracing constructor which extracts any bits it is interested in to use to setup the spans etc.
// It will ignore anything it doesn't care about. So the filtering of the map is done for us and
// we don't need to pre-filter it.

// We don't have the tracing span info on the context yet, that is what this function will add
// we log using the receiver logger
// only use first message for tracing..... will fix later
msg := messages[0]
r.log.Debugf("ContextFromReceivedMessage(): ApplicationProperties %v", msg.ApplicationProperties)

for k, v := range msg.ApplicationProperties {
// Tracing properties will be strings
value, ok := v.(string)
if ok {
carrier.Set(k, value)
}
}
spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, carrier)
if err != nil {
r.log.Infof("handleReceivedMessageWithTracingContext(): Unable to extract span context: %v", err)
} else {
opts = append(opts, opentracing.ChildOf(spanCtx))
}
span := opentracing.StartSpan("handle message", opts...)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return handler.Handle(ctx, messages)
}

func (s *Sender) updateSendingMesssageForSpan(ctx context.Context, message *OutMessage, span opentracing.Span) {
log := s.log.FromContext(ctx)
defer log.Close()
Expand Down
9 changes: 5 additions & 4 deletions taskfiles/Taskfile_codeqa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ tasks:
desc: Quality assurance of code
summary: "format sources (go fmt)"
cmds:
- gofmt -l -s -w .
- |
go fix ./...
goimports -w .
gofmt -l -s -w .
lint:
desc: Quality assurance of code
cmds:
- |
golangci-lint --version
go vet ./...
goimports {{.VERBOSE}} -w .
golangci-lint --version
golangci-lint {{.VERBOSE}} run --timeout 10m ./...
gofmt -l -s -w .
unit-tests:
desc: "run unit tests"
Expand Down

0 comments on commit 6e077a0

Please sign in to comment.