diff --git a/src/SharpPulsar/Client/Internal/SocketClientActor.cs b/src/SharpPulsar/Client/Internal/SocketClientActor.cs index 713f8f0b..dc095fdb 100644 --- a/src/SharpPulsar/Client/Internal/SocketClientActor.cs +++ b/src/SharpPulsar/Client/Internal/SocketClientActor.cs @@ -185,10 +185,10 @@ public SocketClientActor(IActorRef client, ClientConfigurationData conf, DnsEndP { try { - while (_start) - { - _logger.Info("Running on thread: " + Thread.CurrentThread.ManagedThreadId); + _logger.Info("Running on thread: " + Thread.CurrentThread.ManagedThreadId); + while (_start) + { var result = await _pipeReader.ReadAsync().ConfigureAwait(false); var buffer = result.Buffer; diff --git a/src/SharpPulsar/Consumer/ConsumerActor.cs b/src/SharpPulsar/Consumer/ConsumerActor.cs index 5b1016a0..308ccd67 100644 --- a/src/SharpPulsar/Consumer/ConsumerActor.cs +++ b/src/SharpPulsar/Consumer/ConsumerActor.cs @@ -949,6 +949,8 @@ private void Ready() Sender.Tell(new AskResponse(Unwrap(ex))); } }); + Receive(c => { }); + Receive(s => { }); } private async ValueTask Acknowledge(IAcknowledge ack) diff --git a/src/SharpPulsar/Consumer/MultiTopicsConsumer.cs b/src/SharpPulsar/Consumer/MultiTopicsConsumer.cs index 4db24f36..65603359 100644 --- a/src/SharpPulsar/Consumer/MultiTopicsConsumer.cs +++ b/src/SharpPulsar/Consumer/MultiTopicsConsumer.cs @@ -550,6 +550,8 @@ internal void Ready() Sender.Tell(new AskResponse(Unwrap(ex))); } }); + Receive(c => { }); + Receive(s => { }); } // Check topics are valid. diff --git a/src/SharpPulsar/Extension/ArrayDeque.cs b/src/SharpPulsar/Extension/ArrayDeque.cs index 8bf61a7d..f85dea11 100644 --- a/src/SharpPulsar/Extension/ArrayDeque.cs +++ b/src/SharpPulsar/Extension/ArrayDeque.cs @@ -342,7 +342,8 @@ public T RemoveFirst() { if (Count == 0) - throw new InvalidOperationException("There are no items to remove."); + //throw new InvalidOperationException("There are no items to remove."); + return default(T); _size--; @@ -358,7 +359,8 @@ public T RemoveLast() { if (Count == 0) - throw new InvalidOperationException("There are no items to remove."); + //throw new InvalidOperationException("There are no items to remove."); + return default(T); _size--; diff --git a/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs b/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs index 4031b9bf..e7213d6e 100644 --- a/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs +++ b/src/SharpPulsar/Tracker/UnAckedMessageTracker.cs @@ -69,8 +69,8 @@ public UnAckedMessageTracker(IActorRef consumer, IActorRef unack, ConsumerConfig } else { - MessageIdPartitionMap = null; - TimePartitions = null; + MessageIdPartitionMap = new(); + TimePartitions = new(); } BecomeReady(); @@ -117,6 +117,8 @@ private void BecomeReady() { RedeliverMessages(); }); + Receive(c => { }); + Receive(s => { }); } internal virtual void RedeliverMessages() { @@ -124,7 +126,7 @@ internal virtual void RedeliverMessages() try { var headPartition = TimePartitions.RemoveFirst(); - if (headPartition.Count > 0) + if (headPartition?.Count > 0) { _log.Warning($"[{Consumer.Path.Name}] {headPartition.Count} messages will be re-delivered"); headPartition.ForEach(async messageId => @@ -171,6 +173,7 @@ internal async ValueTask AddChunkedMessageIdsAndRemoveFromSequenceMap(IMessageId Unack.Tell(new UnAckedChunckedMessageIdSequenceMapCmd(UnAckedCommand.Remove, new List { messageId })); } } + internal virtual void Clear() { try