import os
import urlparse
from functools import partial
+import logging
+import json
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles
+from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
from cwltool.load_tool import fetch_document
+import arvados.collection
+
from .arvdocker import arv_docker_get_image
from .pathmapper import ArvPathMapper
+logger = logging.getLogger('arvados.cwl-runner')
+
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse):
self.arvrunner = runner
self.job_order = job_order
self.running = False
self.enable_reuse = enable_reuse
+ self.uuid = None
def update_pipeline_component(self, record):
pass
return path
document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
+ loaded = set()
def loadref(b, u):
- return document_loader.fetch(urlparse.urljoin(b, u))
+ joined = urlparse.urljoin(b, u)
+ if joined not in loaded:
+ loaded.add(joined)
+ return document_loader.fetch(urlparse.urljoin(b, u))
+ else:
+ return {}
sc = scandeps(uri, workflowobj,
set(("$import", "run")),
adjustFiles(sc, partial(visitFiles, workflowfiles))
adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+ keepprefix = kwargs.get("keepprefix", "")
workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
- "%s",
- "%s/%s",
+ keepprefix+"%s",
+ keepprefix+"%s/%s",
name=self.name,
**kwargs)
jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
- "%s",
- "%s/%s",
+ keepprefix+"%s",
+ keepprefix+"%s/%s",
name=os.path.basename(self.job_order.get("id", "#")),
**kwargs)
def done(self, record):
if record["state"] == "Complete":
- processStatus = "success"
+ if record.get("exit_code") is not None:
+ if record["exit_code"] == 33:
+ processStatus = "UnsupportedRequirement"
+ elif record["exit_code"] == 0:
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+ else:
+ processStatus = "success"
else:
processStatus = "permanentFail"
def keepify(path):
if not path.startswith("keep:"):
return "keep:%s/%s" % (record["output"], path)
+ else:
+ return path
adjustFiles(outputs, keepify)
except Exception as e:
logger.error("While getting final output object: %s", e)
self.arvrunner.output_callback(outputs, processStatus)
finally:
- del self.arvrunner.jobs[record["uuid"]]
+ del self.arvrunner.processes[record["uuid"]]