projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Fix tests. refs #9570
[arvados.git]
/
sdk
/
cwl
/
arvados_cwl
/
runner.py
diff --git
a/sdk/cwl/arvados_cwl/runner.py
b/sdk/cwl/arvados_cwl/runner.py
index 639426afd3be49db355dee357155e4933bb727fb..19cb7eae37dec25daefd0759670cf9151e1a2add 100644
(file)
--- a/
sdk/cwl/arvados_cwl/runner.py
+++ b/
sdk/cwl/arvados_cwl/runner.py
@@
-3,11
+3,14
@@
import urlparse
from functools import partial
import logging
import json
from functools import partial
import logging
import json
+import re
+import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps,
adjustFiles,
UnsupportedRequirement
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement
from cwltool.load_tool import fetch_document
from cwltool.load_tool import fetch_document
+from cwltool.pathmapper import adjustFileObjs
import arvados.collection
import arvados.collection
@@
-16,6
+19,8
@@
from .pathmapper import ArvPathMapper
logger = logging.getLogger('arvados.cwl-runner')
logger = logging.getLogger('arvados.cwl-runner')
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse):
self.arvrunner = runner
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse):
self.arvrunner = runner
@@
-41,15
+46,14
@@
class Runner(object):
def arvados_job_spec(self, *args, **kwargs):
self.upload_docker(self.tool)
def arvados_job_spec(self, *args, **kwargs):
self.upload_docker(self.tool)
- workflowfiles =
set()
- jobfiles =
set()
- workflowfiles.a
dd(self.tool.tool["id"]
)
+ workflowfiles =
[]
+ jobfiles =
[]
+ workflowfiles.a
ppend({"class":"File", "location": self.tool.tool["id"]}
)
self.name = os.path.basename(self.tool.tool["id"])
def visitFiles(files, path):
self.name = os.path.basename(self.tool.tool["id"])
def visitFiles(files, path):
- files.add(path)
- return path
+ files.append(path)
document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
loaded = set()
document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
loaded = set()
@@
-63,10
+67,10
@@
class Runner(object):
sc = scandeps(uri, workflowobj,
set(("$import", "run")),
sc = scandeps(uri, workflowobj,
set(("$import", "run")),
- set(("$include", "$schemas", "path")),
+ set(("$include", "$schemas", "path"
, "location"
)),
loadref)
loadref)
- adjustFiles(sc, partial(visitFiles, workflowfiles))
- adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+ adjustFile
Obj
s(sc, partial(visitFiles, workflowfiles))
+ adjustFile
Obj
s(self.job_order, partial(visitFiles, jobfiles))
keepprefix = kwargs.get("keepprefix", "")
workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
keepprefix = kwargs.get("keepprefix", "")
workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
@@
-81,7
+85,9
@@
class Runner(object):
name=os.path.basename(self.job_order.get("id", "#")),
**kwargs)
name=os.path.basename(self.job_order.get("id", "#")),
**kwargs)
- adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+ def setloc(p):
+ p["location"] = jobmapper.mapper(p["location"])[1]
+ adjustFileObjs(self.job_order, setloc)
if "id" in self.job_order:
del self.job_order["id"]
if "id" in self.job_order:
del self.job_order["id"]
@@
-119,4
+125,4
@@
class Runner(object):
logger.error("While getting final output object: %s", e)
self.arvrunner.output_callback(outputs, processStatus)
finally:
logger.error("While getting final output object: %s", e)
self.arvrunner.output_callback(outputs, processStatus)
finally:
- del self.arvrunner.
job
s[record["uuid"]]
+ del self.arvrunner.
processe
s[record["uuid"]]