19385: Finally passing schemadef test.
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15
16 from typing import (MutableSequence, MutableMapping)
17
18 from ruamel.yaml import YAML
19 from ruamel.yaml.comments import CommentedMap, CommentedSeq
20
21 from schema_salad.sourceline import SourceLine, cmap
22 import schema_salad.ref_resolver
23
24 import arvados.collection
25
26 from cwltool.pack import pack
27 from cwltool.load_tool import fetch_document, resolve_and_validate_document
28 from cwltool.process import shortname, uniquename
29 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
30 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
31 from cwltool.context import LoadingContext
32
33 from schema_salad.ref_resolver import file_uri, uri_file_path
34
35 import ruamel.yaml as yaml
36
37 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
38                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
39                      make_builder, arvados_jobs_image, FileUpdates)
40 from .pathmapper import ArvPathMapper, trim_listing
41 from .arvtool import ArvadosCommandTool, set_cluster_target
42 from ._version import __version__
43
44 from .perf import Perf
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48
49 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
50 sum_res_pars = ("outdirMin", "outdirMax")
51
52 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
53     col = arvados.collection.Collection(api_client=arvRunner.api,
54                                         keep_client=arvRunner.keep_client)
55
56     with col.open("workflow.json", "wt") as f:
57         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
58
59     pdh = col.portable_data_hash()
60
61     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
62     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
63         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
64
65     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
66     if len(existing["items"]) == 0:
67         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
68
69     # now construct the wrapper
70
71     step = {
72         "id": "#main/" + toolname,
73         "in": [],
74         "out": [],
75         "run": "keep:%s/workflow.json#main" % pdh,
76         "label": name
77     }
78
79     newinputs = []
80     for i in main["inputs"]:
81         inp = {}
82         # Make sure to only copy known fields that are meaningful at
83         # the workflow level. In practice this ensures that if we're
84         # wrapping a CommandLineTool we don't grab inputBinding.
85         # Right now also excludes extension fields, which is fine,
86         # Arvados doesn't currently look for any extension fields on
87         # input parameters.
88         for f in ("type", "label", "secondaryFiles", "streamable",
89                   "doc", "id", "format", "loadContents",
90                   "loadListing", "default"):
91             if f in i:
92                 inp[f] = i[f]
93         newinputs.append(inp)
94
95     wrapper = {
96         "class": "Workflow",
97         "id": "#main",
98         "inputs": newinputs,
99         "outputs": [],
100         "steps": [step]
101     }
102
103     for i in main["inputs"]:
104         step["in"].append({
105             "id": "#main/step/%s" % shortname(i["id"]),
106             "source": i["id"]
107         })
108
109     for i in main["outputs"]:
110         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
111         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
112                                    "type": i["type"],
113                                    "id": i["id"]})
114
115     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
116
117     if main.get("requirements"):
118         wrapper["requirements"].extend(main["requirements"])
119     if main.get("hints"):
120         wrapper["hints"] = main["hints"]
121
122     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
123
124     if git_info:
125         for g in git_info:
126             doc[g] = git_info[g]
127
128     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
129
130
131 def rel_ref(s, baseuri, urlexpander, merged_map):
132     if s.startswith("keep:"):
133         return s
134
135     #print("BBB", s, baseuri)
136     uri = urlexpander(s, baseuri)
137     #print("CCC", uri)
138
139     if uri.startswith("keep:"):
140         return uri
141
142     fileuri = urllib.parse.urldefrag(baseuri)[0]
143
144     #print("BBB", s, baseuri)
145
146     for u in (baseuri, fileuri):
147         if u in merged_map:
148             replacements = merged_map[u].resolved
149             #print("RRR", uri, replacements)
150             #print()
151             #print(uri, replacements)
152             if uri in replacements:
153                 return replacements[uri]
154
155     p1 = os.path.dirname(uri_file_path(fileuri))
156     p2 = os.path.dirname(uri_file_path(uri))
157     p3 = os.path.basename(uri_file_path(uri))
158
159     #print("PPP", p1, p2, p3)
160
161     r = os.path.relpath(p2, p1)
162     if r == ".":
163         r = ""
164
165     #print("RRR", r)
166
167     return os.path.join(r, p3)
168
169
170 def update_refs(d, baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix):
171     if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
172         d.fa.set_block_style()
173
174     if isinstance(d, MutableSequence):
175         for i, s in enumerate(d):
176             if prefix and isinstance(s, str):
177                 if s.startswith(prefix):
178                     d[i] = replacePrefix+s[len(prefix):]
179             else:
180                 update_refs(s, baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
181     elif isinstance(d, MutableMapping):
182         if "id" in d:
183             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
184         elif "name" in d:
185             baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
186
187         if d.get("class") == "DockerRequirement":
188             dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
189             d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
190
191         for field in d:
192             if field in ("location", "run", "name") and isinstance(d[field], str):
193                 d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
194                 continue
195
196             if field in ("$include", "$import") and isinstance(d[field], str):
197                 d[field] = rel_ref(d[field], baseuri, urlexpander, {})
198                 continue
199
200             basetypes = ("null", "boolean", "int", "long", "float", "double", "string", "File", "Directory", "record", "array", "enum")
201
202             if (field == "type" and
203                 isinstance(d["type"], str) and
204                 d["type"] not in basetypes):
205                 #print("DDD ding", d["type"])
206                 d["type"] = rel_ref(d["type"], baseuri, urlexpander, merged_map)
207                 #print("DDD dong", d["type"])
208                 continue
209
210             if field == "inputs" and isinstance(d["inputs"], MutableMapping):
211                 for inp in d["inputs"]:
212                     if isinstance(d["inputs"][inp], str) and d["inputs"][inp] not in basetypes:
213                         #print("III", inp)
214                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map)
215                 continue
216
217             if field == "$schemas":
218                 for n, s in enumerate(d["$schemas"]):
219                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
220                 continue
221
222             update_refs(d[field], baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
223
224
225 def fix_schemadef(req, baseuri, urlexpander, merged_map, pdh):
226     req = copy.deepcopy(req)
227
228     for f in req["types"]:
229         r = f["name"]
230         path, frag = urllib.parse.urldefrag(r)
231         rel = rel_ref(r, baseuri, urlexpander, merged_map)
232         merged_map.setdefault(path, FileUpdates({}, {}))
233         #print("PPP", path, r, frag)
234         rename = "keep:%s/%s" %(pdh, rel)
235         #rename = "#%s" % frag
236         for mm in merged_map:
237             merged_map[mm].resolved[r] = rename
238     return req
239
240 def drop_ids(d):
241     if isinstance(d, MutableSequence):
242         for i, s in enumerate(d):
243             drop_ids(s)
244     elif isinstance(d, MutableMapping):
245         for fixup in ("id", "name"):
246             if fixup in d and d[fixup].startswith("file:"):
247                 del d[fixup]
248
249         for field in d:
250             drop_ids(d[field])
251
252
253 def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
254                         runtimeContext,
255                         uuid=None,
256                         submit_runner_ram=0, name=None, merged_map=None,
257                         submit_runner_image=None,
258                         git_info=None,
259                         set_defaults=False):
260
261     firstfile = None
262     workflow_files = set()
263     import_files = set()
264     include_files = set()
265
266     for w in tool.doc_loader.idx:
267         if w.startswith("file://"):
268             workflow_files.add(urllib.parse.urldefrag(w)[0])
269             if firstfile is None:
270                 firstfile = urllib.parse.urldefrag(w)[0]
271         if w.startswith("import:file://"):
272             import_files.add(urllib.parse.urldefrag(w[7:])[0])
273         if w.startswith("include:file://"):
274             include_files.add(urllib.parse.urldefrag(w[8:])[0])
275
276     all_files = workflow_files | import_files | include_files
277
278     n = 7
279     allmatch = True
280     if firstfile:
281         while allmatch:
282             n += 1
283             for f in all_files:
284                 if len(f)-1 < n:
285                     n -= 1
286                     allmatch = False
287                     break
288                 if f[n] != firstfile[n]:
289                     allmatch = False
290                     break
291
292         while firstfile[n] != "/":
293             n -= 1
294
295     col = arvados.collection.Collection(api_client=arvRunner.api)
296
297     #print(merged_map)
298
299     for w in workflow_files | import_files:
300         # 1. load YAML
301
302         text = tool.doc_loader.fetch_text(w)
303         if isinstance(text, bytes):
304             textIO = StringIO(text.decode('utf-8'))
305         else:
306             textIO = StringIO(text)
307
308         yamlloader = schema_salad.utils.yaml_no_ts()
309         result = yamlloader.load(textIO)
310
311         set_block_style = False
312         if result.fa.flow_style():
313             set_block_style = True
314
315         # 2. find $import, $include, $schema, run, location
316         # 3. update field value
317         update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style, runtimeContext, "", "")
318
319         with col.open(w[n+1:], "wt") as f:
320             #print(yamlloader.dump(result, stream=sys.stdout))
321             yamlloader.dump(result, stream=f)
322
323     for w in include_files:
324         with col.open(w[n+1:], "wb") as f1:
325             with open(uri_file_path(w), "rb") as f2:
326                 dat = f2.read(65536)
327                 while dat:
328                     f1.write(dat)
329                     dat = f2.read(65536)
330
331     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
332     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
333         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
334
335     toolfile = tool.tool["id"][n+1:]
336
337     properties = {
338         "type": "workflow",
339         "arv:workflowMain": toolfile,
340     }
341
342     if git_info:
343         for g in git_info:
344             p = g.split("#", 1)[1]
345             properties["arv:"+p] = git_info[g]
346
347     col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
348
349     adjustDirObjs(job_order, trim_listing)
350     adjustFileObjs(job_order, trim_anonymous_location)
351     adjustDirObjs(job_order, trim_anonymous_location)
352
353     # now construct the wrapper
354
355     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
356
357     step = {
358         "id": "#main/" + toolname,
359         "in": [],
360         "out": [],
361         "run": runfile,
362         "label": name
363     }
364
365     main = tool.tool
366
367     wf_runner_resources = None
368
369     hints = main.get("hints", [])
370     found = False
371     for h in hints:
372         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
373             wf_runner_resources = h
374             found = True
375             break
376     if not found:
377         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
378         hints.append(wf_runner_resources)
379
380     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
381                                                                   submit_runner_image or "arvados/jobs:"+__version__,
382                                                                   runtimeContext)
383
384     if submit_runner_ram:
385         wf_runner_resources["ramMin"] = submit_runner_ram
386
387     newinputs = []
388     for i in main["inputs"]:
389         inp = {}
390         # Make sure to only copy known fields that are meaningful at
391         # the workflow level. In practice this ensures that if we're
392         # wrapping a CommandLineTool we don't grab inputBinding.
393         # Right now also excludes extension fields, which is fine,
394         # Arvados doesn't currently look for any extension fields on
395         # input parameters.
396         for f in ("type", "label", "secondaryFiles", "streamable",
397                   "doc", "format", "loadContents",
398                   "loadListing", "default"):
399             if f in i:
400                 inp[f] = i[f]
401
402         if set_defaults:
403             sn = shortname(i["id"])
404             if sn in job_order:
405                 inp["default"] = job_order[sn]
406
407         inp["id"] = "#main/%s" % shortname(i["id"])
408         newinputs.append(inp)
409
410     wrapper = {
411         "class": "Workflow",
412         "id": "#main",
413         "inputs": newinputs,
414         "outputs": [],
415         "steps": [step]
416     }
417
418     for i in main["inputs"]:
419         step["in"].append({
420             "id": "#main/step/%s" % shortname(i["id"]),
421             "source": "#main/%s" % shortname(i["id"])
422         })
423
424     for i in main["outputs"]:
425         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
426         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
427                                    "type": i["type"],
428                                    "id": "#main/%s" % shortname(i["id"])})
429
430     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
431
432     if main.get("requirements"):
433         wrapper["requirements"].extend(main["requirements"])
434     if hints:
435         wrapper["hints"] = hints
436
437     # 1. check for SchemaDef
438     # 2. do what pack does
439     # 3. fix inputs
440
441     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
442
443     if git_info:
444         for g in git_info:
445             doc[g] = git_info[g]
446
447     #print("MMM", main["id"])
448     #print(yamlloader.dump(wrapper, stream=sys.stdout))
449
450     for i, r in enumerate(wrapper["requirements"]):
451         if r["class"] == "SchemaDefRequirement":
452             wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, col.portable_data_hash())
453
454     # print()
455     # print("merrrrged maaap", merged_map)
456     # print()
457     print("update_refs", main["id"], runfile)
458
459     #print(yamlloader.dump(wrapper, stream=sys.stdout))
460
461     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext, main["id"]+"#", "#main/")
462
463     # Remove any lingering file references.
464     drop_ids(wrapper)
465
466     #print("HHH")
467
468     #print(yamlloader.dump(wrapper, stream=sys.stdout))
469
470     return doc
471
472
473 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
474
475     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
476
477     body = {
478         "workflow": {
479             "name": name,
480             "description": tool.tool.get("doc", ""),
481             "definition": wrappertext
482         }}
483     if project_uuid:
484         body["workflow"]["owner_uuid"] = project_uuid
485
486     if update_uuid:
487         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
488     else:
489         call = arvRunner.api.workflows().create(body=body)
490     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
491
492
493 def upload_workflow(arvRunner, tool, job_order, project_uuid,
494                     runtimeContext, uuid=None,
495                     submit_runner_ram=0, name=None, merged_map=None,
496                     submit_runner_image=None,
497                     git_info=None):
498
499     packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext, git_info)
500
501     adjustDirObjs(job_order, trim_listing)
502     adjustFileObjs(job_order, trim_anonymous_location)
503     adjustDirObjs(job_order, trim_anonymous_location)
504
505     main = [p for p in packed["$graph"] if p["id"] == "#main"][0]
506     for inp in main["inputs"]:
507         sn = shortname(inp["id"])
508         if sn in job_order:
509             inp["default"] = job_order[sn]
510
511     if not name:
512         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
513
514     upload_dependencies(arvRunner, name, tool.doc_loader,
515                         packed, tool.tool["id"],
516                         runtimeContext)
517
518     wf_runner_resources = None
519
520     hints = main.get("hints", [])
521     found = False
522     for h in hints:
523         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
524             wf_runner_resources = h
525             found = True
526             break
527     if not found:
528         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
529         hints.append(wf_runner_resources)
530
531     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
532                                                                   submit_runner_image or "arvados/jobs:"+__version__,
533                                                                   runtimeContext)
534
535     if submit_runner_ram:
536         wf_runner_resources["ramMin"] = submit_runner_ram
537
538     main["hints"] = hints
539
540     wrapper = make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool)
541
542     body = {
543         "workflow": {
544             "name": name,
545             "description": tool.tool.get("doc", ""),
546             "definition": wrapper
547         }}
548     if project_uuid:
549         body["workflow"]["owner_uuid"] = project_uuid
550
551     if uuid:
552         call = arvRunner.api.workflows().update(uuid=uuid, body=body)
553     else:
554         call = arvRunner.api.workflows().create(body=body)
555     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
556
557 def dedup_reqs(reqs):
558     dedup = {}
559     for r in reversed(reqs):
560         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
561             dedup[r["class"]] = r
562     return [dedup[r] for r in sorted(dedup.keys())]
563
564 def get_overall_res_req(res_reqs):
565     """Take the overall of a list of ResourceRequirement,
566     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
567     and the sum of outdirMin, outdirMax."""
568
569     all_res_req = {}
570     exception_msgs = []
571     for a in max_res_pars + sum_res_pars:
572         all_res_req[a] = []
573         for res_req in res_reqs:
574             if a in res_req:
575                 if isinstance(res_req[a], int): # integer check
576                     all_res_req[a].append(res_req[a])
577                 else:
578                     msg = SourceLine(res_req, a).makeError(
579                     "Non-top-level ResourceRequirement in single container cannot have expressions")
580                     exception_msgs.append(msg)
581     if exception_msgs:
582         raise WorkflowException("\n".join(exception_msgs))
583     else:
584         overall_res_req = {}
585         for a in all_res_req:
586             if all_res_req[a]:
587                 if a in max_res_pars:
588                     overall_res_req[a] = max(all_res_req[a])
589                 elif a in sum_res_pars:
590                     overall_res_req[a] = sum(all_res_req[a])
591         if overall_res_req:
592             overall_res_req["class"] = "ResourceRequirement"
593         return cmap(overall_res_req)
594
595 class ArvadosWorkflowStep(WorkflowStep):
596     def __init__(self,
597                  toolpath_object,      # type: Dict[Text, Any]
598                  pos,                  # type: int
599                  loadingContext,       # type: LoadingContext
600                  arvrunner,
601                  *argc,
602                  **argv
603                 ):  # type: (...) -> None
604
605         if arvrunner.fast_submit:
606             self.tool = toolpath_object
607             self.tool["inputs"] = []
608             self.tool["outputs"] = []
609         else:
610             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
611             self.tool["class"] = "WorkflowStep"
612         self.arvrunner = arvrunner
613
614     def job(self, joborder, output_callback, runtimeContext):
615         runtimeContext = runtimeContext.copy()
616         runtimeContext.toplevel = True  # Preserve behavior for #13365
617
618         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
619                                runtimeContext, self.metadata)
620         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
621         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
622
623
624 class ArvadosWorkflow(Workflow):
625     """Wrap cwltool Workflow to override selected methods."""
626
627     def __init__(self, arvrunner, toolpath_object, loadingContext):
628         self.arvrunner = arvrunner
629         self.wf_pdh = None
630         self.dynamic_resource_req = []
631         self.static_resource_req = []
632         self.wf_reffiles = []
633         self.loadingContext = loadingContext
634         super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
635         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
636
637     def job(self, joborder, output_callback, runtimeContext):
638
639         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
640         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
641
642         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
643         if not req:
644             return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
645
646         # RunInSingleContainer is true
647
648         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
649             if "id" not in self.tool:
650                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
651
652         discover_secondary_files(self.arvrunner.fs_access, builder,
653                                  self.tool["inputs"], joborder)
654
655         normalizeFilesDirs(joborder)
656
657         with Perf(metrics, "subworkflow upload_deps"):
658             upload_dependencies(self.arvrunner,
659                                 os.path.basename(joborder.get("id", "#")),
660                                 self.doc_loader,
661                                 joborder,
662                                 joborder.get("id", "#"),
663                                 runtimeContext)
664
665             if self.wf_pdh is None:
666                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
667
668                 for p in packed["$graph"]:
669                     if p["id"] == "#main":
670                         p["requirements"] = dedup_reqs(self.requirements)
671                         p["hints"] = dedup_reqs(self.hints)
672
673                 def visit(item):
674                     if "requirements" in item:
675                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
676                     for t in ("hints", "requirements"):
677                         if t not in item:
678                             continue
679                         for req in item[t]:
680                             if req["class"] == "ResourceRequirement":
681                                 dyn = False
682                                 for k in max_res_pars + sum_res_pars:
683                                     if k in req:
684                                         if isinstance(req[k], basestring):
685                                             if item["id"] == "#main":
686                                                 # only the top-level requirements/hints may contain expressions
687                                                 self.dynamic_resource_req.append(req)
688                                                 dyn = True
689                                                 break
690                                             else:
691                                                 with SourceLine(req, k, WorkflowException):
692                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
693                                 if not dyn:
694                                     self.static_resource_req.append(req)
695
696                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
697
698                 if self.static_resource_req:
699                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
700
701                 upload_dependencies(self.arvrunner,
702                                     runtimeContext.name,
703                                     self.doc_loader,
704                                     packed,
705                                     self.tool["id"],
706                                     runtimeContext)
707
708                 # Discover files/directories referenced by the
709                 # workflow (mainly "default" values)
710                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
711
712
713         if self.dynamic_resource_req:
714             # Evaluate dynamic resource requirements using current builder
715             rs = copy.copy(self.static_resource_req)
716             for dyn_rs in self.dynamic_resource_req:
717                 eval_req = {"class": "ResourceRequirement"}
718                 for a in max_res_pars + sum_res_pars:
719                     if a in dyn_rs:
720                         eval_req[a] = builder.do_eval(dyn_rs[a])
721                 rs.append(eval_req)
722             job_res_reqs = [get_overall_res_req(rs)]
723         else:
724             job_res_reqs = self.static_resource_req
725
726         with Perf(metrics, "subworkflow adjust"):
727             joborder_resolved = copy.deepcopy(joborder)
728             joborder_keepmount = copy.deepcopy(joborder)
729
730             reffiles = []
731             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
732
733             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
734                                    "/keep/%s",
735                                    "/keep/%s/%s")
736
737             # For containers API, we need to make sure any extra
738             # referenced files (ie referenced by the workflow but
739             # not in the inputs) are included in the mounts.
740             if self.wf_reffiles:
741                 runtimeContext = runtimeContext.copy()
742                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
743
744             def keepmount(obj):
745                 remove_redundant_fields(obj)
746                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
747                     if "location" not in obj:
748                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
749                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
750                     if obj["location"].startswith("keep:"):
751                         obj["location"] = mapper.mapper(obj["location"]).target
752                         if "listing" in obj:
753                             del obj["listing"]
754                     elif obj["location"].startswith("_:"):
755                         del obj["location"]
756                     else:
757                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
758
759             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
760
761             def resolved(obj):
762                 if obj["location"].startswith("keep:"):
763                     obj["location"] = mapper.mapper(obj["location"]).resolved
764
765             visit_class(joborder_resolved, ("File", "Directory"), resolved)
766
767             if self.wf_pdh is None:
768                 adjustFileObjs(packed, keepmount)
769                 adjustDirObjs(packed, keepmount)
770                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
771
772         self.loadingContext = self.loadingContext.copy()
773         self.loadingContext.metadata = self.loadingContext.metadata.copy()
774         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
775
776         if len(job_res_reqs) == 1:
777             # RAM request needs to be at least 128 MiB or the workflow
778             # runner itself won't run reliably.
779             if job_res_reqs[0].get("ramMin", 1024) < 128:
780                 job_res_reqs[0]["ramMin"] = 128
781
782         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
783         if runtimeContext.debug:
784             arguments.insert(0, '--debug')
785
786         wf_runner = cmap({
787             "class": "CommandLineTool",
788             "baseCommand": "cwltool",
789             "inputs": self.tool["inputs"],
790             "outputs": self.tool["outputs"],
791             "stdout": "cwl.output.json",
792             "requirements": self.requirements+job_res_reqs+[
793                 {"class": "InlineJavascriptRequirement"},
794                 {
795                 "class": "InitialWorkDirRequirement",
796                 "listing": [{
797                         "entryname": "workflow.cwl",
798                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
799                     }, {
800                         "entryname": "cwl.input.yml",
801                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
802                     }]
803             }],
804             "hints": self.hints,
805             "arguments": arguments,
806             "id": "#"
807         })
808         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
809
810     def make_workflow_step(self,
811                            toolpath_object,      # type: Dict[Text, Any]
812                            pos,                  # type: int
813                            loadingContext,       # type: LoadingContext
814                            *argc,
815                            **argv
816     ):
817         # (...) -> WorkflowStep
818         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)