Skip to content

Commit

Permalink
Version 0.2.0.
Browse files Browse the repository at this point in the history
* Async support.
* IPv6 support.
  • Loading branch information
shigenobu committed Apr 15, 2024
1 parent c61d3fe commit bc4ebd9
Show file tree
Hide file tree
Showing 10 changed files with 480 additions and 63 deletions.
89 changes: 89 additions & 0 deletions OrangeCabinet.Tests/TestSimpleAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using System.IO;
using System.Net;
using System.Threading;
using Xunit;
using Xunit.Abstractions;

namespace OrangeCabinet.Tests
{
public class TestSimpleAsync
{
public TestSimpleAsync(ITestOutputHelper testOutputHelper)
{
OcDate.AddSeconds = 60 * 60 * 9;
// OcLogger.Writer = new StreamWriter(new FileStream("Test.log", FileMode.Append));
OcLogger.Verbose = true;
OcLogger.Transfer = new OcLoggerTransfer
{
Transfer = msg => testOutputHelper.WriteLine(msg.ToString()),

Check warning on line 18 in OrangeCabinet.Tests/TestSimpleAsync.cs

View workflow job for this annotation

GitHub Actions / .NET test

Dereference of a possibly null reference.

Check warning on line 18 in OrangeCabinet.Tests/TestSimpleAsync.cs

View workflow job for this annotation

GitHub Actions / .NET test

Dereference of a possibly null reference.
Raw = false
};
}

[Fact]
public async Task Test()
{
var serverBinder = new OcBinder(new SampleAsyncCallback())
{
BindPort = 8710,
};
var server = new OcLocal(serverBinder);
server.Start();
// server.WaitFor();

// -----
using var clientBinder = new OcBinder(new SampleAsyncCallback())
{
BindPort = 18710,
};
var client = new OcRemote(clientBinder, "127.0.0.1", 8710);
for (int j = 0; j < 3; j++)
{
await client.SendAsync($"{j}".OxToBytes());
}
// -----

// ...
Thread.Sleep(1000);
server.SendTo("hello from server", new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8710));
server.Shutdown();

OcLogger.Close();
}
}

public class SampleAsyncCallback : OcCallback
{
private const string Key = "inc";

public override bool UseAsyncCallback { get; init; } = true;

public override async Task IncomingAsync(OcRemote remote, byte[] message)
{
OcLogger.Info($"Received: {message.OxToString()} ({remote})");

int inc = remote.GetValue<int>(Key);
inc++;
remote.SetValue(Key, inc);

await remote.SendAsync($"{inc}".OxToBytes());
if (inc > 10)
{
remote.ClearValue(Key);
remote.Escape();
}
}

public override Task TimeoutAsync(OcRemote remote)
{
OcLogger.Info($"Timeout: {remote}");
return Task.CompletedTask;
}

public override Task ShutdownAsync(OcRemote remote)
{
OcLogger.Info($"Shutdown: {remote}");
return Task.CompletedTask;
}
}
}
17 changes: 14 additions & 3 deletions OrangeCabinet/OcBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ namespace OrangeCabinet;
/// </summary>
public class OcBinder : IDisposable
{
/// <summary>
/// Default timeout milli seconds.
/// </summary>
internal const int DefaultTimeoutMilliSeconds = 1500;

/// <summary>
/// Default ip v4 host.
/// </summary>
Expand All @@ -34,6 +39,9 @@ public class OcBinder : IDisposable
/// <param name="callback">callback</param>
public OcBinder(OcCallback callback)
{
if (OcCallback.ContainsAsync(callback))
throw new OcBinderException(
$"Disallow async override at {string.Join(',', OcCallback.SynchronousMethodNames.ToArray())} in {callback.GetType().FullName}, use 'xxxAsync' alternatively.");
Callback = callback;
}

Expand Down Expand Up @@ -140,16 +148,19 @@ internal void Bind(OcBindMode bindMode)
}

