Skip to content

Commit

Permalink
Improvement and fixes, Added TestOrderedRedelivery
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba committed Sep 12, 2024
1 parent 3b8092d commit 3347350
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 50 deletions.
210 changes: 180 additions & 30 deletions src/SharpPulsar.Test/ConsumerRedeliveryTest.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using DotNet.Testcontainers.Builders;
using SharpPulsar.Builder;
using SharpPulsar.Interfaces;
using SharpPulsar.Protocol.Proto;
using SharpPulsar.Test.Fixture;
using SharpPulsar.TestContainer;
using Xunit;
Expand Down Expand Up @@ -43,59 +47,183 @@ public ConsumerRedeliveryTest(ITestOutputHelper output, PulsarFixture fixture)
_system = fixture.System;
}

[Fact]
public async Task TestUnAckMessageRedeliveryWithReceive()
/// <summary>
/// It verifies that redelivered messages are sorted based on the ledger-ids.
/// <pre>
/// 1. client publishes 100 messages across 50 ledgers
/// 2. broker delivers 100 messages to consumer
/// 3. consumer ack every alternative message and doesn't ack 50 messages
/// 4. broker sorts replay messages based on ledger and redelivers messages ledger by ledger
/// </pre> </summary>
/// <exception cref="Exception"> </exception>
///
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task TestOrderedRedelivery(bool ackReceiptEnabled)
{
var topic = $"persistent://public/default/async-unack-redelivery-{Guid.NewGuid()}";
var topic = "persistent://public/default/redelivery-" + DateTimeHelper.CurrentUnixTimeMillis();

//broker.conf
//conf.setManagedLedgerMaxEntriesPerLedger(2);
//conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);

var pBuilder = new ProducerConfigBuilder<byte[]>();
pBuilder.Topic(topic);
var pBuilder = new ProducerConfigBuilder<byte[]>()
.Topic(topic)
.ProducerName("my-producer-name");
var producer = await _client.NewProducerAsync(pBuilder);

const int messageCount = 10;
var builder = new ConsumerConfigBuilder<byte[]>()
.Topic(topic)
.SubscriptionName("s1")
.IsAckReceiptEnabled(ackReceiptEnabled)
.SubscriptionType(CommandSubscribe.SubType.Shared);
var consumer1 = await _client.NewConsumerAsync(builder);

for (var i = 0; i < messageCount; i++)
const int totalMsgs = 100;

for (var i = 0; i < totalMsgs; i++)
{
var receipt = await producer.SendAsync(Encoding.UTF8.GetBytes("my-message-" + i));
_output.WriteLine(JsonSerializer.Serialize(receipt, new JsonSerializerOptions { WriteIndented = true }));
var message = "my-message-" + i;
var receipt = await producer.SendAsync(Encoding.UTF8.GetBytes(message));
_output.WriteLine(JsonSerializer.Serialize(receipt, new JsonSerializerOptions { WriteIndented = true }));
}

var builder = new ConsumerConfigBuilder<byte[]>();
builder.Topic(topic);
builder.SubscriptionName("sub-TestUnAckMessageRedeliveryWithReceive");
builder.AckTimeout(TimeSpan.FromMilliseconds(5000));
builder.ForceTopicCreation(true);
builder.AcknowledgmentGroupTime(TimeSpan.Zero);
builder.SubscriptionType(Protocol.Proto.CommandSubscribe.SubType.Shared);

var consumedCount = 0;
var messageIds = new HashSet<IMessageId>();
for (var i = 0; i < totalMsgs; i++)
{
var message = (Message<byte[]>)await consumer1.ReceiveAsync(TimeSpan.FromMicroseconds(5000));
if (message != null && (consumedCount % 2) == 0)
{
consumer1.Acknowledge(message);
}
else
{
messageIds.Add(message.MessageId);
}
var receivedMessage = Encoding.UTF8.GetString(message.Data);
_output.WriteLine($"Received message: [{receivedMessage}]");

consumedCount += 1;
}
Assert.Equal(totalMsgs, consumedCount);

// redeliver all unack messages
await consumer1.RedeliverUnacknowledgedMessagesAsync(messageIds);
_output.WriteLine($"MessageIds: [{messageIds.Count}]");
//await Task.Delay(1000);
MessageId lastMsgId = null;
var count = 1;
for (var i = 0; i < totalMsgs / 2; i++)
{
var message = (Message<byte[]>)await consumer1.ReceiveAsync(TimeSpan.FromMicroseconds(5000));
if (message != null)
{
var msgId = (MessageId)message.MessageId;
if (lastMsgId != null)
{
Assert.True(lastMsgId.LedgerId <= msgId.LedgerId, "lastMsgId: " + lastMsgId + " -- msgId: " + msgId);
}

lastMsgId = msgId;
_output.WriteLine($"{count++} MessageId: [{lastMsgId}]");
}

}

// close consumer so, this consumer's unack messages will be redelivered to new consumer
consumer1.Close();

/* var consumer2 = await _client.NewConsumerAsync(builder);
await Task.Delay(TimeSpan.FromSeconds(10));
count = 0;
lastMsgId = null;
for (var i = 0; i < totalMsgs / 2; i++)
{
var message = (Message<byte[]>)await consumer2.ReceiveAsync(TimeSpan.FromMicroseconds(5000));
if (message != null)
{
var msgId = (MessageId)message.MessageId;
if (lastMsgId != null)
{
Assert.True(lastMsgId.LedgerId <= msgId.LedgerId);
}
lastMsgId = msgId;
_output.WriteLine($"{count++} RedeliverUnacknowledgedMessage MessageId: [{lastMsgId}]");
}
}*/
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task TestUnAckMessageRedeliveryWithReceive(bool ackReceiptEnabled)
{
var topic = $"persistent://public/default/async-unack-redelivery-{Guid.NewGuid()}";

var builder = new ConsumerConfigBuilder<byte[]>()
.Topic(topic)
.SubscriptionName("sub-TestUnAckMessageRedeliveryWithReceive")
.AckTimeout(TimeSpan.FromMilliseconds(3000))
.IsAckReceiptEnabled(ackReceiptEnabled)
.EnableBatchIndexAcknowledgment(ackReceiptEnabled);

var consumer = await _client.NewConsumerAsync(builder);

var pBuilder = new ProducerConfigBuilder<byte[]>()
.Topic(topic)
.EnableBatching(true)
.BatchingMaxMessages(5)
.BatchingMaxPublishDelay(TimeSpan.FromSeconds(1));
var producer = await _client.NewProducerAsync(pBuilder);

const int messages = 10;

for (var i = 0; i < messages; i++)
{
await producer.SendAsync(Encoding.UTF8.GetBytes("my-message-" + i));
//_output.WriteLine(JsonSerializer.Serialize(receipt, new JsonSerializerOptions { WriteIndented = true }));
}
var messageIds = new HashSet<IMessageId>();
var messageReceived = 0;
await Task.Delay(TimeSpan.FromMilliseconds(1000));
for (var i = 0; i < messageCount - 2; ++i)
for (var i = 0; i < messages; ++i)
{
var m = (Message<byte[]>)await consumer.ReceiveAsync(TimeSpan.FromMicroseconds(5000));

_output.WriteLine($"BrokerEntryMetadata[timestamp:{m.BrokerEntryMetadata.BrokerTimestamp} index: {m.BrokerEntryMetadata?.Index.ToString()}");
var receivedMessage = Encoding.UTF8.GetString(m.Data);
_output.WriteLine($"Received message: [{receivedMessage}]");
messageReceived++;
if (m != null)
{
_output.WriteLine($"BrokerEntryMetadata[timestamp:{m.BrokerEntryMetadata.BrokerTimestamp} index: {m.BrokerEntryMetadata?.Index.ToString()}");
var receivedMessage = Encoding.UTF8.GetString(m.Data);
_output.WriteLine($"Received message: [{receivedMessage}]");
messageReceived++;
messageIds.Add(m.MessageId);
}
}

Assert.True(messageReceived > 0);
await Task.Delay(TimeSpan.FromSeconds(1));
for (var i = 0; i < messageCount - 5; i++)
Assert.Equal(10, messageReceived);
// redeliver all unack messages
await consumer.RedeliverUnacknowledgedMessagesAsync(messageIds);
//Assert.True(messageReceived > 0);
await Task.Delay(TimeSpan.FromSeconds(5));
for (var i = 0; i < messages; i++)
{
var m = (Message<byte[]>)await consumer.ReceiveAsync(TimeSpan.FromMicroseconds(5000));
if(m != null)
if (m != null)
{
var receivedMessage = Encoding.UTF8.GetString(m.Data);
_output.WriteLine($"Received message: [{receivedMessage}]");
_output.WriteLine($"{messageReceived} Received message: [{receivedMessage}]");
await consumer.AcknowledgeAsync(m);
messageReceived++;
}

}
Assert.Equal(20, messageReceived);

await producer.CloseAsync();
await consumer.CloseAsync();
Assert.True(messageReceived > 5);
//Assert.True(messageReceived > 5);
}
public async Task InitializeAsync()
{
Expand All @@ -114,6 +242,28 @@ public async Task DisposeAsync()
{
await _client.ShutdownAsync();
}
private static object[][] AckReceiptEnabled()
{
return
[
[true],
[false]
];
}


private object[][] batchedMessageAck()
{
// When batch index ack is disabled (by default), only after all single messages were sent would the pending
// ACK be added into the ACK tracker.
return
[
[3, 5, CommandAck.AckType.Individual],
[5, 5, CommandAck.AckType.Individual],
[3, 5, CommandAck.AckType.Cumulative],
[5, 5, CommandAck.AckType.Cumulative]
];
}

}

Expand Down
20 changes: 10 additions & 10 deletions src/SharpPulsar/Client/ClientCnx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ internal sealed class ClientCnx : ReceiveActor, IWithUnboundedStash, IWithTimers
private State _state;
private readonly IActorRef _self;
private IActorRef _sendMessage;
private IActorRef _sender;
//private IActorRef _sender;

private readonly Dictionary<long, (ReadOnlySequence<byte> Message, IActorRef Requester)> _pendingRequests = new Dictionary<long, (ReadOnlySequence<byte> Message, IActorRef Requester)>();
// LookupRequests that waiting in client side.
Expand Down Expand Up @@ -151,7 +151,7 @@ private void Receives()

Receive<Payload>(p =>
{
_sender = Sender;
//_sender = Sender;
switch (p.Command)
{
case "NewLookup":
Expand Down Expand Up @@ -239,7 +239,7 @@ private void Receives()
});
Receive<SendRequestWithId>(r =>
{
_sender = Sender;
//_sender = Sender;
SendRequestWithId(r.Message, r.RequestId, r.NeedsResponse);
});
Receive<RemoteEndpointProtocolVersion>(r =>
Expand Down Expand Up @@ -673,7 +673,7 @@ private void HandleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEndO
// caller of this method needs to be protected under pendingLookupRequestSemaphore
private void AddPendingLookupRequests(long requestId, ReadOnlySequence<byte> message)
{
_pendingRequests.Add(requestId, (message, _sender));
_pendingRequests.Add(requestId, (message, Sender));
}

private bool RemovePendingLookupRequest(long requestId, out IActorRef actor)
Expand Down Expand Up @@ -749,7 +749,7 @@ private void HandleError(CommandError error)
}
else
{
_sender?.Tell(response);
Sender?.Tell(response);
_log.Warning($"Received unknown request id from server: {error.RequestId}");
}
}
Expand Down Expand Up @@ -796,7 +796,7 @@ private void NewLookup(ReadOnlySequence<byte> request, long requestId)
}
catch (Exception ex)
{
_sender.Tell(PulsarClientException.Unwrap(ex));
Sender.Tell(PulsarClientException.Unwrap(ex));
}
}

