Skip to content

Commit

Permalink
Split server.zig into multiple files for clarity
Browse files Browse the repository at this point in the history
Not really completely clean, but it will do

Signed-off-by: emneo <emneo@kreog.com>
  • Loading branch information
emneo-dev committed Sep 11, 2024
1 parent 18b5071 commit fb5848f
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 79 deletions.
85 changes: 85 additions & 0 deletions src/client.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
const std = @import("std");

const net = @import("network");
const logz = @import("logz");

const Message = @import("message.zig").Message;

pub const Client = struct {
const Self = @This();

sock: net.Socket,
msg: std.ArrayList(Message),
internal_rbuffer: std.ArrayList(u8),
internal_wbuffer: std.ArrayList(u8),
stopping: bool = false,

pub fn init(allocator: std.mem.Allocator, sock: net.Socket) Client {
return .{
.sock = sock,
.msg = std.ArrayList(Message).init(allocator),
.internal_rbuffer = std.ArrayList(u8).init(allocator),
.internal_wbuffer = std.ArrayList(u8).init(allocator),
};
}

pub fn create_messages(self: *Self, allocator: std.mem.Allocator) !void {
while (std.mem.indexOfPos(
u8,
self.internal_rbuffer.items,
0,
"\n",
)) |i| {
const msg_slice = self.internal_rbuffer.items[0..i];
try self.msg.append(try Message.init(msg_slice, allocator));
logz.debug().ctx("New command").string("data", msg_slice).log();
const src = self.internal_rbuffer.items[i + 1 ..];
const dest = self.internal_rbuffer.items;
std.mem.copyForwards(u8, dest, src);
self.internal_rbuffer.shrinkRetainingCapacity(src.len);
}
}

pub fn handle_net_event(self: *Self, set: *const net.SocketSet, allocator: std.mem.Allocator) !void {
if (set.isFaulted(self.sock)) {
self.stopping = true;
return;
}
if (set.isReadyRead(self.sock)) {
var tmp_rbuf: [4096]u8 = undefined;
const nb_bytes = try self.sock.receive(&tmp_rbuf);
if (nb_bytes == 0) {
self.stopping = true;
return;
}
try self.internal_rbuffer.appendSlice(tmp_rbuf[0..nb_bytes]);
try self.create_messages(allocator);
}
if (set.isReadyWrite(self.sock)) {
const nb_bytes = try self.sock.send(self.internal_wbuffer.items);
if (nb_bytes == self.internal_wbuffer.items.len) {
self.internal_wbuffer.clearRetainingCapacity();
} else {
const src = self.internal_wbuffer.items[nb_bytes..];
const dest = self.internal_wbuffer.items;
std.mem.copyForwards(u8, dest, src);
self.internal_wbuffer.shrinkRetainingCapacity(src.len);
}
}
}

pub fn wants_to_write(self: *const Self) bool {
return self.internal_wbuffer.items.len > 0;
}

pub fn wants_to_read(self: *const Self) bool {
return self.stopping == false;
}

pub fn deinit(self: *const Self) void {
self.msg.deinit();
self.internal_rbuffer.deinit();
self.internal_wbuffer.deinit();
self.sock.close();
}
};
24 changes: 24 additions & 0 deletions src/message.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
const std = @import("std");

pub const Message = struct {
const Self = @This();

// Raw message as received from a client
data: []const u8,
// Time in microseconds when the message has been received
timestamp: i64,
// Allocator used to keep a copy of the original message
allocator: std.mem.Allocator,

pub fn init(data: []const u8, allocator: std.mem.Allocator) !Message {
return .{
.data = try allocator.dupe(u8, data),
.timestamp = std.time.microTimestamp(),
.allocator = allocator,
};
}

pub fn deinit(self: *const Self) void {
self.allocator.free(self.data);
}
};
80 changes: 1 addition & 79 deletions src/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const build_config = @import("build_config");

const utils = @import("utils.zig");
const config = @import("config.zig");
const Client = @import("client.zig").Client;

pub const Message = struct {
const Self = @This();
Expand All @@ -33,85 +34,6 @@ pub const Message = struct {
}
};

pub const Client = struct {
const Self = @This();

sock: net.Socket,
msg: std.ArrayList(Message),
internal_rbuffer: std.ArrayList(u8),
internal_wbuffer: std.ArrayList(u8),
stopping: bool = false,

fn init(allocator: std.mem.Allocator, sock: net.Socket) Client {
return .{
.sock = sock,
.msg = std.ArrayList(Message).init(allocator),
.internal_rbuffer = std.ArrayList(u8).init(allocator),
.internal_wbuffer = std.ArrayList(u8).init(allocator),
};
}

fn create_messages(self: *Self, allocator: std.mem.Allocator) !void {
while (std.mem.indexOfPos(
u8,
self.internal_rbuffer.items,
0,
"\n",
)) |i| {
const msg_slice = self.internal_rbuffer.items[0..i];
try self.msg.append(try Message.init(msg_slice, allocator));
logz.debug().ctx("New command").string("data", msg_slice).log();
const src = self.internal_rbuffer.items[i + 1 ..];
const dest = self.internal_rbuffer.items;
std.mem.copyForwards(u8, dest, src);
self.internal_rbuffer.shrinkRetainingCapacity(src.len);
}
}

fn handle_net_event(self: *Self, set: *const net.SocketSet, allocator: std.mem.Allocator) !void {
if (set.isFaulted(self.sock)) {
self.stopping = true;
return;
}
if (set.isReadyRead(self.sock)) {
var tmp_rbuf: [4096]u8 = undefined;
const nb_bytes = try self.sock.receive(&tmp_rbuf);
if (nb_bytes == 0) {
self.stopping = true;
return;
}
try self.internal_rbuffer.appendSlice(tmp_rbuf[0..nb_bytes]);
try self.create_messages(allocator);
}
if (set.isReadyWrite(self.sock)) {
const nb_bytes = try self.sock.send(self.internal_wbuffer.items);
if (nb_bytes == self.internal_wbuffer.items.len) {
self.internal_wbuffer.clearRetainingCapacity();
} else {
const src = self.internal_wbuffer.items[nb_bytes..];
const dest = self.internal_wbuffer.items;
std.mem.copyForwards(u8, dest, src);
self.internal_wbuffer.shrinkRetainingCapacity(src.len);
}
}
}

fn wants_to_write(self: *const Self) bool {
return self.internal_wbuffer.items.len > 0;
}

fn wants_to_read(self: *const Self) bool {
return self.stopping == false;
}

fn deinit(self: *const Self) void {
self.msg.deinit();
self.internal_rbuffer.deinit();
self.internal_wbuffer.deinit();
self.sock.close();
}
};

pub const Context = struct {
const Self = @This();

Expand Down

0 comments on commit fb5848f

Please sign in to comment.