3 from functools import partial
9 from StringIO import StringIO
11 from schema_salad.sourceline import SourceLine
13 import cwltool.draft2tool
14 from cwltool.draft2tool import CommandLineTool
15 import cwltool.workflow
16 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
17 from cwltool.load_tool import fetch_document
18 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
19 from cwltool.utils import aslist
20 from cwltool.builder import substitute
21 from cwltool.pack import pack
23 import arvados.collection
24 import ruamel.yaml as yaml
26 from .arvdocker import arv_docker_get_image
27 from .pathmapper import ArvPathMapper
28 from ._version import __version__
31 logger = logging.getLogger('arvados.cwl-runner')
33 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
35 def trim_listing(obj):
36 """Remove 'listing' field from Directory objects that are keep references.
38 When Directory objects represent Keep references, it redundant and
39 potentially very expensive to pass fully enumerated Directory objects
40 between instances of cwl-runner (e.g. a submitting a job, or using the
41 RunInSingleContainer feature), so delete the 'listing' field when it is
45 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 if obj.get("location", "").startswith("_:"):
50 def upload_dependencies(arvrunner, name, document_loader,
51 workflowobj, uri, loadref_run, include_primary=True):
52 """Upload the dependencies of the workflowobj document to Keep.
54 Returns a pathmapper object mapping local paths to keep references. Also
55 does an in-place update of references in "workflowobj".
57 Use scandeps to find $import, $include, $schemas, run, File and Directory
58 fields that represent external references.
60 If workflowobj has an "id" field, this will reload the document to ensure
61 it is scanning the raw document prior to preprocessing.
66 joined = document_loader.fetcher.urljoin(b, u)
67 defrg, _ = urlparse.urldefrag(joined)
68 if defrg not in loaded:
70 # Use fetch_text to get raw file (before preprocessing).
71 text = document_loader.fetch_text(defrg)
72 if isinstance(text, bytes):
73 textIO = StringIO(text.decode('utf-8'))
75 textIO = StringIO(text)
76 return yaml.safe_load(textIO)
81 loadref_fields = set(("$import", "run"))
83 loadref_fields = set(("$import",))
86 if "id" in workflowobj:
87 # Need raw file content (before preprocessing) to ensure
88 # that external references in $include and $mixin are captured.
89 scanobj = loadref("", workflowobj["id"])
91 sc = scandeps(uri, scanobj,
93 set(("$include", "$schemas", "location")),
94 loadref, urljoin=document_loader.fetcher.urljoin)
96 normalizeFilesDirs(sc)
98 if include_primary and "id" in workflowobj:
99 sc.append({"class": "File", "location": workflowobj["id"]})
101 if "$schemas" in workflowobj:
102 for s in workflowobj["$schemas"]:
103 sc.append({"class": "File", "location": s})
105 mapper = ArvPathMapper(arvrunner, sc, "",
111 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
112 p["location"] = mapper.mapper(p["location"]).resolved
113 adjustFileObjs(workflowobj, setloc)
114 adjustDirObjs(workflowobj, setloc)
116 if "$schemas" in workflowobj:
118 for s in workflowobj["$schemas"]:
119 sch.append(mapper.mapper(s).resolved)
120 workflowobj["$schemas"] = sch
125 def upload_docker(arvrunner, tool):
126 """Visitor which uploads Docker images referenced in CommandLineTool objects."""
127 if isinstance(tool, CommandLineTool):
128 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
130 if docker_req.get("dockerOutputDirectory"):
131 # TODO: can be supported by containers API, but not jobs API.
132 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
133 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
134 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
136 def packed_workflow(arvrunner, tool):
137 """Create a packed workflow.
139 A "packed" workflow is one where all the components have been combined into a single document."""
141 return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
142 tool.tool["id"], tool.metadata)
144 def tag_git_version(packed):
145 if tool.tool["id"].startswith("file://"):
146 path = os.path.dirname(tool.tool["id"][7:])
148 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
149 except (OSError, subprocess.CalledProcessError):
152 packed["http://schema.org/version"] = githash
155 def upload_job_order(arvrunner, name, tool, job_order):
156 """Upload local files referenced in the input object and return updated input
157 object with 'location' updated to the proper keep references.
160 for t in tool.tool["inputs"]:
161 def setSecondary(fileobj):
162 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
163 if "secondaryFiles" not in fileobj:
164 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
166 if isinstance(fileobj, list):
170 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
171 setSecondary(job_order[shortname(t["id"])])
173 jobmapper = upload_dependencies(arvrunner,
177 job_order.get("id", "#"),
180 if "id" in job_order:
183 # Need to filter this out, gets added by cwltool when providing
184 # parameters on the command line.
185 if "job_order" in job_order:
186 del job_order["job_order"]
190 def upload_workflow_deps(arvrunner, tool):
191 # Ensure that Docker images needed by this workflow are available
192 tool.visit(partial(upload_docker, arvrunner))
194 document_loader = tool.doc_loader
196 def upload_tool_deps(deptool):
198 upload_dependencies(arvrunner,
199 "%s dependencies" % (shortname(deptool["id"])),
204 include_primary=False)
205 document_loader.idx[deptool["id"]] = deptool
207 tool.visit(upload_tool_deps)
209 def arvados_jobs_image(arvrunner, img):
210 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
213 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
214 except Exception as e:
215 raise Exception("Docker image %s is not available\n%s" % (img, e) )
218 def upload_workflow_collection(arvrunner, name, packed):
219 collection = arvados.collection.Collection(api_client=arvrunner.api,
220 keep_client=arvrunner.keep_client,
221 num_retries=arvrunner.num_retries)
222 with collection.open("workflow.cwl", "w") as f:
223 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
225 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
226 ["name", "like", name+"%"]]
227 if arvrunner.project_uuid:
228 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
229 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
232 logger.info("Using collection %s", exists["items"][0]["uuid"])
234 collection.save_new(name=name,
235 owner_uuid=arvrunner.project_uuid,
236 ensure_unique_name=True,
237 num_retries=arvrunner.num_retries)
238 logger.info("Uploaded to %s", collection.manifest_locator())
240 return collection.portable_data_hash()
243 class Runner(object):
244 """Base class for runner processes, which submit an instance of
245 arvados-cwl-runner and wait for the final result."""
247 def __init__(self, runner, tool, job_order, enable_reuse,
248 output_name, output_tags, submit_runner_ram=0,
249 name=None, on_error=None, submit_runner_image=None):
250 self.arvrunner = runner
252 self.job_order = job_order
254 self.enable_reuse = enable_reuse
256 self.final_output = None
257 self.output_name = output_name
258 self.output_tags = output_tags
260 self.on_error = on_error
261 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
263 if submit_runner_ram:
264 self.submit_runner_ram = submit_runner_ram
266 self.submit_runner_ram = 1024
268 if self.submit_runner_ram <= 0:
269 raise Exception("Value of --submit-runner-ram must be greater than zero")
271 def update_pipeline_component(self, record):
274 def done(self, record):
275 """Base method for handling a completed runner."""
278 if record["state"] == "Complete":
279 if record.get("exit_code") is not None:
280 if record["exit_code"] == 33:
281 processStatus = "UnsupportedRequirement"
282 elif record["exit_code"] == 0:
283 processStatus = "success"
285 processStatus = "permanentFail"
287 processStatus = "success"
289 processStatus = "permanentFail"
293 if processStatus == "permanentFail":
294 logc = arvados.collection.CollectionReader(record["log"],
295 api_client=self.arvrunner.api,
296 keep_client=self.arvrunner.keep_client,
297 num_retries=self.arvrunner.num_retries)
298 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
300 self.final_output = record["output"]
301 outc = arvados.collection.CollectionReader(self.final_output,
302 api_client=self.arvrunner.api,
303 keep_client=self.arvrunner.keep_client,
304 num_retries=self.arvrunner.num_retries)
305 if "cwl.output.json" in outc:
306 with outc.open("cwl.output.json") as f:
308 outputs = json.load(f)
309 def keepify(fileobj):
310 path = fileobj["location"]
311 if not path.startswith("keep:"):
312 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
313 adjustFileObjs(outputs, keepify)
314 adjustDirObjs(outputs, keepify)
315 except Exception as e:
316 logger.exception("[%s] While getting final output object: %s", self.name, e)
317 self.arvrunner.output_callback({}, "permanentFail")
319 self.arvrunner.output_callback(outputs, processStatus)
321 if record["uuid"] in self.arvrunner.processes:
322 del self.arvrunner.processes[record["uuid"]]