/// <summary>
/// Send bytes to remote.
/// Async send bytes to remote.
/// </summary>
/// <param name="message">message</param>
/// <param name="remoteEndpoint">remote endpoint</param>
/// <param name="timeout">timeout</param>
/// <exception cref="OcBinderException">bind exception</exception>
internal void SendTo(byte[] message, IPEndPoint remoteEndpoint)
internal async Task SendToAsync(byte[] message, IPEndPoint remoteEndpoint, int timeout = DefaultTimeoutMilliSeconds)
{
if (BindSocket == null)
throw new OcBinderException($"Not bind on {BindSocket}:{BindPort}");
BindSocket.SendTo(message, SocketFlags.None, remoteEndpoint);
var t = BindSocket.SendToAsync(message, SocketFlags.None, remoteEndpoint);
if (await Task.WhenAny(t, Task.Delay(timeout)) != t)
throw new OcBinderException($"Error send to {remoteEndpoint.OxToHostPort()} ({message.Length})");
}

/// <summary>
Expand Down
67 changes: 66 additions & 1 deletion OrangeCabinet/OcCallback.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,61 @@
using System.Reflection;
using System.Runtime.CompilerServices;

namespace OrangeCabinet;

/// <summary>
/// Callback.
/// </summary>
public abstract class OcCallback
{
/// <summary>
/// Synchronous method names.
/// </summary>
internal static readonly List<string> SynchronousMethodNames = new() {"Incoming", "Timeout", "Shutdown"};

/// <summary>
/// Use async callback.
/// </summary>
public virtual bool UseAsyncCallback { get; init; }

/// <summary>
/// Contains async.
/// </summary>
/// <param name="callback">callback</param>
/// <returns>if contains, return true</returns>
internal static bool ContainsAsync(OcCallback callback)
{
var attType = typeof(AsyncStateMachineAttribute);
foreach (var methodInfo in callback.GetType().GetMethods())
{
if (!SynchronousMethodNames.Contains(methodInfo.Name)) continue;

var attrib = methodInfo.GetCustomAttribute(attType);
if (attrib != null) return true;
}

return false;
}

/// <summary>
/// Incoming.
/// </summary>
/// <param name="remote">received remote</param>
/// <param name="message">message</param>
public abstract void Incoming(OcRemote remote, byte[] message);
public virtual void Incoming(OcRemote remote, byte[] message)
{
}

/// <summary>
/// Async incoming.
/// </summary>
/// <param name="remote">received remote</param>
/// <param name="message">message</param>
/// <returns>task</returns>
public virtual Task IncomingAsync(OcRemote remote, byte[] message)
{
return Task.CompletedTask;
}

/// <summary>
/// Timeout.
Expand All @@ -20,11 +65,31 @@ public virtual void Timeout(OcRemote remote)
{
}

/// <summary>
/// Async timeout.
/// </summary>
/// <param name="remote">be timeout remote</param>
/// <returns>task</returns>
public virtual Task TimeoutAsync(OcRemote remote)
{
return Task.CompletedTask;
}

/// <summary>
/// Shutdown.
/// </summary>
/// <param name="remote">be shutdown remote</param>
public virtual void Shutdown(OcRemote remote)
{
}

/// <summary>
/// Shutdown.
/// </summary>
/// <param name="remote">be shutdown remote</param>
/// <returns>task</returns>
public virtual Task ShutdownAsync(OcRemote remote)
{
return Task.CompletedTask;
}
}
37 changes: 27 additions & 10 deletions OrangeCabinet/OcHandlerReceive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,36 +123,53 @@ internal override void Complete(IAsyncResult result)
return;
}

EndPoint? remoteEndpoint;
int received;
try
{
// received
EndPoint remoteEndpoint = new IPEndPoint(IPAddress.Any, 0);
var received = state!.Socket.EndReceiveFrom(result, ref remoteEndpoint);
remoteEndpoint = new IPEndPoint(IPAddress.Any, 0);
received = state!.Socket.EndReceiveFrom(result, ref remoteEndpoint);
if (received <= 0)
{
OcLogger.Debug(() => $"Received wrong size: {received}");
return;
}
}
catch (Exception e)
{
OcLogger.Debug(() => e);
Failed(state!);
return;
}

