diff --git a/SharpPulsar/Auth/AuthenticationDataOAuth2.cs b/SharpPulsar/Auth/AuthenticationDataOAuth2.cs
deleted file mode 100644
index 5f3823601..000000000
--- a/SharpPulsar/Auth/AuthenticationDataOAuth2.cs
+++ /dev/null
@@ -1,112 +0,0 @@
-
-using System;
-using System.IO;
-using System.Net.Http;
-using System.Text;
-using IdentityModel.Client;
-using SharpPulsar.Exceptions;
-using SharpPulsar.Interfaces;
-using System.Threading.Tasks;
-
-///
-/// Licensed to the Apache Software Foundation (ASF) under one
-/// or more contributor license agreements. See the NOTICE file
-/// distributed with this work for additional information
-/// regarding copyright ownership. The ASF licenses this file
-/// to you under the Apache License, Version 2.0 (the
-/// "License"); you may not use this file except in compliance
-/// with the License. You may obtain a copy of the License at
-///
-/// http://www.apache.org/licenses/LICENSE-2.0
-///
-/// Unless required by applicable law or agreed to in writing,
-/// software distributed under the License is distributed on an
-/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-/// KIND, either express or implied. See the License for the
-/// specific language governing permissions and limitations
-/// under the License.
-///
-
-namespace SharpPulsar.Auth
-{
-
- public class AuthenticationDataOAuth2 : IAuthenticationDataProvider
- {
- private readonly string _clientId;
- private readonly string _clientSecret;
- private readonly HttpClient _client;
- private readonly DiscoveryDocumentResponse _disco;
- public AuthenticationDataOAuth2(string clientid, string secret, string authority)
- {
- _clientId = clientid;
- _clientSecret = secret;
- _client = new HttpClient();
- var discoTask = _client.GetDiscoveryDocumentAsync(authority);
- _disco = Task.Run(async () => await discoTask ).Result;
- if (_disco.IsError) throw new Exception(_disco.Error);
- }
-
-
- public bool HasDataFromCommand()
- {
- return true;
- }
-
- public string CommandData => Token;
-
- private string Token
- {
- get
- {
- try
- {
- var response = _client.RequestClientCredentialsTokenAsync(new ClientCredentialsTokenRequest
- {
- Address = _disco.TokenEndpoint,
-
- ClientId = _clientId,
- ClientSecret = _clientSecret,
- }).GetAwaiter().GetResult();
-
- if (response.IsError) throw new Exception(response.Error);
- return response.AccessToken;
- }
- catch (Exception t)
- {
- throw new IOException("failed to get client token", t);
- }
- }
- }
-
- public AuthData Authenticate(AuthData data)
- {
- if (data != null)
- {
- var result = _client.IntrospectTokenAsync(new TokenIntrospectionRequest
- {
- Address = _disco.IntrospectionEndpoint,
-
- ClientId = _clientId,
- ClientSecret = _clientSecret,
- Token = Encoding.UTF8.GetString(data.Bytes)
- }).GetAwaiter().GetResult();
-
- if (result.IsError)
- {
- throw new PulsarClientException(result.Error);
- }
-
- if (result.IsActive)
- {
- var bytes = Encoding.UTF8.GetBytes((HasDataFromCommand() ? CommandData : ""));
- return new AuthData(bytes);
- }
-
- throw new PulsarClientException("token is not active");
- }
- var bytesAuth = Encoding.UTF8.GetBytes((HasDataFromCommand() ? CommandData : ""));
- return new AuthData(bytesAuth);
- }
- }
-
-}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/AuthenticationFactory.cs b/SharpPulsar/Auth/AuthenticationFactory.cs
index 056398c29..6586a4279 100644
--- a/SharpPulsar/Auth/AuthenticationFactory.cs
+++ b/SharpPulsar/Auth/AuthenticationFactory.cs
@@ -43,10 +43,6 @@ public static IAuthentication Token(string token)
return DefaultImplementation.NewAuthenticationToken(token);
}
- public static IAuthentication Sts(string client, string secret, string authority)
- {
- return DefaultImplementation.NewAuthenticationSts(client, secret, authority);
- }
///
/// Create an authentication provider for token based authentication.
///
diff --git a/SharpPulsar/Auth/AuthenticationOAuth2.cs b/SharpPulsar/Auth/AuthenticationOAuth2.cs
deleted file mode 100644
index 270265449..000000000
--- a/SharpPulsar/Auth/AuthenticationOAuth2.cs
+++ /dev/null
@@ -1,88 +0,0 @@
-
-
-using SharpPulsar.Interfaces;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-
-///
-/// Licensed to the Apache Software Foundation (ASF) under one
-/// or more contributor license agreements. See the NOTICE file
-/// distributed with this work for additional information
-/// regarding copyright ownership. The ASF licenses this file
-/// to you under the Apache License, Version 2.0 (the
-/// "License"); you may not use this file except in compliance
-/// with the License. You may obtain a copy of the License at
-///
-/// http://www.apache.org/licenses/LICENSE-2.0
-///
-/// Unless required by applicable law or agreed to in writing,
-/// software distributed under the License is distributed on an
-/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-/// KIND, either express or implied. See the License for the
-/// specific language governing permissions and limitations
-/// under the License.
-///
-
-namespace SharpPulsar.Auth
-{
-
- ///
- /// STS Token based authentication provider.
- ///
- public class AuthenticationOAuth2 : IAuthentication, IEncodedAuthenticationParameterSupport
- {
-
- private string _clientId;
- private string _clientSecret;
- private string _authority;
- public AuthenticationOAuth2(string clientid, string secret, string authority)
- {
- _clientId = clientid;
- _clientSecret = secret;
- _authority = authority;
- }
-
- public void Close()
- {
- // noop
- }
-
- public string AuthMethodName => "token";
-
- public IAuthenticationDataProvider GetAuthData()
- {
- return new AuthenticationDataOAuth2(_clientId, _clientSecret, _authority);
- }
-
- public void Configure(string encodedAuthParamString)
- {
- // Interpret the whole param string as the token. If the string contains the notation `token:xxxxx` then strip
- // the prefix
- var authP = encodedAuthParamString.Split(",");
- _clientId = authP[0];
- _clientSecret = authP[1];
- _authority = authP[2];
- }
-
- public void Configure(IDictionary authParams)
- {
- // noop
- }
-
- public void Start()
- {
- // noop
- }
-
- public ValueTask DisposeAsync()
- {
- Dispose();
- return new ValueTask(Task.CompletedTask);
- }
-
- public void Dispose()
- {
- }
- }
-
-}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/AuthenticationDataOAuth2.cs b/SharpPulsar/Auth/OAuth2/AuthenticationDataOAuth2.cs
new file mode 100644
index 000000000..e8810fca2
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/AuthenticationDataOAuth2.cs
@@ -0,0 +1,70 @@
+using System;
+using System.Collections.Generic;
+using SharpPulsar.Interfaces;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2
+{
+ ///
+ /// Provide OAuth 2.0 authentication data.
+ ///
+ [Serializable]
+ internal class AuthenticationDataOAuth2 : IAuthenticationDataProvider
+ {
+ public const string HttpHeaderName = "Authorization";
+
+ private readonly string _accessToken;
+ private readonly ISet> _headers = new HashSet>() ;
+
+ public AuthenticationDataOAuth2(string accessToken)
+ {
+ _accessToken = accessToken;
+ _headers.Add(new KeyValuePair(HttpHeaderName, "Bearer " + accessToken));
+ }
+
+ public virtual bool HasDataForHttp()
+ {
+ return true;
+ }
+
+ public virtual ISet> HttpHeaders
+ {
+ get
+ {
+ return _headers;
+ }
+ }
+
+ public virtual bool HasDataFromCommand()
+ {
+ return true;
+ }
+
+ public virtual string CommandData
+ {
+ get
+ {
+ return _accessToken;
+ }
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/AuthenticationFactoryOAuth2.cs b/SharpPulsar/Auth/OAuth2/AuthenticationFactoryOAuth2.cs
new file mode 100644
index 000000000..18983cdd5
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/AuthenticationFactoryOAuth2.cs
@@ -0,0 +1,61 @@
+using System;
+using SharpPulsar.Interfaces;
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2
+{
+ ///
+ /// Factory class that allows to create instances
+ /// for OAuth 2.0 authentication methods.
+ ///
+ public sealed class AuthenticationFactoryOAuth2
+ {
+
+ ///
+ /// Authenticate with client credentials.
+ ///
+ /// the issuer URL
+ /// the credentials URL
+ /// An optional field. The audience identifier used by some Identity Providers, like Auth0.
+ /// an Authentication object
+ public static IAuthentication ClientCredentials(Uri issuerUrl, Uri credentialsUrl, string audience)
+ {
+ return ClientCredentials(issuerUrl, credentialsUrl, audience, null);
+ }
+
+ ///
+ /// Authenticate with client credentials.
+ ///
+ /// the issuer URL
+ /// the credentials URL
+ /// An optional field. The audience identifier used by some Identity Providers, like Auth0.
+ /// An optional field. The value of the scope parameter is expressed as a list of space-delimited,
+ /// case-sensitive strings. The strings are defined by the authorization server.
+ /// If the value contains multiple space-delimited strings, their order does not matter,
+ /// and each string adds an additional access range to the requested scope.
+ /// From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
+ /// an Authentication object
+ public static IAuthentication ClientCredentials(Uri issuerUrl, Uri credentialsUrl, string audience, string scope)
+ {
+ var flow = new ClientCredentialsFlow(issuerUrl, audience, credentialsUrl.LocalPath, scope);
+ return new AuthenticationOAuth2(flow, DateTime.Now);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/AuthenticationOAuth2.cs b/SharpPulsar/Auth/OAuth2/AuthenticationOAuth2.cs
new file mode 100644
index 000000000..b7a957d2b
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/AuthenticationOAuth2.cs
@@ -0,0 +1,160 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using NodaTime;
+using SharpPulsar.Auth.OAuth2.Protocol;
+using SharpPulsar.Extension;
+using SharpPulsar.Interfaces;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2
+{
+ ///
+ /// Pulsar client authentication provider based on OAuth 2.0.
+ ///
+ [Serializable]
+ public class AuthenticationOAuth2 : IAuthentication, IEncodedAuthenticationParameterSupport
+ {
+
+ public const string ConfigParamType = "type";
+ public const string TypeClientCredentials = "client_credentials";
+ public const string AuthMethodNameConflict = "token";
+ public const double ExpiryAdjustment = 0.9;
+ private const long SerialVersionUID = 1L;
+
+ internal readonly DateTime Date;
+ internal IFlow Flow;
+ [NonSerialized]
+ internal CachedToken cachedToken;
+
+ public AuthenticationOAuth2()
+ {
+ Date = DateTime.Now;
+ }
+
+ internal AuthenticationOAuth2(IFlow flow, DateTime date)
+ {
+ Flow = flow;
+ Date = date;
+ }
+
+ public virtual string AuthMethodName
+ {
+ get
+ {
+ return AuthMethodNameConflict;
+ }
+ }
+
+ public virtual void Configure(string encodedAuthParamString)
+ {
+ if (string.IsNullOrWhiteSpace(encodedAuthParamString))
+ {
+ throw new System.ArgumentException("No authentication parameters were provided");
+ }
+ IDictionary prams;
+ try
+ {
+ prams = AuthenticationUtil.ConfigureFromJsonString(encodedAuthParamString);
+ }
+ catch (IOException E)
+ {
+ throw new System.ArgumentException("Malformed authentication parameters", E);
+ }
+
+ string Type = prams.GetOrDefault(ConfigParamType, TypeClientCredentials);
+ switch (Type)
+ {
+ case TypeClientCredentials:
+ this.Flow = ClientCredentialsFlow.FromParameters(prams);
+ break;
+ default:
+ throw new System.ArgumentException("Unsupported authentication type: " + Type);
+ }
+ }
+
+ [Obsolete]
+ public virtual void Configure(IDictionary AuthParams)
+ {
+ throw new NotImplementedException("Deprecated; use EncodedAuthenticationParameterSupport");
+ }
+
+ public virtual void Start()
+ {
+ Flow.Initialize();
+ }
+
+ public virtual IAuthenticationDataProvider AuthData
+ {
+ get
+ {
+ lock (this)
+ {
+ if (this.cachedToken == null || this.cachedToken.Expired)
+ {
+ TokenResult Tr = this.Flow.Authenticate();
+ this.cachedToken = new CachedToken(this, Tr);
+ }
+ return this.cachedToken.AuthData;
+ }
+ }
+ }
+
+ public virtual void Dispose()
+ {
+ try
+ {
+ Flow.Close();
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ }
+
+ internal class CachedToken
+ {
+ private readonly AuthenticationOAuth2 _outerInstance;
+
+ internal readonly TokenResult Latest;
+ internal readonly DateTime ExpiresAt;
+ internal readonly AuthenticationDataOAuth2 AuthData;
+
+ public CachedToken(AuthenticationOAuth2 outerInstance, TokenResult latest)
+ {
+ _outerInstance = outerInstance;
+ Latest = latest;
+ var adjustedExpiresIn = (int)(latest.ExpiresIn * ExpiryAdjustment);
+ ExpiresAt = _outerInstance.Date.AddSeconds(adjustedExpiresIn);
+ AuthData = new AuthenticationDataOAuth2(latest.AccessToken);
+ }
+
+ public virtual bool Expired
+ {
+ get
+ {
+ return _outerInstance.Date == ExpiresAt;
+ }
+ }
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/ClientCredentialsFlow.cs b/SharpPulsar/Auth/OAuth2/ClientCredentialsFlow.cs
new file mode 100644
index 000000000..13a973f48
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/ClientCredentialsFlow.cs
@@ -0,0 +1,153 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Net.Http;
+using System.Text;
+using System.Text.Json;
+using SharpPulsar.Auth.OAuth2.Protocol;
+using SharpPulsar.Exceptions;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2
+{
+
+ ///
+ /// Implementation of OAuth 2.0 Client Credentials flow.
+ ///
+ /// OAuth 2.0 RFC 6749, section 4.4"/>
+ ///
+ [Serializable]
+ internal class ClientCredentialsFlow : FlowBase
+ {
+ public const string ConfigParamIssuerUrl = "issuerUrl";
+ public const string ConfigParamAudience = "audience";
+ public const string ConfigParamKeyFile = "privateKey";
+ public const string ConfigParamScope = "scope";
+
+ private const long SerialVersionUID = 1L;
+
+ private readonly string _audience;
+ private readonly string _privateKey;
+ private readonly string _scope;
+
+ [NonSerialized]
+ private IClientCredentialsExchanger _exchanger;
+
+ private bool initialized = false;
+
+ public ClientCredentialsFlow(Uri issuerUrl, string audience, string privateKey, string scope) : base(issuerUrl)
+ {
+ _audience = audience;
+ _privateKey = privateKey;
+ _scope = scope;
+ }
+
+ public override void Initialize()
+ {
+ base.Initialize();
+ Debug.Assert(Metadata != null);
+
+ Uri tokenUrl = Metadata.TokenEndpoint;
+ _exchanger = new TokenClient(tokenUrl);
+ initialized = true;
+ }
+
+ public override TokenResult Authenticate()
+ {
+ // read the private key from storage
+ KeyFile keyFile;
+ try
+ {
+ keyFile = LoadPrivateKey(_privateKey);
+ }
+ catch (IOException e)
+ {
+ throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.Message);
+ }
+
+ // request an access token using client credentials
+ var req = new ClientCredentialsExchangeRequest
+ {
+ ClientId = keyFile.ClientId,
+ ClientSecret = keyFile.ClientSecret,
+ Audience = _audience,
+ Scope = _scope
+
+ };
+ TokenResult tr;
+ if (!initialized)
+ {
+ Initialize();
+ }
+ try
+ {
+ tr = _exchanger.ExchangeClientCredentials(req).Result;
+ }
+ catch (Exception e) when (e is TokenExchangeException || e is IOException)
+ {
+ throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: " + e.Message);
+ }
+
+ return tr;
+ }
+
+ public override void Close()
+ {
+ //_exchanger.Close();
+ }
+
+ ///
+ /// Constructs a from configuration parameters.
+ ///
+ /// @return
+ public static ClientCredentialsFlow FromParameters(IDictionary prams)
+ {
+ var issuerUrl = ParseParameterUrl(prams, ConfigParamIssuerUrl);
+ var privateKeyUrl = ParseParameterString(prams, ConfigParamKeyFile);
+ // These are optional parameters, so we only perform a get
+ var scope = prams[ConfigParamScope];
+ var audience = prams[ConfigParamAudience];
+ return new ClientCredentialsFlow(issuerUrl, audience, privateKeyUrl,scope);
+ }
+
+ ///
+ /// Loads the private key from the given URL.
+ ///
+ /// @return
+ ///
+ ///
+ private static KeyFile LoadPrivateKey(string privateKeyURL)
+ {
+ try
+ {
+ var uri = new Uri(privateKeyURL);
+ var fs = new FileStream(privateKeyURL, FileMode.Open, FileAccess.Read);
+ var temp = JsonSerializer.DeserializeAsync(fs).Result;
+ return temp;
+ }
+ catch (Exception e)
+ {
+ throw new IOException("Invalid privateKey format", e);
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/FlowBase.cs b/SharpPulsar/Auth/OAuth2/FlowBase.cs
new file mode 100644
index 000000000..8c943467a
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/FlowBase.cs
@@ -0,0 +1,97 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using SharpPulsar.Auth.OAuth2.Protocol;
+using SharpPulsar.Exceptions;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2
+{
+
+
+ ///
+ /// An abstract OAuth 2.0 authorization flow.
+ ///
+ ///
+
+ [Serializable]
+ internal abstract class FlowBase : IFlow
+ {
+ public abstract void Close();
+ public abstract TokenResult Authenticate();
+
+ protected internal readonly Uri IssuerUrl;
+
+ [NonSerialized]
+ protected internal Protocol.Metadata Metadata;
+
+ protected internal FlowBase(Uri issuerUrl)
+ {
+ this.IssuerUrl = issuerUrl;
+ }
+ public virtual void Initialize()
+ {
+ try
+ {
+ var resolve = CreateMetadataResolver();
+ var result = resolve.Resolve().Result;
+ Metadata = result;
+ }
+ catch (IOException E)
+ {
+ //log.error("Unable to retrieve OAuth 2.0 server metadata", E);
+ throw new PulsarClientException.AuthenticationException("Unable to retrieve OAuth 2.0 server metadata");
+ }
+ }
+
+ protected internal virtual MetadataResolver CreateMetadataResolver()
+ {
+ return DefaultMetadataResolver.FromIssuerUrl(IssuerUrl);
+ }
+
+ internal static string ParseParameterString(IDictionary prams, string name)
+ {
+ var s = prams[name];
+ if (string.IsNullOrEmpty(s))
+ {
+ throw new System.ArgumentException("Required configuration parameter: " + name);
+ }
+ return s;
+ }
+
+ internal static Uri ParseParameterUrl(IDictionary prams, string name)
+ {
+ var s = prams[name];
+ if (string.IsNullOrEmpty(s))
+ {
+ throw new System.ArgumentException("Required configuration parameter: " + name);
+ }
+ try
+ {
+ return new Uri(s);
+ }
+ catch (Exception)
+ {
+ throw new System.ArgumentException("Malformed configuration parameter: " + name);
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/IFlow.cs b/SharpPulsar/Auth/OAuth2/IFlow.cs
new file mode 100644
index 000000000..93235c506
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/IFlow.cs
@@ -0,0 +1,48 @@
+using SharpPulsar.Auth.OAuth2.Protocol;
+using SharpPulsar.Exceptions;
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2
+{
+
+ ///
+ /// An OAuth 2.0 authorization flow.
+ ///
+ internal interface IFlow //: AutoCloseable
+ {
+
+ ///
+ /// Initializes the authorization flow.
+ /// if the flow could not be initialized.
+ ///
+ void Initialize();
+
+ ///
+ /// Acquires an access token from the OAuth 2.0 authorization server.
+ /// a token result including an access token and optionally a refresh token.
+ /// if authentication failed. () throws org.apache.pulsar.client.api.PulsarClientException;
+ TokenResult Authenticate();
+
+ ///
+ /// Closes the authorization flow.
+ ///
+ void Close();
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/KeyFile.cs b/SharpPulsar/Auth/OAuth2/KeyFile.cs
new file mode 100644
index 000000000..f57a1d372
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/KeyFile.cs
@@ -0,0 +1,55 @@
+using System.Text.Json;
+using System.Text.Json.Serialization;
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2
+{
+ ///
+ /// A JSON object representing a credentials file.
+ ///
+ public class KeyFile
+ {
+ [JsonPropertyName("type")]
+ public string Type { get; set; }
+
+ [JsonPropertyName("client_id")]
+ public string ClientId { get; set; }
+
+
+ [JsonPropertyName("client_secret")]
+ public string ClientSecret { get; set; }
+
+ [JsonPropertyName("client_email")]
+ public string ClientEmail { get; set; }
+
+ [JsonPropertyName("issuer_url")]
+ public string IssuerUrl { get; set; }
+
+ public virtual string ToJson()
+ {
+ var options = new JsonSerializerOptions { WriteIndented = true };
+ return JsonSerializer.Serialize(this, options);
+ }
+ public static KeyFile FromJson(string value)
+ {
+ return JsonSerializer.Deserialize(value);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/Protocol/IClientCredentialsExchanger.cs b/SharpPulsar/Auth/OAuth2/Protocol/IClientCredentialsExchanger.cs
new file mode 100644
index 000000000..562434b8e
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/Protocol/IClientCredentialsExchanger.cs
@@ -0,0 +1,37 @@
+using System.Threading.Tasks;
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ ///
+ /// An interface for exchanging client credentials for an access token.
+ ///
+ public interface IClientCredentialsExchanger// : AutoCloseable
+ {
+ ///
+ /// Requests an exchange of client credentials for an access token.
+ /// the request details.
+ /// an access token.
+ /// if the OAuth server returned a detailed error.
+ /// if a general IO error occurred.
+ Task ExchangeClientCredentials(ClientCredentialsExchangeRequest req);
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/ClientCredentialsExchangeRequest.cs b/SharpPulsar/Auth/OAuth2/protocol/ClientCredentialsExchangeRequest.cs
new file mode 100644
index 000000000..e50004ca9
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/ClientCredentialsExchangeRequest.cs
@@ -0,0 +1,41 @@
+using System.Text.Json.Serialization;
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+ ///
+ /// A token request based on the exchange of client credentials.
+ ///
+ /// OAuth 2.0 RFC 6749, section 4.4"/>
+ public class ClientCredentialsExchangeRequest
+ {
+ [JsonPropertyName("client_id")]
+ public string ClientId { get; set; }
+
+ [JsonPropertyName("client_secret")]
+ public string ClientSecret { get; set; }
+
+ [JsonPropertyName("audience")]
+ public string Audience { get; set; }
+
+ [JsonPropertyName("scope")]
+ public string Scope { get; set; }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/DefaultMetadataResolver.cs b/SharpPulsar/Auth/OAuth2/protocol/DefaultMetadataResolver.cs
new file mode 100644
index 000000000..4528765f8
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/DefaultMetadataResolver.cs
@@ -0,0 +1,116 @@
+using System;
+using System.IO;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Security.Policy;
+using System.Text.Json;
+using System.Threading.Tasks;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ ///
+ /// Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
+ ///
+ public class DefaultMetadataResolver : MetadataResolver
+ {
+
+ protected internal const int DefaultConnectTimeoutInSeconds = 10;
+ protected internal const int DefaultReadTimeoutInSeconds = 30;
+
+ private readonly Uri _metadataUrl;
+ private TimeSpan _connectTimeout;
+ private TimeSpan _readTimeout;
+
+ public DefaultMetadataResolver(Uri metadataUrl)
+ {
+ _metadataUrl = metadataUrl;
+ // set a default timeout to ensure that this doesn't block
+ _connectTimeout = TimeSpan.FromSeconds(DefaultConnectTimeoutInSeconds);
+ _readTimeout = TimeSpan.FromSeconds(DefaultReadTimeoutInSeconds);
+ }
+
+ public virtual DefaultMetadataResolver WithConnectTimeout(TimeSpan connectTimeout)
+ {
+ _connectTimeout = connectTimeout;
+ return this;
+ }
+
+ public virtual DefaultMetadataResolver WithReadTimeout(TimeSpan readTimeout)
+ {
+ _readTimeout = readTimeout;
+ return this;
+ }
+
+ ///
+ /// Resolves the authorization metadata.
+ /// metadata
+ /// if the metadata could not be resolved.
+ public async Task Resolve()
+ {
+ try
+ {
+ var mediaType = new MediaTypeWithQualityHeaderValue("application/json");
+ var client = new HttpClient();
+ client.DefaultRequestHeaders.Accept.Add(mediaType);
+ client.Timeout = TimeSpan.FromSeconds(DefaultConnectTimeoutInSeconds);
+ var c = _metadataUrl;
+ var metadataDataUrl = GetWellKnownMetadataUrl(_metadataUrl);
+ var response = await client.GetStreamAsync(metadataDataUrl);
+ return await JsonSerializer.DeserializeAsync(response);
+
+ }
+ catch (IOException E)
+ {
+ throw new IOException("Cannot obtain authorization metadata from " + _metadataUrl.ToString(), E);
+ }
+ }
+
+ ///
+ /// Gets a well-known metadata URL for the given OAuth issuer URL.
+ /// The authorization server's issuer identifier
+ /// a resolver
+ public static DefaultMetadataResolver FromIssuerUrl(Uri issuerUrl)
+ {
+ return new DefaultMetadataResolver(GetWellKnownMetadataUrl(issuerUrl));
+ }
+
+ ///
+ /// Gets a well-known metadata URL for the given OAuth issuer URL.
+ /// "
+ /// OAuth Discovery: Obtaining Authorization Server Metadata/>
+ /// The authorization server's issuer identifier
+ /// a URL
+ public static Uri GetWellKnownMetadataUrl(Uri issuerUrl)
+ {
+ try
+ {
+ return new Uri(issuerUrl.AbsoluteUri + ".well-known/openid-configuration");
+
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/Metadata.cs b/SharpPulsar/Auth/OAuth2/protocol/Metadata.cs
new file mode 100644
index 000000000..d8114defe
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/Metadata.cs
@@ -0,0 +1,53 @@
+using System;
+using System.Security.Policy;
+using System.Text.Json.Serialization;
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ ///
+ /// Represents OAuth 2.0 Server Metadata.
+ ///
+ public class Metadata
+ {
+
+ [JsonPropertyName("issuer")]
+ public Uri Issuer { get; set; }
+
+ [JsonPropertyName("authorization_endpoint")]
+ public Uri AuthorizationEndpoint { get; set; }
+
+ [JsonPropertyName("token_endpoint")]
+ public Uri TokenEndpoint { get; set; }
+
+ [JsonPropertyName("userinfo_endpoint")]
+ public Uri UserInfoEndpoint { get; set; }
+
+ [JsonPropertyName("revocation_endpoint")]
+ public Uri RevocationEndpoint { get; set; }
+
+ [JsonPropertyName("jwks_uri")]
+ public Uri JwksUri { get; set; }
+
+ [JsonPropertyName("device_authorization_endpoint")]
+ public Uri DeviceAuthorizationEndpoint { get; set; }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/MetadataResolver.cs b/SharpPulsar/Auth/OAuth2/protocol/MetadataResolver.cs
new file mode 100644
index 000000000..1373f9851
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/MetadataResolver.cs
@@ -0,0 +1,31 @@
+using System.Threading.Tasks;
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ ///
+ /// Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
+ ///
+ public interface MetadataResolver
+ {
+ Task Resolve();
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/TokenClient.cs b/SharpPulsar/Auth/OAuth2/protocol/TokenClient.cs
new file mode 100644
index 000000000..62ac32461
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/TokenClient.cs
@@ -0,0 +1,160 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Net;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Text;
+using System.Text.Json;
+using System.Threading.Tasks;
+using SharpPulsar.Extension;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ ///
+ /// A client for an OAuth 2.0 token endpoint.
+ ///
+ public class TokenClient : IClientCredentialsExchanger
+ {
+
+ protected internal const int DefaultConnectTimeoutInSeconds = 10;
+ protected internal const int DefaultReadTimeoutInSeconds = 30;
+
+ private readonly Uri _tokenUrl;
+ private readonly HttpClient _httpClient;
+
+ public TokenClient(Uri tokenUrl) : this(tokenUrl, null)
+ {
+ }
+
+ internal TokenClient(Uri tokenUrl, HttpClient httpClient)
+ {
+ if (httpClient == null)
+ {
+ _httpClient = new HttpClient()
+ {
+ Timeout = TimeSpan.FromSeconds(DefaultConnectTimeoutInSeconds * 1000)
+ };
+ httpClient.DefaultRequestHeaders.Add("User-Agent", "SharpPulsar");
+ }
+ else
+ {
+ _httpClient = httpClient;
+ }
+ _tokenUrl = tokenUrl;
+ }
+
+ public void Close()
+ {
+ _httpClient.Dispose();
+ }
+
+ ///
+ /// Constructing http request parameters.
+ /// object with relevant request parameters
+ /// Generate the final request body from a map.
+ internal virtual string BuildClientCredentialsBody(ClientCredentialsExchangeRequest req)
+ {
+ IDictionary bodyMap = new SortedDictionary();
+ bodyMap["grant_type"] = "client_credentials";
+ bodyMap["client_id"] = req.ClientId;
+ bodyMap["client_secret"] = req.ClientSecret;
+ // Only set audience and scope if they are non-empty.
+ if (!string.IsNullOrWhiteSpace(req.Audience))
+ {
+ bodyMap["audience"] = req.Audience;
+ }
+ if (!string.IsNullOrWhiteSpace(req.Scope))
+ {
+ bodyMap["scope"] = req.Scope;
+ }
+ var map = bodyMap.SetOfKeyValuePairs().Select(e =>
+ {
+ try
+ {
+ return Encoding.UTF8.GetString(Encoding.Default.GetBytes(e.Key)) + '=' + Encoding.UTF8.GetString(Encoding.Default.GetBytes(e.Value));
+ }
+ catch (Exception es)
+ {
+ throw es;
+ }
+ });//.Collect(Collectors.joining("&"));
+ return string.Join("&", map);
+ }
+
+
+ ///
+ /// Performs a token exchange using client credentials.
+ /// the client credentials request details.
+ /// a token result
+ ///
+ public virtual async Task ExchangeClientCredentials(ClientCredentialsExchangeRequest req)
+ {
+ var body = BuildClientCredentialsBody(req);
+
+ try
+ {
+ var mediaType = new MediaTypeWithQualityHeaderValue("application/json");
+ var res = new HttpRequestMessage(HttpMethod.Post, _tokenUrl);
+ res.Headers.Accept.Add(mediaType);
+ res.Headers.Add("Content-Type", "application/x-www-form-urlencoded");
+ res.Content = new StringContent(JsonSerializer.Serialize(body));
+
+ var response = await _httpClient.SendAsync(res);
+ var resultContent = await response.Content.ReadAsStreamAsync();
+
+ switch (response.StatusCode)
+ {
+ case HttpStatusCode.OK:
+ {
+ var result = await JsonSerializer.DeserializeAsync(resultContent,
+ new JsonSerializerOptions
+ {
+ PropertyNameCaseInsensitive = true,
+ });
+ return result;
+ }
+
+ case HttpStatusCode.BadRequest: // Bad request
+ case HttpStatusCode.Unauthorized: // Unauthorized
+ {
+ resultContent = await response.Content.ReadAsStreamAsync();
+ var result = await JsonSerializer.DeserializeAsync(resultContent);
+ throw new TokenExchangeException(result);
+ }
+
+ default:
+ throw new IOException("Failed to perform HTTP request. res: " + response.StatusCode + " " + response.RequestMessage);
+ }
+
+
+
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/TokenError.cs b/SharpPulsar/Auth/OAuth2/protocol/TokenError.cs
new file mode 100644
index 000000000..f0cf59330
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/TokenError.cs
@@ -0,0 +1,41 @@
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ using System.Text.Json.Serialization;
+
+ ///
+ /// Represents an error returned from an OAuth 2.0 token endpoint.
+ ///
+ ///
+ public class TokenError
+ {
+ [JsonPropertyName("error")]
+ public string Error { get; set; }
+
+ [JsonPropertyName("error_description")]
+ public string ErrorDescription { get; set; }
+
+
+ [JsonPropertyName("error_uri")]
+ public string ErrorUri { get; set; }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/TokenExchangeException.cs b/SharpPulsar/Auth/OAuth2/protocol/TokenExchangeException.cs
new file mode 100644
index 000000000..bd2694ae0
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/TokenExchangeException.cs
@@ -0,0 +1,44 @@
+using System;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+ ///
+ /// Indicates a token exchange failure.
+ ///
+ public class TokenExchangeException : Exception
+ {
+ private TokenError error;
+
+ public TokenExchangeException(TokenError error) : base(string.Format("{0} ({1})", error.ErrorDescription, error.Error))
+ {
+ this.error = Error;
+ }
+
+ public virtual TokenError Error
+ {
+ get
+ {
+ return error;
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/TokenResult.cs b/SharpPulsar/Auth/OAuth2/protocol/TokenResult.cs
new file mode 100644
index 000000000..7cf6b7866
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/TokenResult.cs
@@ -0,0 +1,44 @@
+using System;
+using System.Text.Json.Serialization;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ ///
+ /// The result of a token exchange request.
+ ///
+ [Serializable]
+ public class TokenResult
+ {
+ [JsonPropertyName("access_token")]
+ public string AccessToken { get; set; }
+
+ [JsonPropertyName("id_token")]
+ public string IdToken { get; set; }
+
+ [JsonPropertyName("refresh_token")]
+ public string RefreshToken { get; set; }
+
+ [JsonPropertyName("expires_in")]
+ public int ExpiresIn { get; set; }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/DefaultImplementation.cs b/SharpPulsar/DefaultImplementation.cs
index 55dd3af7a..2f1a320ec 100644
--- a/SharpPulsar/DefaultImplementation.cs
+++ b/SharpPulsar/DefaultImplementation.cs
@@ -12,6 +12,7 @@
using SharpPulsar.Schema;
using SharpPulsar.Schemas.Generic;
using Akka.Event;
+using SharpPulsar.Auth.OAuth2;
///
/// Licensed to the Apache Software Foundation (ASF) under one
@@ -66,10 +67,6 @@ public static IAuthentication NewAuthenticationToken(string token)
{
return new AuthenticationToken(token);
}
- public static IAuthentication NewAuthenticationSts(string client, string secret, string authority)
- {
- return new AuthenticationOAuth2(client, secret, authority);
- }
public static IAuthentication NewAuthenticationToken(Func supplier)
{
return new AuthenticationToken(supplier);
diff --git a/SharpPulsar/SocketImpl/SocketClient.cs b/SharpPulsar/SocketImpl/SocketClient.cs
index f38b2dcdd..d234ee7a7 100644
--- a/SharpPulsar/SocketImpl/SocketClient.cs
+++ b/SharpPulsar/SocketImpl/SocketClient.cs
@@ -302,6 +302,7 @@ private async Task ConnectAsync(IPAddress[] serverAddresses, int port)
try
{
await _socket.ConnectAsync(address, port).ConfigureAwait(false);
+ return;
}
catch (Exception exc)
{
diff --git a/Tests/SharpPulsar.Test/SharpPulsar.Test.csproj b/Tests/SharpPulsar.Test/SharpPulsar.Test.csproj
index 653619af6..b0d4313cc 100644
--- a/Tests/SharpPulsar.Test/SharpPulsar.Test.csproj
+++ b/Tests/SharpPulsar.Test/SharpPulsar.Test.csproj
@@ -103,4 +103,8 @@
+
+
+
+
diff --git a/Tutorials/OAuth2/Oauth2.cs b/Tutorials/OAuth2/Oauth2.cs
new file mode 100644
index 000000000..c76ffaeed
--- /dev/null
+++ b/Tutorials/OAuth2/Oauth2.cs
@@ -0,0 +1,66 @@
+using System;
+using System.IO;
+using System.Reflection;
+using System.Text;
+using System.Threading.Tasks;
+using SharpPulsar;
+using SharpPulsar.Auth.OAuth2;
+using SharpPulsar.Builder;
+using SharpPulsar.Common.Enum;
+using static SharpPulsar.Protocol.Proto.CommandSubscribe;
+//https://github.com/fsprojects/pulsar-client-dotnet/blob/d643e7a2518b7ab6bd5a346c1836d944c3b557f8/tests/IntegrationTests/Common.fs
+namespace Tutorials.OAuth2
+{
+ internal class Oauth2
+ {
+ static string GetConfigFilePath()
+ {
+ var configFolderName = "Oauth2Files";
+ var privateKeyFileName = "credentials_file.json";
+ var startup = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
+ var indexOfConfigDir = startup.IndexOf("examples", StringComparison.Ordinal);
+ var examplesFolder = startup.Substring(0, startup.Length - indexOfConfigDir - 3);
+ var configFolder = Path.Combine(examplesFolder, configFolderName);
+ var ret = Path.Combine(configFolder, privateKeyFileName);
+ if (!File.Exists(ret)) throw new FileNotFoundException("can't find credentials file");
+ return ret;
+ }
+//In order to run this example one has to have authentication on broker
+//Check configuration files to see how to set up authentication in broker
+//In this example Auth0 server is used, look at it's response in Auth0response file
+ internal static async Task RunOauth()
+ {
+ var fileUri = new Uri(GetConfigFilePath());
+ var issuerUrl = new Uri("https://pulsar-sample.us.auth0.com");
+ var audience = "https://pulsar-sample.us.auth0.com/api/v2/";
+
+ var serviceUrl = "pulsar://localhost:6650";
+ var subscriptionName = "my-subscription";
+ var topicName = $"my-topic-%{DateTime.Now.Ticks}";
+
+ var clientConfig = new PulsarClientConfigBuilder()
+ .ServiceUrl(serviceUrl)
+ .Authentication(AuthenticationFactoryOAuth2.ClientCredentials(issuerUrl, fileUri, audience));
+
+ //pulsar actor system
+ var pulsarSystem = await PulsarSystem.GetInstanceAsync(clientConfig);
+ var pulsarClient = pulsarSystem.NewClient();
+
+ var producer = pulsarClient.NewProducer(new ProducerConfigBuilder()
+ .Topic(topicName));
+
+ var consumer = pulsarClient.NewConsumer(new ConsumerConfigBuilder()
+ .Topic(topicName)
+ .SubscriptionName(subscriptionName)
+ .SubscriptionType(SubType.Exclusive));
+
+ var messageId = await producer.SendAsync(Encoding.UTF8.GetBytes($"Sent from C# at '{DateTime.Now}'"));
+ Console.WriteLine($"MessageId is: '{messageId}'");
+
+ var message = await consumer.ReceiveAsync();
+ Console.WriteLine($"Received: {Encoding.UTF8.GetString(message.Data)}");
+
+ await consumer.AcknowledgeAsync(message.MessageId);
+ }
+ }
+}
\ No newline at end of file
diff --git a/Tutorials/OAuth2/Oauth2Files/credentials_file.json b/Tutorials/OAuth2/Oauth2Files/credentials_file.json
new file mode 100644
index 000000000..884be9a4a
--- /dev/null
+++ b/Tutorials/OAuth2/Oauth2Files/credentials_file.json
@@ -0,0 +1,7 @@
+{
+ "type": "client_credentials",
+ "client_id": "kC2t8oF0kYbQiuj5FQGHXHZzer2FB46t",
+ "client_secret": "wOSnirygS3E-dUKSDnEdyclwvRa4l2jFPlvAdJMa3U1_zCWrjcvng0uYKXF2XZn6",
+ "client_email": "test@developer.gserviceaccount.com",
+ "issuer_url": "https://pulsar-sample.us.auth0.com"
+}
\ No newline at end of file
diff --git a/Tutorials/OAuth2/Oauth2Files/standalone.conf b/Tutorials/OAuth2/Oauth2Files/standalone.conf
new file mode 100644
index 000000000..5373bf4e9
--- /dev/null
+++ b/Tutorials/OAuth2/Oauth2Files/standalone.conf
@@ -0,0 +1,69 @@
+### --- Authentication --- ###
+# Role names that are treated as "proxy roles". If the broker sees a request with
+#role as proxyRoles - it will demand to see a valid original principal.
+proxyRoles=
+
+# If this flag is set then the broker authenticates the original Auth data
+# else it just accepts the originalPrincipal and authorizes it (if required).
+authenticateOriginalAuthData=false
+
+# Enable authentication
+authenticationEnabled=true
+
+# Autentication provider name list, which is comma separated list of class names
+authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+
+# Enforce authorization
+authorizationEnabled=true
+
+# Authorization provider fully qualified class-name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
+# Allow wildcard matching in authorization
+# (wildcard matching only applicable if wildcard-char:
+# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
+authorizationAllowWildcardsMatching=false
+
+# Role names that are treated as "super-user", meaning they will be able to do all admin
+# operations and publish/consume from all topics
+superUserRoles= lord_kek
+
+# Authentication settings of the broker itself. Used when the broker connects to other brokers,
+# either in same or other clusters
+brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
+brokerClientAuthenticationParameters={"issuerUrl": "https://pulsar-sample.us.auth0.com","privateKey": "/pulsar/credentials_file.json","audience": "https://pulsar-sample.us.auth0.com/api/v2/"}
+
+# Supported Athenz provider domain names(comma separated) for authentication
+athenzDomainNames=
+
+# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
+anonymousUserRole=
+
+
+### --- Token Authentication Provider --- ###
+
+## Symmetric key
+# Configure the secret key to be used to validate auth tokens
+# The key can be specified like:
+# tokenSecretKey=data:;base64,xxxxxxxxx
+# tokenSecretKey=file:///my/secret.key ( Note: key file must be DER-encoded )
+tokenSecretKey=
+
+## Asymmetric public/private key pair
+# Configure the public key to be used to validate auth tokens
+# The key can be specified like:
+# tokenPublicKey=data:;base64,xxxxxxxxx
+# tokenPublicKey=file:///my/public.key ( Note: key file must be DER-encoded )
+tokenPublicKey=file:///pulsar/oauth_public2.key
+
+
+# The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken (defaults to "sub" if blank)
+tokenAuthClaim=https://pulsar.com/tokenAuthClaim
+
+# The token audience "claim" name, e.g. "aud", that will be used to get the audience from token.
+# If not set, audience will not be verified.
+tokenAudienceClaim=
+
+# The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this.
+tokenAudience=
+
diff --git a/Tutorials/Program.cs b/Tutorials/Program.cs
index 7457e92f2..b11ba9391 100644
--- a/Tutorials/Program.cs
+++ b/Tutorials/Program.cs
@@ -8,6 +8,7 @@
using System.Threading.Tasks;
using DotNet.Testcontainers.Builders;
using SharpPulsar;
+using SharpPulsar.Auth.OAuth2;
using SharpPulsar.Builder;
using SharpPulsar.Interfaces;
using SharpPulsar.Schemas;
@@ -46,7 +47,6 @@ static async Task Main(string[] args)
if(selection.Equals("1"))
url = "pulsar+ssl://127.0.0.1:6651";
-
var clientConfig = new PulsarClientConfigBuilder()
.ServiceUrl(url);
diff --git a/Tutorials/Tutorials.csproj b/Tutorials/Tutorials.csproj
index 37f51d5d3..8af8a4da0 100644
--- a/Tutorials/Tutorials.csproj
+++ b/Tutorials/Tutorials.csproj
@@ -14,6 +14,12 @@
+
+
+ Always
+
+
+
Always
@@ -54,6 +60,12 @@
Always
+
+ Always
+
+
+ Always
+