Expand Down Expand Up @@ -868,14 +868,14 @@ private bool SendRequestAndHandleTimeout(ReadOnlySequence<byte> requestMessage,
try
{
_sendMessage.Tell(new SendMessage(requestMessage));
_pendingRequests.Add(requestId, (requestMessage, _sender));
_pendingRequests.Add(requestId, (requestMessage, Sender));

_requestTimeoutQueue.Enqueue(new RequestTime(DateTimeHelper.CurrentUnixTimeMillis(), requestId, requestType));
return true;
}
catch (Exception ex)
{
_sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex)));
Sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex)));
}
return false;
}
Expand All @@ -885,11 +885,11 @@ private void SendRequest(ReadOnlySequence<byte> requestMessage, long requestId)
{
_sendMessage.Tell(new SendMessage(requestMessage));
if (requestId >= 0)
_pendingRequests.Add(requestId, (requestMessage, _sender));
_pendingRequests.Add(requestId, (requestMessage, Sender));
}
catch (Exception ex)
{
_sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex)));
Sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex)));
}

}
Expand Down
9 changes: 9 additions & 0 deletions src/SharpPulsar/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,15 @@ public async ValueTask RedeliverUnacknowledgedMessagesAsync()
throw ask.Exception;
}

public void RedeliverUnacknowledgedMessages(ISet<IMessageId> messageIds)
=> RedeliverUnacknowledgedMessagesAsync(messageIds).ConfigureAwait(false);
public async ValueTask RedeliverUnacknowledgedMessagesAsync(ISet<IMessageId> messageIds)
{
var ask = await _consumerActor.Ask<AskResponse>(new RedeliverUnacknowledgedMessageIds(messageIds))
.ConfigureAwait(false);
if (ask.Failed)
throw ask.Exception;
}
public void Resume()
{
_consumerActor.Tell(Messages.Consumer.Resume.Instance);
Expand Down
2 changes: 1 addition & 1 deletion src/SharpPulsar/Consumer/ConsumerActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ private void Ready()
_replyTo.Tell(new AskResponse(Unwrap(ex)));
}
});
Receive<RedeliverUnacknowledgedMessages>(m =>
Receive<RedeliverUnacknowledgedMessages>(_ =>
{
RedeliverUnacknowledgedMessages();
Sender.Tell(new AskResponse());
Expand Down
6 changes: 1 addition & 5 deletions src/SharpPulsar/Exceptions/PulsarClientException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,15 +1049,11 @@ public static PulsarClientException Unwrap(Exception t)
{
return (PulsarClientException)t;
}
else if (t is Exception)
{
throw (RuntimeException)t;
}

// Unwrap the exception to keep the same exception type but a stack trace that includes the application calling
// site
Exception cause = t.InnerException;
string msg = cause.Message;
string msg = cause != null? cause.Message: t.Message;
PulsarClientException newException;
if (cause is TimeoutException)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@

using System;
using System.ComponentModel.DataAnnotations;
using Org.BouncyCastle.Crypto.Modes.Gcm;
using SharpPulsar.Admin.v2;
using SharpPulsar.Exceptions;

namespace SharpPulsar.Messages.Consumer
Expand Down

0 comments on commit 3347350

Please sign in to comment.