Skip to content

Commit

Permalink
Add base network impl
Browse files Browse the repository at this point in the history
Just does a ping pong for now

Signed-off-by: emneo <emneo@kreog.com>
  • Loading branch information
emneo-dev committed Sep 11, 2024
1 parent 99a6592 commit 6112aec
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 44 deletions.
2 changes: 2 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ const build_options = struct {
fn add_options_to_bin(b: *std.Build, bin: *std.Build.Step.Compile, target: std.Build.ResolvedTarget, optimize: std.builtin.OptimizeMode, opt: build_options) void {
const ini_pkg = b.dependency("ini", .{ .target = target, .optimize = optimize });
const logz_pkg = b.dependency("logz", .{ .target = target, .optimize = optimize });
const net_pkg = b.dependency("network", .{ .target = target, .optimize = optimize });

const options = b.addOptions();
options.addOption([]const u8, "version", opt.version);

bin.root_module.addOptions("build_config", options);
bin.root_module.addImport("ini", ini_pkg.module("ini"));
bin.root_module.addImport("logz", logz_pkg.module("logz"));
bin.root_module.addImport("network", net_pkg.module("network"));
}

fn configure_tests(b: *std.Build, opt: build_options, target: std.Build.ResolvedTarget, optimize: std.builtin.OptimizeMode) *std.Build.Step.Compile {
Expand Down
4 changes: 4 additions & 0 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
.url = "https://github.com/karlseguin/log.zig/archive/97d0818270a357d3ce7bc878abc32dc996fe78ff.tar.gz",
.hash = "1220c032d828e768348b321a79523812be1c4cb27e1eb7ff5df54b573fa131afd448",
},
.network = .{
.url = "https://github.com/ikskuh/zig-network/archive/bcf6cc8918d574f947b2647522d84a805c33f1c8.tar.gz",
.hash = "12203b3634f36570ffced059248bd429cd06dc89a98659c86569d890fdd99b5070bd",
},
},
.paths = .{
"build.zig",
Expand Down
7 changes: 7 additions & 0 deletions build.zig.zon.nix
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ linkFarm "zig-packages" [
hash = "sha256-JN+s2VI6zKotfJ4sDbGrjSBz3JAVrcZk3npkXUNOMho=";
};
}
{
name = "12203b3634f36570ffced059248bd429cd06dc89a98659c86569d890fdd99b5070bd";
path = fetchzip {
url = "https://github.com/ikskuh/zig-network/archive/bcf6cc8918d574f947b2647522d84a805c33f1c8.tar.gz";
hash = "sha256-QtlJwMQ9GZTuxdCCsCMbOudxdRYYdeMe8WL+p8GDLOU=";
};
}
{
name = "1220b0979ea9891fa4aeb85748fc42bc4b24039d9c99a4d65d893fb1c83e921efad8";
path = fetchzip {
Expand Down
3 changes: 2 additions & 1 deletion config.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[network]
bind=0.0.0.0:2105
ip=0.0.0.0
port=2105
46 changes: 6 additions & 40 deletions src/config.zig
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
const ini = @import("ini");
const std = @import("std");

const ini = @import("ini");
const net = @import("network");

// Add to this structure to automatically add to the config
pub const config = struct {
network_bind: std.net.Address,
network_ip: net.Address,
network_port: u16,
};

pub const ConfigError = error{
Expand Down Expand Up @@ -80,22 +83,13 @@ fn map_opt_struct_to_struct(opt_stc: type, stc: type, from: *const opt_stc) stc
return to;
}

// TODO: Handle ipv6
fn parse_full_ip(ip: []const u8) !std.net.Address {
const colon = std.mem.indexOf(u8, ip, ":") orelse return ConfigError.BadKey;
const port_slice = ip[colon + 1 ..];
const port = try std.fmt.parseInt(u16, port_slice, 10);
const ip_slice = ip[0..colon];
return try std.net.Address.parseIp(ip_slice, port);
}

fn map_value(kv: ini_field, conf: *tmp_config) !void {
inline for (std.meta.fields(@TypeOf(conf.*))) |f| {
if (std.mem.eql(u8, f.name, kv.full_name)) {
const target_type = @typeInfo(f.type).Optional.child;
@field(conf, f.name) = switch (target_type) {
u16, u32, u64, i16, i32, i64 => try std.fmt.parseInt(target_type, kv.value, 0),
std.net.Address => try parse_full_ip(kv.value),
net.Address => try net.Address.parse(kv.value),
else => @panic("You probably need to implement another type :3"),
};
return;
Expand Down Expand Up @@ -139,34 +133,6 @@ pub fn parse(filepath: []const u8, allocator: std.mem.Allocator) !config {
return map_opt_struct_to_struct(tmp_config, config, &tmp);
}

test "ipv4 loopback default port" {
const parsed = try parse_full_ip("127.0.0.1:2105");
try std.testing.expectEqual(parsed.getPort(), 2105);
try std.testing.expectEqual(parsed.in.sa.addr, 0x100007f);
}

test {
std.testing.refAllDecls(@This());
}

test "ipv4 loopback random port" {
const parsed = try parse_full_ip("127.0.0.1:0");
try std.testing.expectEqual(parsed.getPort(), 0);
try std.testing.expectEqual(parsed.in.sa.addr, 0x100007f);
}

test "ipv4 wildcard default port" {
const parsed = try parse_full_ip("0.0.0.0:2105");
try std.testing.expectEqual(parsed.getPort(), 2105);
try std.testing.expectEqual(parsed.in.sa.addr, 0);
}

test "ipv4 invalid no port" {
const parsed = parse_full_ip("127.0.0.1");
try std.testing.expectError(error.BadKey, parsed);
}

test "ipv4 invalid no ip" {
const parsed = parse_full_ip("2105");
try std.testing.expectError(error.BadKey, parsed);
}
7 changes: 4 additions & 3 deletions src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ const logz = @import("logz");
const build_config = @import("build_config");

const config = @import("config.zig");
const server = @import("server.zig");

const semver = std.SemanticVersion.parse(build_config.version) catch @compileError("Given version is not valid semver");
pub const semver = std.SemanticVersion.parse(build_config.version) catch @compileError("Given version is not valid semver");

const is_debug_build = builtin.mode == std.builtin.OptimizeMode.Debug;
pub const is_debug_build = builtin.mode == std.builtin.OptimizeMode.Debug;

pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
Expand All @@ -29,7 +30,7 @@ pub fn main() !void {
logz.info().ctx("Launching liskvork").stringSafe("version", build_config.version).log();

const conf = try config.parse("config.ini", allocator);
_ = conf;
try server.launch_server(&conf, allocator);
}

test {
Expand Down
170 changes: 170 additions & 0 deletions src/server.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
const builtin = @import("builtin");

const std = @import("std");

const net = @import("network");

const config = @import("config.zig");

pub const Message = struct {
const Self = @This();

// Raw message as received from a client
data: []const u8,
// Time in microseconds when the message has been received
timestamp: i64,
// Allocator used to keep a copy of the original message
allocator: std.mem.Allocator,

pub fn init(data: []const u8, allocator: std.mem.Allocator) !Message {
return .{
.data = try allocator.dupe(u8, data),
.timestamp = std.time.microTimestamp(),
};
}

pub fn deinit(self: *const Self) void {
self.allocator.free(self.data);
}
};

pub const Client = struct {
const Self = @This();

sock: net.Socket,
msg: std.ArrayList(Message),
internal_rbuffer: std.ArrayList(u8),
internal_wbuffer: std.ArrayList(u8),
stopping: bool = false,

fn init(allocator: std.mem.Allocator, sock: net.Socket) Client {
return .{
.sock = sock,
.msg = std.ArrayList(Message).init(allocator),
.internal_rbuffer = std.ArrayList(u8).init(allocator),
.internal_wbuffer = std.ArrayList(u8).init(allocator),
};
}

fn handle_event(self: *Self, set: *const net.SocketSet) !void {
if (set.isFaulted(self.sock)) {
self.stopping = true;
return;
}
if (set.isReadyRead(self.sock)) {
var tmp_rbuf: [4096]u8 = undefined;
const nb_bytes = try self.sock.receive(&tmp_rbuf);
if (nb_bytes == 0) {
self.stopping = true;
return;
}
try self.internal_wbuffer.appendSlice(tmp_rbuf[0..nb_bytes]);
}
if (set.isReadyWrite(self.sock)) {
const nb_bytes = try self.sock.send(self.internal_wbuffer.items);
if (nb_bytes == self.internal_wbuffer.items.len) {
self.internal_wbuffer.clearRetainingCapacity();
} else {
const src = self.internal_wbuffer.items[nb_bytes..];
const dest = self.internal_wbuffer.items;
std.mem.copyForwards(u8, dest, src);
}
}
}

fn wants_to_write(self: *const Self) bool {
return self.internal_wbuffer.items.len > 0;
}

fn wants_to_read(self: *const Self) bool {
return self.stopping == false;
}

fn deinit(self: *const Self) void {
self.msg.deinit();
self.internal_rbuffer.deinit();
self.internal_wbuffer.deinit();
self.sock.close();
}
};

pub const Context = struct {
const Self = @This();

srv_sock: net.Socket,
clients: std.ArrayList(Client),
conf: *const config.config,

pub fn init(allocator: std.mem.Allocator, conf: *const config.config, is_ipv6: bool) !Context {
return .{
.clients = std.ArrayList(Client).init(allocator),
.conf = conf,
.srv_sock = try net.Socket.create(
if (is_ipv6) net.AddressFamily.ipv6 else net.AddressFamily.ipv4,
net.Protocol.tcp,
),
};
}

pub fn deinit(self: *const Self) void {
for (self.clients.items) |c| {
c.deinit();
}
self.clients.deinit();
}
};

fn setup_socket_set(ctx: *const Context, set: *net.SocketSet) !void {
set.clear();
try set.add(ctx.srv_sock, .{ .read = true, .write = false });
for (ctx.clients.items) |*cli|
try set.add(cli.sock, .{
.read = cli.wants_to_read(),
.write = cli.wants_to_write(),
});
}

pub fn launch_server(conf: *const config.config, allocator: std.mem.Allocator) !void {
const is_ipv6: bool = switch (conf.network_ip) {
.ipv4 => false,
.ipv6 => true,
};
var ctx = try Context.init(allocator, conf, is_ipv6);
defer ctx.deinit();

try ctx.srv_sock.enablePortReuse(true);
try ctx.srv_sock.bind(.{
.address = conf.network_ip,
.port = conf.network_port,
});
try ctx.srv_sock.listen();
var set = try net.SocketSet.init(allocator);
defer set.deinit();

while (true) {
try setup_socket_set(&ctx, &set);
const evt_return = try net.waitForSocketEvent(&set, null);
const has_timeout_been_reached = evt_return == 0;
_ = has_timeout_been_reached;
for (ctx.clients.items) |*cli|
try cli.handle_event(&set);
if (set.isReadyRead(ctx.srv_sock)) {
// Accept new connection
const new_sock = try ctx.srv_sock.accept();
try ctx.clients.append(Client.init(allocator, new_sock));
}
// Cleanup stopping clients
var i: u32 = 0;
while (i < ctx.clients.items.len) {
if (ctx.clients.items[i].stopping) {
ctx.clients.swapRemove(i).deinit();
continue;
}
i += 1;
}
}
}

test {
std.testing.refAllDecls(@This());
}

0 comments on commit 6112aec

Please sign in to comment.