-
Notifications
You must be signed in to change notification settings - Fork 1
/
functory.mli
366 lines (280 loc) · 12.6 KB
/
functory.mli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
(**************************************************************************)
(* *)
(* Functory: a distributed computing library for Ocaml *)
(* Copyright (C) 2010 Jean-Christophe Filliatre and Kalyan Krishnamani *)
(* *)
(* This software is free software; you can redistribute it and/or *)
(* modify it under the terms of the GNU Library General Public *)
(* License version 2.1, with the special exception on linking *)
(* described in file LICENSE. *)
(* *)
(* This software is distributed in the hope that it will be useful, *)
(* but WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *)
(* *)
(**************************************************************************)
(** {b Distributed computing library} *)
(** The sequential implementation (to be used as a reference) *)
module Sequential : sig
(** {2 Generic API} *)
val compute :
worker:('a -> 'b) ->
master:('a * 'c -> 'b -> ('a * 'c) list) -> ('a * 'c) list -> unit
(** [master f handle l] applies function [f] to each first-component
of elements in [l]; for each such computation, both the list element
and the result are passed to [handle], which returns a list of
new elements to be processed (in an identical manner).
The computation stops when there is no more element to be processed.
*)
(** {2 Derived API}
The following functions are provided for convenience;
they can be derived from the generic function above. *)
val map : f:('a -> 'b) -> 'a list -> 'b list
(** same result as [List.map] *)
val map_local_fold :
f:('a -> 'b) -> fold:('c -> 'b -> 'c) -> 'c -> 'a list -> 'c
(** [map_local_fold f fold acc l] computes
[fold ... (fold (fold acc (f x1)) (f x2)) ... (f xn)]
for some permutation [x1,x2,...,xn] of [l] *)
val map_remote_fold :
f:('a -> 'b) -> fold:('c -> 'b -> 'c) -> 'c -> 'a list -> 'c
(** same specification as above *)
val map_fold_ac :
f:('a -> 'b) -> fold:('b -> 'b -> 'b) -> 'b -> 'a list -> 'b
(** same specification, assuming [fold] is an associative and
commutative operation; the third argument should be a
neutral element for [fold] *)
val map_fold_a :
f:('a -> 'b) -> fold:('b -> 'b -> 'b) -> 'b -> 'a list -> 'b
(** [map_fold_a f fold acc [x1;...xn]] computes
[fold ... (fold (fold acc (f x1)) (f x2)) ... (f xn)]
assuming [fold] is an associative
operation with neutral element [acc] *)
end
(** Several cores on the same machine. *)
module Cores : sig
val set_number_of_cores : int -> unit
(** [set_number_of_cores n] indicates that [n] computations can be
done in parallel. It is typically less or equal to the number of
actual cores on the machine, though it is not mandatory.
Setting [n] to 1 is equivalent to a sequential execution (though the
order in which tasks are performed may differ). *)
(** {2 Generic API}
For documentation, refer to module {!Sequential}. *)
val compute :
worker:('a -> 'b) ->
master:('a * 'c -> 'b -> ('a * 'c) list) -> ('a * 'c) list -> unit
(** {2 Derived API}
For documentation, refer to module {!Sequential}. *)
val map : f:('a -> 'b) -> 'a list -> 'b list
val map_local_fold :
f:('a -> 'b) -> fold:('c -> 'b -> 'c) -> 'c -> 'a list -> 'c
val map_remote_fold :
f:('a -> 'b) -> fold:('c -> 'b -> 'c) -> 'c -> 'a list -> 'c
val map_fold_ac :
f:('a -> 'b) -> fold:('b -> 'b -> 'b) -> 'b -> 'a list -> 'b
val map_fold_a :
f:('a -> 'b) -> fold:('b -> 'b -> 'b) -> 'b -> 'a list -> 'b
end
(** Network implementation.
Different network implementations are provided, depending on whether
- master and workers are the same binary programs (module {!Same})
- master and workers are different programs compiled with the
same version of Ocaml (module {!Poly})
- master and workers are different programs possibly compiled with
different versions of Ocaml (module {!Mono})
*)
module Network : sig
(** {2 Setup} *)
type worker
val create_worker : ?port:int -> string -> worker
(** [create_worker s] creates a new worker located on machine [s].
Raises [Invalid_argument] if [s] is not a valid machine. *)
val declare_workers : ?port:int -> ?n:int -> string -> unit
(** [declare_workers s] declares [n] workers on machine [s]
(when [n] is not given, it defaults to 1).
Number [n] does not necessarily coincide with the number of
available cores of machine [s].
[s] could be a machine hostname or an IP number.
If [port] is not given, it is set to the default port number
(see below). *)
val torque_init : unit -> unit
(** initialize workers based on torque resource management system's environment variables and config *)
val set_default_port_number : int -> unit
(** Sets the default port number.
If not called, the default port number is 51000. *)
val set_pong_timeout : float -> unit
(** [set_pong_timeout t] sets the upper time limit [t] for receiving a
pong message from a worker (since last ping message), before
we declare the worker unreachable. If not specified, it
defaults to 5 seconds. *)
val set_ping_interval : float -> unit
(** [set_ping_interval t] sets the interval between consecutive ping
messages. If not specified, it defaults to 10 seconds. *)
(** {2 Worker type} *)
type worker_type = ?port:int -> unit -> unit
(** The type of forthcoming worker implementations.
Port number is given by [port]; default value is [51000] and can
be changed using function [set_default_port_number] above. *)
type computation_status = Running | Done | Dead
(** {2 Same binary executed as master and workers}
A worker is distinguished from the master in two possible ways:
- either the environment variable WORKER is set and then a worker is
immediately started;
- or function [Worker.compute] below is explicitely called by the
user program. *)
module Same : sig
(** {2 Low level API} *)
module Computation : sig
type ('a, 'c) t
(** The type of distributed computations. *)
val create :
worker:('a -> 'b) ->
master:('a * 'c -> 'b -> ('a * 'c) list) ->
('a, 'c) t
(** [create worker master] creates a new distributed computation.
It has no worker, nor tasks.
Workers (resp.tasks) should be added using function [add_worker]
(resp. [add_task]) below.
Note: function [declare_workers] above is only meaningful for
high-level API functions such as [compute], [map], etc. See below.
*)
val add_worker : ('a, 'c) t -> worker -> unit
(** [add_worker c w] adds worker [w] to computation [c]. *)
val add_task : ('a, 'c) t -> 'a * 'c -> unit
(** [add_task c t] adds task [t] to computation [c]. *)
val remove_worker : ('a, 'c) t -> worker -> unit
(** [remove_worker c w] removes worker [w] from computation [c].
If [w] was running some task, it will be eventually
rescheduled to another worker. *)
val one_step : ?timeout:float -> ('a, 'c) t -> unit
(** [one_step ~timeout c] runs one step of computation [c].
That is, it
- connects, or reconnects, to workers if necessary;
- pings connected workers;
- schedule new pending tasks to idle workers if possible;
- listens to incoming messages from workers, if any.
The optional argument [timeout] indicates how long we should
listen to incoming messages. Its default value is [0] which
means that we handle messages if any, otherwise we immediately
return. A value greater than 0 can be used to avoid busy waiting
in a loop which does nothing but [one_step].
*)
val status : ('a, 'c) t -> computation_status
(** [status c] queries the statis of computation [c].
It has three possible values:
- [Running]: on-going computation.
- [Done]: completed computation i.e. there is no pending task and
no currently running task. This state could be reached either
by completion of all tasks or by function [clear] below.
- [Dead]: computation [c] was killed (see function [kill] below).
*)
val clear : ('a, 'c) t -> unit
(** [clear c] clears all tasks from computation [c] i.e. all pending
tasks as well as all currently running tasks. The status of [c]
is set to [Done]. One can still add new tasks, which will
put [c] back to the [Running] state. *)
val kill : ('a, 'c) t -> unit
(** [kill c] kills computation [c].
It turns the status of [c] to [Dead].
[c] cannot be used anymore i.e. any operation applied to [c] will
raise an [Invalid_argument] exception. *)
end
(** {2 High level API} *)
val compute :
worker:('a -> 'b) ->
master:('a * 'c -> 'b -> ('a * 'c) list) -> ('a * 'c) list -> unit
module Worker : sig
val compute : worker_type
(** [compute ()] starts a worker loop, waiting for computations
from the master. *)
end
(** {2 Derived API} *)
val map : f:('a -> 'b) -> 'a list -> 'b list
val map_local_fold :
f:('a -> 'b) -> fold:('c -> 'b -> 'c) -> 'c -> 'a list -> 'c
val map_remote_fold :
f:('a -> 'b) -> fold:('c -> 'b -> 'c) -> 'c -> 'a list -> 'c
val map_fold_ac :
f:('a -> 'b) -> fold:('b -> 'b -> 'b) -> 'b -> 'a list -> 'b
val map_fold_a :
f:('a -> 'b) -> fold:('b -> 'b -> 'b) -> 'b -> 'a list -> 'b
end
(** {2 Polymorphic API (same version of Ocaml)}
Contrary to module [Same] above, master and workers are no more
executing the same code. Submodule [Master] (resp. [Worker])
provides functions to implement the master (resp. the workers).
Arguments for functions [master], [map], etc. are thus split between
these two submodules. *)
module Poly : sig
module Master : sig
module Computation : sig
type ('a, 'c) t
val create : master:('a * 'c -> 'b -> ('a * 'c) list) -> ('a, 'c) t
val add_worker : ('a, 'c) t -> worker -> unit
val remove_worker : ('a, 'c) t -> worker -> unit
val one_step : ?timeout:float -> ('a, 'c) t -> unit
val status : ('a, 'c) t -> computation_status
val kill : ('a, 'c) t -> unit
val clear : ('a, 'c) t -> unit
val add_task : ('a, 'c) t -> 'a * 'c -> unit
val nb_tasks : ('a, 'c) t -> int
end
val compute :
master:('a * 'c -> 'b -> ('a * 'c) list) ->
('a * 'c) list -> unit
val map : 'a list -> 'b list
val map_local_fold : fold:('c -> 'b -> 'c) -> 'c -> 'a list -> 'c
val map_remote_fold : 'c -> 'a list -> 'c
val map_fold_ac : 'b -> 'a list -> 'b
val map_fold_a : 'b -> 'a list -> 'b
end
module Worker : sig
val compute : ('a -> 'b) -> worker_type
val map :
f:('a -> 'b) -> worker_type
val map_local_fold :
f:('a -> 'b) -> worker_type
val map_remote_fold :
f:('a -> 'b) -> fold:('c -> 'b -> 'c) -> worker_type
val map_fold_ac :
f:('a -> 'b) -> fold:('b -> 'b -> 'b) -> worker_type
val map_fold_a :
f:('a -> 'b) -> fold:('b -> 'b -> 'b) -> worker_type
end
end
(** {2 Monomorphic API (possibly different versions of ocaml)}
When master and workers are not compiled with the same version of Ocaml,
only strings can be passed, hence the monomorphic API below.
It is the responsability of user to encode/decode values.
As of now, there is no derived API in this module. *)
module Mono : sig
module Computation : sig
type 'c t
val create :
master:(string * 'c -> string -> (string * 'c) list) -> 'c t
val add_worker : 'c t -> worker -> unit
val remove_worker : 'c t -> worker -> unit
val one_step : ?timeout:float -> 'c t -> unit
val status : 'c t -> computation_status
val kill : 'c t -> unit
val clear : 'c t -> unit
val add_task : 'c t -> string * 'c -> unit
end
module Master : sig
val compute :
master:(string * 'c -> string -> (string * 'c) list) ->
(string * 'c) list -> unit
end
module Worker : sig
val compute : (string -> string) -> worker_type
end
end
end
(** Library parameters *)
module Control : sig
val set_debug : bool -> unit
(** Sets the debug flag. When set, several messages are displayed
on error output. (Workers and master display different kinds of
messages.) *)
end