diff --git a/SharpPulsar/Auth/OAuth2/protocol/ClientCredentialsExchangeRequest.cs b/SharpPulsar/Auth/OAuth2/protocol/ClientCredentialsExchangeRequest.cs
new file mode 100644
index 000000000..16b619976
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/ClientCredentialsExchangeRequest.cs
@@ -0,0 +1,40 @@
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+ ///
+ /// A token request based on the exchange of client credentials.
+ ///
+ /// OAuth 2.0 RFC 6749, section 4.4"/>
+ public class ClientCredentialsExchangeRequest
+ {
+// @JsonProperty("client_id") private String clientId;
+ private string clientId;
+
+// JsonProperty("client_secret") private String clientSecret;
+ private string clientSecret;
+
+// JsonProperty("audience") private String audience;
+ private string audience;
+
+// JsonProperty("scope") private String scope;
+ private string scope;
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/ClientCredentialsExchanger.cs b/SharpPulsar/Auth/OAuth2/protocol/ClientCredentialsExchanger.cs
new file mode 100644
index 000000000..f5103ade8
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/ClientCredentialsExchanger.cs
@@ -0,0 +1,36 @@
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ ///
+ /// An interface for exchanging client credentials for an access token.
+ ///
+ public interface ClientCredentialsExchanger : AutoCloseable
+ {
+ ///
+ /// Requests an exchange of client credentials for an access token.
+ /// the request details.
+ /// an access token.
+ /// if the OAuth server returned a detailed error.
+ /// if a general IO error occurred.
+ TokenResult ExchangeClientCredentials(ClientCredentialsExchangeRequest req);
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/DefaultMetadataResolver.cs b/SharpPulsar/Auth/OAuth2/protocol/DefaultMetadataResolver.cs
new file mode 100644
index 000000000..f79ffcb15
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/DefaultMetadataResolver.cs
@@ -0,0 +1,122 @@
+using System.IO;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+ using ObjectMapper = com.fasterxml.jackson.databind.ObjectMapper;
+ using ObjectReader = com.fasterxml.jackson.databind.ObjectReader;
+
+ ///
+ /// Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
+ ///
+ public class DefaultMetadataResolver : MetadataResolver
+ {
+
+ protected internal const int DefaultConnectTimeoutInSeconds = 10;
+ protected internal const int DefaultReadTimeoutInSeconds = 30;
+
+ private readonly URL metadataUrl;
+ private readonly ObjectReader objectReader;
+ private Duration connectTimeout;
+ private Duration readTimeout;
+
+ public DefaultMetadataResolver(URL MetadataUrl)
+ {
+ this.metadataUrl = MetadataUrl;
+ this.objectReader = (new ObjectMapper()).readerFor(typeof(Metadata));
+ // set a default timeout to ensure that this doesn't block
+ this.connectTimeout = Duration.ofSeconds(DefaultConnectTimeoutInSeconds);
+ this.readTimeout = Duration.ofSeconds(DefaultReadTimeoutInSeconds);
+ }
+
+ public virtual DefaultMetadataResolver WithConnectTimeout(Duration ConnectTimeout)
+ {
+ this.connectTimeout = ConnectTimeout;
+ return this;
+ }
+
+ public virtual DefaultMetadataResolver WithReadTimeout(Duration ReadTimeout)
+ {
+ this.readTimeout = ReadTimeout;
+ return this;
+ }
+
+ ///
+ /// Resolves the authorization metadata.
+ /// metadata
+ /// if the metadata could not be resolved.
+ public virtual Metadata Resolve()
+ {
+ try
+ {
+ URLConnection C = this.metadataUrl.openConnection();
+ if (connectTimeout != null)
+ {
+ C.setConnectTimeout((int) connectTimeout.toMillis());
+ }
+ if (readTimeout != null)
+ {
+ C.setReadTimeout((int) readTimeout.toMillis());
+ }
+ C.setRequestProperty("Accept", "application/json");
+
+ Metadata Metadata;
+ using (Stream InputStream = C.getInputStream())
+ {
+ Metadata = this.objectReader.readValue(InputStream);
+ }
+ return Metadata;
+
+ }
+ catch (IOException E)
+ {
+ throw new IOException("Cannot obtain authorization metadata from " + metadataUrl.ToString(), E);
+ }
+ }
+
+ ///
+ /// Gets a well-known metadata URL for the given OAuth issuer URL.
+ /// The authorization server's issuer identifier
+ /// a resolver
+ public static DefaultMetadataResolver FromIssuerUrl(URL IssuerUrl)
+ {
+ return new DefaultMetadataResolver(GetWellKnownMetadataUrl(IssuerUrl));
+ }
+
+ ///
+ /// Gets a well-known metadata URL for the given OAuth issuer URL.
+ /// "
+ /// OAuth Discovery: Obtaining Authorization Server Metadata/>
+ /// The authorization server's issuer identifier
+ /// a URL
+ public static URL GetWellKnownMetadataUrl(URL IssuerUrl)
+ {
+ try
+ {
+ return URI.create(IssuerUrl.toExternalForm() + "/.well-known/openid-configuration").normalize().toURL();
+ }
+ catch (MalformedURLException E)
+ {
+ throw new System.ArgumentException(E);
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/Metadata.cs b/SharpPulsar/Auth/OAuth2/protocol/Metadata.cs
new file mode 100644
index 000000000..2ad27d803
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/Metadata.cs
@@ -0,0 +1,44 @@
+using System.Security.Policy;
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ ///
+ /// Represents OAuth 2.0 Server Metadata.
+ ///
+ public class Metadata
+ {
+ //JsonProperty("issuer") private java.net.URL authorizationEndpoint
+ public Url issuer;
+ //JsonProperty("authorization_endpoint") private java.net.URL authorizationEndpoint
+ public Url authorizationEndpoint;
+ //JsonProperty("token_endpoint") private java.net.URL tokenEndpoint
+ public Url tokenEndpoint;
+ //JsonProperty("userinfo_endpoint") private java.net.URL userInfoEndpoint;
+ public Url userInfoEndpoint;
+ //JsonProperty("revocation_endpoint") private java.net.URL revocationEndpoint;
+ public Url revocationEndpoint;
+ //JsonProperty("jwks_uri") private java.net.URL jwksUri
+ public Url jwksUri;
+ //JsonProperty("device_authorization_endpoint") private java.net.URL deviceAuthorizationEndpoint;
+ public Url deviceAuthorizationEndpoint;
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/MetadataResolver.cs b/SharpPulsar/Auth/OAuth2/protocol/MetadataResolver.cs
new file mode 100644
index 000000000..f109e57ad
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/MetadataResolver.cs
@@ -0,0 +1,30 @@
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ ///
+ /// Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
+ ///
+ public interface MetadataResolver
+ {
+ Metadata Resolve();
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/TokenClient.cs b/SharpPulsar/Auth/OAuth2/protocol/TokenClient.cs
new file mode 100644
index 000000000..ed50bfabf
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/TokenClient.cs
@@ -0,0 +1,134 @@
+using System;
+using System.Collections.Generic;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ ///
+ /// A client for an OAuth 2.0 token endpoint.
+ ///
+ public class TokenClient : ClientCredentialsExchanger
+ {
+
+ protected internal const int DefaultConnectTimeoutInSeconds = 10;
+ protected internal const int DefaultReadTimeoutInSeconds = 30;
+
+ private readonly URL tokenUrl;
+ private readonly AsyncHttpClient httpClient;
+
+ public TokenClient(URL TokenUrl) : this(TokenUrl, null)
+ {
+ }
+
+ internal TokenClient(URL TokenUrl, AsyncHttpClient HttpClient)
+ {
+ if (HttpClient == null)
+ {
+ DefaultAsyncHttpClientConfig.Builder ConfBuilder = new DefaultAsyncHttpClientConfig.Builder();
+ ConfBuilder.setFollowRedirect(true);
+ ConfBuilder.setConnectTimeout(DefaultConnectTimeoutInSeconds * 1000);
+ ConfBuilder.setReadTimeout(DefaultReadTimeoutInSeconds * 1000);
+ ConfBuilder.setUserAgent(string.Format("Pulsar-Java-v{0}", PulsarVersion.Version));
+ AsyncHttpClientConfig Config = ConfBuilder.build();
+ this.httpClient = new DefaultAsyncHttpClient(Config);
+ }
+ else
+ {
+ this.httpClient = HttpClient;
+ }
+ this.tokenUrl = TokenUrl;
+ }
+
+ public override void close()
+ {
+ httpClient.close();
+ }
+
+ ///
+ /// Constructing http request parameters.
+ /// object with relevant request parameters
+ /// Generate the final request body from a map.
+ internal virtual string BuildClientCredentialsBody(ClientCredentialsExchangeRequest Req)
+ {
+ IDictionary BodyMap = new SortedDictionary();
+ BodyMap["grant_type"] = "client_credentials";
+ BodyMap["client_id"] = Req.getClientId();
+ BodyMap["client_secret"] = Req.getClientSecret();
+ // Only set audience and scope if they are non-empty.
+ if (!StringUtils.isBlank(Req.getAudience()))
+ {
+ BodyMap["audience"] = Req.getAudience();
+ }
+ if (!StringUtils.isBlank(Req.getScope()))
+ {
+ BodyMap["scope"] = Req.getScope();
+ }
+ return BodyMap.SetOfKeyValuePairs().Select(e =>
+ {
+ try
+ {
+ return URLEncoder.encode(e.getKey(), "UTF-8") + '=' + URLEncoder.encode(e.getValue(), "UTF-8");
+ }
+ catch (UnsupportedEncodingException E1)
+ {
+ throw new Exception(E1);
+ }
+ }).collect(Collectors.joining("&"));
+ }
+
+ ///
+ /// Performs a token exchange using client credentials.
+ /// the client credentials request details.
+ /// a token result
+ ///
+ public virtual TokenResult ExchangeClientCredentials(ClientCredentialsExchangeRequest Req)
+ {
+ string Body = BuildClientCredentialsBody(Req);
+
+ try
+ {
+
+ Response Res = httpClient.preparePost(tokenUrl.ToString()).setHeader("Accept", "application/json").setHeader("Content-Type", "application/x-www-form-urlencoded").setBody(Body).execute().get();
+
+ switch (Res.getStatusCode())
+ {
+ case 200:
+ return ObjectMapperFactory.ThreadLocal.reader().readValue(Res.getResponseBodyAsBytes(), typeof(TokenResult));
+
+ case 400: // Bad request
+ case 401: // Unauthorized
+ throw new TokenExchangeException(ObjectMapperFactory.ThreadLocal.reader().readValue(Res.getResponseBodyAsBytes(), typeof(TokenError)));
+
+ default:
+ throw new IOException("Failed to perform HTTP request. res: " + Res.getStatusCode() + " " + Res.getStatusText());
+ }
+
+
+
+ }
+ catch (Exception e1) when (e1 is InterruptedException || e1 is ExecutionException)
+ {
+ throw new IOException(e1);
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/TokenError.cs b/SharpPulsar/Auth/OAuth2/protocol/TokenError.cs
new file mode 100644
index 000000000..ebad5a042
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/TokenError.cs
@@ -0,0 +1,35 @@
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+ ///
+ /// Represents an error returned from an OAuth 2.0 token endpoint.
+ ///
+ ///
+ public class TokenError
+ {
+ //JsonProperty("error") private String error;
+ public string Error;
+ //JsonProperty("error_description") private String errorDescription;
+ public string ErrorDescription;
+ //JsonProperty("error_uri") private String errorUri;
+ public string ErrorUri;
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/TokenExchangeException.cs b/SharpPulsar/Auth/OAuth2/protocol/TokenExchangeException.cs
new file mode 100644
index 000000000..f412f91a3
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/TokenExchangeException.cs
@@ -0,0 +1,44 @@
+using System;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+ ///
+ /// Indicates a token exchange failure.
+ ///
+ public class TokenExchangeException : Exception
+ {
+ private TokenError error;
+
+ public TokenExchangeException(TokenError Error) : base(string.Format("{0} ({1})", Error.getErrorDescription(), Error.getError()))
+ {
+ this.error = Error;
+ }
+
+ public virtual TokenError Error
+ {
+ get
+ {
+ return error;
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/SharpPulsar/Auth/OAuth2/protocol/TokenResult.cs b/SharpPulsar/Auth/OAuth2/protocol/TokenResult.cs
new file mode 100644
index 000000000..4eece9bdf
--- /dev/null
+++ b/SharpPulsar/Auth/OAuth2/protocol/TokenResult.cs
@@ -0,0 +1,41 @@
+using System;
+
+///
+/// Licensed to the Apache Software Foundation (ASF) under one
+/// or more contributor license agreements. See the NOTICE file
+/// distributed with this work for additional information
+/// regarding copyright ownership. The ASF licenses this file
+/// to you under the Apache License, Version 2.0 (the
+/// "License"); you may not use this file except in compliance
+/// with the License. You may obtain a copy of the License at
+///
+/// http://www.apache.org/licenses/LICENSE-2.0
+///
+/// Unless required by applicable law or agreed to in writing,
+/// software distributed under the License is distributed on an
+/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+/// KIND, either express or implied. See the License for the
+/// specific language governing permissions and limitations
+/// under the License.
+///
+namespace SharpPulsar.Auth.OAuth2.Protocol
+{
+
+ ///
+ /// The result of a token exchange request.
+ ///
+ [Serializable]
+ public class TokenResult
+ {
+ private const long SerialVersionUID = 1L;
+ //JsonProperty("access_token") private String accessToken;
+ public string AccessToken;
+ //JsonProperty("id_token") private String idToken;
+ public string IdToken;
+ //JsonProperty("refresh_token") private String refreshToken;
+ public string RefreshToken;
+ //JsonProperty("expires_in") private int expiresIn;
+ public int ExpiresIn;
+ }
+
+}
\ No newline at end of file