Skip to content

Commit

Permalink
GetPartitionedTopicMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba committed Jul 23, 2022
1 parent 7f23238 commit 80cc881
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 63 deletions.
129 changes: 73 additions & 56 deletions SharpPulsar/BinaryProtoLookupService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,32 +310,41 @@ private async ValueTask<AskResponse> NewLookup(TopicName topicName, bool authori
/// calls broker binaryProto-lookup api to get metadata of partitioned-topic.
///
/// </summary>
private async ValueTask GetPartitionedTopicMetadata(TopicName topicName, TimeSpan opTimeout)
private async ValueTask GetPartitionedTopicMetadata(TopicName topicName, TimeSpan timeout)
{
var request = Commands.NewPartitionMetadataRequest(topicName.ToString(), _requestId);
var time = timeout;
var request = Commands.NewPartitionMetadataRequest(topicName.ToString(), _requestId);
var payload = new Payload(request, _requestId, "NewPartitionMetadataRequest");
var askResponse = await _clientCnx.Ask<AskResponse>(payload/*, TimeSpan.FromSeconds(5)*/);
if (askResponse.Failed)
while (true)
{
var e = askResponse.Exception;
var nextDelay = Math.Min(_getPartitionedTopicMetadataBackOff.Next(), opTimeout.TotalMilliseconds);
var reply = _replyTo;
var isLookupThrottling = !PulsarClientException.IsRetriableError(e) || e is PulsarClientException.TooManyRequestsException || e is PulsarClientException.AuthenticationException;
if (nextDelay <= 0 || isLookupThrottling)
{
reply.Tell(new AskResponse(new PulsarClientException.InvalidConfigurationException(e)));
_log.Error(e.ToString());
_getPartitionedTopicMetadataBackOff = null;
}
else
if (askResponse.Failed)
{
_log.Warning($"[topic: {topicName}] Could not get connection while getPartitionedTopicMetadata -- Will try again in {nextDelay} ms: {e.Message}");
opTimeout.Subtract(TimeSpan.FromMilliseconds(nextDelay));
var id = await _generator.Ask<NewRequestIdResponse>(NewRequestId.Instance);
_requestId = id.Id;
await GetPartitionedTopicMetadata(topicName, opTimeout);
var e = askResponse.Exception;
var nextDelay = Math.Min(_getPartitionedTopicMetadataBackOff.Next(), time.TotalMilliseconds);
var reply = _replyTo;
var isLookupThrottling = !PulsarClientException.IsRetriableError(e) || e is PulsarClientException.TooManyRequestsException || e is PulsarClientException.AuthenticationException;
if (nextDelay <= 0 || isLookupThrottling)
{
reply.Tell(new AskResponse(new PulsarClientException.InvalidConfigurationException(e)));
_log.Error(e.ToString());
_getPartitionedTopicMetadataBackOff = null;
}
else
{
_log.Warning($"[topic: {topicName}] Could not get connection while getPartitionedTopicMetadata -- Will try again in {nextDelay} ms: {e.Message}");
time.Subtract(TimeSpan.FromMilliseconds(nextDelay));
var id = await _generator.Ask<NewRequestIdResponse>(NewRequestId.Instance);
_requestId = id.Id;
request = Commands.NewPartitionMetadataRequest(topicName.ToString(), _requestId);
payload = new Payload(request, _requestId, "NewPartitionMetadataRequest");
askResponse = await _clientCnx.Ask<AskResponse>(payload/*, TimeSpan.FromSeconds(5)*/);
continue;
//await GetPartitionedTopicMetadata(topicName, opTimeout);
}
}
}
break;
}

var data = askResponse.ConvertTo<LookupDataResult>();

Expand Down Expand Up @@ -397,51 +406,59 @@ public string ServiceUrl

private async ValueTask GetTopicsUnderNamespace(NamespaceName ns, Mode mode, TimeSpan opTimeout)
{
try
var request = Commands.NewGetTopicsOfNamespaceRequest(ns.ToString(), _requestId, mode);
var payload = new Payload(request, _requestId, "NewGetTopicsOfNamespaceRequest");
var askResponse = await _clientCnx.Ask<AskResponse>(payload, TimeSpan.FromSeconds(10));
while (true)
{
var request = Commands.NewGetTopicsOfNamespaceRequest(ns.ToString(), _requestId, mode);
var payload = new Payload(request, _requestId, "NewGetTopicsOfNamespaceRequest");
var askResponse = await _clientCnx.Ask<AskResponse>(payload, TimeSpan.FromSeconds(10));
var response = askResponse.ConvertTo<GetTopicsOfNamespaceResponse>();
if (_log.IsDebugEnabled)
{
_log.Debug($"[namespace: {ns}] Successfully got {response.Response.Topics.Count} topics list in request: {_requestId}");
}
var result = new List<string>();
//https://github.com/apache/pulsar/issues/12727
//var tpics = response.Response.Topics.Where(x=> !x.Contains("__transaction")).ToArray();
var tpics = response.Response.Topics;
foreach (var topic in tpics)

try
{
var filtered = TopicName.Get(topic).PartitionedTopicName;
if (!result.Contains(filtered))
var response = askResponse.ConvertTo<GetTopicsOfNamespaceResponse>();
if (_log.IsDebugEnabled)
{
result.Add(filtered);
_log.Debug($"[namespace: {ns}] Successfully got {response.Response.Topics.Count} topics list in request: {_requestId}");
}
var result = new List<string>();
//https://github.com/apache/pulsar/issues/12727
//var tpics = response.Response.Topics.Where(x=> !x.Contains("__transaction")).ToArray();
var tpics = response.Response.Topics;
foreach (var topic in tpics)
{
var filtered = TopicName.Get(topic).PartitionedTopicName;
if (!result.Contains(filtered))
{
result.Add(filtered);
}
}
_replyTo.Tell(new AskResponse(new GetTopicsUnderNamespaceResponse(result)));
return;
}
_replyTo.Tell(new AskResponse(new GetTopicsUnderNamespaceResponse(result)));
}
catch
{
var nextDelay = Math.Min(_getTopicsUnderNamespaceBackOff.Next(), opTimeout.TotalMilliseconds);
var reply = _replyTo;
if (nextDelay <= 0)
{
reply.Tell(new AskResponse(PulsarClientException.Unwrap(new Exception($"TimeoutException: Could not get topics of namespace {ns} within configured timeout"))));
}
else
catch
{
_log.Warning($"[namespace: {ns}] Could not get connection while getTopicsUnderNamespace -- Will try again in {nextDelay} ms");
opTimeout.Subtract(TimeSpan.FromMilliseconds(nextDelay));
await Task.Delay(TimeSpan.FromMilliseconds(nextDelay));

var reqId = await _generator.Ask<NewRequestIdResponse>(NewRequestId.Instance);
var nextDelay = Math.Min(_getTopicsUnderNamespaceBackOff.Next(), opTimeout.TotalMilliseconds);
var reply = _replyTo;
if (nextDelay <= 0)
{
reply.Tell(new AskResponse(PulsarClientException.Unwrap(new Exception($"TimeoutException: Could not get topics of namespace {ns} within configured timeout"))));
return;
}
else
{
_log.Warning($"[namespace: {ns}] Could not get connection while getTopicsUnderNamespace -- Will try again in {nextDelay} ms");
opTimeout.Subtract(TimeSpan.FromMilliseconds(nextDelay));
await Task.Delay(TimeSpan.FromMilliseconds(nextDelay));

_requestId = reqId.Id;
var reqId = await _generator.Ask<NewRequestIdResponse>(NewRequestId.Instance);

_log.Warning($"Retrying 'GetTopicsUnderNamespace' after {nextDelay} ms delay with requestid '{reqId.Id}'");
_requestId = reqId.Id;

await GetTopicsUnderNamespace(ns, mode, opTimeout);
_log.Warning($"Retrying 'GetTopicsUnderNamespace' after {nextDelay} ms delay with requestid '{reqId.Id}'");
request = Commands.NewGetTopicsOfNamespaceRequest(ns.ToString(), _requestId, mode);
payload = new Payload(request, _requestId, "NewGetTopicsOfNamespaceRequest");
askResponse = await _clientCnx.Ask<AskResponse>(payload, TimeSpan.FromSeconds(10));
//await GetTopicsUnderNamespace(ns, mode, opTimeout);
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion Tests/SharpPulsar.Test/SchemaUpgradeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public SchemaUpgradeTest(ITestOutputHelper output, PulsarFixture fixture)

_client = fixture.Client;
}
[Fact(Skip ="A")]
//[Fact(Skip ="A")]
[Fact]
public async Task SchemaProduceAndConsume()
{
var topic = $"persistent://public/default/upgradeable-{Guid.NewGuid()}";
Expand Down
11 changes: 6 additions & 5 deletions Tests/SharpPulsar.Test/SqlTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public SqlTests(ITestOutputHelper output, PulsarFixture fixture)
_output = output;
_client = fixture.Client;
}
[Fact(Skip ="Issue with sql-worker on github action")]
//[Fact]
//[Fact(Skip ="Issue with sql-worker on github action")]
[Fact]
public virtual async Task TestQuerySql()
{
var topic = $"query_topics_avro_{Guid.NewGuid()}";
Expand Down Expand Up @@ -70,13 +70,14 @@ public virtual async Task TestQuerySql()
Assert.True(receivedCount > 1);
}

[Fact(Skip = "Issue with sql-worker on github action")]
//[Fact]
//[Fact(Skip = "Issue with sql-worker on github action")]
[Fact]
public async Task TestAvro()
{
await PlainAvroProducer($"journal-{Guid.NewGuid()}");
}
[Fact(Skip = "Issue with sql-worker")]
//[Fact(Skip = "Issue with sql-worker")]
[Fact]
public async Task TestKeyValue()
{
await PlainKeyValueProducer($"keyvalue");
Expand Down
3 changes: 2 additions & 1 deletion Tests/SharpPulsar.Test/TestMessageEncryption.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public TestMessageEncryption(ITestOutputHelper output, PulsarFixture fixture)
_output = output;
_client = fixture.Client;
}
[Fact(Skip = "Encrpted Produce Consume") ]
//[Fact(Skip = "Encrpted Produce Consume") ]
[Fact]
public async Task TestEncrptedProduceConsume()
{
var messageCount = 10;
Expand Down

0 comments on commit 80cc881

Please sign in to comment.