Skip to content

Commit

Permalink
Merge branch 'pulsar_v3.0.0' of https://github.com/eaba/SharpPulsar i…
Browse files Browse the repository at this point in the history
…nto pulsar_v3.0.0
  • Loading branch information
eaba committed Jul 3, 2023
2 parents 34697b0 + 62e9074 commit aa37dec
Show file tree
Hide file tree
Showing 24 changed files with 719 additions and 537 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## vNext

## [2.13.0] / 2023-05-20
- Improved `ClientCnx`
- `SocketClient`: changed to `SocketClientActor`
- Added `SendMessageActor`
- Removed `IObservable`, `NewThreadScheduler.Default.Schedule`

## [2.12.1] / 2023-05-10
- Fixed `GetStats*`

Expand Down Expand Up @@ -36,7 +42,8 @@
- First release

[vNext]: https://github.com/eaba/SharpPulsar/compare/2.13.0...HEAD
[2.13.0]: https://github.com/eaba/SharpPulsar/compare/2.12.0...2.13.0
[2.13.0]: https://github.com/eaba/SharpPulsar/compare/2.12.1...2.13.0
[2.12.1]: https://github.com/eaba/SharpPulsar/compare/2.12.0...2.12.1
[2.12.0]: https://github.com/eaba/SharpPulsar/compare/2.11.2...2.12.0
[2.11.2]: https://github.com/eaba/SharpPulsar/compare/2.11.1...2.11.2
[2.11.1]: https://github.com/eaba/SharpPulsar/compare/2.11.0...2.11.1
Expand Down
2 changes: 1 addition & 1 deletion build/Build.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ protected override void OnBuildInitialized()

