Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba committed Jan 31, 2024
1 parent 36e4d98 commit 687ad12
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 8 deletions.
6 changes: 3 additions & 3 deletions src/SharpPulsar/Client/Internal/SocketClientActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ public SocketClientActor(IActorRef client, ClientConfigurationData conf, DnsEndP
{
try
{
while (_start)
{
_logger.Info("Running on thread: " + Thread.CurrentThread.ManagedThreadId);
_logger.Info("Running on thread: " + Thread.CurrentThread.ManagedThreadId);
while (_start)
{
var result = await _pipeReader.ReadAsync().ConfigureAwait(false);
var buffer = result.Buffer;
Expand Down
2 changes: 2 additions & 0 deletions src/SharpPulsar/Consumer/ConsumerActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,8 @@ private void Ready()
Sender.Tell(new AskResponse(Unwrap(ex)));
}
});
Receive<bool>(c => { });
Receive<string>(s => { });
}

private async ValueTask Acknowledge(IAcknowledge ack)
Expand Down
2 changes: 2 additions & 0 deletions src/SharpPulsar/Consumer/MultiTopicsConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,8 @@ internal void Ready()
Sender.Tell(new AskResponse(Unwrap(ex)));
}
});
Receive<bool>(c => { });
Receive<string>(s => { });
}

// Check topics are valid.
Expand Down
6 changes: 4 additions & 2 deletions src/SharpPulsar/Extension/ArrayDeque.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public T RemoveFirst()
{

if (Count == 0)
throw new InvalidOperationException("There are no items to remove.");
//throw new InvalidOperationException("There are no items to remove.");
return default(T);

_size--;

Expand All @@ -358,7 +359,8 @@ public T RemoveLast()
{

if (Count == 0)
throw new InvalidOperationException("There are no items to remove.");
//throw new InvalidOperationException("There are no items to remove.");
return default(T);

_size--;

Expand Down
9 changes: 6 additions & 3 deletions src/SharpPulsar/Tracker/UnAckedMessageTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public UnAckedMessageTracker(IActorRef consumer, IActorRef unack, ConsumerConfig
}
else
{
MessageIdPartitionMap = null;
TimePartitions = null;
MessageIdPartitionMap = new();
TimePartitions = new();
}
BecomeReady();

Expand Down Expand Up @@ -117,14 +117,16 @@ private void BecomeReady()
{
RedeliverMessages();
});
Receive<bool>(c => { });
Receive<string>(s => { });
}
internal virtual void RedeliverMessages()
{
MessageIds.Clear();
try
{
var headPartition = TimePartitions.RemoveFirst();
if (headPartition.Count > 0)
if (headPartition?.Count > 0)
{
_log.Warning($"[{Consumer.Path.Name}] {headPartition.Count} messages will be re-delivered");
headPartition.ForEach(async messageId =>
Expand Down Expand Up @@ -171,6 +173,7 @@ internal async ValueTask AddChunkedMessageIdsAndRemoveFromSequenceMap(IMessageId
Unack.Tell(new UnAckedChunckedMessageIdSequenceMapCmd(UnAckedCommand.Remove, new List<IMessageId> { messageId }));
}
}

internal virtual void Clear()
{
try
Expand Down

0 comments on commit 687ad12

Please sign in to comment.