Skip to content

Commit

Permalink
Merge pull request #48 from skrater/master
Browse files Browse the repository at this point in the history
Remove useless code
  • Loading branch information
skrater committed May 1, 2019
2 parents 1e5eeaa + bf9ff86 commit f628e92
Showing 1 changed file with 21 additions and 31 deletions.
52 changes: 21 additions & 31 deletions amqp/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type producer struct {
channel *amqplib.Channel
notifyConfirm chan amqplib.Confirmation
notifyChanClose chan *amqplib.Error
closeQueue chan bool
config ProducerConfig

internalQueue chan message
Expand Down Expand Up @@ -62,7 +61,6 @@ func NewProducerConfig(c messaging.Connection, exchange string, config ProducerC
config: config,
internalQueue: make(chan message, 2),
exchangeName: exchange,
closeQueue: make(chan bool),
}

err := producer.setupTopology()
Expand Down Expand Up @@ -127,10 +125,7 @@ func (p *producer) Close() {

p.wg.Wait()

p.closeQueue <- true

close(p.internalQueue)
close(p.closeQueue)

p.channel.Close()

Expand Down Expand Up @@ -333,32 +328,27 @@ func (p *producer) isClosed() bool {
}

func (p *producer) drainInternalQueue() {
for {
select {
case <-p.closeQueue:
return
case m := <-p.internalQueue:
retry := true

for retry {
// block until confirmation
err := p.publishMessage(m.msg, m.action)

if err != nil {
log.WithFields(log.Fields{
"action": m.action,
"body": m.msg.Body,
"message_id": m.msg.MessageId,
"error": err,
"type": "goevents",
"sub_type": "producer",
}).Error("Error publishing message to the exchange. Retrying...")

time.Sleep(p.config.publishInterval)
} else {
p.wg.Done()
retry = false
}
for m := range p.internalQueue {
retry := true

for retry {
// block until confirmation
err := p.publishMessage(m.msg, m.action)

if err != nil {
log.WithFields(log.Fields{
"action": m.action,
"body": m.msg.Body,
"message_id": m.msg.MessageId,
"error": err,
"type": "goevents",
"sub_type": "producer",
}).Error("Error publishing message to the exchange. Retrying...")

time.Sleep(p.config.publishInterval)
} else {
p.wg.Done()
retry = false
}
}
}
Expand Down

0 comments on commit f628e92

Please sign in to comment.