Target RunChangelog => _ => _
.Requires(() => IsLocalBuild)
.OnlyWhenDynamic(() => GitRepository.Branch.Equals("main", StringComparison.OrdinalIgnoreCase))
//.OnlyWhenDynamic(() => GitRepository.Branch.Equals("main", StringComparison.OrdinalIgnoreCase))
.Executes(() =>
{
FinalizeChangelog(ChangelogFile, GitVersion.MajorMinorPatch, GitRepository);
Expand Down
8 changes: 4 additions & 4 deletions src/SharpPulsar/Batch/BatchMessageContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public override bool Add(Message<T> msg, SendCallback<T> callback)
// some properties are common amongst the different messages in the batch, hence we just pick it up from
// the first message
_messageMetadata.SequenceId = (ulong)msg.SequenceId;
_lowestSequenceId = new Commands().InitBatchMessageMetadata(_messageMetadata);
_lowestSequenceId = Commands.InitBatchMessageMetadata(_messageMetadata);

_firstCallback = callback;
_batchedMessageMetadataAndPayload = new List<byte>(Math.Min(MaxBatchSize, Container.MaxMessageSize));
Expand Down Expand Up @@ -122,7 +122,7 @@ private byte[] CompressedBatchMetadataAndPayload
{
var msg = _messages[i];
var msgMetadata = msg.Metadata.OriginalMetadata;
Serializer.SerializeWithLengthPrefix(stream, new Commands().SingleMessageMetadat(msgMetadata, (int)msg.Data.Length, msg.SequenceId), PrefixStyle.Fixed32BigEndian);
Serializer.SerializeWithLengthPrefix(stream, Commands.SingleMessageMetadat(msgMetadata, (int)msg.Data.Length, msg.SequenceId), PrefixStyle.Fixed32BigEndian);
messageWriter.Write(msg.Data.ToArray());
}
var batchedMessageMetadataAndPayload = stream.ToArray();
Expand Down Expand Up @@ -226,11 +226,11 @@ private ReadOnlySequence<byte> SendMessage(long producerId, long sequenceId, int
{
if (messageId is MessageId)
{
return new Commands().NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, ((MessageId)messageId).LedgerId, ((MessageId)messageId).EntryId, msgMetadata, compressedPayload);
return Commands.NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, ((MessageId)messageId).LedgerId, ((MessageId)messageId).EntryId, msgMetadata, compressedPayload);
}
else
{
return new Commands().NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, -1, -1, msgMetadata, compressedPayload);
return Commands.NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, -1, -1, msgMetadata, compressedPayload);
}
}
public override bool HasSameSchema(Message<T> msg)
Expand Down
8 changes: 4 additions & 4 deletions src/SharpPulsar/Batch/BatchMessageKeyBasedContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ private ReadOnlySequence<byte> SendMessage(long producerId, long sequenceId, int
{
if (messageId is MessageId)
{
return new Commands().NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, ((MessageId)messageId).LedgerId, ((MessageId)messageId).EntryId, msgMetadata, compressedPayload);
return Commands.NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, ((MessageId)messageId).LedgerId, ((MessageId)messageId).EntryId, msgMetadata, compressedPayload);
}
else
{
return new Commands().NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, -1, -1, msgMetadata, compressedPayload);
return Commands.NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, -1, -1, msgMetadata, compressedPayload);
}
}
public override IList<ProducerActor<T>.OpSendMsg<T>> CreateOpSendMsgs()
Expand Down Expand Up @@ -242,7 +242,7 @@ public virtual byte[] CompressedBatchMetadataAndPayload
foreach (var msg in Messages)
{
var msgMetadata = msg.Metadata.OriginalMetadata;
Serializer.SerializeWithLengthPrefix(stream, new Commands().SingleMessageMetadat(msgMetadata, (int)msg.Data.Length, msg.SequenceId), PrefixStyle.Fixed32BigEndian);
Serializer.SerializeWithLengthPrefix(stream, Commands.SingleMessageMetadat(msgMetadata, (int)msg.Data.Length, msg.SequenceId), PrefixStyle.Fixed32BigEndian);
messageWriter.Write(msg.Data.ToArray());
}
var batchedMessageMetadataAndPayload = stream.ToArray();
Expand All @@ -266,7 +266,7 @@ public virtual void AddMsg(Message<T> msg, SendCallback<T> callback)
{
if (Messages.Count == 0)
{
SequenceId = new Commands().InitBatchMessageMetadata(MessageMetadata);
SequenceId = Commands.InitBatchMessageMetadata(MessageMetadata);
BatchedMessageMetadataAndPayload.AddRange(msg.Data.ToArray());
FirstCallback = callback;
}
Expand Down
9 changes: 4 additions & 5 deletions src/SharpPulsar/BinaryProtoLookupService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public class BinaryProtoLookupService : ReceiveActor, IWithUnboundedStash
private TopicName _topicName;
private Backoff _getTopicsUnderNamespaceBackOff;
private Backoff _getPartitionedTopicMetadataBackOff;
private readonly Commands _commands = new Commands();
private GetTopicsUnderNamespace _getTopicsUnderNamespace;
public BinaryProtoLookupService(IActorRef connectionPool, IActorRef idGenerator, string serviceUrl, string listenerName, bool useTls, int maxLookupRedirects, TimeSpan operationTimeout, TimeSpan timeCnx)
{
Expand Down Expand Up @@ -319,7 +318,7 @@ private async ValueTask GetCnxAndRequestId(DnsEndPoint dnsEndPoint)
/// <returns> broker-socket-address that serves given topic </returns>
private async ValueTask<AskResponse> NewLookup(TopicName topicName, bool authoritative = false)
{
var request = _commands.NewLookup(topicName.ToString(), _listenerName, authoritative, _requestId);
var request = Commands.NewLookup(topicName.ToString(), _listenerName, authoritative, _requestId);
var payload = new Payload(request, _requestId, "NewLookup");
return await _clientCnx.Ask<AskResponse>(payload);
}
Expand Down Expand Up @@ -352,7 +351,7 @@ private void GetPartitionedTopicMetadata()
}
private async ValueTask PartitionedTopicMetadata(TopicName topicName, TimeSpan opTimeout)
{
var request = _commands.NewPartitionMetadataRequest(topicName.ToString(), _requestId);
var request = Commands.NewPartitionMetadataRequest(topicName.ToString(), _requestId);
var payload = new Payload(request, _requestId, "NewPartitionMetadataRequest");
var askResponse = await _clientCnx.Ask<AskResponse>(payload, _timeCnx);
if (askResponse.Failed)
Expand Down Expand Up @@ -397,7 +396,7 @@ private async ValueTask PartitionedTopicMetadata(TopicName topicName, TimeSpan o
}
private async ValueTask GetSchema(TopicName topicName, byte[] version)
{
var request = _commands.NewGetSchema(_requestId, topicName.ToString(), BytesSchemaVersion.Of(version));
var request = Commands.NewGetSchema(_requestId, topicName.ToString(), BytesSchemaVersion.Of(version));
var payload = new Payload(request, _requestId, "SendGetRawSchema");
var askResponse = await _clientCnx.Ask<AskResponse>(payload);

Expand Down Expand Up @@ -460,7 +459,7 @@ private async ValueTask TopicsUnderNamespace(NamespaceName ns, Mode mode, string
{
try
{
var request = _commands.NewGetTopicsOfNamespaceRequest(ns.ToString(), _requestId, mode, topicsPattern, topicsHash);
var request = Commands.NewGetTopicsOfNamespaceRequest(ns.ToString(), _requestId, mode, topicsPattern, topicsHash);
var payload = new Payload(request, _requestId, "NewGetTopicsOfNamespaceRequest");
var askResponse = await _clientCnx.Ask<AskResponse>(payload, _timeCnx);
var response = askResponse.ConvertTo<GetTopicsOfNamespaceResponse>();
Expand Down
Loading

0 comments on commit aa37dec

Please sign in to comment.