-
Notifications
You must be signed in to change notification settings - Fork 2
/
server_pool.ml
236 lines (212 loc) · 9.47 KB
/
server_pool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
(**
This module is built around [Resource_pool]. While a pool of type
[Resource_pool.t] manages a number of resources, here we manage a cluster of
such pools. A typical use case would be a cluster of servers, where for each
server we maintain a number of connections. A user of this module can call [use]
to access one of the connections, which are served in a round-robin fashion.
*)
[@@@ocaml.warning "+A-9-44-48"]
let (>>=) = Lwt.(>>=)
let section = Lwt_log.Section.make "server-pool"
let () = Lwt_log.Section.set_level section Lwt_log.Info
module type CONF = sig
type connection
type server
type serverid
val serverid_to_string : serverid -> string
val connect : server -> connection Lwt.t
val close : connection -> unit Lwt.t
val check_delay : float
val check_server : serverid -> server -> bool Lwt.t
end
module Make (Conf : CONF) = struct
let show = Conf.serverid_to_string
type server_status = {
serverid : Conf.serverid;
desired : int;
current : int;
essential : bool;
suspended : bool;
check_server : unit -> bool Lwt.t;
connections : Conf.connection Resource_pool.t;
}
let mk_server_status
~serverid ~desired ~essential ~check_server ~connections =
{serverid = serverid; desired; current = 0; essential; suspended = false;
check_server; connections}
let servers : (Conf.serverid, server_status) Hashtbl.t = Hashtbl.create 9
let get_status serverid = Hashtbl.find_opt servers serverid
let non_essential_active_connection_pools () =
let accum serverid {essential; suspended; connections} acc =
if not essential && not suspended
then (serverid, connections) :: acc
else acc
in
Hashtbl.fold accum servers []
let remove serverid =
Lwt_log.ign_notice_f ~section "removing server %s" (show serverid);
Hashtbl.remove servers serverid
let update_current_count serverid f =
match get_status serverid with None -> () | Some status ->
let status = {status with current = f status.current} in
Hashtbl.replace servers serverid status;
Lwt_log.ign_debug_f ~section "current number of instances for %s: %d/%d"
(show serverid) status.current status.desired
(* Each server holds its own connection_pool, so a server pool is a pool of
connection pools. HOWEVER, [server_pool] will not contain one
connection pool per server, but [n] times the same connection pool per
server, where [n] is the (maximum) size of the servers connection pool. *)
let server_pool : server_status Resource_pool.t =
let nil () = failwith "Bs_db.server_pool: invalid connection attempt" in
(* We supply [0] as the first argument to [Resource_pool.create] as it will
prevent [Resource_pool] to ever create a new resource on its own. This is what
we want since new servers are to be added by the user of this module. *)
let n = 0
and dispose {serverid} =
update_current_count serverid pred;
Lwt.return_unit
and check _ {serverid} =
match get_status serverid with
| None -> Lwt.return_false (* remove retired server from pool *)
| Some _ -> Lwt.return_true
(* | Some status -> f status.essential *)
(* For now, do not dispose of servers upon Resource_invalid {safe =
true} as there is currently no mechanism for reinstating them.
Potentially it might be advisable to dispose of them temporarily. *)
in Resource_pool.create ~check ~dispose n nil
let server_exists serverid = Hashtbl.mem servers serverid
let add_many
?(essential = false)
?(connect_immediately = false) ~num_conn new_servers =
let mk_connection_pool (serverid, server) : server_status =
Lwt_log.ign_notice_f ~section "adding server: %s" (show serverid);
let connect () =
Lwt_log.ign_info_f ~section "opening connection to %s" (show serverid);
Conf.connect server >>= fun conn ->
Lwt.return conn
in
let dispose conn =
(* TODO: reopen closed connections if connect_immediately is true ? *)
Lwt_log.ign_info_f ~section "closing connection to %s" (show serverid);
Lwt.catch (fun () -> Conf.close conn) (fun _ -> Lwt.return_unit)
in
let check_server () = Conf.check_server serverid server in
let check _ _ = Lwt.return_true in (* never close connections *)
let connections = Resource_pool.create num_conn ~check ~dispose connect in
let status =
mk_server_status
~serverid ~desired:num_conn ~essential ~check_server ~connections
in
Hashtbl.add servers serverid status;
if connect_immediately then
for _ = 1 to num_conn do
Lwt.async @@ fun () ->
connect () >>= fun c ->
try Resource_pool.add connections c; Lwt.return_unit
with Resource_pool.Resource_limit_exceeded -> dispose c
done;
status
in
let pools = List.map mk_connection_pool @@
List.filter (fun l -> not @@ server_exists @@ fst l) new_servers in
for _ = 1 to num_conn do
pools |> List.iter @@ fun conn_pool ->
Resource_pool.add ~omit_max_check:true server_pool conn_pool;
update_current_count conn_pool.serverid succ
done
let add_one ?essential ?connect_immediately ~num_conn serverid server =
add_many ?essential ?connect_immediately ~num_conn [(serverid, server)]
let add_existing
?(essential = false) ?(check_server = fun () -> Lwt.return_true)
~num_conn serverid connections =
Lwt_log.ign_notice_f ~section "adding existing server: %s" (show serverid);
let status =
mk_server_status
~serverid ~desired:num_conn ~essential ~check_server ~connections
in
Hashtbl.add servers serverid status;
for _ = 1 to num_conn do
Resource_pool.add ~omit_max_check:true server_pool status;
update_current_count serverid succ
done
let reactivate_server ~check_server connection_pool =
let serverid = connection_pool.serverid in
let check () =
Lwt_unix.sleep Conf.check_delay >>= fun () ->
Lwt_log.ign_debug_f ~section "checking server health of %s" (show serverid);
Lwt.catch
check_server
(fun e ->
Lwt_log.ign_info_f ~section
"exception during health check of %s: %s"
(show serverid) (Printexc.to_string e);
Lwt.return_false)
in
let reactivate () =
match get_status serverid with None -> Lwt.return_unit | Some status ->
Lwt_log.ign_notice_f ~section
"reactivating healthy server %s" (show serverid);
Hashtbl.replace servers serverid {status with suspended = false};
for _ = status.current to status.desired - 1 do
Resource_pool.add ~omit_max_check:true server_pool connection_pool;
update_current_count serverid succ
done;
Lwt.return_unit
in
let rec loop () =
if not @@ server_exists serverid then Lwt.return_unit else
check () >>= fun healthy -> if healthy then reactivate () else loop ()
in loop ()
let suspend_server ~check_server connection_pool =
let serverid = connection_pool.serverid in
match get_status serverid with None -> () | Some status ->
if status.essential || status.suspended then () else begin
Lwt_log.ign_warning_f ~section "suspending %s" (show serverid);
Hashtbl.replace servers serverid {status with suspended = true};
Lwt.async @@ fun () ->
Resource_pool.clear connection_pool.connections >>= fun () ->
reactivate_server ~check_server connection_pool
end
let use ?usage_attempts f =
(* We use retry here, since elements cannot be removed from an
[Resource_pool.t] directly. Therefore we detect whether a server has been
removed by our own means and try again with another server it this was
the case. Once a server has been removed (by the use of [remove]) there
will be [n] such retries before all traces of a server have been
erased, where [n] equals the value used for [num_conn] when the server
was added. *)
Resource_pool.use ~usage_attempts:9 server_pool @@ fun connection_pool ->
let {serverid; connections} = connection_pool in
match get_status serverid with
| None ->
Lwt_log.info_f ~section "cannot use %s (removed)" (show serverid)
>>= fun () ->
Lwt.fail Resource_pool.(Resource_invalid {safe = true})
| Some {suspended = true} ->
Lwt_log.info_f ~section "not using %s (suspended)" (show serverid)
>>= fun () ->
Lwt.fail Resource_pool.(Resource_invalid {safe = true})
| Some {check_server} ->
Lwt_log.debug_f ~section "using connection to %s" (show serverid)
>>= fun () ->
Lwt.catch
(fun () -> Resource_pool.use ?usage_attempts connections f)
(fun e -> match e with
| Resource_pool.(Resource_invalid {safe = true}) ->
Lwt_log.warning
"connection unusable (safe to retry using another server)"
>>= fun () ->
Lwt.fail e
| Resource_pool.(Resource_invalid {safe = false}) ->
Lwt_log.warning
"connection unusable (unsafe to retry using another server)"
>>= fun () ->
suspend_server ~check_server connection_pool;
Lwt.fail e
| e -> Lwt.fail e
)
let server_statuses () =
Hashtbl.fold (fun _ status l -> status :: l) servers []
let servers () =
Hashtbl.fold (fun server _ l -> server :: l) servers []
end