Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pull Request] Crequency/KitX#278 #8

Merged
merged 5 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions KitX.Loader.CSharp/ArgsParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,26 @@ public class ArgsParser
public static void Parse(string[] args)
{
Parser.Default.ParseArguments<Options>(args)
.WithParsed(option =>
.WithParsed(async option =>
{
if (option.PluginPath is null)
return;

if (option.WorkingDirectory is not null)
Directory.SetCurrentDirectory(option.WorkingDirectory);

var communicationManager = new CommunicationManager();

if (option.DashboardIpAddress is not null)
communicationManager = communicationManager
.Connect(option.DashboardIpAddress ?? "");
if (option.ConnectUrl is not null)
communicationManager = await communicationManager.Connect(option.ConnectUrl);
else communicationManager = null;

var pluginManager = new PluginManager()
.OnSendMessage(x => communicationManager?.SendMessage(x))
.LoadPlugin(option.PluginPath ?? "");
.OnSendMessage(x => communicationManager?.SendMessageAsync(x))
.LoadPlugin(option.PluginPath);

if (communicationManager is not null)
communicationManager.OnReceiveMessage = x => pluginManager.ReceiveMessage(x);
});
}
}
142 changes: 70 additions & 72 deletions KitX.Loader.CSharp/CommunicationManager.cs
Original file line number Diff line number Diff line change
@@ -1,116 +1,108 @@
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text;

namespace KitX.Loader.CSharp;

public class CommunicationManager
{
private readonly TcpClient? client;

private readonly Thread? receiveThread;

private bool stillReceiving = true;
private readonly ClientWebSocket? Client;

private int receiveBufferSize = 1024 * 1024 * 10; // 10MB

public Action<string>? OnReceiveMessage { get; set; }

public CommunicationManager()
{
client = new();
Client = new();

receiveThread = new(ReceiveMessage);
Client.Options.KeepAliveInterval = TimeSpan.FromSeconds(10);
}

public CommunicationManager Connect(string address)
public async Task<CommunicationManager> Connect(string? url)
{
var splited = address.Split(':');
ArgumentNullException.ThrowIfNull(url, nameof(url));

var ipv4 = splited[0];
ArgumentNullException.ThrowIfNull(Client, nameof(Client));

if (!int.TryParse(splited[1], out var port))
throw new ArgumentException("Bad port number.", nameof(address));
await Client.ConnectAsync(new Uri(url), CancellationToken.None);

client?.Connect(ipv4, port);
var waiting = true;

receiveThread?.Start();
while (waiting)
{
switch (Client.State)
{
case WebSocketState.None:
waiting = false;
break;
case WebSocketState.Connecting:
break;
case WebSocketState.Open:
new Thread(async () => await ReceiveAsync()).Start();
waiting = false;
break;
case WebSocketState.CloseSent:
waiting = false;
break;
case WebSocketState.CloseReceived:
waiting = false;
break;
case WebSocketState.Closed:
waiting = false;
break;
case WebSocketState.Aborted:
waiting = false;
break;
}
}

return this;
}

public CommunicationManager SendMessage(string message)
public async Task<CommunicationManager> SendMessageAsync(string message)
{
var stream = client?.GetStream();

if (stream is null) return this;
ArgumentNullException.ThrowIfNull(Client, nameof(Client));

var data = Encoding.UTF8.GetBytes(message);

try
{
stream?.Write(data, 0, data.Length);
var bufferToSend = new ArraySegment<byte>(data);

stream?.Flush();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);

Console.WriteLine(ex.StackTrace);

stream?.Close();

stream?.Dispose();
}
await Client.SendAsync(bufferToSend, WebSocketMessageType.Text, true, CancellationToken.None);

return this;
}

private void ReceiveMessage()
private async Task ReceiveAsync()
{
if (client is null) return;

var stream = client?.GetStream();
ArgumentNullException.ThrowIfNull(Client, nameof(Client));

if (stream is null) return;
var buffer = new byte[receiveBufferSize];

var buffer = new byte[receiveBufferSize]; // Default 10 MB buffer

try
while (true)
{
while (stillReceiving)
{
var receivedBuffer = new ArraySegment<byte>(buffer);

if (buffer is null) break;
var result = await Client.ReceiveAsync(
receivedBuffer,
CancellationToken.None
);

var length = stream.Read(buffer, 0, buffer.Length);

if (length > 0)
{
var msg = Encoding.UTF8.GetString(buffer, 0, length);

//ToDo: Process `msg`
}
else
{
stream?.Dispose();
if (result.MessageType == WebSocketMessageType.Close)
{
await Client.CloseAsync(
WebSocketCloseStatus.NormalClosure,
string.Empty,
CancellationToken.None
);

break;
}
break;
}

stream?.Close();
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);

stream?.Dispose();
}
catch (Exception e)
{
Console.WriteLine(e.Message);
Console.WriteLine(e.StackTrace);
OnReceiveMessage?.Invoke(message);

stream?.Close();
stream?.Dispose();

client?.Close();
client?.Dispose();
if (!result.EndOfMessage) continue;
}
}

Expand All @@ -121,9 +113,15 @@ public CommunicationManager SetBufferSize(int size)
return this;
}

public CommunicationManager Stop()
public async Task<CommunicationManager> Close()
{
stillReceiving = false;
ArgumentNullException.ThrowIfNull(Client, nameof(Client));

await Client.CloseAsync(
WebSocketCloseStatus.NormalClosure,
string.Empty,
CancellationToken.None
);

return this;
}
Expand Down
7 changes: 3 additions & 4 deletions KitX.Loader.CSharp/KitX.Loader.CSharp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net7.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="CommandLineParser" Version="2.9.1" />
<PackageReference Include="KitX.Contract.CSharp" Version="23.4.6543.429" />
<PackageReference Include="KitX.Web.Rules" Version="23.4.6543.429" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\KitX Contracts\KitX.Contract.CSharp\KitX.Contract.CSharp.csproj" />
<ProjectReference Include="..\..\KitX Rules\KitX.Web.Rules\KitX.Web.Rules.csproj" />
<ProjectReference Include="..\..\..\KitX Standard\KitX Contracts\KitX.Contract.CSharp\KitX.Contract.CSharp.csproj" />
<ProjectReference Include="..\..\..\KitX Standard\KitX.Shared\KitX.Shared.csproj" />
</ItemGroup>

</Project>
7 changes: 5 additions & 2 deletions KitX.Loader.CSharp/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ public class Options
[Option('l', "load", Required = true, HelpText = "Path to plugin.")]
public string? PluginPath { get; set; }

[Option('c', "connect", Required = false, HelpText = "Dashboard IPv4 address.")]
public string? DashboardIpAddress { get; set; }
[Option('c', "connect", Required = false, HelpText = "Connect url.")]
public string? ConnectUrl { get; set; }

[Option("working-directory", Required = false, HelpText = "Set working directory.")]
public string? WorkingDirectory { get; set; }
}
Loading
Loading