20259: Add documentation for banner and tooltip features
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15 import re
16
17 from typing import (MutableSequence, MutableMapping)
18
19 from ruamel.yaml import YAML
20 from ruamel.yaml.comments import CommentedMap, CommentedSeq
21
22 from schema_salad.sourceline import SourceLine, cmap
23 import schema_salad.ref_resolver
24
25 import arvados.collection
26
27 from cwltool.pack import pack
28 from cwltool.load_tool import fetch_document, resolve_and_validate_document
29 from cwltool.process import shortname, uniquename
30 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
31 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
32 from cwltool.context import LoadingContext
33
34 from schema_salad.ref_resolver import file_uri, uri_file_path
35
36 import ruamel.yaml as yaml
37
38 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
39                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
40                      make_builder, arvados_jobs_image, FileUpdates)
41 from .pathmapper import ArvPathMapper, trim_listing
42 from .arvtool import ArvadosCommandTool, set_cluster_target
43 from ._version import __version__
44
45 from .perf import Perf
46
47 logger = logging.getLogger('arvados.cwl-runner')
48 metrics = logging.getLogger('arvados.cwl-runner.metrics')
49
50 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
51 sum_res_pars = ("outdirMin", "outdirMax")
52
53 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
54     col = arvados.collection.Collection(api_client=arvRunner.api,
55                                         keep_client=arvRunner.keep_client)
56
57     with col.open("workflow.json", "wt") as f:
58         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
59
60     pdh = col.portable_data_hash()
61
62     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
63     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
64         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
65
66     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
67     if len(existing["items"]) == 0:
68         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
69
70     # now construct the wrapper
71
72     step = {
73         "id": "#main/" + toolname,
74         "in": [],
75         "out": [],
76         "run": "keep:%s/workflow.json#main" % pdh,
77         "label": name
78     }
79
80     newinputs = []
81     for i in main["inputs"]:
82         inp = {}
83         # Make sure to only copy known fields that are meaningful at
84         # the workflow level. In practice this ensures that if we're
85         # wrapping a CommandLineTool we don't grab inputBinding.
86         # Right now also excludes extension fields, which is fine,
87         # Arvados doesn't currently look for any extension fields on
88         # input parameters.
89         for f in ("type", "label", "secondaryFiles", "streamable",
90                   "doc", "id", "format", "loadContents",
91                   "loadListing", "default"):
92             if f in i:
93                 inp[f] = i[f]
94         newinputs.append(inp)
95
96     wrapper = {
97         "class": "Workflow",
98         "id": "#main",
99         "inputs": newinputs,
100         "outputs": [],
101         "steps": [step]
102     }
103
104     for i in main["inputs"]:
105         step["in"].append({
106             "id": "#main/step/%s" % shortname(i["id"]),
107             "source": i["id"]
108         })
109
110     for i in main["outputs"]:
111         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
112         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
113                                    "type": i["type"],
114                                    "id": i["id"]})
115
116     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
117
118     if main.get("requirements"):
119         wrapper["requirements"].extend(main["requirements"])
120     if main.get("hints"):
121         wrapper["hints"] = main["hints"]
122
123     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
124
125     if git_info:
126         for g in git_info:
127             doc[g] = git_info[g]
128
129     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
130
131
132 def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
133     if s.startswith("keep:"):
134         return s
135
136     uri = urlexpander(s, baseuri)
137
138     if uri.startswith("keep:"):
139         return uri
140
141     fileuri = urllib.parse.urldefrag(baseuri)[0]
142
143     for u in (baseuri, fileuri):
144         if u in merged_map:
145             replacements = merged_map[u].resolved
146             if uri in replacements:
147                 return replacements[uri]
148
149     if uri in jobmapper:
150         return jobmapper.mapper(uri).target
151
152     p1 = os.path.dirname(uri_file_path(fileuri))
153     p2 = os.path.dirname(uri_file_path(uri))
154     p3 = os.path.basename(uri_file_path(uri))
155
156     r = os.path.relpath(p2, p1)
157     if r == ".":
158         r = ""
159
160     return os.path.join(r, p3)
161
162 def is_basetype(tp):
163     basetypes = ("null", "boolean", "int", "long", "float", "double", "string", "File", "Directory", "record", "array", "enum")
164     for b in basetypes:
165         if re.match(b+"(\[\])?\??", tp):
166             return True
167     return False
168
169
170 def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix):
171     if isinstance(d, MutableSequence):
172         for i, s in enumerate(d):
173             if prefix and isinstance(s, str):
174                 if s.startswith(prefix):
175                     d[i] = replacePrefix+s[len(prefix):]
176             else:
177                 update_refs(s, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
178     elif isinstance(d, MutableMapping):
179         for field in ("id", "name"):
180             if isinstance(d.get(field), str) and d[field].startswith("_:"):
181                 # blank node reference, was added in automatically, can get rid of it.
182                 del d[field]
183
184         if "id" in d:
185             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
186         elif "name" in d and isinstance(d["name"], str):
187             baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
188
189         if d.get("class") == "DockerRequirement":
190             dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
191             d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
192
193         for field in d:
194             if field in ("location", "run", "name") and isinstance(d[field], str):
195                 d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map, jobmapper)
196                 continue
197
198             if field in ("$include", "$import") and isinstance(d[field], str):
199                 d[field] = rel_ref(d[field], baseuri, urlexpander, {}, jobmapper)
200                 continue
201
202             for t in ("type", "items"):
203                 if (field == t and
204                     isinstance(d[t], str) and
205                     not is_basetype(d[t])):
206                     d[t] = rel_ref(d[t], baseuri, urlexpander, merged_map, jobmapper)
207                     continue
208
209             if field == "inputs" and isinstance(d["inputs"], MutableMapping):
210                 for inp in d["inputs"]:
211                     if isinstance(d["inputs"][inp], str) and not is_basetype(d["inputs"][inp]):
212                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper)
213                     if isinstance(d["inputs"][inp], MutableMapping):
214                         update_refs(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
215                 continue
216
217             if field == "$schemas":
218                 for n, s in enumerate(d["$schemas"]):
219                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map, jobmapper)
220                 continue
221
222             update_refs(d[field], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
223
224
225 def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
226     req = copy.deepcopy(req)
227
228     for f in req["types"]:
229         r = f["name"]
230         path, frag = urllib.parse.urldefrag(r)
231         rel = rel_ref(r, baseuri, urlexpander, merged_map, jobmapper)
232         merged_map.setdefault(path, FileUpdates({}, {}))
233         rename = "keep:%s/%s" %(pdh, rel)
234         for mm in merged_map:
235             merged_map[mm].resolved[r] = rename
236     return req
237
238 def drop_ids(d):
239     if isinstance(d, MutableSequence):
240         for i, s in enumerate(d):
241             drop_ids(s)
242     elif isinstance(d, MutableMapping):
243         if "id" in d and d["id"].startswith("file:"):
244             del d["id"]
245
246         for field in d:
247             drop_ids(d[field])
248
249
250 def upload_workflow(arvRunner, tool, job_order, project_uuid,
251                         runtimeContext,
252                         uuid=None,
253                         submit_runner_ram=0, name=None, merged_map=None,
254                         submit_runner_image=None,
255                         git_info=None,
256                         set_defaults=False,
257                         jobmapper=None):
258
259     firstfile = None
260     workflow_files = set()
261     import_files = set()
262     include_files = set()
263
264     # The document loader index will have entries for all the files
265     # that were loaded in the process of parsing the entire workflow
266     # (including subworkflows, tools, imports, etc).  We use this to
267     # get compose a list of the workflow file dependencies.
268     for w in tool.doc_loader.idx:
269         if w.startswith("file://"):
270             workflow_files.add(urllib.parse.urldefrag(w)[0])
271             if firstfile is None:
272                 firstfile = urllib.parse.urldefrag(w)[0]
273         if w.startswith("import:file://"):
274             import_files.add(urllib.parse.urldefrag(w[7:])[0])
275         if w.startswith("include:file://"):
276             include_files.add(urllib.parse.urldefrag(w[8:])[0])
277
278     all_files = workflow_files | import_files | include_files
279
280     # Find the longest common prefix among all the file names.  We'll
281     # use this to recreate the directory structure in a keep
282     # collection with correct relative references.
283     n = 7
284     allmatch = True
285     if firstfile:
286         while allmatch:
287             n += 1
288             for f in all_files:
289                 if len(f)-1 < n:
290                     n -= 1
291                     allmatch = False
292                     break
293                 if f[n] != firstfile[n]:
294                     allmatch = False
295                     break
296
297         while firstfile[n] != "/":
298             n -= 1
299
300     col = arvados.collection.Collection(api_client=arvRunner.api)
301
302     # Now go through all the files and update references to other
303     # files.  We previously scanned for file dependencies, these are
304     # are passed in as merged_map.
305     #
306     # note about merged_map: we upload dependencies of each process
307     # object (CommandLineTool/Workflow) to a separate collection.
308     # That way, when the user edits something, this limits collection
309     # PDH changes to just that tool, and minimizes situations where
310     # small changes break container reuse for the whole workflow.
311     #
312     for w in workflow_files | import_files:
313         # 1. load the YAML  file
314
315         text = tool.doc_loader.fetch_text(w)
316         if isinstance(text, bytes):
317             textIO = StringIO(text.decode('utf-8'))
318         else:
319             textIO = StringIO(text)
320
321         yamlloader = schema_salad.utils.yaml_no_ts()
322         result = yamlloader.load(textIO)
323
324         # If the whole document is in "flow style" it is probably JSON
325         # formatted.  We'll re-export it as JSON because the
326         # ruamel.yaml round-trip mode is a lie and only preserves
327         # "block style" formatting and not "flow style" formatting.
328         export_as_json = result.fa.flow_style()
329
330         # 2. find $import, $include, $schema, run, location
331         # 3. update field value
332         update_refs(result, w, tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, "", "")
333
334         # Write the updated file to the collection.
335         with col.open(w[n+1:], "wt") as f:
336             if export_as_json:
337                 json.dump(result, f, indent=4, separators=(',',': '))
338             else:
339                 yamlloader.dump(result, stream=f)
340
341         # Also store a verbatim copy of the original files
342         with col.open(os.path.join("original", w[n+1:]), "wt") as f:
343             f.write(text)
344
345
346     # Upload files referenced by $include directives, these are used
347     # unchanged and don't need to be updated.
348     for w in include_files:
349         with col.open(w[n+1:], "wb") as f1:
350             with col.open(os.path.join("original", w[n+1:]), "wb") as f3:
351                 with open(uri_file_path(w), "rb") as f2:
352                     dat = f2.read(65536)
353                     while dat:
354                         f1.write(dat)
355                         f3.write(dat)
356                         dat = f2.read(65536)
357
358     # Now collect metadata: the collection name and git properties.
359
360     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
361     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
362         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
363
364     toolfile = tool.tool["id"][n+1:]
365
366     properties = {
367         "type": "workflow",
368         "arv:workflowMain": toolfile,
369     }
370
371     if git_info:
372         for g in git_info:
373             p = g.split("#", 1)[1]
374             properties["arv:"+p] = git_info[g]
375
376     # Check if a collection with the same content already exists in the target project.  If so, just use that one.
377     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
378                                                          ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
379
380     if len(existing["items"]) == 0:
381         toolname = toolname.replace("/", " ")
382         col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
383         logger.info("Workflow uploaded to %s", col.manifest_locator())
384     else:
385         logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
386
387     # Now that we've updated the workflow and saved it to a
388     # collection, we're going to construct a minimal "wrapper"
389     # workflow which consists of only of input and output parameters
390     # connected to a single step that runs the real workflow.
391
392     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
393
394     step = {
395         "id": "#main/" + toolname,
396         "in": [],
397         "out": [],
398         "run": runfile,
399         "label": name
400     }
401
402     main = tool.tool
403
404     wf_runner_resources = None
405
406     hints = main.get("hints", [])
407     found = False
408     for h in hints:
409         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
410             wf_runner_resources = h
411             found = True
412             break
413     if not found:
414         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
415         hints.append(wf_runner_resources)
416
417     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
418                                                                   submit_runner_image or "arvados/jobs:"+__version__,
419                                                                   runtimeContext)
420
421     if submit_runner_ram:
422         wf_runner_resources["ramMin"] = submit_runner_ram
423
424     # Remove a few redundant fields from the "job order" (aka input
425     # object or input parameters).  In the situation where we're
426     # creating or updating a workflow record, any values in the job
427     # order get copied over as default values for input parameters.
428     adjustDirObjs(job_order, trim_listing)
429     adjustFileObjs(job_order, trim_anonymous_location)
430     adjustDirObjs(job_order, trim_anonymous_location)
431
432     newinputs = []
433     for i in main["inputs"]:
434         inp = {}
435         # Make sure to only copy known fields that are meaningful at
436         # the workflow level. In practice this ensures that if we're
437         # wrapping a CommandLineTool we don't grab inputBinding.
438         # Right now also excludes extension fields, which is fine,
439         # Arvados doesn't currently look for any extension fields on
440         # input parameters.
441         for f in ("type", "label", "secondaryFiles", "streamable",
442                   "doc", "format", "loadContents",
443                   "loadListing", "default"):
444             if f in i:
445                 inp[f] = i[f]
446
447         if set_defaults:
448             sn = shortname(i["id"])
449             if sn in job_order:
450                 inp["default"] = job_order[sn]
451
452         inp["id"] = "#main/%s" % shortname(i["id"])
453         newinputs.append(inp)
454
455     wrapper = {
456         "class": "Workflow",
457         "id": "#main",
458         "inputs": newinputs,
459         "outputs": [],
460         "steps": [step]
461     }
462
463     for i in main["inputs"]:
464         step["in"].append({
465             "id": "#main/step/%s" % shortname(i["id"]),
466             "source": "#main/%s" % shortname(i["id"])
467         })
468
469     for i in main["outputs"]:
470         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
471         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
472                                    "type": i["type"],
473                                    "id": "#main/%s" % shortname(i["id"])})
474
475     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
476
477     if main.get("requirements"):
478         wrapper["requirements"].extend(main["requirements"])
479     if hints:
480         wrapper["hints"] = hints
481
482     # Schema definitions (this lets you define things like record
483     # types) require a special handling.
484
485     for i, r in enumerate(wrapper["requirements"]):
486         if r["class"] == "SchemaDefRequirement":
487             wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
488
489     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, main["id"]+"#", "#main/")
490
491     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
492
493     if git_info:
494         for g in git_info:
495             doc[g] = git_info[g]
496
497     # Remove any lingering file references.
498     drop_ids(wrapper)
499
500     return doc
501
502
503 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
504
505     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
506
507     body = {
508         "workflow": {
509             "name": name,
510             "description": tool.tool.get("doc", ""),
511             "definition": wrappertext
512         }}
513     if project_uuid:
514         body["workflow"]["owner_uuid"] = project_uuid
515
516     if update_uuid:
517         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
518     else:
519         call = arvRunner.api.workflows().create(body=body)
520     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
521
522
523 def dedup_reqs(reqs):
524     dedup = {}
525     for r in reversed(reqs):
526         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
527             dedup[r["class"]] = r
528     return [dedup[r] for r in sorted(dedup.keys())]
529
530 def get_overall_res_req(res_reqs):
531     """Take the overall of a list of ResourceRequirement,
532     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
533     and the sum of outdirMin, outdirMax."""
534
535     all_res_req = {}
536     exception_msgs = []
537     for a in max_res_pars + sum_res_pars:
538         all_res_req[a] = []
539         for res_req in res_reqs:
540             if a in res_req:
541                 if isinstance(res_req[a], int): # integer check
542                     all_res_req[a].append(res_req[a])
543                 else:
544                     msg = SourceLine(res_req, a).makeError(
545                     "Non-top-level ResourceRequirement in single container cannot have expressions")
546                     exception_msgs.append(msg)
547     if exception_msgs:
548         raise WorkflowException("\n".join(exception_msgs))
549     else:
550         overall_res_req = {}
551         for a in all_res_req:
552             if all_res_req[a]:
553                 if a in max_res_pars:
554                     overall_res_req[a] = max(all_res_req[a])
555                 elif a in sum_res_pars:
556                     overall_res_req[a] = sum(all_res_req[a])
557         if overall_res_req:
558             overall_res_req["class"] = "ResourceRequirement"
559         return cmap(overall_res_req)
560
561 class ArvadosWorkflowStep(WorkflowStep):
562     def __init__(self,
563                  toolpath_object,      # type: Dict[Text, Any]
564                  pos,                  # type: int
565                  loadingContext,       # type: LoadingContext
566                  arvrunner,
567                  *argc,
568                  **argv
569                 ):  # type: (...) -> None
570
571         if arvrunner.fast_submit:
572             self.tool = toolpath_object
573             self.tool["inputs"] = []
574             self.tool["outputs"] = []
575         else:
576             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
577             self.tool["class"] = "WorkflowStep"
578         self.arvrunner = arvrunner
579
580     def job(self, joborder, output_callback, runtimeContext):
581         runtimeContext = runtimeContext.copy()
582         runtimeContext.toplevel = True  # Preserve behavior for #13365
583
584         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
585                                runtimeContext, self.metadata)
586         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
587         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
588
589
590 class ArvadosWorkflow(Workflow):
591     """Wrap cwltool Workflow to override selected methods."""
592
593     def __init__(self, arvrunner, toolpath_object, loadingContext):
594         self.arvrunner = arvrunner
595         self.wf_pdh = None
596         self.dynamic_resource_req = []
597         self.static_resource_req = []
598         self.wf_reffiles = []
599         self.loadingContext = loadingContext
600         super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
601         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
602
603     def job(self, joborder, output_callback, runtimeContext):
604
605         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
606         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
607
608         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
609         if not req:
610             return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
611
612         # RunInSingleContainer is true
613
614         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
615             if "id" not in self.tool:
616                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
617
618         discover_secondary_files(self.arvrunner.fs_access, builder,
619                                  self.tool["inputs"], joborder)
620
621         normalizeFilesDirs(joborder)
622
623         with Perf(metrics, "subworkflow upload_deps"):
624             upload_dependencies(self.arvrunner,
625                                 os.path.basename(joborder.get("id", "#")),
626                                 self.doc_loader,
627                                 joborder,
628                                 joborder.get("id", "#"),
629                                 runtimeContext)
630
631             if self.wf_pdh is None:
632                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
633
634                 for p in packed["$graph"]:
635                     if p["id"] == "#main":
636                         p["requirements"] = dedup_reqs(self.requirements)
637                         p["hints"] = dedup_reqs(self.hints)
638
639                 def visit(item):
640                     if "requirements" in item:
641                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
642                     for t in ("hints", "requirements"):
643                         if t not in item:
644                             continue
645                         for req in item[t]:
646                             if req["class"] == "ResourceRequirement":
647                                 dyn = False
648                                 for k in max_res_pars + sum_res_pars:
649                                     if k in req:
650                                         if isinstance(req[k], basestring):
651                                             if item["id"] == "#main":
652                                                 # only the top-level requirements/hints may contain expressions
653                                                 self.dynamic_resource_req.append(req)
654                                                 dyn = True
655                                                 break
656                                             else:
657                                                 with SourceLine(req, k, WorkflowException):
658                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
659                                 if not dyn:
660                                     self.static_resource_req.append(req)
661
662                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
663
664                 if self.static_resource_req:
665                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
666
667                 upload_dependencies(self.arvrunner,
668                                     runtimeContext.name,
669                                     self.doc_loader,
670                                     packed,
671                                     self.tool["id"],
672                                     runtimeContext)
673
674                 # Discover files/directories referenced by the
675                 # workflow (mainly "default" values)
676                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
677
678
679         if self.dynamic_resource_req:
680             # Evaluate dynamic resource requirements using current builder
681             rs = copy.copy(self.static_resource_req)
682             for dyn_rs in self.dynamic_resource_req:
683                 eval_req = {"class": "ResourceRequirement"}
684                 for a in max_res_pars + sum_res_pars:
685                     if a in dyn_rs:
686                         eval_req[a] = builder.do_eval(dyn_rs[a])
687                 rs.append(eval_req)
688             job_res_reqs = [get_overall_res_req(rs)]
689         else:
690             job_res_reqs = self.static_resource_req
691
692         with Perf(metrics, "subworkflow adjust"):
693             joborder_resolved = copy.deepcopy(joborder)
694             joborder_keepmount = copy.deepcopy(joborder)
695
696             reffiles = []
697             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
698
699             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
700                                    "/keep/%s",
701                                    "/keep/%s/%s")
702
703             # For containers API, we need to make sure any extra
704             # referenced files (ie referenced by the workflow but
705             # not in the inputs) are included in the mounts.
706             if self.wf_reffiles:
707                 runtimeContext = runtimeContext.copy()
708                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
709
710             def keepmount(obj):
711                 remove_redundant_fields(obj)
712                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
713                     if "location" not in obj:
714                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
715                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
716                     if obj["location"].startswith("keep:"):
717                         obj["location"] = mapper.mapper(obj["location"]).target
718                         if "listing" in obj:
719                             del obj["listing"]
720                     elif obj["location"].startswith("_:"):
721                         del obj["location"]
722                     else:
723                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
724
725             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
726
727             def resolved(obj):
728                 if obj["location"].startswith("keep:"):
729                     obj["location"] = mapper.mapper(obj["location"]).resolved
730
731             visit_class(joborder_resolved, ("File", "Directory"), resolved)
732
733             if self.wf_pdh is None:
734                 adjustFileObjs(packed, keepmount)
735                 adjustDirObjs(packed, keepmount)
736                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
737
738         self.loadingContext = self.loadingContext.copy()
739         self.loadingContext.metadata = self.loadingContext.metadata.copy()
740         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
741
742         if len(job_res_reqs) == 1:
743             # RAM request needs to be at least 128 MiB or the workflow
744             # runner itself won't run reliably.
745             if job_res_reqs[0].get("ramMin", 1024) < 128:
746                 job_res_reqs[0]["ramMin"] = 128
747
748         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
749         if runtimeContext.debug:
750             arguments.insert(0, '--debug')
751
752         wf_runner = cmap({
753             "class": "CommandLineTool",
754             "baseCommand": "cwltool",
755             "inputs": self.tool["inputs"],
756             "outputs": self.tool["outputs"],
757             "stdout": "cwl.output.json",
758             "requirements": self.requirements+job_res_reqs+[
759                 {"class": "InlineJavascriptRequirement"},
760                 {
761                 "class": "InitialWorkDirRequirement",
762                 "listing": [{
763                         "entryname": "workflow.cwl",
764                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
765                     }, {
766                         "entryname": "cwl.input.yml",
767                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
768                     }]
769             }],
770             "hints": self.hints,
771             "arguments": arguments,
772             "id": "#"
773         })
774         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
775
776     def make_workflow_step(self,
777                            toolpath_object,      # type: Dict[Text, Any]
778                            pos,                  # type: int
779                            loadingContext,       # type: LoadingContext
780                            *argc,
781                            **argv
782     ):
783         # (...) -> WorkflowStep
784         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)