From bc4ebd95c529e99f6ff55e03929eb42c1c49bcd8 Mon Sep 17 00:00:00 2001 From: Shigenobu Furuta Date: Tue, 16 Apr 2024 00:00:56 +0900 Subject: [PATCH] Version 0.2.0. * Async support. * IPv6 support. --- OrangeCabinet.Tests/TestSimpleAsync.cs | 89 +++++++++++++++++++++++++ OrangeCabinet/OcBinder.cs | 17 ++++- OrangeCabinet/OcCallback.cs | 67 ++++++++++++++++++- OrangeCabinet/OcHandlerReceive.cs | 37 ++++++++--- OrangeCabinet/OcLocal.cs | 64 ++++++++++++++++-- OrangeCabinet/OcLock.cs | 57 ++++++++++++++++ OrangeCabinet/OcRemote.cs | 57 ++++++++++++---- OrangeCabinet/OcRemoteManager.cs | 63 ++++++++++-------- OrangeCabinet/OrangeCabinet.csproj | 2 +- README.md | 90 ++++++++++++++++++++++++-- 10 files changed, 480 insertions(+), 63 deletions(-) create mode 100644 OrangeCabinet.Tests/TestSimpleAsync.cs create mode 100644 OrangeCabinet/OcLock.cs diff --git a/OrangeCabinet.Tests/TestSimpleAsync.cs b/OrangeCabinet.Tests/TestSimpleAsync.cs new file mode 100644 index 0000000..832f955 --- /dev/null +++ b/OrangeCabinet.Tests/TestSimpleAsync.cs @@ -0,0 +1,89 @@ +using System.IO; +using System.Net; +using System.Threading; +using Xunit; +using Xunit.Abstractions; + +namespace OrangeCabinet.Tests +{ + public class TestSimpleAsync + { + public TestSimpleAsync(ITestOutputHelper testOutputHelper) + { + OcDate.AddSeconds = 60 * 60 * 9; + // OcLogger.Writer = new StreamWriter(new FileStream("Test.log", FileMode.Append)); + OcLogger.Verbose = true; + OcLogger.Transfer = new OcLoggerTransfer + { + Transfer = msg => testOutputHelper.WriteLine(msg.ToString()), + Raw = false + }; + } + + [Fact] + public async Task Test() + { + var serverBinder = new OcBinder(new SampleAsyncCallback()) + { + BindPort = 8710, + }; + var server = new OcLocal(serverBinder); + server.Start(); + // server.WaitFor(); + + // ----- + using var clientBinder = new OcBinder(new SampleAsyncCallback()) + { + BindPort = 18710, + }; + var client = new OcRemote(clientBinder, "127.0.0.1", 8710); + for (int j = 0; j < 3; j++) + { + await client.SendAsync($"{j}".OxToBytes()); + } + // ----- + + // ... + Thread.Sleep(1000); + server.SendTo("hello from server", new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8710)); + server.Shutdown(); + + OcLogger.Close(); + } + } + + public class SampleAsyncCallback : OcCallback + { + private const string Key = "inc"; + + public override bool UseAsyncCallback { get; init; } = true; + + public override async Task IncomingAsync(OcRemote remote, byte[] message) + { + OcLogger.Info($"Received: {message.OxToString()} ({remote})"); + + int inc = remote.GetValue(Key); + inc++; + remote.SetValue(Key, inc); + + await remote.SendAsync($"{inc}".OxToBytes()); + if (inc > 10) + { + remote.ClearValue(Key); + remote.Escape(); + } + } + + public override Task TimeoutAsync(OcRemote remote) + { + OcLogger.Info($"Timeout: {remote}"); + return Task.CompletedTask; + } + + public override Task ShutdownAsync(OcRemote remote) + { + OcLogger.Info($"Shutdown: {remote}"); + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/OrangeCabinet/OcBinder.cs b/OrangeCabinet/OcBinder.cs index aa3163c..5d2249b 100644 --- a/OrangeCabinet/OcBinder.cs +++ b/OrangeCabinet/OcBinder.cs @@ -8,6 +8,11 @@ namespace OrangeCabinet; /// public class OcBinder : IDisposable { + /// + /// Default timeout milli seconds. + /// + internal const int DefaultTimeoutMilliSeconds = 1500; + /// /// Default ip v4 host. /// @@ -34,6 +39,9 @@ public class OcBinder : IDisposable /// callback public OcBinder(OcCallback callback) { + if (OcCallback.ContainsAsync(callback)) + throw new OcBinderException( + $"Disallow async override at {string.Join(',', OcCallback.SynchronousMethodNames.ToArray())} in {callback.GetType().FullName}, use 'xxxAsync' alternatively."); Callback = callback; } @@ -140,16 +148,19 @@ internal void Bind(OcBindMode bindMode) } /// - /// Send bytes to remote. + /// Async send bytes to remote. /// /// message /// remote endpoint + /// timeout /// bind exception - internal void SendTo(byte[] message, IPEndPoint remoteEndpoint) + internal async Task SendToAsync(byte[] message, IPEndPoint remoteEndpoint, int timeout = DefaultTimeoutMilliSeconds) { if (BindSocket == null) throw new OcBinderException($"Not bind on {BindSocket}:{BindPort}"); - BindSocket.SendTo(message, SocketFlags.None, remoteEndpoint); + var t = BindSocket.SendToAsync(message, SocketFlags.None, remoteEndpoint); + if (await Task.WhenAny(t, Task.Delay(timeout)) != t) + throw new OcBinderException($"Error send to {remoteEndpoint.OxToHostPort()} ({message.Length})"); } /// diff --git a/OrangeCabinet/OcCallback.cs b/OrangeCabinet/OcCallback.cs index 58a7d97..6e0eccf 100644 --- a/OrangeCabinet/OcCallback.cs +++ b/OrangeCabinet/OcCallback.cs @@ -1,3 +1,6 @@ +using System.Reflection; +using System.Runtime.CompilerServices; + namespace OrangeCabinet; /// @@ -5,12 +8,54 @@ namespace OrangeCabinet; /// public abstract class OcCallback { + /// + /// Synchronous method names. + /// + internal static readonly List SynchronousMethodNames = new() {"Incoming", "Timeout", "Shutdown"}; + + /// + /// Use async callback. + /// + public virtual bool UseAsyncCallback { get; init; } + + /// + /// Contains async. + /// + /// callback + /// if contains, return true + internal static bool ContainsAsync(OcCallback callback) + { + var attType = typeof(AsyncStateMachineAttribute); + foreach (var methodInfo in callback.GetType().GetMethods()) + { + if (!SynchronousMethodNames.Contains(methodInfo.Name)) continue; + + var attrib = methodInfo.GetCustomAttribute(attType); + if (attrib != null) return true; + } + + return false; + } + /// /// Incoming. /// /// received remote /// message - public abstract void Incoming(OcRemote remote, byte[] message); + public virtual void Incoming(OcRemote remote, byte[] message) + { + } + + /// + /// Async incoming. + /// + /// received remote + /// message + /// task + public virtual Task IncomingAsync(OcRemote remote, byte[] message) + { + return Task.CompletedTask; + } /// /// Timeout. @@ -20,6 +65,16 @@ public virtual void Timeout(OcRemote remote) { } + /// + /// Async timeout. + /// + /// be timeout remote + /// task + public virtual Task TimeoutAsync(OcRemote remote) + { + return Task.CompletedTask; + } + /// /// Shutdown. /// @@ -27,4 +82,14 @@ public virtual void Timeout(OcRemote remote) public virtual void Shutdown(OcRemote remote) { } + + /// + /// Shutdown. + /// + /// be shutdown remote + /// task + public virtual Task ShutdownAsync(OcRemote remote) + { + return Task.CompletedTask; + } } \ No newline at end of file diff --git a/OrangeCabinet/OcHandlerReceive.cs b/OrangeCabinet/OcHandlerReceive.cs index 595c5b9..151302d 100644 --- a/OrangeCabinet/OcHandlerReceive.cs +++ b/OrangeCabinet/OcHandlerReceive.cs @@ -123,20 +123,32 @@ internal override void Complete(IAsyncResult result) return; } + EndPoint? remoteEndpoint; + int received; try { // received - EndPoint remoteEndpoint = new IPEndPoint(IPAddress.Any, 0); - var received = state!.Socket.EndReceiveFrom(result, ref remoteEndpoint); + remoteEndpoint = new IPEndPoint(IPAddress.Any, 0); + received = state!.Socket.EndReceiveFrom(result, ref remoteEndpoint); if (received <= 0) { OcLogger.Debug(() => $"Received wrong size: {received}"); return; } + } + catch (Exception e) + { + OcLogger.Debug(() => e); + Failed(state!); + return; + } - var remote = _remoteManager.Generate((IPEndPoint) remoteEndpoint); + // callback + var taskReceive = Task.Run(async () => + { + var remote = await _remoteManager.GenerateAsync((IPEndPoint) remoteEndpoint); OcLogger.Debug(() => $"Received remote: {remote}, size: {received}"); - lock (remote) + using (await remote.Lock.LockAsync()) { // if remote is active and not timeout, invoke incoming if (remote.Active && !remote.IsTimeout()) @@ -144,15 +156,20 @@ internal override void Complete(IAsyncResult result) var message = new byte[received]; Buffer.BlockCopy(state.Buffer!, 0, message, 0, message.Length); remote.UpdateTimeout(); - _callback.Incoming(remote, message); + if (_callback.UseAsyncCallback) + await _callback.IncomingAsync(remote, message); + else + // ReSharper disable once MethodHasAsyncOverload + _callback.Incoming(remote, message); } } - } - catch (Exception e) + }); + taskReceive.ContinueWith(comp => { - OcLogger.Debug(() => e); - Failed(state!); - } + if (comp.Exception is not { } e) return; + OcLogger.Debug(() => e.InnerExceptions); + Failed(state); + }); } /// diff --git a/OrangeCabinet/OcLocal.cs b/OrangeCabinet/OcLocal.cs index 60c16aa..da167f9 100644 --- a/OrangeCabinet/OcLocal.cs +++ b/OrangeCabinet/OcLocal.cs @@ -36,9 +36,26 @@ public void Start() /// /// message /// remote endpoint - public void SendTo(string message, IPEndPoint remoteEndpoint) + /// timeout + /// send error + public void SendTo(string message, IPEndPoint remoteEndpoint, int timeout = OcBinder.DefaultTimeoutMilliSeconds) { - _binder.SendTo(message.OxToBytes(), remoteEndpoint); + SendTo(message.OxToBytes(), remoteEndpoint, timeout); + } + + /// + /// Async send string to remote. + /// Enable to send message for some endpoint directly what you hope. + /// Notice, this method is not checked for remote endpoint state. + /// + /// message + /// remote endpoint + /// timeout + /// send error + public async Task SendToAsync(string message, IPEndPoint remoteEndpoint, + int timeout = OcBinder.DefaultTimeoutMilliSeconds) + { + await SendToAsync(message.OxToBytes(), remoteEndpoint, timeout); } /// @@ -48,9 +65,34 @@ public void SendTo(string message, IPEndPoint remoteEndpoint) /// /// message /// remote endpoint - public void SendTo(byte[] message, IPEndPoint remoteEndpoint) + /// timeout + /// send error + public void SendTo(byte[] message, IPEndPoint remoteEndpoint, int timeout = OcBinder.DefaultTimeoutMilliSeconds) { - _binder.SendTo(message, remoteEndpoint); + SendToAsync(message, remoteEndpoint, timeout).ConfigureAwait(false).GetAwaiter().GetResult(); + } + + /// + /// Async send bytes to remote. + /// Enable to send message for some endpoint directly what you hope. + /// Notice, this method is not checked for remote endpoint state. + /// + /// message + /// remote endpoint + /// timeout + /// send error + public async Task SendToAsync(byte[] message, IPEndPoint remoteEndpoint, + int timeout = OcBinder.DefaultTimeoutMilliSeconds) + { + try + { + await _binder.SendToAsync(message, remoteEndpoint, timeout); + } + catch (Exception e) + { + OcLogger.Error(e); + throw new OcLocalSendException(e); + } } /// @@ -68,4 +110,18 @@ public void Shutdown() { _binder.Close(); } +} + +/// +/// Local send exception. +/// +public class OcLocalSendException : Exception +{ + /// + /// Constructor. + /// + /// exception + internal OcLocalSendException(Exception e) : base(e.ToString()) + { + } } \ No newline at end of file diff --git a/OrangeCabinet/OcLock.cs b/OrangeCabinet/OcLock.cs new file mode 100644 index 0000000..8382a1e --- /dev/null +++ b/OrangeCabinet/OcLock.cs @@ -0,0 +1,57 @@ +namespace OrangeCabinet; + +/// +/// Lock. +/// +public class OcLock +{ + /// + /// Semaphore. + /// + private readonly SemaphoreSlim _semaphore = new(1, 1); + + /// + /// Lock async. + /// + /// task + public async Task LockAsync() + { + await _semaphore.WaitAsync(); + return new OcLockHandler(_semaphore); + } + + /// + /// Lock handler. + /// + private sealed class OcLockHandler : IDisposable + { + /// + /// Semaphore. + /// + private readonly SemaphoreSlim _semaphore; + + /// + /// Disposed. + /// + private bool _disposed; + + /// + /// Constructor. + /// + /// semaphore + public OcLockHandler(SemaphoreSlim semaphore) + { + _semaphore = semaphore; + } + + /// + /// Dispose. + /// + public void Dispose() + { + if (_disposed) return; + _semaphore.Release(); + _disposed = true; + } + } +} \ No newline at end of file diff --git a/OrangeCabinet/OcRemote.cs b/OrangeCabinet/OcRemote.cs index 5fc5ca2..422cf7b 100644 --- a/OrangeCabinet/OcRemote.cs +++ b/OrangeCabinet/OcRemote.cs @@ -80,6 +80,11 @@ public OcRemote(OcBinder binder, IPEndPoint remoteEndpoint) /// internal bool Active { get; set; } = true; + /// + /// Lock. + /// + internal OcLock Lock { get; } = new(); + /// /// Change idle milli seconds. /// @@ -113,10 +118,23 @@ internal void UpdateTimeout() /// If remote is timeout or inactive, not send and throws exception. /// /// message - /// send error - public void Send(string message) + /// timeout + /// send error + public void Send(string message, int timeout = OcBinder.DefaultTimeoutMilliSeconds) { - Send(message.OxToBytes()); + Send(message.OxToBytes(), timeout); + } + + /// + /// Async send string. + /// If remote is timeout or inactive, not send and throws exception. + /// + /// message + /// timeout + /// send error + public async Task SendAsync(string message, int timeout = OcBinder.DefaultTimeoutMilliSeconds) + { + await SendAsync(message.OxToBytes(), timeout); } /// @@ -124,23 +142,36 @@ public void Send(string message) /// If remote is timeout or inactive, not send and throws exception. /// /// message - /// send error - public void Send(byte[] message) + /// timeout + /// send error + public void Send(byte[] message, int timeout = OcBinder.DefaultTimeoutMilliSeconds) + { + SendAsync(message, timeout).ConfigureAwait(false).GetAwaiter().GetResult(); + } + + /// + /// Async send bytes. + /// If remote is timeout or inactive, not send and throws exception. + /// + /// message + /// timeout + /// send error + public async Task SendAsync(byte[] message, int timeout = OcBinder.DefaultTimeoutMilliSeconds) { // if escaped, disallow send - if (_lifeTimestampMilliseconds <= 0) throw new OcSendException($"Remote({this}) is already escaped"); + if (_lifeTimestampMilliseconds <= 0) throw new OcRemoteSendException($"Remote({this}) is already escaped"); // if not active, disallow send - if (!Active) throw new OcSendException($"Remote({this}) is not active"); + if (!Active) throw new OcRemoteSendException($"Remote({this}) is not active"); try { - _binder.SendTo(message, RemoteEndpoint); + await _binder.SendToAsync(message, RemoteEndpoint, timeout); } catch (Exception e) { OcLogger.Error(e); - throw new OcSendException(e); + throw new OcRemoteSendException(e); } } @@ -201,15 +232,15 @@ public override string ToString() } /// -/// Send exception. +/// Remote send exception. /// -public class OcSendException : Exception +public class OcRemoteSendException : Exception { /// /// Constructor. /// /// exception - internal OcSendException(Exception e) : base(e.ToString()) + internal OcRemoteSendException(Exception e) : base(e.ToString()) { } @@ -217,7 +248,7 @@ internal OcSendException(Exception e) : base(e.ToString()) /// Constructor. /// /// message - internal OcSendException(string message) : base(message) + internal OcRemoteSendException(string message) : base(message) { } } \ No newline at end of file diff --git a/OrangeCabinet/OcRemoteManager.cs b/OrangeCabinet/OcRemoteManager.cs index 806b6e3..9b2ceec 100644 --- a/OrangeCabinet/OcRemoteManager.cs +++ b/OrangeCabinet/OcRemoteManager.cs @@ -21,7 +21,7 @@ public class OcRemoteManager /// /// Remote locks. /// - private readonly List _remoteLocks; + private readonly List _remoteLocks; /// /// Remotes. @@ -52,8 +52,8 @@ internal OcRemoteManager(OcBinder binder) _binder = binder; _divide = binder.Divide; - _remoteLocks = new List(_divide); - for (var i = 0; i < _divide; i++) _remoteLocks.Add(new object()); + _remoteLocks = new List(_divide); + for (var i = 0; i < _divide; i++) _remoteLocks.Add(new OcLock()); _remotes = new List>(_divide); for (var i = 0; i < _divide; i++) _remotes.Add(new ConcurrentDictionary()); @@ -86,16 +86,20 @@ internal void StartTimeoutTask() if (taskNo >= _divide) taskNo = 0; // timeout - lock (_remoteLocks[taskNo]) + using (await _remoteLocks[taskNo].LockAsync()) { foreach (var pair in _remotes[taskNo]) - lock (pair.Value) + using (await pair.Value.Lock.LockAsync()) { // if already timeout and active, invoke timeout. if (pair.Value.Active && pair.Value.IsTimeout()) { pair.Value.Active = false; - _binder.Callback.Timeout(pair.Value); + if (_binder.Callback.UseAsyncCallback) + await _binder.Callback.TimeoutAsync(pair.Value); + else + // ReSharper disable once MethodHasAsyncOverload + _binder.Callback.Timeout(pair.Value); if (_remotes[taskNo].TryRemove(pair)) { @@ -124,27 +128,34 @@ internal void ShutdownTimeoutTask() // shutdown all sessions OcLogger.Info("Closing remotes at shutdown"); - for (var i = 0; i < _divide; i++) - lock (_remoteLocks[i]) - { - foreach (var pair in _remotes[i]) - lock (pair.Value) - { - // if active, invoke shutdown. - if (pair.Value.Active) + Task.Run(async () => + { + for (var i = 0; i < _divide; i++) + using (await _remoteLocks[i].LockAsync()) + { + foreach (var pair in _remotes[i]) + using (await pair.Value.Lock.LockAsync()) { - pair.Value.Active = false; - _binder.Callback.Shutdown(pair.Value); - - if (_remotes[i].TryRemove(pair)) + // if active, invoke shutdown. + if (pair.Value.Active) { - // decrement - Interlocked.Decrement(ref _remoteCount); - OcLogger.Debug(() => $"By shutdown, removed remote: {pair.Value}"); + pair.Value.Active = false; + if (_binder.Callback.UseAsyncCallback) + await _binder.Callback.ShutdownAsync(pair.Value); + else + // ReSharper disable once MethodHasAsyncOverload + _binder.Callback.Shutdown(pair.Value); + + if (_remotes[i].TryRemove(pair)) + { + // decrement + Interlocked.Decrement(ref _remoteCount); + OcLogger.Debug(() => $"By shutdown, removed remote: {pair.Value}"); + } } } - } - } + } + }); } /// @@ -170,17 +181,17 @@ private bool TryGet(string hostPort, out OcRemote? remote) } /// - /// Generate. + /// Async generate. /// /// remote endpoint /// remote - internal OcRemote Generate(IPEndPoint remoteEndpoint) + internal async Task GenerateAsync(IPEndPoint remoteEndpoint) { var hostPort = remoteEndpoint.OxToHostPort(); var mod = GetMod(hostPort); OcRemote? remote; - lock (_remoteLocks[mod]) + using (await _remoteLocks[mod].LockAsync()) { if (!TryGet(hostPort, out remote)) { diff --git a/OrangeCabinet/OrangeCabinet.csproj b/OrangeCabinet/OrangeCabinet.csproj index 9b7b5b7..53b9ba2 100644 --- a/OrangeCabinet/OrangeCabinet.csproj +++ b/OrangeCabinet/OrangeCabinet.csproj @@ -13,7 +13,7 @@ README.md git https://github.com/shigenobu/OrangeCabinet.git - 0.0.9 + 0.2.0 diff --git a/README.md b/README.md index cc9bdad..d8f60cb 100644 --- a/README.md +++ b/README.md @@ -7,14 +7,24 @@ ## feature -* Callback for 'Incoming'(received), 'Timeout'(timeout), 'Shutdown'(shutdown). +OrangeCabinet is __'Asynchronous Programming Model (APM)'__ socket wrapper library, +with __'Task-based Asynchronous Pattern (TAP)'__ at callback methods. +Otherwise, __APM__ and __TAP__ mixed. +Sync methods (Incoming, Timeout and Shutdown) are disallowed for async override. +If you want to use 'async', +Async methods (IncomingAsync, TimeoutAsync and ShutdownAsync) are override with 'UseAsyncCallback = true'. + +* Callback is below. + * 'Incoming or IncomingAsync' (received) + * 'Timeout or TimeoutAsync' (timeout) + * 'Shutdown or ShutdownAsync' (shutdown) * Can store user value in remote. * Check timeout at regular intervals by last receive time. * Client bind too, not connect. So, previously known client port. ## how to use -### callback +### callback (sync) public class Callback : OcCallback { @@ -47,7 +57,44 @@ } } -### for server +### callback (async) + + public class AsyncCallback : OcCallback + { + private const string Key = "inc"; + + public override bool UseAsyncCallback { get; init; } = true; + + public override async Task IncomingAsync(OcRemote remote, byte[] message) + { + Console.WriteLine($"Received: {Encoding.UTF8.GetString(message)} ({remote})"); + + int inc = remote.GetValue(Key); + inc++; + remote.SetValue(Key, inc); + + await remote.SendAsync($"{inc}"); + if (inc > 10) + { + remote.ClearValue(Key); + remote.Escape(); + } + } + + public override Task TimeoutAsync(OcRemote remote) + { + Console.WriteLine($"Timeout: {remote}"); + return Task.CompletedTask; + } + + public override Task ShutdownAsync(OcRemote remote) + { + Console.WriteLine($"Shutdown: {remote}"); + return Task.CompletedTask; + } + } + +### for server (ip v4) public static void Main(string[] args) { @@ -57,13 +104,13 @@ }; var server = new OcLocal(serverBinder); server.Start(); - server.SendTo("0", IPEndPoint.Parse("127.0.0.1:18170")); // Send from server to some endpoint what you hope. + server.SendTo("0", new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8710)); // Send from server to some endpoint what you hope. server.WaitFor(); // ... server.Shutdown(); } -### for client +### for client (ip v4) public static void Main(string[] args) { @@ -77,3 +124,36 @@ client.Send($"{j}"); } } + +### for server (ip v6) + + public static void Main(string[] args) + { + var serverBinder = new OcBinder(new SampleCallback()) + { + SocketAddressFamily = OcSocketAddressFamily.Ipv6, + BindPort = 8710, + }; + var server = new OcLocal(serverBinder); + server.Start(); + server.SendTo("0", new IPEndPoint(IPAddress.Parse("::1"), 8710)); // Send from server to some endpoint what you hope. + server.WaitFor(); + // ... + server.Shutdown(); + } + +### for client (ip v6) + + public static void Main(string[] args) + { + using var clientBinder = new OcBinder(new Callback()) + { + SocketAddressFamily = OcSocketAddressFamily.Ipv6, + BindPort = 18710, + }; + var client = new OcRemote(clientBinder, "::1", 8710); + for (int j = 0; j < 3; j++) + { + client.Send($"{j}"); + } + } \ No newline at end of file