20862: Add 'sdk/ruby-google-api-client/' from commit '2f4be67955e48bb65d008ecd9ff6da9...
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15 import re
16
17 from typing import (MutableSequence, MutableMapping)
18
19 from ruamel.yaml import YAML
20 from ruamel.yaml.comments import CommentedMap, CommentedSeq
21
22 from schema_salad.sourceline import SourceLine, cmap
23 import schema_salad.ref_resolver
24
25 import arvados.collection
26
27 from cwltool.pack import pack
28 from cwltool.load_tool import fetch_document, resolve_and_validate_document
29 from cwltool.process import shortname, uniquename
30 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
31 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
32 from cwltool.context import LoadingContext
33
34 from schema_salad.ref_resolver import file_uri, uri_file_path
35
36 import ruamel.yaml as yaml
37
38 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
39                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
40                      make_builder, arvados_jobs_image, FileUpdates)
41 from .pathmapper import ArvPathMapper, trim_listing
42 from .arvtool import ArvadosCommandTool, set_cluster_target
43 from ._version import __version__
44 from .util import common_prefix
45
46 from .perf import Perf
47
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50
51 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
52 sum_res_pars = ("outdirMin", "outdirMax")
53
54 _basetype_re = re.compile(r'''(?:
55 Directory
56 |File
57 |array
58 |boolean
59 |double
60 |enum
61 |float
62 |int
63 |long
64 |null
65 |record
66 |string
67 )(?:\[\])?\??''', re.VERBOSE)
68
69 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
70     col = arvados.collection.Collection(api_client=arvRunner.api,
71                                         keep_client=arvRunner.keep_client)
72
73     with col.open("workflow.json", "wt") as f:
74         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
75
76     pdh = col.portable_data_hash()
77
78     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
79     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
80         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
81
82     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
83     if len(existing["items"]) == 0:
84         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
85
86     # now construct the wrapper
87
88     step = {
89         "id": "#main/" + toolname,
90         "in": [],
91         "out": [],
92         "run": "keep:%s/workflow.json#main" % pdh,
93         "label": name
94     }
95
96     newinputs = []
97     for i in main["inputs"]:
98         inp = {}
99         # Make sure to only copy known fields that are meaningful at
100         # the workflow level. In practice this ensures that if we're
101         # wrapping a CommandLineTool we don't grab inputBinding.
102         # Right now also excludes extension fields, which is fine,
103         # Arvados doesn't currently look for any extension fields on
104         # input parameters.
105         for f in ("type", "label", "secondaryFiles", "streamable",
106                   "doc", "id", "format", "loadContents",
107                   "loadListing", "default"):
108             if f in i:
109                 inp[f] = i[f]
110         newinputs.append(inp)
111
112     wrapper = {
113         "class": "Workflow",
114         "id": "#main",
115         "inputs": newinputs,
116         "outputs": [],
117         "steps": [step]
118     }
119
120     for i in main["inputs"]:
121         step["in"].append({
122             "id": "#main/step/%s" % shortname(i["id"]),
123             "source": i["id"]
124         })
125
126     for i in main["outputs"]:
127         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
128         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
129                                    "type": i["type"],
130                                    "id": i["id"]})
131
132     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
133
134     if main.get("requirements"):
135         wrapper["requirements"].extend(main["requirements"])
136     if main.get("hints"):
137         wrapper["hints"] = main["hints"]
138
139     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
140
141     if git_info:
142         for g in git_info:
143             doc[g] = git_info[g]
144
145     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
146
147
148 def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
149     if s.startswith("keep:"):
150         return s
151
152     uri = urlexpander(s, baseuri)
153
154     if uri.startswith("keep:"):
155         return uri
156
157     fileuri = urllib.parse.urldefrag(baseuri)[0]
158
159     for u in (baseuri, fileuri):
160         if u in merged_map:
161             replacements = merged_map[u].resolved
162             if uri in replacements:
163                 return replacements[uri]
164
165     if uri in jobmapper:
166         return jobmapper.mapper(uri).target
167
168     p1 = os.path.dirname(uri_file_path(fileuri))
169     p2 = os.path.dirname(uri_file_path(uri))
170     p3 = os.path.basename(uri_file_path(uri))
171
172     r = os.path.relpath(p2, p1)
173     if r == ".":
174         r = ""
175
176     return os.path.join(r, p3)
177
178 def is_basetype(tp):
179     return _basetype_re.match(tp) is not None
180
181 def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix):
182     if isinstance(d, MutableSequence):
183         for i, s in enumerate(d):
184             if prefix and isinstance(s, str):
185                 if s.startswith(prefix):
186                     d[i] = replacePrefix+s[len(prefix):]
187             else:
188                 update_refs(s, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
189     elif isinstance(d, MutableMapping):
190         for field in ("id", "name"):
191             if isinstance(d.get(field), str) and d[field].startswith("_:"):
192                 # blank node reference, was added in automatically, can get rid of it.
193                 del d[field]
194
195         if "id" in d:
196             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
197         elif "name" in d and isinstance(d["name"], str):
198             baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
199
200         if d.get("class") == "DockerRequirement":
201             dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
202             d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
203
204         for field in d:
205             if field in ("location", "run", "name") and isinstance(d[field], str):
206                 d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map, jobmapper)
207                 continue
208
209             if field in ("$include", "$import") and isinstance(d[field], str):
210                 d[field] = rel_ref(d[field], baseuri, urlexpander, {}, jobmapper)
211                 continue
212
213             for t in ("type", "items"):
214                 if (field == t and
215                     isinstance(d[t], str) and
216                     not is_basetype(d[t])):
217                     d[t] = rel_ref(d[t], baseuri, urlexpander, merged_map, jobmapper)
218                     continue
219
220             if field == "inputs" and isinstance(d["inputs"], MutableMapping):
221                 for inp in d["inputs"]:
222                     if isinstance(d["inputs"][inp], str) and not is_basetype(d["inputs"][inp]):
223                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper)
224                     if isinstance(d["inputs"][inp], MutableMapping):
225                         update_refs(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
226                 continue
227
228             if field == "$schemas":
229                 for n, s in enumerate(d["$schemas"]):
230                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map, jobmapper)
231                 continue
232
233             update_refs(d[field], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
234
235
236 def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
237     req = copy.deepcopy(req)
238
239     for f in req["types"]:
240         r = f["name"]
241         path, frag = urllib.parse.urldefrag(r)
242         rel = rel_ref(r, baseuri, urlexpander, merged_map, jobmapper)
243         merged_map.setdefault(path, FileUpdates({}, {}))
244         rename = "keep:%s/%s" %(pdh, rel)
245         for mm in merged_map:
246             merged_map[mm].resolved[r] = rename
247     return req
248
249
250 def drop_ids(d):
251     if isinstance(d, MutableSequence):
252         for i, s in enumerate(d):
253             drop_ids(s)
254     elif isinstance(d, MutableMapping):
255         if "id" in d and d["id"].startswith("file:"):
256             del d["id"]
257
258         for field in d:
259             drop_ids(d[field])
260
261
262 def upload_workflow(arvRunner, tool, job_order, project_uuid,
263                         runtimeContext,
264                         uuid=None,
265                         submit_runner_ram=0, name=None, merged_map=None,
266                         submit_runner_image=None,
267                         git_info=None,
268                         set_defaults=False,
269                         jobmapper=None):
270
271     firstfile = None
272     workflow_files = set()
273     import_files = set()
274     include_files = set()
275
276     # The document loader index will have entries for all the files
277     # that were loaded in the process of parsing the entire workflow
278     # (including subworkflows, tools, imports, etc).  We use this to
279     # get compose a list of the workflow file dependencies.
280     for w in tool.doc_loader.idx:
281         if w.startswith("file://"):
282             workflow_files.add(urllib.parse.urldefrag(w)[0])
283             if firstfile is None:
284                 firstfile = urllib.parse.urldefrag(w)[0]
285         if w.startswith("import:file://"):
286             import_files.add(urllib.parse.urldefrag(w[7:])[0])
287         if w.startswith("include:file://"):
288             include_files.add(urllib.parse.urldefrag(w[8:])[0])
289
290     all_files = workflow_files | import_files | include_files
291
292     # Find the longest common prefix among all the file names.  We'll
293     # use this to recreate the directory structure in a keep
294     # collection with correct relative references.
295     prefix = common_prefix(firstfile, all_files)
296
297     col = arvados.collection.Collection(api_client=arvRunner.api)
298
299     # Now go through all the files and update references to other
300     # files.  We previously scanned for file dependencies, these are
301     # are passed in as merged_map.
302     #
303     # note about merged_map: we upload dependencies of each process
304     # object (CommandLineTool/Workflow) to a separate collection.
305     # That way, when the user edits something, this limits collection
306     # PDH changes to just that tool, and minimizes situations where
307     # small changes break container reuse for the whole workflow.
308     #
309     for w in workflow_files | import_files:
310         # 1. load the YAML  file
311
312         text = tool.doc_loader.fetch_text(w)
313         if isinstance(text, bytes):
314             textIO = StringIO(text.decode('utf-8'))
315         else:
316             textIO = StringIO(text)
317
318         yamlloader = schema_salad.utils.yaml_no_ts()
319         result = yamlloader.load(textIO)
320
321         # If the whole document is in "flow style" it is probably JSON
322         # formatted.  We'll re-export it as JSON because the
323         # ruamel.yaml round-trip mode is a lie and only preserves
324         # "block style" formatting and not "flow style" formatting.
325         export_as_json = result.fa.flow_style()
326
327         # 2. find $import, $include, $schema, run, location
328         # 3. update field value
329         update_refs(result, w, tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, "", "")
330
331         # Write the updated file to the collection.
332         with col.open(w[len(prefix):], "wt") as f:
333             if export_as_json:
334                 json.dump(result, f, indent=4, separators=(',',': '))
335             else:
336                 yamlloader.dump(result, stream=f)
337
338         # Also store a verbatim copy of the original files
339         with col.open(os.path.join("original", w[len(prefix):]), "wt") as f:
340             f.write(text)
341
342
343     # Upload files referenced by $include directives, these are used
344     # unchanged and don't need to be updated.
345     for w in include_files:
346         with col.open(w[len(prefix):], "wb") as f1:
347             with col.open(os.path.join("original", w[len(prefix):]), "wb") as f3:
348                 with open(uri_file_path(w), "rb") as f2:
349                     dat = f2.read(65536)
350                     while dat:
351                         f1.write(dat)
352                         f3.write(dat)
353                         dat = f2.read(65536)
354
355     # Now collect metadata: the collection name and git properties.
356
357     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
358     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
359         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
360
361     toolfile = tool.tool["id"][len(prefix):]
362
363     properties = {
364         "type": "workflow",
365         "arv:workflowMain": toolfile,
366     }
367
368     if git_info:
369         for g in git_info:
370             p = g.split("#", 1)[1]
371             properties["arv:"+p] = git_info[g]
372
373     # Check if a collection with the same content already exists in the target project.  If so, just use that one.
374     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
375                                                          ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
376
377     if len(existing["items"]) == 0:
378         toolname = toolname.replace("/", " ")
379         col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
380         logger.info("Workflow uploaded to %s", col.manifest_locator())
381     else:
382         logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
383
384     # Now that we've updated the workflow and saved it to a
385     # collection, we're going to construct a minimal "wrapper"
386     # workflow which consists of only of input and output parameters
387     # connected to a single step that runs the real workflow.
388
389     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
390
391     step = {
392         "id": "#main/" + toolname,
393         "in": [],
394         "out": [],
395         "run": runfile,
396         "label": name
397     }
398
399     main = tool.tool
400
401     wf_runner_resources = None
402
403     hints = main.get("hints", [])
404     found = False
405     for h in hints:
406         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
407             wf_runner_resources = h
408             found = True
409             break
410     if not found:
411         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
412         hints.append(wf_runner_resources)
413
414     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
415                                                                   submit_runner_image or "arvados/jobs:"+__version__,
416                                                                   runtimeContext)
417
418     if submit_runner_ram:
419         wf_runner_resources["ramMin"] = submit_runner_ram
420
421     # Remove a few redundant fields from the "job order" (aka input
422     # object or input parameters).  In the situation where we're
423     # creating or updating a workflow record, any values in the job
424     # order get copied over as default values for input parameters.
425     adjustDirObjs(job_order, trim_listing)
426     adjustFileObjs(job_order, trim_anonymous_location)
427     adjustDirObjs(job_order, trim_anonymous_location)
428
429     newinputs = []
430     for i in main["inputs"]:
431         inp = {}
432         # Make sure to only copy known fields that are meaningful at
433         # the workflow level. In practice this ensures that if we're
434         # wrapping a CommandLineTool we don't grab inputBinding.
435         # Right now also excludes extension fields, which is fine,
436         # Arvados doesn't currently look for any extension fields on
437         # input parameters.
438         for f in ("type", "label", "secondaryFiles", "streamable",
439                   "doc", "format", "loadContents",
440                   "loadListing", "default"):
441             if f in i:
442                 inp[f] = i[f]
443
444         if set_defaults:
445             sn = shortname(i["id"])
446             if sn in job_order:
447                 inp["default"] = job_order[sn]
448
449         inp["id"] = "#main/%s" % shortname(i["id"])
450         newinputs.append(inp)
451
452     wrapper = {
453         "class": "Workflow",
454         "id": "#main",
455         "inputs": newinputs,
456         "outputs": [],
457         "steps": [step]
458     }
459
460     for i in main["inputs"]:
461         step["in"].append({
462             "id": "#main/step/%s" % shortname(i["id"]),
463             "source": "#main/%s" % shortname(i["id"])
464         })
465
466     for i in main["outputs"]:
467         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
468         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
469                                    "type": i["type"],
470                                    "id": "#main/%s" % shortname(i["id"])})
471
472     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
473
474     if main.get("requirements"):
475         wrapper["requirements"].extend(main["requirements"])
476     if hints:
477         wrapper["hints"] = hints
478
479     # Schema definitions (this lets you define things like record
480     # types) require a special handling.
481
482     for i, r in enumerate(wrapper["requirements"]):
483         if r["class"] == "SchemaDefRequirement":
484             wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
485
486     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, main["id"]+"#", "#main/")
487
488     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
489
490     if git_info:
491         for g in git_info:
492             doc[g] = git_info[g]
493
494     # Remove any lingering file references.
495     drop_ids(wrapper)
496
497     return doc
498
499
500 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
501
502     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
503
504     body = {
505         "workflow": {
506             "name": name,
507             "description": tool.tool.get("doc", ""),
508             "definition": wrappertext
509         }}
510     if project_uuid:
511         body["workflow"]["owner_uuid"] = project_uuid
512
513     if update_uuid:
514         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
515     else:
516         call = arvRunner.api.workflows().create(body=body)
517     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
518
519
520 def dedup_reqs(reqs):
521     dedup = {}
522     for r in reversed(reqs):
523         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
524             dedup[r["class"]] = r
525     return [dedup[r] for r in sorted(dedup.keys())]
526
527 def get_overall_res_req(res_reqs):
528     """Take the overall of a list of ResourceRequirement,
529     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
530     and the sum of outdirMin, outdirMax."""
531
532     all_res_req = {}
533     exception_msgs = []
534     for a in max_res_pars + sum_res_pars:
535         all_res_req[a] = []
536         for res_req in res_reqs:
537             if a in res_req:
538                 if isinstance(res_req[a], int): # integer check
539                     all_res_req[a].append(res_req[a])
540                 else:
541                     msg = SourceLine(res_req, a).makeError(
542                     "Non-top-level ResourceRequirement in single container cannot have expressions")
543                     exception_msgs.append(msg)
544     if exception_msgs:
545         raise WorkflowException("\n".join(exception_msgs))
546     else:
547         overall_res_req = {}
548         for a in all_res_req:
549             if all_res_req[a]:
550                 if a in max_res_pars:
551                     overall_res_req[a] = max(all_res_req[a])
552                 elif a in sum_res_pars:
553                     overall_res_req[a] = sum(all_res_req[a])
554         if overall_res_req:
555             overall_res_req["class"] = "ResourceRequirement"
556         return cmap(overall_res_req)
557
558 class ArvadosWorkflowStep(WorkflowStep):
559     def __init__(self,
560                  toolpath_object,      # type: Dict[Text, Any]
561                  pos,                  # type: int
562                  loadingContext,       # type: LoadingContext
563                  arvrunner,
564                  *argc,
565                  **argv
566                 ):  # type: (...) -> None
567
568         if arvrunner.fast_submit:
569             self.tool = toolpath_object
570             self.tool["inputs"] = []
571             self.tool["outputs"] = []
572         else:
573             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
574             self.tool["class"] = "WorkflowStep"
575         self.arvrunner = arvrunner
576
577     def job(self, joborder, output_callback, runtimeContext):
578         runtimeContext = runtimeContext.copy()
579         runtimeContext.toplevel = True  # Preserve behavior for #13365
580
581         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
582                                runtimeContext, self.metadata)
583         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
584         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
585
586
587 class ArvadosWorkflow(Workflow):
588     """Wrap cwltool Workflow to override selected methods."""
589
590     def __init__(self, arvrunner, toolpath_object, loadingContext):
591         self.arvrunner = arvrunner
592         self.wf_pdh = None
593         self.dynamic_resource_req = []
594         self.static_resource_req = []
595         self.wf_reffiles = []
596         self.loadingContext = loadingContext
597         super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
598         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
599
600     def job(self, joborder, output_callback, runtimeContext):
601
602         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
603         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
604
605         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
606         if not req:
607             return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
608
609         # RunInSingleContainer is true
610
611         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
612             if "id" not in self.tool:
613                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
614
615         discover_secondary_files(self.arvrunner.fs_access, builder,
616                                  self.tool["inputs"], joborder)
617
618         normalizeFilesDirs(joborder)
619
620         with Perf(metrics, "subworkflow upload_deps"):
621             upload_dependencies(self.arvrunner,
622                                 os.path.basename(joborder.get("id", "#")),
623                                 self.doc_loader,
624                                 joborder,
625                                 joborder.get("id", "#"),
626                                 runtimeContext)
627
628             if self.wf_pdh is None:
629                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
630
631                 for p in packed["$graph"]:
632                     if p["id"] == "#main":
633                         p["requirements"] = dedup_reqs(self.requirements)
634                         p["hints"] = dedup_reqs(self.hints)
635
636                 def visit(item):
637                     if "requirements" in item:
638                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
639                     for t in ("hints", "requirements"):
640                         if t not in item:
641                             continue
642                         for req in item[t]:
643                             if req["class"] == "ResourceRequirement":
644                                 dyn = False
645                                 for k in max_res_pars + sum_res_pars:
646                                     if k in req:
647                                         if isinstance(req[k], basestring):
648                                             if item["id"] == "#main":
649                                                 # only the top-level requirements/hints may contain expressions
650                                                 self.dynamic_resource_req.append(req)
651                                                 dyn = True
652                                                 break
653                                             else:
654                                                 with SourceLine(req, k, WorkflowException):
655                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
656                                 if not dyn:
657                                     self.static_resource_req.append(req)
658
659                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
660
661                 if self.static_resource_req:
662                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
663
664                 upload_dependencies(self.arvrunner,
665                                     runtimeContext.name,
666                                     self.doc_loader,
667                                     packed,
668                                     self.tool["id"],
669                                     runtimeContext)
670
671                 # Discover files/directories referenced by the
672                 # workflow (mainly "default" values)
673                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
674
675
676         if self.dynamic_resource_req:
677             # Evaluate dynamic resource requirements using current builder
678             rs = copy.copy(self.static_resource_req)
679             for dyn_rs in self.dynamic_resource_req:
680                 eval_req = {"class": "ResourceRequirement"}
681                 for a in max_res_pars + sum_res_pars:
682                     if a in dyn_rs:
683                         eval_req[a] = builder.do_eval(dyn_rs[a])
684                 rs.append(eval_req)
685             job_res_reqs = [get_overall_res_req(rs)]
686         else:
687             job_res_reqs = self.static_resource_req
688
689         with Perf(metrics, "subworkflow adjust"):
690             joborder_resolved = copy.deepcopy(joborder)
691             joborder_keepmount = copy.deepcopy(joborder)
692
693             reffiles = []
694             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
695
696             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
697                                    "/keep/%s",
698                                    "/keep/%s/%s")
699
700             # For containers API, we need to make sure any extra
701             # referenced files (ie referenced by the workflow but
702             # not in the inputs) are included in the mounts.
703             if self.wf_reffiles:
704                 runtimeContext = runtimeContext.copy()
705                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
706
707             def keepmount(obj):
708                 remove_redundant_fields(obj)
709                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
710                     if "location" not in obj:
711                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
712                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
713                     if obj["location"].startswith("keep:"):
714                         obj["location"] = mapper.mapper(obj["location"]).target
715                         if "listing" in obj:
716                             del obj["listing"]
717                     elif obj["location"].startswith("_:"):
718                         del obj["location"]
719                     else:
720                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
721
722             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
723
724             def resolved(obj):
725                 if obj["location"].startswith("keep:"):
726                     obj["location"] = mapper.mapper(obj["location"]).resolved
727
728             visit_class(joborder_resolved, ("File", "Directory"), resolved)
729
730             if self.wf_pdh is None:
731                 adjustFileObjs(packed, keepmount)
732                 adjustDirObjs(packed, keepmount)
733                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
734
735         self.loadingContext = self.loadingContext.copy()
736         self.loadingContext.metadata = self.loadingContext.metadata.copy()
737         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
738
739         if len(job_res_reqs) == 1:
740             # RAM request needs to be at least 128 MiB or the workflow
741             # runner itself won't run reliably.
742             if job_res_reqs[0].get("ramMin", 1024) < 128:
743                 job_res_reqs[0]["ramMin"] = 128
744
745         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
746         if runtimeContext.debug:
747             arguments.insert(0, '--debug')
748
749         wf_runner = cmap({
750             "class": "CommandLineTool",
751             "baseCommand": "cwltool",
752             "inputs": self.tool["inputs"],
753             "outputs": self.tool["outputs"],
754             "stdout": "cwl.output.json",
755             "requirements": self.requirements+job_res_reqs+[
756                 {"class": "InlineJavascriptRequirement"},
757                 {
758                 "class": "InitialWorkDirRequirement",
759                 "listing": [{
760                         "entryname": "workflow.cwl",
761                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
762                     }, {
763                         "entryname": "cwl.input.yml",
764                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
765                     }]
766             }],
767             "hints": self.hints,
768             "arguments": arguments,
769             "id": "#"
770         })
771         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
772
773     def make_workflow_step(self,
774                            toolpath_object,      # type: Dict[Text, Any]
775                            pos,                  # type: int
776                            loadingContext,       # type: LoadingContext
777                            *argc,
778                            **argv
779     ):
780         # (...) -> WorkflowStep
781         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)