forked from xfguo/luactor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathluactor.lua
353 lines (302 loc) · 11 KB
/
luactor.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
--
-- Luactor - A pure Lua "Actor Model" framework
--
--============================================================================
-- load configuration module
local __config = require "luactor.config"
--============================================================================
-- An actor need a reactor for event trigger.
-- get the choose of reactor from environment variable.
local reactor_env = os.getenv("LUACTOR_REACTOR")
-- get reactor choice from config module if no environment variable set.
if reactor_env == nil then
reactor_env = __config.REACTOR
end
-- load rector library
local reactor
if reactor_env == 'uloop' then
reactor = require "luactor.reactor.uloop"
else
-- for now, default reactor driver is luaevent
reactor = require "luactor.reactor.luaevent"
end
--============================================================================
-- declare internal objects
local __reactor -- reactor object
local __mqueue -- message queue
local __actors -- actor object pool
local __actors_num -- number of actors
local __lut_thread_actor -- thread to actor look-up table
local __actors_events -- registered events for each actor
--============================================================================
-- helper methods for luactor
local pack = table.pack or function(...) return {n = select("#", ...), ...} end
------------------------------------------------------------------------------
-- open a coroutine safely (for lua 5.1)
-- modified from coxpcall project
--
-- Coroutine safe xpcall and pcall versions modified for luactor
--
-- Encapsulates the protected calls with a coroutine based loop, so errors can
-- be dealed without the usual Lua 5.x pcall/xpcall issues with coroutines
-- yielding inside the call to pcall or xpcall.
--
-- Authors: Roberto Ierusalimschy and Andre Carregal
-- Contributors: Thomas Harning Jr., Ignacio Burgueño, Fabio Mascarenhas
--
-- Copyright 2005 - Kepler Project (www.keplerproject.org)
-------------------------------------------------------------------------------
local handle_return_value = function (err, co, status, ...)
if not status then
return false, err(debug.traceback(co, (...)), ...)
end
return true, ...
end
local perform_resume = function (err, co, ...)
return handle_return_value(err, co, coroutine.resume(co, ...))
end
local safe_coroutine_create = function(f)
local res, thread = pcall(coroutine.create, f)
if not res then
local newf = function(...) return f(...) end
thread = coroutine.create(newf)
end
return thread
end
--------------------------------------------------------------------------------
-- Simple Queue
local queue = {}
queue.new = function()
return {first = 0, last = -1}
end
queue.push = function (q, value)
local first = q.first - 1
q.first = first
q[first] = value
end
queue.pop = function (q)
local last = q.last
if q.first > last then error("queue is empty") end
local value = q[last]
q[last] = nil -- to allow garbage collection
q.last = last - 1
return value
end
queue.empty = function (q)
return q.first > q.last and true or false
end
-------------------------------------------------------------------------------
-- get running actor's object
local get_myself = function ()
local myself_thread = coroutine.running()
local me = __lut_thread_actor[myself_thread]
-- TODO: what if I am main thread
return me
end
-------------------------------------------------------------------------------
-- resume an actor and handle its error.
local resume_actor = function (actor, ...)
local actor = actor
if type(actor) == 'string' then
actor = __actors[actor]
end
local me = get_myself()
local status, err = perform_resume(pack, actor.thread, ...)
-- send the failed actor to its creator. if it creator is
-- dead, then just destory it.
if status == false and actor.creator ~= nil then
queue.push(__mqueue, {
'_', -- sender
actor.creator, -- receiver
"actor_error", -- command
{
actor = actor,
error = err,
} -- error message
})
end
-- if an actor is dead or failed, destory the relevant info.
if coroutine.status(actor.thread) == 'dead'
or status == false
then
__actors[actor.name] = nil
__actors_num = __actors_num - 1
__lut_thread_actor[actor.thread] = nil
-- unregister all events that created by this actor
for _, ev_obj in pairs(__actors_events[actor.name]) do
__reactor.unregister_event(ev_obj)
end
__actors_events[actor.name] = nil
end
end
------------------------------------------------------------------------------
-- event register handlers
local event_handlers = {
timeout = function(sender, receiver, ev_name, timeout_interval)
return __reactor.register_timeout_cb(
function (events)
-- push event message to __mqueue
queue.push(__mqueue, {sender, receiver, ev_name,
{
timeout_interval = timeout_interval,
}
})
resume_actor('_')
end, timeout_interval
)
end,
fd = function(sender, receiver, ev_name, fd, event)
return __reactor.register_fd_event(
function (events)
-- push event message to __mqueue
queue.push(__mqueue, {sender, receiver, ev_name,
{
event = event,
fd = fd,
}
})
resume_actor('_')
end,
fd, event
)
end,
}
------------------------------------------------------------------------------
-- schedular coroutine
--
-- process the message queue until it's empty, then yield out.
local process_mqueue = function ()
local finish = false
while not finish do
-- process the message in the queue one by one until empty.
while not queue.empty(__mqueue) do
local sender, receiver, command, message = unpack(queue.pop(__mqueue))
if __actors[receiver] == nil then
-- drop this message
-- TODO: what we can do before the message is dropped.
else
resume_actor(receiver, sender, command, message)
-- if there is no running actor, everything should be done.
if __actors_num <= 0 then
__reactor.cancel()
finish = true
break
end
end
end
-- yield out to mainthread to wait next event.
coroutine.yield()
end
end
--============================================================================
-- Luactor methods
--
-- like *coroutine*, all *luactor* method are contained within a table.
local actor = {}
------------------------------------------------------------------------------
-- initialize internal objects
__reactor = reactor -- reactor object
__mqueue = queue.new() -- message queue
__actors = {} -- actor object pool
__actors_num = 0 -- number of actors
__lut_thread_actor = {} -- thread to actor look-up table
__actors_events = {} -- registered events for each actor
------------------------------------------------------------------------------
-- create an actor
actor.create = function (name, f, ...)
if __actors[name] ~= nil then
error("the actor name has been registered")
end
local new_actor = {}
new_actor.name = name
__actors_num = __actors_num + 1
local thread = safe_coroutine_create(f)
new_actor.thread = thread
local me = get_myself()
new_actor.creator = me and me.name
-- save the actor to the global table
__actors[name] = new_actor
__actors_events[name] = {}
__lut_thread_actor[thread] = new_actor
return new_actor
end
------------------------------------------------------------------------------
-- send a message to another actor
--
-- XXX: should we yield out when receive a message?
actor.send = function (receiver, command, message)
local me = get_myself()
-- didn't check receiver's name because it might be create later.
-- push message to global message queue
queue.push(__mqueue, {
me and me.name or "_", -- sender
receiver, -- receiver
command, -- command
message, -- message
})
end
------------------------------------------------------------------------------
-- wait a message
actor.wait = function (handlers)
-- handlers is the set of the message you want listen
local sender, command, message = coroutine.yield()
if command ~= nil
and handlers[command] ~= nil
-- XXX:how to make sure handlers[command] is function like?
then
handlers[command](message, sender)
else
-- XXX: should we raise an error here? or, we can return a
-- command `__unknown` or `__index` as a *meta method*.
error("unknown message command type")
end
end
------------------------------------------------------------------------------
-- register an event
actor.register_event = function (ev_type, ev_name, ...)
local me = get_myself()
local event_handler = event_handlers[ev_type]
if __actors_events[me.name][ev_name] ~= nil then
error('event name "'..ev_name..'" has been registered!')
elseif event_handler ~= nil then
-- XXX: any other possibilities for sender and receiver?
__actors_events[me.name][ev_name] = event_handler('_', me.name, ev_name, ...)
else
error('unknonw event type: '..ev_type)
end
end
------------------------------------------------------------------------------
-- unregister an event
actor.unregister_event = function (ev_name)
local me = get_myself()
if __actors_events[me.name][ev_name] == nil then
error("cannot find event "..ev_name..".")
end
__reactor.unregister_event(__actors_events[me.name][ev_name])
__actors_events[me.name][ev_name] = nil
end
------------------------------------------------------------------------------
-- start an actor
actor.start = function (actor, ...)
resume_actor(actor, ...)
end
------------------------------------------------------------------------------
-- run schedular and event loop
actor.run = function ()
local thread = safe_coroutine_create(process_mqueue)
local schedular_actor = {}
schedular_actor.name = '_'
schedular_actor.thread = thread
__lut_thread_actor[thread] = schedular_actor
-- TODO: the schedular coroutine should have its creator and the
-- error should be processed.
-- schedular_actor.creater = ?
__actors['_'] = schedular_actor
resume_actor(schedular_actor)
-- run until there are no __actors
while __actors_num > 0 do
__reactor.run()
end
-- TODO: clean up everything
end
return actor