Skip to content

imandra-ai/batrpc

Repository files navigation

BatRPC

Build and Test

This is a RPC framework for OCaml, based on protobuf as a wire format.

Overview

The goal of BatRPC is to provide an efficient and flexible RPC system for our needs at Imandra. It is designed for long-lived connections between two processes that can both act as a server and a client.

Protobuf is used as an IDL to describe the types used for communication, as well as the actual RPC endpoints. Protobuf also generates (de)serialization code for these types, and bundles for the services.

Multiple services can be provided on a single connection, provided they have distinct names.

Features

  • auto-generation of types, services, and (de)serialization using ocaml-protoc
  • basic per-message compression for large messages, using deflate. Stream-level compression is not supported by BatRPC, but could be implemented transparently: a Client.t or Server.For_client.t takes a pair of input/output byte streams which could be compressed or encrypted. The types from iostream are used to abstract over the byte streams.
  • messages carry headers, ie pairs of strings, pretty much like HTTP headers.
  • middlewares on the server side. A middleware can take an incoming request and its future reply, and insert metadata in headers, perform logging, tracing, etc.
  • baked-in concurrency using moonpool as a thread pool and future library.
  • kinds of requests:
    • simple request/response
    • client-side streaming (the client sends a stream of values)
    • server-side streaming (the server returns a stream of values)
    • bidirectional streaming

Example

A basic example, fully worked out

The code is in examples/trivial.

Given this file (see examples/trivial/trivial.proto):

message Pair {
  string x = 1;
  string y = 2;
}

message BigString {
  string msg = 1;
}

message Count {
  int32 count = 1;
}

message SingleInt {
  int32 i = 0;
}

service Swapper {
  rpc swap(Pair) returns (Pair);
  rpc count_chars(BigString) returns (Count);
}

and the dune rules

(rule
 (targets trivial.ml trivial.mli)
 (deps trivial.proto)
 (mode promote)
 (action
  (run ocaml-protoc --binary --pp --yojson --services --make --ml_out ./ %{deps})))

We get files trivial.ml and trivial.mli. The signature generated from this is, roughly:

type pair = {
  x : string;
  y : string;
  artificial_delay_s : float option;
}

type big_string = {
  msg : string;
}

type count = {
  count : int32;
}

type single_int = {
  i : int32;
}

val pp_pair : Format.formatter -> pair -> unit 
(**)


val encode_pb_pair : pair -> Pbrt.Encoder.t -> unit
(**)

val decode_pb_pair : Pbrt.Decoder.t -> pair
(**)


(** Swapper service *)
module Swapper : sig
  open Pbrt_services
  open Pbrt_services.Value_mode
  
  module Client : sig
    
    val swap : (pair, unary, pair, unary) Client.rpc
    
    val count_chars : (big_string, unary, count, unary) Client.rpc
  end
  
  module Server : sig
    (** Produce a server implementation from handlers *)
    val make : 
      swap:((pair, unary, pair, unary) Server.rpc -> 'handler) ->
      count_chars:((big_string, unary, count, unary) Server.rpc -> 'handler) ->
      unit -> 'handler Pbrt_services.Server.t
  end
end

We can then use the batrpc library and this generated code, together, to implement RPC clients and servers. Here "client" and "server" really means "network client" and "network server" (ie clients are the ones opening connections to servers); from the RPC point of view, once the connection is established, both ends act both are client and server in the sense that they can provide services, and emit requests to services.

Client side

Let's write a TCP client.

let (let@) = (@@)
let port = 12345

module RPC = Batrpc
module Client = RPC.Basic_client
module Fut = Moonpool.Fut

let () =
  let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
  let timer = RPC.Simple_timer.create () in

  Printf.printf "connecting...\n%!";
  let client : Client.t =
    RPC.Tcp_client.connect ~timer addr |> RPC.Error.unwrap
  in
  let@ () = Fun.protect ~finally:(fun () -> Client.close_and_join client) in

  let pair = Trivial.make_pair ~x:"hello" ~y:"world" () in
  Format.printf "pair: %a@." Trivial.pp_pair pair;

  let fut_pair_swapped : Trivial.pair Moonpool.Fut.t =
    Client.call client ~timeout_s:2. Trivial.Swapper.Client.swap pair
  in

  (* the request is in-flight, we can do other things here … *)

  (* now wait for the result *)
  let pair_swapped = Fut.wait_block_exn fut_pair_swapped in
  Format.printf "swapped pair: %a@." Trivial.pp_pair pair_swapped;
  ()

Server side

let ( let@ ) = ( @@ )
let port = 12345

module RPC = Batrpc
module Fut = Moonpool.Fut

(* this is where we implement the actual logic for the services *)

let trivial_service =
  Trivial.Swapper.Server.make
    ~swap:(fun rpc ->
      RPC.mk_handler rpc @@ fun (p : Trivial.pair) ->
      let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "test.swap" in
      Fut.return @@ Trivial.make_pair ~x:p.y ~y:p.x ())
    ~count_chars:(fun rpc ->
      RPC.mk_handler rpc @@ fun (msg : Trivial.big_string) ->
      let n = String.length msg.msg in
      Fut.return @@ Trivial.make_count ~count:(Int32.of_int n) ())
    ()

(* we could host multiple services, here we only have one *)
let services = [ trivial_service ]

let () =
  let active = RPC.Simple_switch.create () in
  let timer = RPC.Simple_timer.create () in

  (* we need a thread pool to run the tasks *)
  let@ runner = Moonpool.Ws_pool.with_ ~num_threads:8 () in

  let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
  let server : RPC.Tcp_server.t =
    RPC.Tcp_server.create ~active ~runner ~timer ~services addr
    |> RPC.Error.unwrap
  in

  (* background thread to accept connection *)
  Format.eprintf "listening on port %d@." port;
  RPC.Tcp_server.run server