Skip to content

Commit f1a30a3

Browse files
authored
Merge pull request #101 from akolpakov-somehash/bugfix/inventory-consumer-processes-only-once
fix:consumer processes deliveries only once
2 parents e708a3c + a034ad1 commit f1a30a3

File tree

1 file changed

+60
-58
lines changed

1 file changed

+60
-58
lines changed

internal/pkg/rabbitmq/consumer.go

Lines changed: 60 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@ package rabbitmq
33
import (
44
"context"
55
"fmt"
6+
"reflect"
7+
"runtime"
8+
"strings"
9+
"time"
10+
611
"github.com/ahmetb/go-linq/v3"
712
"github.com/iancoleman/strcase"
813
jsoniter "github.com/json-iterator/go"
@@ -11,10 +16,6 @@ import (
1116
"github.com/streadway/amqp"
1217
"go.opentelemetry.io/otel/attribute"
1318
"go.opentelemetry.io/otel/trace"
14-
"reflect"
15-
"runtime"
16-
"strings"
17-
"time"
1819
)
1920

2021
//go:generate mockery --name IConsumer
@@ -104,60 +105,61 @@ func (c Consumer[T]) ConsumeMessage(msg interface{}, dependencies T) error {
104105
}
105106

106107
go func() {
107-
108-
select {
109-
case <-c.ctx.Done():
110-
defer func(ch *amqp.Channel) {
111-
err := ch.Close()
112-
if err != nil {
113-
c.log.Errorf("failed to close channel closed for for queue: %s", q.Name)
114-
}
115-
}(ch)
116-
c.log.Infof("channel closed for for queue: %s", q.Name)
117-
return
118-
119-
case delivery, ok := <-deliveries:
120-
{
121-
if !ok {
122-
c.log.Errorf("NOT OK deliveries channel closed for queue: %s", q.Name)
123-
return
124-
}
125-
126-
// Extract headers
127-
c.ctx = otel.ExtractAMQPHeaders(c.ctx, delivery.Headers)
128-
129-
err := c.handler(q.Name, delivery, dependencies)
130-
if err != nil {
131-
c.log.Error(err.Error())
132-
}
133-
134-
consumedMessages = append(consumedMessages, snakeTypeName)
135-
136-
_, span := c.jaegerTracer.Start(c.ctx, consumerHandlerName)
137-
138-
h, err := jsoniter.Marshal(delivery.Headers)
139-
140-
if err != nil {
141-
c.log.Errorf("Error in marshalling headers in consumer: %v", string(h))
142-
}
143-
144-
span.SetAttributes(attribute.Key("message-id").String(delivery.MessageId))
145-
span.SetAttributes(attribute.Key("correlation-id").String(delivery.CorrelationId))
146-
span.SetAttributes(attribute.Key("queue").String(q.Name))
147-
span.SetAttributes(attribute.Key("exchange").String(delivery.Exchange))
148-
span.SetAttributes(attribute.Key("routing-key").String(delivery.RoutingKey))
149-
span.SetAttributes(attribute.Key("ack").Bool(true))
150-
span.SetAttributes(attribute.Key("timestamp").String(delivery.Timestamp.String()))
151-
span.SetAttributes(attribute.Key("body").String(string(delivery.Body)))
152-
span.SetAttributes(attribute.Key("headers").String(string(h)))
153-
154-
// Cannot use defer inside a for loop
155-
time.Sleep(1 * time.Millisecond)
156-
span.End()
157-
158-
err = delivery.Ack(false)
159-
if err != nil {
160-
c.log.Errorf("We didn't get a ack for delivery: %v", string(delivery.Body))
108+
for {
109+
select {
110+
case <-c.ctx.Done():
111+
defer func(ch *amqp.Channel) {
112+
err := ch.Close()
113+
if err != nil {
114+
c.log.Errorf("failed to close channel closed for for queue: %s", q.Name)
115+
}
116+
}(ch)
117+
c.log.Infof("channel closed for for queue: %s", q.Name)
118+
return
119+
120+
case delivery, ok := <-deliveries:
121+
{
122+
if !ok {
123+
c.log.Errorf("NOT OK deliveries channel closed for queue: %s", q.Name)
124+
return
125+
}
126+
127+
// Extract headers
128+
c.ctx = otel.ExtractAMQPHeaders(c.ctx, delivery.Headers)
129+
130+
err := c.handler(q.Name, delivery, dependencies)
131+
if err != nil {
132+
c.log.Error(err.Error())
133+
}
134+
135+
consumedMessages = append(consumedMessages, snakeTypeName)
136+
137+
_, span := c.jaegerTracer.Start(c.ctx, consumerHandlerName)
138+
139+
h, err := jsoniter.Marshal(delivery.Headers)
140+
141+
if err != nil {
142+
c.log.Errorf("Error in marshalling headers in consumer: %v", string(h))
143+
}
144+
145+
span.SetAttributes(attribute.Key("message-id").String(delivery.MessageId))
146+
span.SetAttributes(attribute.Key("correlation-id").String(delivery.CorrelationId))
147+
span.SetAttributes(attribute.Key("queue").String(q.Name))
148+
span.SetAttributes(attribute.Key("exchange").String(delivery.Exchange))
149+
span.SetAttributes(attribute.Key("routing-key").String(delivery.RoutingKey))
150+
span.SetAttributes(attribute.Key("ack").Bool(true))
151+
span.SetAttributes(attribute.Key("timestamp").String(delivery.Timestamp.String()))
152+
span.SetAttributes(attribute.Key("body").String(string(delivery.Body)))
153+
span.SetAttributes(attribute.Key("headers").String(string(h)))
154+
155+
// Cannot use defer inside a for loop
156+
time.Sleep(1 * time.Millisecond)
157+
span.End()
158+
159+
err = delivery.Ack(false)
160+
if err != nil {
161+
c.log.Errorf("We didn't get a ack for delivery: %v", string(delivery.Body))
162+
}
161163
}
162164
}
163165
}

0 commit comments

Comments
 (0)