import threading
import cwltool.docker
import fnmatch
+import logging
+import re
+import os
+from cwltool.process import get_feature
logger = logging.getLogger('arvados.cwl-runner')
logger.setLevel(logging.INFO)
if not images:
imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
- arvados.commands.keepdocker.main(dockerRequirement["dockerImageId"])
+ args = [image_name]
+ if image_tag:
+ args.append(image_tag)
+ arvados.commands.keepdocker.main(args)
return dockerRequirement["dockerImageId"]
runtime_constraints = {}
if self.stdin:
- command["stdin"] = self.stdin
+ script_parameters["task.stdin"] = self.stdin
if self.stdout:
- command["stdout"] = self.stdout
+ script_parameters["task.stdout"] = self.stdout
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
- runtime_constraints["docker_image"] = arv_docker_get(docker_req, pull_image)
+ runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
runtime_constraints["arvados_sdk_version"] = "master"
response = self.arvrunner.api.jobs().create(body={
class ArvPathMapper(cwltool.pathmapper.PathMapper):
- def __init__(self, arvrunner, referenced_files, basedir):
+ def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
self._pathmap = {}
uploadfiles = []
for src in referenced_files:
ab = src if os.path.isabs(src) else os.path.join(basedir, src)
st = arvados.commands.run.statfile("", ab)
- if isinstance(st, arvados.commands.run.UploadFile):
+ if kwargs.get("conformance_test"):
+ self._pathmap[src] = (src, ab)
+ elif isinstance(st, arvados.commands.run.UploadFile):
uploadfiles.append((src, ab, st))
elif isinstance(st, arvados.commands.run.ArvFile):
self._pathmap[src] = (ab, st.fn)
elif isinstance(st, basestring) and pdh_path.match(st):
- self._pathmap[src] = (st, "$(file %s") % st)
+ self._pathmap[src] = (st, "$(file %s)" % st)
else:
- workflow.WorkflowException("Input file path '%s' is invalid", st)
+ raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid", st)
- arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api)
+ if uploadfiles:
+ arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3)
for src, ab, st in uploadfiles:
self._pathmap[src] = (ab, st.fn)
def makeJobRunner(self):
return ArvadosJob(self.arvrunner)
- def makePathMapper(self, reffiles, input_basedir):
- return ArvadosCommandTool.ArvPathMapper(self.arvrunner, reffiles, input_basedir)
+ def makePathMapper(self, reffiles, input_basedir, **kwargs):
+ return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
class ArvCwlRunner(object):
if "object_uuid" in event:
if event["object_uuid"] in self.jobs and event["event_type"] == "update":
if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
- try:
- self.cond.acquire()
- self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
- self.cond.notify()
- finally:
- self.cond.release()
+ try:
+ self.cond.acquire()
+ self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
+ self.cond.notify()
+ finally:
+ self.cond.release()
def arvExecutor(self, t, job_order, input_basedir, **kwargs):
events = arvados.events.subscribe(arvados.api('v1'), [["object_kind", "=", "arvados#job"]], self.on_message)
finally:
self.cond.release()
else:
- raise workflow.WorkflowException("Workflow deadlocked.")
+ raise cwltool.workflow.WorkflowException("Workflow deadlocked.")
while self.jobs:
try:
events.close()
if self.final_output is None:
- raise workflow.WorkflowException("Workflow did not return a result.")
+ raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
return self.final_output
return prefix+fn
-def uploadfiles(files, api):
+def uploadfiles(files, api, dry_run=False, num_retries=0, project=None):
# Find the smallest path prefix that includes all the files that need to be uploaded.
# This starts at the root and iteratively removes common parent directory prefixes
# until all file pathes no longer have a common parent.
+ n = True
+ pathprefix = "/"
while n:
pathstep = None
for c in files:
logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
- if args.dry_run:
+ if dry_run:
logger.info("$(input) is %s", pathprefix.rstrip('/'))
pdh = "$(input)"
else:
files = sorted(files, key=lambda x: x.fn)
- collection = arvados.CollectionWriter(api, num_retries=args.retries)
+ collection = arvados.CollectionWriter(api, num_retries=num_retries)
stream = None
for f in files:
sp = os.path.split(f.fn)
command[i] = statfile(m.group(1), m.group(2))
break
- n = True
- pathprefix = "/"
files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
if files:
- uploadfiles(files, api)
+ uploadfiles(files, api, dry_run=args.dry_run, num_retries=args.num_retries, project=project)
for i in xrange(1, len(slots)):
slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]