Merge branch '12430-output-glob'
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import json
7 import copy
8 import logging
9 import urllib
10 import sys
11 import re
12
13 from io import StringIO
14 from typing import (MutableSequence, MutableMapping)
15
16 from ruamel.yaml import YAML
17 from ruamel.yaml.comments import CommentedMap, CommentedSeq
18
19 from schema_salad.sourceline import SourceLine, cmap
20 import schema_salad.ref_resolver
21
22 import arvados.collection
23
24 from cwltool.pack import pack
25 from cwltool.load_tool import fetch_document, resolve_and_validate_document
26 from cwltool.process import shortname, uniquename
27 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
28 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
29 from cwltool.context import LoadingContext, getdefault
30
31 from schema_salad.ref_resolver import file_uri, uri_file_path
32
33 import ruamel.yaml as yaml
34
35 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
36                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
37                      make_builder, arvados_jobs_image, FileUpdates)
38 from .arvcontainer import RunnerContainer
39 from .pathmapper import ArvPathMapper, trim_listing
40 from .arvtool import ArvadosCommandTool, set_cluster_target
41 from ._version import __version__
42 from .util import common_prefix
43 from .arvdocker import arv_docker_get_image
44
45 from .perf import Perf
46
47 logger = logging.getLogger('arvados.cwl-runner')
48 metrics = logging.getLogger('arvados.cwl-runner.metrics')
49
50 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
51 sum_res_pars = ("outdirMin", "outdirMax")
52
53 _basetype_re = re.compile(r'''(?:
54 Directory
55 |File
56 |array
57 |boolean
58 |double
59 |enum
60 |float
61 |int
62 |long
63 |null
64 |record
65 |string
66 )(?:\[\])?\??''', re.VERBOSE)
67
68 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
69     col = arvados.collection.Collection(api_client=arvRunner.api,
70                                         keep_client=arvRunner.keep_client)
71
72     with col.open("workflow.json", "wt") as f:
73         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
74
75     pdh = col.portable_data_hash()
76
77     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
78     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
79         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
80
81     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
82     if len(existing["items"]) == 0:
83         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
84
85     # now construct the wrapper
86
87     step = {
88         "id": "#main/" + toolname,
89         "in": [],
90         "out": [],
91         "run": "keep:%s/workflow.json#main" % pdh,
92         "label": name
93     }
94
95     newinputs = []
96     for i in main["inputs"]:
97         inp = {}
98         # Make sure to only copy known fields that are meaningful at
99         # the workflow level. In practice this ensures that if we're
100         # wrapping a CommandLineTool we don't grab inputBinding.
101         # Right now also excludes extension fields, which is fine,
102         # Arvados doesn't currently look for any extension fields on
103         # input parameters.
104         for f in ("type", "label", "secondaryFiles", "streamable",
105                   "doc", "id", "format", "loadContents",
106                   "loadListing", "default"):
107             if f in i:
108                 inp[f] = i[f]
109         newinputs.append(inp)
110
111     wrapper = {
112         "class": "Workflow",
113         "id": "#main",
114         "inputs": newinputs,
115         "outputs": [],
116         "steps": [step]
117     }
118
119     for i in main["inputs"]:
120         step["in"].append({
121             "id": "#main/step/%s" % shortname(i["id"]),
122             "source": i["id"]
123         })
124
125     for i in main["outputs"]:
126         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
127         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
128                                    "type": i["type"],
129                                    "id": i["id"]})
130
131     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
132
133     if main.get("requirements"):
134         wrapper["requirements"].extend(main["requirements"])
135     if main.get("hints"):
136         wrapper["hints"] = main["hints"]
137
138     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
139
140     if git_info:
141         for g in git_info:
142             doc[g] = git_info[g]
143
144     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
145
146
147 def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
148     if s.startswith("keep:") or s.startswith("arvwf:"):
149         return s
150
151     uri = urlexpander(s, baseuri)
152
153     if uri.startswith("keep:"):
154         return uri
155
156     fileuri = urllib.parse.urldefrag(baseuri)[0]
157
158     for u in (baseuri, fileuri):
159         if u in merged_map:
160             replacements = merged_map[u].resolved
161             if uri in replacements:
162                 return replacements[uri]
163
164     if uri in jobmapper:
165         return jobmapper.mapper(uri).target
166
167     p1 = os.path.dirname(uri_file_path(fileuri))
168     p2 = os.path.dirname(uri_file_path(uri))
169     p3 = os.path.basename(uri_file_path(uri))
170
171     r = os.path.relpath(p2, p1)
172     if r == ".":
173         r = ""
174
175     return os.path.join(r, p3)
176
177 def is_basetype(tp):
178     return _basetype_re.match(tp) is not None
179
180 def update_refs(api, d, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix):
181     if isinstance(d, MutableSequence):
182         for i, s in enumerate(d):
183             if prefix and isinstance(s, str):
184                 if s.startswith(prefix):
185                     d[i] = replacePrefix+s[len(prefix):]
186             else:
187                 update_refs(api, s, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
188     elif isinstance(d, MutableMapping):
189         for field in ("id", "name"):
190             if isinstance(d.get(field), str) and d[field].startswith("_:"):
191                 # blank node reference, was added in automatically, can get rid of it.
192                 del d[field]
193
194         if "id" in d:
195             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
196         elif "name" in d and isinstance(d["name"], str):
197             baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
198
199         if d.get("class") == "DockerRequirement":
200             d["http://arvados.org/cwl#dockerCollectionPDH"] = arv_docker_get_image(api, d, False,
201                                                                                    runtimeContext)
202
203         for field in d:
204             if field in ("location", "run", "name") and isinstance(d[field], str):
205                 d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map, jobmapper)
206                 continue
207
208             if field in ("$include", "$import") and isinstance(d[field], str):
209                 d[field] = rel_ref(d[field], baseuri, urlexpander, {}, jobmapper)
210                 continue
211
212             for t in ("type", "items"):
213                 if (field == t and
214                     isinstance(d[t], str) and
215                     not is_basetype(d[t])):
216                     d[t] = rel_ref(d[t], baseuri, urlexpander, merged_map, jobmapper)
217                     continue
218
219             if field == "inputs" and isinstance(d["inputs"], MutableMapping):
220                 for inp in d["inputs"]:
221                     if isinstance(d["inputs"][inp], str) and not is_basetype(d["inputs"][inp]):
222                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper)
223                     if isinstance(d["inputs"][inp], MutableMapping):
224                         update_refs(api, d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
225                 continue
226
227             if field in ("requirements", "hints") and isinstance(d[field], MutableMapping):
228                 dr = d[field].get("DockerRequirement")
229                 if dr:
230                     dr["http://arvados.org/cwl#dockerCollectionPDH"] = arv_docker_get_image(api, dr, False,
231                                                                                             runtimeContext)
232
233             if field == "$schemas":
234                 for n, s in enumerate(d["$schemas"]):
235                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map, jobmapper)
236                 continue
237
238             update_refs(api, d[field], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
239
240
241 def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
242     req = copy.deepcopy(req)
243
244     for f in req["types"]:
245         r = f["name"]
246         path, frag = urllib.parse.urldefrag(r)
247         rel = rel_ref(r, baseuri, urlexpander, merged_map, jobmapper)
248         merged_map.setdefault(path, FileUpdates({}, {}))
249         rename = "keep:%s/%s" %(pdh, rel)
250         for mm in merged_map:
251             merged_map[mm].resolved[r] = rename
252     return req
253
254
255 def drop_ids(d):
256     if isinstance(d, MutableSequence):
257         for i, s in enumerate(d):
258             drop_ids(s)
259     elif isinstance(d, MutableMapping):
260         if "id" in d and d["id"].startswith("file:"):
261             del d["id"]
262
263         for field in d:
264             drop_ids(d[field])
265
266
267 def upload_workflow(arvRunner, tool, job_order, project_uuid,
268                         runtimeContext,
269                         uuid=None,
270                         submit_runner_ram=0, name=None, merged_map=None,
271                         submit_runner_image=None,
272                         git_info=None,
273                         set_defaults=False,
274                         jobmapper=None):
275
276     firstfile = None
277     workflow_files = set()
278     import_files = set()
279     include_files = set()
280
281     # The document loader index will have entries for all the files
282     # that were loaded in the process of parsing the entire workflow
283     # (including subworkflows, tools, imports, etc).  We use this to
284     # get compose a list of the workflow file dependencies.
285     for w in tool.doc_loader.idx:
286         if w.startswith("file://"):
287             workflow_files.add(urllib.parse.urldefrag(w)[0])
288             if firstfile is None:
289                 firstfile = urllib.parse.urldefrag(w)[0]
290         if w.startswith("import:file://"):
291             import_files.add(urllib.parse.urldefrag(w[7:])[0])
292         if w.startswith("include:file://"):
293             include_files.add(urllib.parse.urldefrag(w[8:])[0])
294
295     all_files = workflow_files | import_files | include_files
296
297     # Find the longest common prefix among all the file names.  We'll
298     # use this to recreate the directory structure in a keep
299     # collection with correct relative references.
300     prefix = common_prefix(firstfile, all_files) if firstfile else ""
301
302
303     col = arvados.collection.Collection(api_client=arvRunner.api)
304
305     # Now go through all the files and update references to other
306     # files.  We previously scanned for file dependencies, these are
307     # are passed in as merged_map.
308     #
309     # note about merged_map: we upload dependencies of each process
310     # object (CommandLineTool/Workflow) to a separate collection.
311     # That way, when the user edits something, this limits collection
312     # PDH changes to just that tool, and minimizes situations where
313     # small changes break container reuse for the whole workflow.
314     #
315     for w in workflow_files | import_files:
316         # 1. load the YAML  file
317
318         text = tool.doc_loader.fetch_text(w)
319         if isinstance(text, bytes):
320             textIO = StringIO(text.decode('utf-8'))
321         else:
322             textIO = StringIO(text)
323
324         yamlloader = schema_salad.utils.yaml_no_ts()
325         result = yamlloader.load(textIO)
326
327         # If the whole document is in "flow style" it is probably JSON
328         # formatted.  We'll re-export it as JSON because the
329         # ruamel.yaml round-trip mode is a lie and only preserves
330         # "block style" formatting and not "flow style" formatting.
331         export_as_json = result.fa.flow_style()
332
333         # 2. find $import, $include, $schema, run, location
334         # 3. update field value
335         update_refs(arvRunner.api, result, w, tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, "", "")
336
337         # Write the updated file to the collection.
338         with col.open(w[len(prefix):], "wt") as f:
339             if export_as_json:
340                 json.dump(result, f, indent=4, separators=(',',': '))
341             else:
342                 yamlloader.dump(result, stream=f)
343
344         # Also store a verbatim copy of the original files
345         with col.open(os.path.join("original", w[len(prefix):]), "wt") as f:
346             f.write(text)
347
348
349     # Upload files referenced by $include directives, these are used
350     # unchanged and don't need to be updated.
351     for w in include_files:
352         with col.open(w[len(prefix):], "wb") as f1:
353             with col.open(os.path.join("original", w[len(prefix):]), "wb") as f3:
354                 with open(uri_file_path(w), "rb") as f2:
355                     dat = f2.read(65536)
356                     while dat:
357                         f1.write(dat)
358                         f3.write(dat)
359                         dat = f2.read(65536)
360
361     # Now collect metadata: the collection name and git properties.
362
363     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
364     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
365         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
366
367     toolfile = tool.tool["id"][len(prefix):]
368
369     properties = {
370         "type": "workflow",
371         "arv:workflowMain": toolfile,
372     }
373
374     if git_info:
375         for g in git_info:
376             p = g.split("#", 1)[1]
377             properties["arv:"+p] = git_info[g]
378
379     # Check if a collection with the same content already exists in the target project.  If so, just use that one.
380     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
381                                                          ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
382
383     if len(existing["items"]) == 0:
384         toolname = toolname.replace("/", " ")
385         col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
386         logger.info("Workflow uploaded to %s", col.manifest_locator())
387     else:
388         logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
389
390     # Now that we've updated the workflow and saved it to a
391     # collection, we're going to construct a minimal "wrapper"
392     # workflow which consists of only of input and output parameters
393     # connected to a single step that runs the real workflow.
394
395     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
396
397     step = {
398         "id": "#main/" + toolname,
399         "in": [],
400         "out": [],
401         "run": runfile,
402         "label": name
403     }
404
405     main = tool.tool
406
407     wf_runner_resources = None
408
409     hints = main.get("hints", [])
410     found = False
411     for h in hints:
412         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
413             wf_runner_resources = h
414             found = True
415             break
416     if not found:
417         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
418         hints.append(wf_runner_resources)
419
420     if "acrContainerImage" not in wf_runner_resources:
421         wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
422                                                                       submit_runner_image or "arvados/jobs:"+__version__,
423                                                                       runtimeContext)
424
425     if submit_runner_ram:
426         wf_runner_resources["ramMin"] = submit_runner_ram
427
428     # Remove a few redundant fields from the "job order" (aka input
429     # object or input parameters).  In the situation where we're
430     # creating or updating a workflow record, any values in the job
431     # order get copied over as default values for input parameters.
432     adjustDirObjs(job_order, trim_listing)
433     adjustFileObjs(job_order, trim_anonymous_location)
434     adjustDirObjs(job_order, trim_anonymous_location)
435
436     newinputs = []
437     for i in main["inputs"]:
438         inp = {}
439         # Make sure to only copy known fields that are meaningful at
440         # the workflow level. In practice this ensures that if we're
441         # wrapping a CommandLineTool we don't grab inputBinding.
442         # Right now also excludes extension fields, which is fine,
443         # Arvados doesn't currently look for any extension fields on
444         # input parameters.
445         for f in ("type", "label", "secondaryFiles", "streamable",
446                   "doc", "format", "loadContents",
447                   "loadListing", "default"):
448             if f in i:
449                 inp[f] = i[f]
450
451         if set_defaults:
452             sn = shortname(i["id"])
453             if sn in job_order:
454                 inp["default"] = job_order[sn]
455
456         inp["id"] = "#main/%s" % shortname(i["id"])
457         newinputs.append(inp)
458
459     wrapper = {
460         "class": "Workflow",
461         "id": "#main",
462         "inputs": newinputs,
463         "outputs": [],
464         "steps": [step]
465     }
466
467     for i in main["inputs"]:
468         step["in"].append({
469             "id": "#main/step/%s" % shortname(i["id"]),
470             "source": "#main/%s" % shortname(i["id"])
471         })
472
473     for i in main["outputs"]:
474         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
475         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
476                                    "type": i["type"],
477                                    "id": "#main/%s" % shortname(i["id"])})
478
479     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
480
481     if main.get("requirements"):
482         wrapper["requirements"].extend(main["requirements"])
483     if hints:
484         wrapper["hints"] = hints
485
486     # Schema definitions (this lets you define things like record
487     # types) require a special handling.
488
489     for i, r in enumerate(wrapper["requirements"]):
490         if r["class"] == "SchemaDefRequirement":
491             wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
492
493     update_refs(arvRunner.api, wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, main["id"]+"#", "#main/")
494
495     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
496
497     if git_info:
498         for g in git_info:
499             doc[g] = git_info[g]
500
501     # Remove any lingering file references.
502     drop_ids(wrapper)
503
504     return doc
505
506
507 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
508
509     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
510
511     body = {
512         "workflow": {
513             "name": name,
514             "description": tool.tool.get("doc", ""),
515             "definition": wrappertext
516         }}
517     if project_uuid:
518         body["workflow"]["owner_uuid"] = project_uuid
519
520     if update_uuid:
521         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
522     else:
523         call = arvRunner.api.workflows().create(body=body)
524     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
525
526
527 def dedup_reqs(reqs):
528     dedup = {}
529     for r in reversed(reqs):
530         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
531             dedup[r["class"]] = r
532     return [dedup[r] for r in sorted(dedup.keys())]
533
534 def get_overall_res_req(res_reqs):
535     """Take the overall of a list of ResourceRequirement,
536     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
537     and the sum of outdirMin, outdirMax."""
538
539     all_res_req = {}
540     exception_msgs = []
541     for a in max_res_pars + sum_res_pars:
542         all_res_req[a] = []
543         for res_req in res_reqs:
544             if a in res_req:
545                 if isinstance(res_req[a], int): # integer check
546                     all_res_req[a].append(res_req[a])
547                 else:
548                     msg = SourceLine(res_req, a).makeError(
549                     "Non-top-level ResourceRequirement in single container cannot have expressions")
550                     exception_msgs.append(msg)
551     if exception_msgs:
552         raise WorkflowException("\n".join(exception_msgs))
553     else:
554         overall_res_req = {}
555         for a in all_res_req:
556             if all_res_req[a]:
557                 if a in max_res_pars:
558                     overall_res_req[a] = max(all_res_req[a])
559                 elif a in sum_res_pars:
560                     overall_res_req[a] = sum(all_res_req[a])
561         if overall_res_req:
562             overall_res_req["class"] = "ResourceRequirement"
563         return cmap(overall_res_req)
564
565 class ArvadosWorkflowStep(WorkflowStep):
566     def __init__(self,
567                  toolpath_object,      # type: Dict[Text, Any]
568                  pos,                  # type: int
569                  loadingContext,       # type: LoadingContext
570                  arvrunner,
571                  *argc,
572                  **argv
573                 ):  # type: (...) -> None
574
575         if arvrunner.fast_submit:
576             self.tool = toolpath_object
577             self.tool["inputs"] = []
578             self.tool["outputs"] = []
579         else:
580             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
581             self.tool["class"] = "WorkflowStep"
582         self.arvrunner = arvrunner
583
584     def job(self, joborder, output_callback, runtimeContext):
585         runtimeContext = runtimeContext.copy()
586         runtimeContext.toplevel = True  # Preserve behavior for #13365
587
588         builder = make_builder({shortname(k): v for k, v in joborder.items()}, self.hints, self.requirements,
589                                runtimeContext, self.metadata)
590         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
591         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
592
593
594 class ArvadosWorkflow(Workflow):
595     """Wrap cwltool Workflow to override selected methods."""
596
597     def __init__(self, arvrunner, toolpath_object, loadingContext):
598         self.arvrunner = arvrunner
599         self.wf_pdh = None
600         self.dynamic_resource_req = []
601         self.static_resource_req = []
602         self.wf_reffiles = []
603         self.loadingContext = loadingContext.copy()
604
605         self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
606         tool_requirements = toolpath_object.get("requirements", [])
607         self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
608         tool_hints = toolpath_object.get("hints", [])
609
610         workflow_runner_req, _ = self.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
611         if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
612             self.loadingContext.default_docker_image = workflow_runner_req.get("acrContainerImage")
613
614         super(ArvadosWorkflow, self).__init__(toolpath_object, self.loadingContext)
615         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
616
617
618     def runInSingleContainer(self, joborder, output_callback, runtimeContext, builder):
619         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
620             if "id" not in self.tool:
621                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
622
623         discover_secondary_files(self.arvrunner.fs_access, builder,
624                                  self.tool["inputs"], joborder)
625
626         normalizeFilesDirs(joborder)
627
628         with Perf(metrics, "subworkflow upload_deps"):
629             upload_dependencies(self.arvrunner,
630                                 os.path.basename(joborder.get("id", "#")),
631                                 self.doc_loader,
632                                 joborder,
633                                 joborder.get("id", "#"),
634                                 runtimeContext)
635
636             if self.wf_pdh is None:
637                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
638
639                 for p in packed["$graph"]:
640                     if p["id"] == "#main":
641                         p["requirements"] = dedup_reqs(self.requirements)
642                         p["hints"] = dedup_reqs(self.hints)
643
644                 def visit(item):
645                     if "requirements" in item:
646                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
647                     for t in ("hints", "requirements"):
648                         if t not in item:
649                             continue
650                         for req in item[t]:
651                             if req["class"] == "ResourceRequirement":
652                                 dyn = False
653                                 for k in max_res_pars + sum_res_pars:
654                                     if k in req:
655                                         if isinstance(req[k], str):
656                                             if item["id"] == "#main":
657                                                 # only the top-level requirements/hints may contain expressions
658                                                 self.dynamic_resource_req.append(req)
659                                                 dyn = True
660                                                 break
661                                             else:
662                                                 with SourceLine(req, k, WorkflowException):
663                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
664                                 if not dyn:
665                                     self.static_resource_req.append(req)
666
667                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
668
669                 if self.static_resource_req:
670                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
671
672                 upload_dependencies(self.arvrunner,
673                                     runtimeContext.name,
674                                     self.doc_loader,
675                                     packed,
676                                     self.tool["id"],
677                                     runtimeContext)
678
679                 # Discover files/directories referenced by the
680                 # workflow (mainly "default" values)
681                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
682
683
684         if self.dynamic_resource_req:
685             # Evaluate dynamic resource requirements using current builder
686             rs = copy.copy(self.static_resource_req)
687             for dyn_rs in self.dynamic_resource_req:
688                 eval_req = {"class": "ResourceRequirement"}
689                 for a in max_res_pars + sum_res_pars:
690                     if a in dyn_rs:
691                         eval_req[a] = builder.do_eval(dyn_rs[a])
692                 rs.append(eval_req)
693             job_res_reqs = [get_overall_res_req(rs)]
694         else:
695             job_res_reqs = self.static_resource_req
696
697         with Perf(metrics, "subworkflow adjust"):
698             joborder_resolved = copy.deepcopy(joborder)
699             joborder_keepmount = copy.deepcopy(joborder)
700
701             reffiles = []
702             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
703
704             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
705                                    "/keep/%s",
706                                    "/keep/%s/%s")
707
708             # For containers API, we need to make sure any extra
709             # referenced files (ie referenced by the workflow but
710             # not in the inputs) are included in the mounts.
711             if self.wf_reffiles:
712                 runtimeContext = runtimeContext.copy()
713                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
714
715             def keepmount(obj):
716                 remove_redundant_fields(obj)
717                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
718                     if "location" not in obj:
719                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
720                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
721                     if obj["location"].startswith("keep:"):
722                         obj["location"] = mapper.mapper(obj["location"]).target
723                         if "listing" in obj:
724                             del obj["listing"]
725                     elif obj["location"].startswith("_:"):
726                         del obj["location"]
727                     else:
728                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
729
730             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
731
732             def resolved(obj):
733                 if obj["location"].startswith("keep:"):
734                     obj["location"] = mapper.mapper(obj["location"]).resolved
735
736             visit_class(joborder_resolved, ("File", "Directory"), resolved)
737
738             if self.wf_pdh is None:
739                 adjustFileObjs(packed, keepmount)
740                 adjustDirObjs(packed, keepmount)
741                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
742
743         self.loadingContext = self.loadingContext.copy()
744         self.loadingContext.metadata = self.loadingContext.metadata.copy()
745         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
746
747         if len(job_res_reqs) == 1:
748             # RAM request needs to be at least 128 MiB or the workflow
749             # runner itself won't run reliably.
750             if job_res_reqs[0].get("ramMin", 1024) < 128:
751                 job_res_reqs[0]["ramMin"] = 128
752
753         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
754         if runtimeContext.debug:
755             arguments.insert(0, '--debug')
756
757         wf_runner = cmap({
758             "class": "CommandLineTool",
759             "baseCommand": "cwltool",
760             "inputs": self.tool["inputs"],
761             "outputs": self.tool["outputs"],
762             "stdout": "cwl.output.json",
763             "requirements": self.requirements+job_res_reqs+[
764                 {"class": "InlineJavascriptRequirement"},
765                 {
766                 "class": "InitialWorkDirRequirement",
767                 "listing": [{
768                         "entryname": "workflow.cwl",
769                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
770                     }, {
771                         "entryname": "cwl.input.yml",
772                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
773                     }]
774             }],
775             "hints": self.hints,
776             "arguments": arguments,
777             "id": "#"
778         })
779         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
780
781
782     def separateRunner(self, joborder, output_callback, runtimeContext, req, builder):
783
784         name = runtimeContext.name
785
786         rpn = req.get("runnerProcessName")
787         if rpn:
788             name = builder.do_eval(rpn)
789
790         return RunnerContainer(self.arvrunner,
791                                self,
792                                self.loadingContext,
793                                runtimeContext.enable_reuse,
794                                None,
795                                None,
796                                submit_runner_ram=runtimeContext.submit_runner_ram,
797                                name=name,
798                                on_error=runtimeContext.on_error,
799                                submit_runner_image=runtimeContext.submit_runner_image,
800                                intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
801                                merged_map=None,
802                                priority=runtimeContext.priority,
803                                secret_store=self.arvrunner.secret_store,
804                                collection_cache_size=runtimeContext.collection_cache_size,
805                                collection_cache_is_default=self.arvrunner.should_estimate_cache_size,
806                                git_info=runtimeContext.git_info,
807                                reuse_runner=True).job(joborder, output_callback, runtimeContext)
808
809
810     def job(self, joborder, output_callback, runtimeContext):
811
812         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
813         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
814
815         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
816         if req:
817             return self.runInSingleContainer(joborder, output_callback, runtimeContext, builder)
818
819         req, _ = self.get_requirement("http://arvados.org/cwl#SeparateRunner")
820         if req:
821             return self.separateRunner(joborder, output_callback, runtimeContext, req, builder)
822
823         return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
824
825
826     def make_workflow_step(self,
827                            toolpath_object,      # type: Dict[Text, Any]
828                            pos,                  # type: int
829                            loadingContext,       # type: LoadingContext
830                            *argc,
831                            **argv
832     ):
833         # (...) -> WorkflowStep
834         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)