var remote = _remoteManager.Generate((IPEndPoint) remoteEndpoint);
// callback
var taskReceive = Task.Run(async () =>
{
var remote = await _remoteManager.GenerateAsync((IPEndPoint) remoteEndpoint);
OcLogger.Debug(() => $"Received remote: {remote}, size: {received}");
lock (remote)
using (await remote.Lock.LockAsync())
{
// if remote is active and not timeout, invoke incoming
if (remote.Active && !remote.IsTimeout())
{
var message = new byte[received];
Buffer.BlockCopy(state.Buffer!, 0, message, 0, message.Length);
remote.UpdateTimeout();
_callback.Incoming(remote, message);
if (_callback.UseAsyncCallback)
await _callback.IncomingAsync(remote, message);
else
// ReSharper disable once MethodHasAsyncOverload
_callback.Incoming(remote, message);
}
}
}
catch (Exception e)
});
taskReceive.ContinueWith(comp =>
{
OcLogger.Debug(() => e);
Failed(state!);
}
if (comp.Exception is not { } e) return;
OcLogger.Debug(() => e.InnerExceptions);
Failed(state);
});
}

/// <summary>
Expand Down
64 changes: 60 additions & 4 deletions OrangeCabinet/OcLocal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,26 @@ public void Start()
/// </summary>
/// <param name="message">message</param>
/// <param name="remoteEndpoint">remote endpoint</param>
public void SendTo(string message, IPEndPoint remoteEndpoint)
/// <param name="timeout">timeout</param>
/// <exception cref="OcLocalSendException">send error</exception>
public void SendTo(string message, IPEndPoint remoteEndpoint, int timeout = OcBinder.DefaultTimeoutMilliSeconds)
{
_binder.SendTo(message.OxToBytes(), remoteEndpoint);
SendTo(message.OxToBytes(), remoteEndpoint, timeout);
}

/// <summary>
/// Async send string to remote.
/// Enable to send message for some endpoint directly what you hope.
/// Notice, this method is not checked for remote endpoint state.
/// </summary>
/// <param name="message">message</param>
/// <param name="remoteEndpoint">remote endpoint</param>
/// <param name="timeout">timeout</param>
/// <exception cref="OcLocalSendException">send error</exception>
public async Task SendToAsync(string message, IPEndPoint remoteEndpoint,
int timeout = OcBinder.DefaultTimeoutMilliSeconds)
{
await SendToAsync(message.OxToBytes(), remoteEndpoint, timeout);
}

/// <summary>
Expand All @@ -48,9 +65,34 @@ public void SendTo(string message, IPEndPoint remoteEndpoint)
/// </summary>
/// <param name="message">message</param>
/// <param name="remoteEndpoint">remote endpoint</param>
public void SendTo(byte[] message, IPEndPoint remoteEndpoint)
/// <param name="timeout">timeout</param>
/// <exception cref="OcLocalSendException">send error</exception>
public void SendTo(byte[] message, IPEndPoint remoteEndpoint, int timeout = OcBinder.DefaultTimeoutMilliSeconds)
{
_binder.SendTo(message, remoteEndpoint);
SendToAsync(message, remoteEndpoint, timeout).ConfigureAwait(false).GetAwaiter().GetResult();
}

/// <summary>
/// Async send bytes to remote.
/// Enable to send message for some endpoint directly what you hope.
/// Notice, this method is not checked for remote endpoint state.
/// </summary>
/// <param name="message">message</param>
/// <param name="remoteEndpoint">remote endpoint</param>
/// <param name="timeout">timeout</param>
/// <exception cref="OcLocalSendException">send error</exception>
public async Task SendToAsync(byte[] message, IPEndPoint remoteEndpoint,
int timeout = OcBinder.DefaultTimeoutMilliSeconds)
{
try
{
await _binder.SendToAsync(message, remoteEndpoint, timeout);
}
catch (Exception e)
{
OcLogger.Error(e);
throw new OcLocalSendException(e);
}
}

/// <summary>
Expand All @@ -68,4 +110,18 @@ public void Shutdown()
{
_binder.Close();
}
}

/// <summary>
/// Local send exception.
/// </summary>
public class OcLocalSendException : Exception
{
/// <summary>
/// Constructor.
/// </summary>
/// <param name="e">exception</param>
internal OcLocalSendException(Exception e) : base(e.ToString())
{
}
}
Loading

0 comments on commit bc4ebd9

Please sign in to comment.