Skip to content

Commit

Permalink
Merge pull request #49 from skrater/master
Browse files Browse the repository at this point in the history
customize publish timeout
  • Loading branch information
skrater committed May 14, 2019
2 parents f628e92 + 01988e7 commit f903bc0
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 21 deletions.
61 changes: 46 additions & 15 deletions amqp/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type message struct {

// producer holds a amqp connection and channel to publish messages to.
type producer struct {
m sync.Mutex
m sync.RWMutex
wg sync.WaitGroup
conn *connection
channel *amqplib.Channel
Expand All @@ -43,14 +43,16 @@ type producer struct {

// ProducerConfig to be used when creating a new producer.
type ProducerConfig struct {
publishInterval time.Duration
PublishInterval time.Duration
ConfirmTimeout time.Duration
}

// NewProducer returns a new AMQP Producer.
// Uses a default ProducerConfig with 2 second of publish interval.
func NewProducer(c messaging.Connection, exchange string) (*producer, error) {
return NewProducerConfig(c, exchange, ProducerConfig{
publishInterval: 2 * time.Second,
PublishInterval: 2 * time.Second,
ConfirmTimeout: 10 * time.Second,
})
}

Expand All @@ -59,7 +61,7 @@ func NewProducerConfig(c messaging.Connection, exchange string, config ProducerC
producer := &producer{
conn: c.(*connection),
config: config,
internalQueue: make(chan message, 2),
internalQueue: make(chan message),
exchangeName: exchange,
}

Expand Down Expand Up @@ -95,6 +97,15 @@ func (p *producer) Publish(action string, data []byte) {
func (p *producer) publishAmqMessage(queue string, msg amqplib.Publishing) {
p.wg.Add(1)

log.WithFields(log.Fields{
"action": queue,
"message_id": msg.MessageId,
"type": "goevents",
"sub_type": "producer",
"exchange": p.exchangeName,
"length": len(p.internalQueue),
}).Debug("Publishing message to internal queue.")

p.internalQueue <- message{
action: queue,
msg: msg,
Expand All @@ -119,6 +130,15 @@ func (p *producer) setClosed() {
p.closed = true
}

func (p *producer) notifyProducerClosed() {
p.m.RLock()
defer p.m.RUnlock()

for _, c := range p.closes {
c <- true
}
}

// Close the producer's internal queue.
func (p *producer) Close() {
p.setClosed()
Expand All @@ -129,9 +149,7 @@ func (p *producer) Close() {

p.channel.Close()

for _, c := range p.closes {
c <- true
}
p.notifyProducerClosed()
}

// changeChannel takes a new channel to the queue,
Expand Down Expand Up @@ -208,8 +226,8 @@ func (p *producer) setChannelReady(ready bool) {
}

func (p *producer) isChannelReady() bool {
p.m.Lock()
defer p.m.Unlock()
p.m.RLock()
defer p.m.RUnlock()

return p.channelReady
}
Expand Down Expand Up @@ -269,13 +287,16 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
return
}

log.WithFields(log.Fields{
logMessage := log.WithFields(log.Fields{
"action": queue,
"body": msg.Body,
"message_id": msg.MessageId,
"type": "goevents",
"sub_type": "producer",
"exchange": p.exchangeName,
})

logMessage.WithFields(log.Fields{
"body": msg.Body,
}).Debug("Publishing message to the exchange.")

defer func() {
Expand Down Expand Up @@ -304,6 +325,8 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
return
}

logMessage.Debug("Waiting message to be acked or timed out.")

select {
case confirm := <-p.notifyConfirm:
if confirm.Ack {
Expand All @@ -312,7 +335,7 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err

err = ErrNotAcked
return
case <-time.After(p.config.publishInterval):
case <-time.After(p.config.ConfirmTimeout):
err = ErrTimedout
return
}
Expand All @@ -321,8 +344,8 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
}

func (p *producer) isClosed() bool {
p.m.Lock()
defer p.m.Unlock()
p.m.RLock()
defer p.m.RUnlock()

return p.closed
}
Expand All @@ -345,7 +368,15 @@ func (p *producer) drainInternalQueue() {
"sub_type": "producer",
}).Error("Error publishing message to the exchange. Retrying...")

time.Sleep(p.config.publishInterval)
if err == ErrTimedout {
log.Warn("Closing producer channel due timeout wating msg confirmation")

// force close to run setupTopology
p.setChannelReady(false)
p.channel.Close()
}

time.Sleep(p.config.PublishInterval)
} else {
p.wg.Done()
retry = false
Expand Down
27 changes: 21 additions & 6 deletions examples/producer/amqp/producer.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package main

import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"fmt"
"time"

"github.com/eventials/goevents/amqp"
)
Expand Down Expand Up @@ -58,14 +58,29 @@ func main() {
}()

sigc := make(chan os.Signal, 1)

signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

fmt.Println("Waiting CTRL+C")

<-sigc
fmt.Println("Closing producerA")
producerA.Close()

fmt.Println("Closing producerB")
producerB.Close()
closed := make(chan bool)

go func() {
fmt.Println("Closing producerA")
producerA.Close()

fmt.Println("Closing producerB")
producerB.Close()

closed <- true
}()

select {
case <-closed:
fmt.Println("Successfully closed.")
case <-time.After(20 * time.Second):
fmt.Println("Close timeout.")
}
}

0 comments on commit f903bc0

Please sign in to comment.