Skip to content

Commit

Permalink
Reuse nodejs subprocess for faster expressions (#178)
Browse files Browse the repository at this point in the history
* Reuse javascript subprocess to reduce script evaluation overhead.
  • Loading branch information
tetron authored Sep 1, 2016
1 parent 6a8d5f4 commit 42f7318
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 60 deletions.
13 changes: 13 additions & 0 deletions cwltool/cwlNodeEngine.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"use strict";
process.stdin.setEncoding('utf8');
var incoming = "";
process.stdin.on('data', function(chunk) {
incoming += chunk;
var i = incoming.indexOf("\n");
if (i > -1) {
var fn = JSON.parse(incoming.substr(0, i));
incoming = incoming.substr(i+1);
process.stdout.write(JSON.stringify(require("vm").runInNewContext(fn, {})) + "\n");
}
});
process.stdin.on('end', process.exit);
74 changes: 61 additions & 13 deletions cwltool/sandboxjs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,36 @@
import threading
import errno
import logging
from typing import Any, Dict, List, Mapping, Text, TypeVar, Union
import select
import os

import cStringIO
from cStringIO import StringIO
from typing import Any, Dict, List, Mapping, Text, TypeVar, Union
from pkg_resources import resource_stream

class JavascriptException(Exception):
pass

_logger = logging.getLogger("cwltool")

JSON = Union[Dict[Any,Any], List[Any], Text, int, long, float, bool, None]
JSON = Union[Dict[Text,Any], List[Any], Text, int, long, float, bool, None]

localdata = threading.local()

have_node_slim = False

def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -> JSON
def new_js_proc():
# type: () -> subprocess.Popen

res = resource_stream(__name__, 'cwlNodeEngine.js')
nodecode = res.read()

nodejs = None
trynodes = ("nodejs", "node")
for n in trynodes:
try:
nodejs = subprocess.Popen([n], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
nodejs = subprocess.Popen([n, "--eval", nodecode], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
break
except OSError as e:
if e.errno == errno.ENOENT:
Expand All @@ -39,7 +51,7 @@ def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -
nodejs = subprocess.Popen(["docker", "run",
"--attach=STDIN", "--attach=STDOUT", "--attach=STDERR",
"--sig-proxy=true", "--interactive",
"--rm", nodeimg],
"--rm", nodeimg, "node", "--eval", nodecode],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError as e:
if e.errno == errno.ENOENT:
Expand All @@ -55,15 +67,24 @@ def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -
"expressions, but couldn't find it. Tried %s, docker run "
"node:slim" % u", ".join(trynodes))

return nodejs


def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -> JSON

if not hasattr(localdata, "proc") or localdata.proc.poll() is not None:
localdata.proc = new_js_proc()

nodejs = localdata.proc

fn = u"\"use strict\";\n%s\n(function()%s)()" % (jslib, js if isinstance(js, basestring) and len(js) > 1 and js[0] == '{' else ("{return (%s);}" % js))
script = u"console.log(JSON.stringify(require(\"vm\").runInNewContext(%s, {})));\n" % json.dumps(fn)

killed = []

def term():
try:
nodejs.kill()
killed.append(True)
nodejs.kill()
except OSError:
pass

Expand All @@ -73,17 +94,44 @@ def term():
tm = threading.Timer(timeout, term)
tm.start()

stdoutdata, stderrdata = nodejs.communicate(script)
stdin_buf = StringIO(json.dumps(fn)+"\n")
stdout_buf = StringIO()
stderr_buf = StringIO()

completed = [] # type: List[Union[cStringIO.InputType, cStringIO.OutputType]]
while len(completed) < 3:
rready, wready, _ = select.select([nodejs.stdout, nodejs.stderr], [nodejs.stdin], [])
if nodejs.stdin in wready:
b = stdin_buf.read(select.PIPE_BUF)
if b:
os.write(nodejs.stdin.fileno(), b)
elif stdin_buf not in completed:
completed.append(stdin_buf)
for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
if pipes[0] in rready:
b = os.read(pipes[0].fileno(), select.PIPE_BUF)
if b:
pipes[1].write(b)
elif pipes[1] not in completed:
completed.append(pipes[1])
if stdout_buf.getvalue().endswith("\n"):
for buf in (stdout_buf, stderr_buf):
if buf not in completed:
completed.append(buf)
tm.cancel()

stdin_buf.close()
stdoutdata = stdout_buf.getvalue()
stderrdata = stderr_buf.getvalue()

def fn_linenum(): # type: () -> Text
return u"\n".join(u"%04i %s" % (i+1, b) for i, b in enumerate(fn.split("\n")))

if killed:
raise JavascriptException(u"Long-running script killed after %s seconds.\nscript was:\n%s\n" % (timeout, fn_linenum()))

if nodejs.returncode != 0:
raise JavascriptException(u"Returncode was: %s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % (nodejs.returncode, fn_linenum(), stdoutdata, stderrdata))
if nodejs.poll() not in (None, 0):
if killed:
raise JavascriptException(u"Long-running script killed after %s seconds.\nscript was:\n%s\n" % (timeout, fn_linenum()))
else:
raise JavascriptException(u"Returncode was: %s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % (nodejs.returncode, fn_linenum(), stdoutdata, stderrdata))
else:
try:
return json.loads(stdoutdata)
Expand Down
46 changes: 0 additions & 46 deletions node-expr-engine/cwlNodeEngine.js

This file was deleted.

3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
'schemas/v1.0/*.yml',
'schemas/v1.0/*.md',
'schemas/v1.0/salad/schema_salad/metaschema/*.yml',
'schemas/v1.0/salad/schema_salad/metaschema/*.md']},
'schemas/v1.0/salad/schema_salad/metaschema/*.md',
'cwlNodeEngine.js']},
install_requires=[
'requests',
'ruamel.yaml == 0.12.4',
Expand Down

0 comments on commit 42f7318

Please sign in to comment.