3 from functools import partial
7 from cStringIO import StringIO
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15 from cwltool.utils import aslist
16 from cwltool.builder import substitute
18 import arvados.collection
19 import ruamel.yaml as yaml
21 from .arvdocker import arv_docker_get_image
22 from .pathmapper import ArvPathMapper
23 from ._version import __version__
25 logger = logging.getLogger('arvados.cwl-runner')
27 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
29 def trim_listing(obj):
30 """Remove 'listing' field from Directory objects that are keep references.
32 When Directory objects represent Keep references, it redundant and
33 potentially very expensive to pass fully enumerated Directory objects
34 between instances of cwl-runner (e.g. a submitting a job, or using the
35 RunInSingleContainer feature), so delete the 'listing' field when it is
39 if obj.get("location", "").startswith("keep:") and "listing" in obj:
41 if obj.get("location", "").startswith("_:"):
44 def upload_dependencies(arvrunner, name, document_loader,
45 workflowobj, uri, loadref_run):
46 """Upload the dependencies of the workflowobj document to Keep.
48 Returns a pathmapper object mapping local paths to keep references. Also
49 does an in-place update of references in "workflowobj".
51 Use scandeps to find $import, $include, $schemas, run, File and Directory
52 fields that represent external references.
54 If workflowobj has an "id" field, this will reload the document to ensure
55 it is scanning the raw document prior to preprocessing.
60 joined = urlparse.urljoin(b, u)
61 defrg, _ = urlparse.urldefrag(joined)
62 if defrg not in loaded:
64 # Use fetch_text to get raw file (before preprocessing).
65 text = document_loader.fetch_text(defrg)
66 if isinstance(text, bytes):
67 textIO = StringIO(text.decode('utf-8'))
69 textIO = StringIO(text)
70 return yaml.safe_load(textIO)
75 loadref_fields = set(("$import", "run"))
77 loadref_fields = set(("$import",))
80 if "id" in workflowobj:
81 # Need raw file content (before preprocessing) to ensure
82 # that external references in $include and $mixin are captured.
83 scanobj = loadref("", workflowobj["id"])
85 sc = scandeps(uri, scanobj,
87 set(("$include", "$schemas", "location")),
90 normalizeFilesDirs(sc)
92 if "id" in workflowobj:
93 sc.append({"class": "File", "location": workflowobj["id"]})
95 mapper = ArvPathMapper(arvrunner, sc, "",
101 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
102 p["location"] = mapper.mapper(p["location"]).resolved
103 adjustFileObjs(workflowobj, setloc)
104 adjustDirObjs(workflowobj, setloc)
109 def upload_docker(arvrunner, tool):
110 if isinstance(tool, CommandLineTool):
111 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
113 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
114 elif isinstance(tool, cwltool.workflow.Workflow):
116 upload_docker(arvrunner, s.embedded_tool)
118 def upload_instance(arvrunner, name, tool, job_order):
119 upload_docker(arvrunner, tool)
121 for t in tool.tool["inputs"]:
122 def setSecondary(fileobj):
123 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
124 if "secondaryFiles" not in fileobj:
125 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
127 if isinstance(fileobj, list):
131 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
132 setSecondary(job_order[shortname(t["id"])])
134 workflowmapper = upload_dependencies(arvrunner,
140 jobmapper = upload_dependencies(arvrunner,
141 os.path.basename(job_order.get("id", "#")),
144 job_order.get("id", "#"),
147 if "id" in job_order:
150 return workflowmapper
152 def arvados_jobs_image(arvrunner):
153 img = "arvados/jobs:"+__version__
155 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
156 except Exception as e:
157 raise Exception("Docker image %s is not available\n%s" % (img, e) )
160 class Runner(object):
161 def __init__(self, runner, tool, job_order, enable_reuse, output_name):
162 self.arvrunner = runner
164 self.job_order = job_order
166 self.enable_reuse = enable_reuse
168 self.final_output = None
169 self.output_name = output_name
171 def update_pipeline_component(self, record):
174 def arvados_job_spec(self, *args, **kwargs):
175 self.name = os.path.basename(self.tool.tool["id"])
176 workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
177 adjustDirObjs(self.job_order, trim_listing)
178 return workflowmapper
180 def done(self, record):
181 if record["state"] == "Complete":
182 if record.get("exit_code") is not None:
183 if record["exit_code"] == 33:
184 processStatus = "UnsupportedRequirement"
185 elif record["exit_code"] == 0:
186 processStatus = "success"
188 processStatus = "permanentFail"
190 processStatus = "success"
192 processStatus = "permanentFail"
197 self.final_output = record["output"]
198 outc = arvados.collection.CollectionReader(self.final_output,
199 api_client=self.arvrunner.api,
200 keep_client=self.arvrunner.keep_client,
201 num_retries=self.arvrunner.num_retries)
202 with outc.open("cwl.output.json") as f:
203 outputs = json.load(f)
204 def keepify(fileobj):
205 path = fileobj["location"]
206 if not path.startswith("keep:"):
207 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
208 adjustFileObjs(outputs, keepify)
209 adjustDirObjs(outputs, keepify)
210 except Exception as e:
211 logger.error("While getting final output object: %s", e)
212 self.arvrunner.output_callback(outputs, processStatus)
214 del self.arvrunner.processes[record["uuid"]]