19385: more WIP
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15
16 from typing import (MutableSequence, MutableMapping)
17
18 from ruamel.yaml import YAML
19 from ruamel.yaml.comments import CommentedMap, CommentedSeq
20
21 from schema_salad.sourceline import SourceLine, cmap
22 import schema_salad.ref_resolver
23
24 import arvados.collection
25
26 from cwltool.pack import pack
27 from cwltool.load_tool import fetch_document, resolve_and_validate_document
28 from cwltool.process import shortname, uniquename
29 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
30 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
31 from cwltool.context import LoadingContext
32
33 from schema_salad.ref_resolver import file_uri, uri_file_path
34
35 import ruamel.yaml as yaml
36
37 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
38                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
39                      make_builder, arvados_jobs_image, FileUpdates)
40 from .pathmapper import ArvPathMapper, trim_listing
41 from .arvtool import ArvadosCommandTool, set_cluster_target
42 from ._version import __version__
43
44 from .perf import Perf
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48
49 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
50 sum_res_pars = ("outdirMin", "outdirMax")
51
52 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
53     col = arvados.collection.Collection(api_client=arvRunner.api,
54                                         keep_client=arvRunner.keep_client)
55
56     with col.open("workflow.json", "wt") as f:
57         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
58
59     pdh = col.portable_data_hash()
60
61     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
62     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
63         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
64
65     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
66     if len(existing["items"]) == 0:
67         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
68
69     # now construct the wrapper
70
71     step = {
72         "id": "#main/" + toolname,
73         "in": [],
74         "out": [],
75         "run": "keep:%s/workflow.json#main" % pdh,
76         "label": name
77     }
78
79     newinputs = []
80     for i in main["inputs"]:
81         inp = {}
82         # Make sure to only copy known fields that are meaningful at
83         # the workflow level. In practice this ensures that if we're
84         # wrapping a CommandLineTool we don't grab inputBinding.
85         # Right now also excludes extension fields, which is fine,
86         # Arvados doesn't currently look for any extension fields on
87         # input parameters.
88         for f in ("type", "label", "secondaryFiles", "streamable",
89                   "doc", "id", "format", "loadContents",
90                   "loadListing", "default"):
91             if f in i:
92                 inp[f] = i[f]
93         newinputs.append(inp)
94
95     wrapper = {
96         "class": "Workflow",
97         "id": "#main",
98         "inputs": newinputs,
99         "outputs": [],
100         "steps": [step]
101     }
102
103     for i in main["inputs"]:
104         step["in"].append({
105             "id": "#main/step/%s" % shortname(i["id"]),
106             "source": i["id"]
107         })
108
109     for i in main["outputs"]:
110         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
111         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
112                                    "type": i["type"],
113                                    "id": i["id"]})
114
115     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
116
117     if main.get("requirements"):
118         wrapper["requirements"].extend(main["requirements"])
119     if main.get("hints"):
120         wrapper["hints"] = main["hints"]
121
122     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
123
124     if git_info:
125         for g in git_info:
126             doc[g] = git_info[g]
127
128     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
129
130
131 def rel_ref(s, baseuri, urlexpander, merged_map):
132     if s.startswith("keep:"):
133         return s
134
135     #print("BBB", s, baseuri)
136     uri = urlexpander(s, baseuri)
137     #print("CCC", uri)
138
139     if uri.startswith("keep:"):
140         return uri
141
142     fileuri = urllib.parse.urldefrag(baseuri)[0]
143
144     #print("BBB", s, baseuri)
145
146     for u in (baseuri, fileuri):
147         if u in merged_map:
148             replacements = merged_map[u].resolved
149             #print("RRR", uri, replacements)
150             #print()
151             #print(uri, replacements)
152             if uri in replacements:
153                 return replacements[uri]
154
155     p1 = os.path.dirname(uri_file_path(fileuri))
156     p2 = os.path.dirname(uri_file_path(uri))
157     p3 = os.path.basename(uri_file_path(uri))
158
159     #print("PPP", p1, p2, p3)
160
161     r = os.path.relpath(p2, p1)
162     if r == ".":
163         r = ""
164
165     #print("RRR", r)
166
167     return os.path.join(r, p3)
168
169
170 def update_refs(d, baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix):
171     if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
172         d.fa.set_block_style()
173
174     if isinstance(d, MutableSequence):
175         for i, s in enumerate(d):
176             if prefix and isinstance(s, str):
177                 if s.startswith(prefix):
178                     d[i] = replacePrefix+s[len(prefix):]
179             else:
180                 update_refs(s, baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
181     elif isinstance(d, MutableMapping):
182         if "id" in d:
183             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
184         elif "name" in d:
185             baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
186
187         if d.get("class") == "DockerRequirement":
188             dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
189             d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
190
191         for field in d:
192             if field in ("location", "run", "name") and isinstance(d[field], str):
193                 d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
194                 continue
195
196             if field in ("$include", "$import") and isinstance(d[field], str):
197                 d[field] = rel_ref(d[field], baseuri, urlexpander, {})
198                 continue
199
200             basetypes = ("null", "boolean", "int", "long", "float", "double", "string", "File", "Directory", "record", "array", "enum")
201
202             if (field == "type" and
203                 isinstance(d["type"], str) and
204                 d["type"] not in basetypes):
205                 #print("DDD ding", d["type"])
206                 d["type"] = rel_ref(d["type"], baseuri, urlexpander, merged_map)
207                 #print("DDD dong", d["type"])
208                 continue
209
210             if field == "inputs" and isinstance(d["inputs"], MutableMapping):
211                 for inp in d["inputs"]:
212                     if isinstance(d["inputs"][inp], str) and d["inputs"][inp] not in basetypes:
213                         #print("III", inp)
214                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map)
215                 continue
216
217             if field == "$schemas":
218                 for n, s in enumerate(d["$schemas"]):
219                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
220                 continue
221
222             update_refs(d[field], baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
223
224
225 def fix_schemadef(req, baseuri, urlexpander, merged_map, pdh):
226     req = copy.deepcopy(req)
227     #if "id" in req:
228     #    del req["id"]
229
230     for f in req["types"]:
231         r = f["name"]
232         path, frag = urllib.parse.urldefrag(r)
233         rel = rel_ref(r, baseuri, urlexpander, merged_map)
234         merged_map.setdefault(path, FileUpdates({}, {}))
235         #print("PPP", path, r, frag)
236         rename = "keep:%s/%s" %(pdh, rel)
237         for mm in merged_map:
238             merged_map[mm].resolved[r] = rename
239     return req
240
241 def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
242                         runtimeContext,
243                         uuid=None,
244                         submit_runner_ram=0, name=None, merged_map=None,
245                         submit_runner_image=None,
246                         git_info=None,
247                         set_defaults=False):
248
249     firstfile = None
250     workflow_files = set()
251     import_files = set()
252     include_files = set()
253
254     for w in tool.doc_loader.idx:
255         if w.startswith("file://"):
256             workflow_files.add(urllib.parse.urldefrag(w)[0])
257             if firstfile is None:
258                 firstfile = urllib.parse.urldefrag(w)[0]
259         if w.startswith("import:file://"):
260             import_files.add(urllib.parse.urldefrag(w[7:])[0])
261         if w.startswith("include:file://"):
262             include_files.add(urllib.parse.urldefrag(w[8:])[0])
263
264     all_files = workflow_files | import_files | include_files
265
266     n = 7
267     allmatch = True
268     if firstfile:
269         while allmatch:
270             n += 1
271             for f in all_files:
272                 if len(f)-1 < n:
273                     n -= 1
274                     allmatch = False
275                     break
276                 if f[n] != firstfile[n]:
277                     allmatch = False
278                     break
279
280         while firstfile[n] != "/":
281             n -= 1
282
283     col = arvados.collection.Collection(api_client=arvRunner.api)
284
285     #print(merged_map)
286
287     for w in workflow_files | import_files:
288         # 1. load YAML
289
290         text = tool.doc_loader.fetch_text(w)
291         if isinstance(text, bytes):
292             textIO = StringIO(text.decode('utf-8'))
293         else:
294             textIO = StringIO(text)
295
296         yamlloader = schema_salad.utils.yaml_no_ts()
297         result = yamlloader.load(textIO)
298
299         set_block_style = False
300         if result.fa.flow_style():
301             set_block_style = True
302
303         # 2. find $import, $include, $schema, run, location
304         # 3. update field value
305         update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style, runtimeContext, "", "")
306
307         with col.open(w[n+1:], "wt") as f:
308             #print(yamlloader.dump(result, stream=sys.stdout))
309             yamlloader.dump(result, stream=f)
310
311     for w in include_files:
312         with col.open(w[n+1:], "wb") as f1:
313             with open(uri_file_path(w), "rb") as f2:
314                 dat = f2.read(65536)
315                 while dat:
316                     f1.write(dat)
317                     dat = f2.read(65536)
318
319     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
320     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
321         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
322
323     toolfile = tool.tool["id"][n+1:]
324
325     properties = {
326         "type": "workflow",
327         "arv:workflowMain": toolfile,
328     }
329
330     if git_info:
331         for g in git_info:
332             p = g.split("#", 1)[1]
333             properties["arv:"+p] = git_info[g]
334
335     col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
336
337     adjustDirObjs(job_order, trim_listing)
338     adjustFileObjs(job_order, trim_anonymous_location)
339     adjustDirObjs(job_order, trim_anonymous_location)
340
341     # now construct the wrapper
342
343     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
344
345     step = {
346         "id": "#main/" + toolname,
347         "in": [],
348         "out": [],
349         "run": runfile,
350         "label": name
351     }
352
353     main = tool.tool
354
355     wf_runner_resources = None
356
357     hints = main.get("hints", [])
358     found = False
359     for h in hints:
360         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
361             wf_runner_resources = h
362             found = True
363             break
364     if not found:
365         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
366         hints.append(wf_runner_resources)
367
368     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
369                                                                   submit_runner_image or "arvados/jobs:"+__version__,
370                                                                   runtimeContext)
371
372     if submit_runner_ram:
373         wf_runner_resources["ramMin"] = submit_runner_ram
374
375     newinputs = []
376     for i in main["inputs"]:
377         inp = {}
378         # Make sure to only copy known fields that are meaningful at
379         # the workflow level. In practice this ensures that if we're
380         # wrapping a CommandLineTool we don't grab inputBinding.
381         # Right now also excludes extension fields, which is fine,
382         # Arvados doesn't currently look for any extension fields on
383         # input parameters.
384         for f in ("type", "label", "secondaryFiles", "streamable",
385                   "doc", "format", "loadContents",
386                   "loadListing", "default"):
387             if f in i:
388                 inp[f] = i[f]
389
390         if set_defaults:
391             sn = shortname(i["id"])
392             if sn in job_order:
393                 inp["default"] = job_order[sn]
394
395         inp["id"] = "#main/%s" % shortname(i["id"])
396         newinputs.append(inp)
397
398     wrapper = {
399         "class": "Workflow",
400         "id": "#main",
401         "inputs": newinputs,
402         "outputs": [],
403         "steps": [step]
404     }
405
406     for i in main["inputs"]:
407         step["in"].append({
408             "id": "#main/step/%s" % shortname(i["id"]),
409             "source": "#main/%s" % shortname(i["id"])
410         })
411
412     for i in main["outputs"]:
413         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
414         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
415                                    "type": i["type"],
416                                    "id": "#main/%s" % shortname(i["id"])})
417
418     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
419
420     if main.get("requirements"):
421         wrapper["requirements"].extend(main["requirements"])
422     if hints:
423         wrapper["hints"] = hints
424
425     # 1. check for SchemaDef
426     # 2. do what pack does
427     # 3. fix inputs
428
429     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
430
431     if git_info:
432         for g in git_info:
433             doc[g] = git_info[g]
434
435     #print("MMM", main["id"])
436     #print(yamlloader.dump(wrapper, stream=sys.stdout))
437
438     for i, r in enumerate(wrapper["requirements"]):
439         if r["class"] == "SchemaDefRequirement":
440             wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, col.portable_data_hash())
441
442     # print()
443     # print("merrrrged maaap", merged_map)
444     # print()
445     print("update_refs", main["id"], runfile)
446
447     #print(yamlloader.dump(wrapper, stream=sys.stdout))
448
449     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext, main["id"]+"#", "#main/")
450
451     #print("HHH")
452
453     #print(yamlloader.dump(wrapper, stream=sys.stdout))
454
455     return doc
456
457
458 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
459
460     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
461
462     body = {
463         "workflow": {
464             "name": name,
465             "description": tool.tool.get("doc", ""),
466             "definition": wrappertext
467         }}
468     if project_uuid:
469         body["workflow"]["owner_uuid"] = project_uuid
470
471     if update_uuid:
472         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
473     else:
474         call = arvRunner.api.workflows().create(body=body)
475     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
476
477
478 def upload_workflow(arvRunner, tool, job_order, project_uuid,
479                     runtimeContext, uuid=None,
480                     submit_runner_ram=0, name=None, merged_map=None,
481                     submit_runner_image=None,
482                     git_info=None):
483
484     packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext, git_info)
485
486     adjustDirObjs(job_order, trim_listing)
487     adjustFileObjs(job_order, trim_anonymous_location)
488     adjustDirObjs(job_order, trim_anonymous_location)
489
490     main = [p for p in packed["$graph"] if p["id"] == "#main"][0]
491     for inp in main["inputs"]:
492         sn = shortname(inp["id"])
493         if sn in job_order:
494             inp["default"] = job_order[sn]
495
496     if not name:
497         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
498
499     upload_dependencies(arvRunner, name, tool.doc_loader,
500                         packed, tool.tool["id"],
501                         runtimeContext)
502
503     wf_runner_resources = None
504
505     hints = main.get("hints", [])
506     found = False
507     for h in hints:
508         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
509             wf_runner_resources = h
510             found = True
511             break
512     if not found:
513         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
514         hints.append(wf_runner_resources)
515
516     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
517                                                                   submit_runner_image or "arvados/jobs:"+__version__,
518                                                                   runtimeContext)
519
520     if submit_runner_ram:
521         wf_runner_resources["ramMin"] = submit_runner_ram
522
523     main["hints"] = hints
524
525     wrapper = make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool)
526
527     body = {
528         "workflow": {
529             "name": name,
530             "description": tool.tool.get("doc", ""),
531             "definition": wrapper
532         }}
533     if project_uuid:
534         body["workflow"]["owner_uuid"] = project_uuid
535
536     if uuid:
537         call = arvRunner.api.workflows().update(uuid=uuid, body=body)
538     else:
539         call = arvRunner.api.workflows().create(body=body)
540     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
541
542 def dedup_reqs(reqs):
543     dedup = {}
544     for r in reversed(reqs):
545         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
546             dedup[r["class"]] = r
547     return [dedup[r] for r in sorted(dedup.keys())]
548
549 def get_overall_res_req(res_reqs):
550     """Take the overall of a list of ResourceRequirement,
551     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
552     and the sum of outdirMin, outdirMax."""
553
554     all_res_req = {}
555     exception_msgs = []
556     for a in max_res_pars + sum_res_pars:
557         all_res_req[a] = []
558         for res_req in res_reqs:
559             if a in res_req:
560                 if isinstance(res_req[a], int): # integer check
561                     all_res_req[a].append(res_req[a])
562                 else:
563                     msg = SourceLine(res_req, a).makeError(
564                     "Non-top-level ResourceRequirement in single container cannot have expressions")
565                     exception_msgs.append(msg)
566     if exception_msgs:
567         raise WorkflowException("\n".join(exception_msgs))
568     else:
569         overall_res_req = {}
570         for a in all_res_req:
571             if all_res_req[a]:
572                 if a in max_res_pars:
573                     overall_res_req[a] = max(all_res_req[a])
574                 elif a in sum_res_pars:
575                     overall_res_req[a] = sum(all_res_req[a])
576         if overall_res_req:
577             overall_res_req["class"] = "ResourceRequirement"
578         return cmap(overall_res_req)
579
580 class ArvadosWorkflowStep(WorkflowStep):
581     def __init__(self,
582                  toolpath_object,      # type: Dict[Text, Any]
583                  pos,                  # type: int
584                  loadingContext,       # type: LoadingContext
585                  arvrunner,
586                  *argc,
587                  **argv
588                 ):  # type: (...) -> None
589
590         if arvrunner.fast_submit:
591             self.tool = toolpath_object
592             self.tool["inputs"] = []
593             self.tool["outputs"] = []
594         else:
595             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
596             self.tool["class"] = "WorkflowStep"
597         self.arvrunner = arvrunner
598
599     def job(self, joborder, output_callback, runtimeContext):
600         runtimeContext = runtimeContext.copy()
601         runtimeContext.toplevel = True  # Preserve behavior for #13365
602
603         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
604                                runtimeContext, self.metadata)
605         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
606         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
607
608
609 class ArvadosWorkflow(Workflow):
610     """Wrap cwltool Workflow to override selected methods."""
611
612     def __init__(self, arvrunner, toolpath_object, loadingContext):
613         self.arvrunner = arvrunner
614         self.wf_pdh = None
615         self.dynamic_resource_req = []
616         self.static_resource_req = []
617         self.wf_reffiles = []
618         self.loadingContext = loadingContext
619         super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
620         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
621
622     def job(self, joborder, output_callback, runtimeContext):
623
624         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
625         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
626
627         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
628         if not req:
629             return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
630
631         # RunInSingleContainer is true
632
633         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
634             if "id" not in self.tool:
635                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
636
637         discover_secondary_files(self.arvrunner.fs_access, builder,
638                                  self.tool["inputs"], joborder)
639
640         normalizeFilesDirs(joborder)
641
642         with Perf(metrics, "subworkflow upload_deps"):
643             upload_dependencies(self.arvrunner,
644                                 os.path.basename(joborder.get("id", "#")),
645                                 self.doc_loader,
646                                 joborder,
647                                 joborder.get("id", "#"),
648                                 runtimeContext)
649
650             if self.wf_pdh is None:
651                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
652
653                 for p in packed["$graph"]:
654                     if p["id"] == "#main":
655                         p["requirements"] = dedup_reqs(self.requirements)
656                         p["hints"] = dedup_reqs(self.hints)
657
658                 def visit(item):
659                     if "requirements" in item:
660                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
661                     for t in ("hints", "requirements"):
662                         if t not in item:
663                             continue
664                         for req in item[t]:
665                             if req["class"] == "ResourceRequirement":
666                                 dyn = False
667                                 for k in max_res_pars + sum_res_pars:
668                                     if k in req:
669                                         if isinstance(req[k], basestring):
670                                             if item["id"] == "#main":
671                                                 # only the top-level requirements/hints may contain expressions
672                                                 self.dynamic_resource_req.append(req)
673                                                 dyn = True
674                                                 break
675                                             else:
676                                                 with SourceLine(req, k, WorkflowException):
677                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
678                                 if not dyn:
679                                     self.static_resource_req.append(req)
680
681                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
682
683                 if self.static_resource_req:
684                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
685
686                 upload_dependencies(self.arvrunner,
687                                     runtimeContext.name,
688                                     self.doc_loader,
689                                     packed,
690                                     self.tool["id"],
691                                     runtimeContext)
692
693                 # Discover files/directories referenced by the
694                 # workflow (mainly "default" values)
695                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
696
697
698         if self.dynamic_resource_req:
699             # Evaluate dynamic resource requirements using current builder
700             rs = copy.copy(self.static_resource_req)
701             for dyn_rs in self.dynamic_resource_req:
702                 eval_req = {"class": "ResourceRequirement"}
703                 for a in max_res_pars + sum_res_pars:
704                     if a in dyn_rs:
705                         eval_req[a] = builder.do_eval(dyn_rs[a])
706                 rs.append(eval_req)
707             job_res_reqs = [get_overall_res_req(rs)]
708         else:
709             job_res_reqs = self.static_resource_req
710
711         with Perf(metrics, "subworkflow adjust"):
712             joborder_resolved = copy.deepcopy(joborder)
713             joborder_keepmount = copy.deepcopy(joborder)
714
715             reffiles = []
716             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
717
718             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
719                                    "/keep/%s",
720                                    "/keep/%s/%s")
721
722             # For containers API, we need to make sure any extra
723             # referenced files (ie referenced by the workflow but
724             # not in the inputs) are included in the mounts.
725             if self.wf_reffiles:
726                 runtimeContext = runtimeContext.copy()
727                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
728
729             def keepmount(obj):
730                 remove_redundant_fields(obj)
731                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
732                     if "location" not in obj:
733                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
734                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
735                     if obj["location"].startswith("keep:"):
736                         obj["location"] = mapper.mapper(obj["location"]).target
737                         if "listing" in obj:
738                             del obj["listing"]
739                     elif obj["location"].startswith("_:"):
740                         del obj["location"]
741                     else:
742                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
743
744             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
745
746             def resolved(obj):
747                 if obj["location"].startswith("keep:"):
748                     obj["location"] = mapper.mapper(obj["location"]).resolved
749
750             visit_class(joborder_resolved, ("File", "Directory"), resolved)
751
752             if self.wf_pdh is None:
753                 adjustFileObjs(packed, keepmount)
754                 adjustDirObjs(packed, keepmount)
755                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
756
757         self.loadingContext = self.loadingContext.copy()
758         self.loadingContext.metadata = self.loadingContext.metadata.copy()
759         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
760
761         if len(job_res_reqs) == 1:
762             # RAM request needs to be at least 128 MiB or the workflow
763             # runner itself won't run reliably.
764             if job_res_reqs[0].get("ramMin", 1024) < 128:
765                 job_res_reqs[0]["ramMin"] = 128
766
767         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
768         if runtimeContext.debug:
769             arguments.insert(0, '--debug')
770
771         wf_runner = cmap({
772             "class": "CommandLineTool",
773             "baseCommand": "cwltool",
774             "inputs": self.tool["inputs"],
775             "outputs": self.tool["outputs"],
776             "stdout": "cwl.output.json",
777             "requirements": self.requirements+job_res_reqs+[
778                 {"class": "InlineJavascriptRequirement"},
779                 {
780                 "class": "InitialWorkDirRequirement",
781                 "listing": [{
782                         "entryname": "workflow.cwl",
783                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
784                     }, {
785                         "entryname": "cwl.input.yml",
786                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
787                     }]
788             }],
789             "hints": self.hints,
790             "arguments": arguments,
791             "id": "#"
792         })
793         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
794
795     def make_workflow_step(self,
796                            toolpath_object,      # type: Dict[Text, Any]
797                            pos,                  # type: int
798                            loadingContext,       # type: LoadingContext
799                            *argc,
800                            **argv
801     ):
802         # (...) -> WorkflowStep
803         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)