19385: Reference rewriting fixups
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15
16 from typing import (MutableSequence, MutableMapping)
17
18 from ruamel.yaml import YAML
19 from ruamel.yaml.comments import CommentedMap, CommentedSeq
20
21 from schema_salad.sourceline import SourceLine, cmap
22 import schema_salad.ref_resolver
23
24 import arvados.collection
25
26 from cwltool.pack import pack
27 from cwltool.load_tool import fetch_document, resolve_and_validate_document
28 from cwltool.process import shortname
29 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
30 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
31 from cwltool.context import LoadingContext
32
33 from schema_salad.ref_resolver import file_uri, uri_file_path
34
35 import ruamel.yaml as yaml
36
37 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
38                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
39                      make_builder, arvados_jobs_image)
40 from .pathmapper import ArvPathMapper, trim_listing
41 from .arvtool import ArvadosCommandTool, set_cluster_target
42 from ._version import __version__
43
44 from .perf import Perf
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48
49 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
50 sum_res_pars = ("outdirMin", "outdirMax")
51
52 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
53     col = arvados.collection.Collection(api_client=arvRunner.api,
54                                         keep_client=arvRunner.keep_client)
55
56     with col.open("workflow.json", "wt") as f:
57         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
58
59     pdh = col.portable_data_hash()
60
61     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
62     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
63         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
64
65     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
66     if len(existing["items"]) == 0:
67         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
68
69     # now construct the wrapper
70
71     step = {
72         "id": "#main/" + toolname,
73         "in": [],
74         "out": [],
75         "run": "keep:%s/workflow.json#main" % pdh,
76         "label": name
77     }
78
79     newinputs = []
80     for i in main["inputs"]:
81         inp = {}
82         # Make sure to only copy known fields that are meaningful at
83         # the workflow level. In practice this ensures that if we're
84         # wrapping a CommandLineTool we don't grab inputBinding.
85         # Right now also excludes extension fields, which is fine,
86         # Arvados doesn't currently look for any extension fields on
87         # input parameters.
88         for f in ("type", "label", "secondaryFiles", "streamable",
89                   "doc", "id", "format", "loadContents",
90                   "loadListing", "default"):
91             if f in i:
92                 inp[f] = i[f]
93         newinputs.append(inp)
94
95     wrapper = {
96         "class": "Workflow",
97         "id": "#main",
98         "inputs": newinputs,
99         "outputs": [],
100         "steps": [step]
101     }
102
103     for i in main["inputs"]:
104         step["in"].append({
105             "id": "#main/step/%s" % shortname(i["id"]),
106             "source": i["id"]
107         })
108
109     for i in main["outputs"]:
110         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
111         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
112                                    "type": i["type"],
113                                    "id": i["id"]})
114
115     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
116
117     if main.get("requirements"):
118         wrapper["requirements"].extend(main["requirements"])
119     if main.get("hints"):
120         wrapper["hints"] = main["hints"]
121
122     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
123
124     if git_info:
125         for g in git_info:
126             doc[g] = git_info[g]
127
128     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
129
130 def rel_ref(s, baseuri, urlexpander, merged_map):
131     if s.startswith("keep:"):
132         return s
133
134     #print("BBB", s, baseuri)
135     uri = urlexpander(s, baseuri)
136     #print("CCC", uri)
137
138     fileuri = urllib.parse.urldefrag(baseuri)[0]
139
140     for u in (baseuri, fileuri):
141         if u in merged_map:
142             replacements = merged_map[u].resolved
143             #print(uri, replacements)
144             if uri in replacements:
145                 return replacements[uri]
146
147     p1 = os.path.dirname(uri_file_path(fileuri))
148     p2 = os.path.dirname(uri_file_path(uri))
149     p3 = os.path.basename(uri_file_path(uri))
150
151     #print("PPP", p1, p2, p3)
152
153     r = os.path.relpath(p2, p1)
154     if r == ".":
155         r = ""
156
157     #print("RRR", r)
158
159     return os.path.join(r, p3)
160
161
162 def update_refs(d, baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix):
163     if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
164         d.fa.set_block_style()
165
166     if isinstance(d, MutableSequence):
167         for i, s in enumerate(d):
168             if prefix and isinstance(s, str):
169                 if s.startswith(prefix):
170                     d[i] = replacePrefix+s[len(prefix):]
171             else:
172                 update_refs(s, baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
173     elif isinstance(d, MutableMapping):
174         if "id" in d:
175             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
176
177         if d.get("class") == "DockerRequirement":
178             dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
179             d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
180
181         for s in d:
182             for field in ("location", "run", "name"):
183                 if field in d and isinstance(d[field], str):
184                     d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
185
186             for field in ("$include", "$import"):
187                 if field in d and isinstance(d[field], str):
188                     d[field] = rel_ref(d[field], baseuri, urlexpander, {})
189
190             basetypes = ("null", "boolean", "int", "long", "float", "double", "string", "File", "Directory")
191
192             if ("type" in d and
193                 isinstance(d["type"], str) and
194                 d["type"] not in basetypes):
195                 d["type"] = rel_ref(d["type"], baseuri, urlexpander, merged_map)
196
197             if "inputs" in d and isinstance(d["inputs"], MutableMapping):
198                 for inp in d["inputs"]:
199                     if isinstance(d["inputs"][inp], str) and d["inputs"][inp] not in basetypes:
200                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map)
201
202             if "$schemas" in d:
203                 for n, s in enumerate(d["$schemas"]):
204                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
205
206             update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
207
208 def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
209                         runtimeContext,
210                         uuid=None,
211                         submit_runner_ram=0, name=None, merged_map=None,
212                         submit_runner_image=None,
213                         git_info=None,
214                         set_defaults=False):
215
216     firstfile = None
217     workflow_files = set()
218     import_files = set()
219     include_files = set()
220
221     for w in tool.doc_loader.idx:
222         if w.startswith("file://"):
223             workflow_files.add(urllib.parse.urldefrag(w)[0])
224             if firstfile is None:
225                 firstfile = urllib.parse.urldefrag(w)[0]
226         if w.startswith("import:file://"):
227             import_files.add(urllib.parse.urldefrag(w[7:])[0])
228         if w.startswith("include:file://"):
229             include_files.add(urllib.parse.urldefrag(w[8:])[0])
230
231     all_files = workflow_files | import_files | include_files
232
233     n = 7
234     allmatch = True
235     if firstfile:
236         while allmatch:
237             n += 1
238             for f in all_files:
239                 if len(f)-1 < n:
240                     n -= 1
241                     allmatch = False
242                     break
243                 if f[n] != firstfile[n]:
244                     allmatch = False
245                     break
246
247         while firstfile[n] != "/":
248             n -= 1
249
250     col = arvados.collection.Collection(api_client=arvRunner.api)
251
252     #print(merged_map)
253
254     for w in workflow_files | import_files:
255         # 1. load YAML
256
257         text = tool.doc_loader.fetch_text(w)
258         if isinstance(text, bytes):
259             textIO = StringIO(text.decode('utf-8'))
260         else:
261             textIO = StringIO(text)
262
263         yamlloader = schema_salad.utils.yaml_no_ts()
264         result = yamlloader.load(textIO)
265
266         set_block_style = False
267         if result.fa.flow_style():
268             set_block_style = True
269
270         # 2. find $import, $include, $schema, run, location
271         # 3. update field value
272         update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style, runtimeContext, "", "")
273
274         with col.open(w[n+1:], "wt") as f:
275             #print(yamlloader.dump(result, stream=sys.stdout))
276             yamlloader.dump(result, stream=f)
277
278     for w in include_files:
279         with col.open(w[n+1:], "wb") as f1:
280             with open(uri_file_path(w), "rb") as f2:
281                 dat = f2.read(65536)
282                 while dat:
283                     f1.write(dat)
284                     dat = f2.read(65536)
285
286     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
287     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
288         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
289
290     toolfile = tool.tool["id"][n+1:]
291
292     properties = {
293         "type": "workflow",
294         "arv:workflowMain": toolfile,
295     }
296
297     if git_info:
298         for g in git_info:
299             p = g.split("#", 1)[1]
300             properties["arv:"+p] = git_info[g]
301
302     col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
303
304     adjustDirObjs(job_order, trim_listing)
305     adjustFileObjs(job_order, trim_anonymous_location)
306     adjustDirObjs(job_order, trim_anonymous_location)
307
308     # now construct the wrapper
309
310     step = {
311         "id": "#main/" + toolname,
312         "in": [],
313         "out": [],
314         "run": "keep:%s/%s" % (col.portable_data_hash(), toolfile),
315         "label": name
316     }
317
318     main = tool.tool
319
320     wf_runner_resources = None
321
322     hints = main.get("hints", [])
323     found = False
324     for h in hints:
325         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
326             wf_runner_resources = h
327             found = True
328             break
329     if not found:
330         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
331         hints.append(wf_runner_resources)
332
333     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
334                                                                   submit_runner_image or "arvados/jobs:"+__version__,
335                                                                   runtimeContext)
336
337     if submit_runner_ram:
338         wf_runner_resources["ramMin"] = submit_runner_ram
339
340     newinputs = []
341     for i in main["inputs"]:
342         inp = {}
343         # Make sure to only copy known fields that are meaningful at
344         # the workflow level. In practice this ensures that if we're
345         # wrapping a CommandLineTool we don't grab inputBinding.
346         # Right now also excludes extension fields, which is fine,
347         # Arvados doesn't currently look for any extension fields on
348         # input parameters.
349         for f in ("type", "label", "secondaryFiles", "streamable",
350                   "doc", "format", "loadContents",
351                   "loadListing", "default"):
352             if f in i:
353                 inp[f] = i[f]
354
355         if set_defaults:
356             sn = shortname(i["id"])
357             if sn in job_order:
358                 inp["default"] = job_order[sn]
359
360         inp["id"] = "#main/%s" % shortname(i["id"])
361         newinputs.append(inp)
362
363     wrapper = {
364         "class": "Workflow",
365         "id": "#main",
366         "inputs": newinputs,
367         "outputs": [],
368         "steps": [step]
369     }
370
371     for i in main["inputs"]:
372         step["in"].append({
373             "id": "#main/step/%s" % shortname(i["id"]),
374             "source": "#main/%s" % shortname(i["id"])
375         })
376
377     for i in main["outputs"]:
378         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
379         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
380                                    "type": i["type"],
381                                    "id": "#main/%s" % shortname(i["id"])})
382
383     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
384
385     if main.get("requirements"):
386         wrapper["requirements"].extend(main["requirements"])
387     if hints:
388         wrapper["hints"] = hints
389
390     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
391
392     if git_info:
393         for g in git_info:
394             doc[g] = git_info[g]
395
396     #print("MMM", main["id"])
397     #print(yamlloader.dump(wrapper, stream=sys.stdout))
398
399     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext, main["id"]+"#", "#main/")
400
401     #print(yamlloader.dump(wrapper, stream=sys.stdout))
402
403     return doc
404
405
406 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
407
408     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
409
410     body = {
411         "workflow": {
412             "name": name,
413             "description": tool.tool.get("doc", ""),
414             "definition": wrappertext
415         }}
416     if project_uuid:
417         body["workflow"]["owner_uuid"] = project_uuid
418
419     if update_uuid:
420         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
421     else:
422         call = arvRunner.api.workflows().create(body=body)
423     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
424
425
426 def upload_workflow(arvRunner, tool, job_order, project_uuid,
427                     runtimeContext, uuid=None,
428                     submit_runner_ram=0, name=None, merged_map=None,
429                     submit_runner_image=None,
430                     git_info=None):
431
432     packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext, git_info)
433
434     adjustDirObjs(job_order, trim_listing)
435     adjustFileObjs(job_order, trim_anonymous_location)
436     adjustDirObjs(job_order, trim_anonymous_location)
437
438     main = [p for p in packed["$graph"] if p["id"] == "#main"][0]
439     for inp in main["inputs"]:
440         sn = shortname(inp["id"])
441         if sn in job_order:
442             inp["default"] = job_order[sn]
443
444     if not name:
445         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
446
447     upload_dependencies(arvRunner, name, tool.doc_loader,
448                         packed, tool.tool["id"],
449                         runtimeContext)
450
451     wf_runner_resources = None
452
453     hints = main.get("hints", [])
454     found = False
455     for h in hints:
456         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
457             wf_runner_resources = h
458             found = True
459             break
460     if not found:
461         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
462         hints.append(wf_runner_resources)
463
464     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
465                                                                   submit_runner_image or "arvados/jobs:"+__version__,
466                                                                   runtimeContext)
467
468     if submit_runner_ram:
469         wf_runner_resources["ramMin"] = submit_runner_ram
470
471     main["hints"] = hints
472
473     wrapper = make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool)
474
475     body = {
476         "workflow": {
477             "name": name,
478             "description": tool.tool.get("doc", ""),
479             "definition": wrapper
480         }}
481     if project_uuid:
482         body["workflow"]["owner_uuid"] = project_uuid
483
484     if uuid:
485         call = arvRunner.api.workflows().update(uuid=uuid, body=body)
486     else:
487         call = arvRunner.api.workflows().create(body=body)
488     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
489
490 def dedup_reqs(reqs):
491     dedup = {}
492     for r in reversed(reqs):
493         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
494             dedup[r["class"]] = r
495     return [dedup[r] for r in sorted(dedup.keys())]
496
497 def get_overall_res_req(res_reqs):
498     """Take the overall of a list of ResourceRequirement,
499     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
500     and the sum of outdirMin, outdirMax."""
501
502     all_res_req = {}
503     exception_msgs = []
504     for a in max_res_pars + sum_res_pars:
505         all_res_req[a] = []
506         for res_req in res_reqs:
507             if a in res_req:
508                 if isinstance(res_req[a], int): # integer check
509                     all_res_req[a].append(res_req[a])
510                 else:
511                     msg = SourceLine(res_req, a).makeError(
512                     "Non-top-level ResourceRequirement in single container cannot have expressions")
513                     exception_msgs.append(msg)
514     if exception_msgs:
515         raise WorkflowException("\n".join(exception_msgs))
516     else:
517         overall_res_req = {}
518         for a in all_res_req:
519             if all_res_req[a]:
520                 if a in max_res_pars:
521                     overall_res_req[a] = max(all_res_req[a])
522                 elif a in sum_res_pars:
523                     overall_res_req[a] = sum(all_res_req[a])
524         if overall_res_req:
525             overall_res_req["class"] = "ResourceRequirement"
526         return cmap(overall_res_req)
527
528 class ArvadosWorkflowStep(WorkflowStep):
529     def __init__(self,
530                  toolpath_object,      # type: Dict[Text, Any]
531                  pos,                  # type: int
532                  loadingContext,       # type: LoadingContext
533                  arvrunner,
534                  *argc,
535                  **argv
536                 ):  # type: (...) -> None
537
538         if arvrunner.fast_submit:
539             self.tool = toolpath_object
540             self.tool["inputs"] = []
541             self.tool["outputs"] = []
542         else:
543             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
544             self.tool["class"] = "WorkflowStep"
545         self.arvrunner = arvrunner
546
547     def job(self, joborder, output_callback, runtimeContext):
548         runtimeContext = runtimeContext.copy()
549         runtimeContext.toplevel = True  # Preserve behavior for #13365
550
551         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
552                                runtimeContext, self.metadata)
553         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
554         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
555
556
557 class ArvadosWorkflow(Workflow):
558     """Wrap cwltool Workflow to override selected methods."""
559
560     def __init__(self, arvrunner, toolpath_object, loadingContext):
561         self.arvrunner = arvrunner
562         self.wf_pdh = None
563         self.dynamic_resource_req = []
564         self.static_resource_req = []
565         self.wf_reffiles = []
566         self.loadingContext = loadingContext
567         super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
568         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
569
570     def job(self, joborder, output_callback, runtimeContext):
571
572         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
573         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
574
575         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
576         if not req:
577             return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
578
579         # RunInSingleContainer is true
580
581         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
582             if "id" not in self.tool:
583                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
584
585         discover_secondary_files(self.arvrunner.fs_access, builder,
586                                  self.tool["inputs"], joborder)
587
588         normalizeFilesDirs(joborder)
589
590         with Perf(metrics, "subworkflow upload_deps"):
591             upload_dependencies(self.arvrunner,
592                                 os.path.basename(joborder.get("id", "#")),
593                                 self.doc_loader,
594                                 joborder,
595                                 joborder.get("id", "#"),
596                                 runtimeContext)
597
598             if self.wf_pdh is None:
599                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
600
601                 for p in packed["$graph"]:
602                     if p["id"] == "#main":
603                         p["requirements"] = dedup_reqs(self.requirements)
604                         p["hints"] = dedup_reqs(self.hints)
605
606                 def visit(item):
607                     if "requirements" in item:
608                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
609                     for t in ("hints", "requirements"):
610                         if t not in item:
611                             continue
612                         for req in item[t]:
613                             if req["class"] == "ResourceRequirement":
614                                 dyn = False
615                                 for k in max_res_pars + sum_res_pars:
616                                     if k in req:
617                                         if isinstance(req[k], basestring):
618                                             if item["id"] == "#main":
619                                                 # only the top-level requirements/hints may contain expressions
620                                                 self.dynamic_resource_req.append(req)
621                                                 dyn = True
622                                                 break
623                                             else:
624                                                 with SourceLine(req, k, WorkflowException):
625                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
626                                 if not dyn:
627                                     self.static_resource_req.append(req)
628
629                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
630
631                 if self.static_resource_req:
632                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
633
634                 upload_dependencies(self.arvrunner,
635                                     runtimeContext.name,
636                                     self.doc_loader,
637                                     packed,
638                                     self.tool["id"],
639                                     runtimeContext)
640
641                 # Discover files/directories referenced by the
642                 # workflow (mainly "default" values)
643                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
644
645
646         if self.dynamic_resource_req:
647             # Evaluate dynamic resource requirements using current builder
648             rs = copy.copy(self.static_resource_req)
649             for dyn_rs in self.dynamic_resource_req:
650                 eval_req = {"class": "ResourceRequirement"}
651                 for a in max_res_pars + sum_res_pars:
652                     if a in dyn_rs:
653                         eval_req[a] = builder.do_eval(dyn_rs[a])
654                 rs.append(eval_req)
655             job_res_reqs = [get_overall_res_req(rs)]
656         else:
657             job_res_reqs = self.static_resource_req
658
659         with Perf(metrics, "subworkflow adjust"):
660             joborder_resolved = copy.deepcopy(joborder)
661             joborder_keepmount = copy.deepcopy(joborder)
662
663             reffiles = []
664             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
665
666             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
667                                    "/keep/%s",
668                                    "/keep/%s/%s")
669
670             # For containers API, we need to make sure any extra
671             # referenced files (ie referenced by the workflow but
672             # not in the inputs) are included in the mounts.
673             if self.wf_reffiles:
674                 runtimeContext = runtimeContext.copy()
675                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
676
677             def keepmount(obj):
678                 remove_redundant_fields(obj)
679                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
680                     if "location" not in obj:
681                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
682                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
683                     if obj["location"].startswith("keep:"):
684                         obj["location"] = mapper.mapper(obj["location"]).target
685                         if "listing" in obj:
686                             del obj["listing"]
687                     elif obj["location"].startswith("_:"):
688                         del obj["location"]
689                     else:
690                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
691
692             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
693
694             def resolved(obj):
695                 if obj["location"].startswith("keep:"):
696                     obj["location"] = mapper.mapper(obj["location"]).resolved
697
698             visit_class(joborder_resolved, ("File", "Directory"), resolved)
699
700             if self.wf_pdh is None:
701                 adjustFileObjs(packed, keepmount)
702                 adjustDirObjs(packed, keepmount)
703                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
704
705         self.loadingContext = self.loadingContext.copy()
706         self.loadingContext.metadata = self.loadingContext.metadata.copy()
707         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
708
709         if len(job_res_reqs) == 1:
710             # RAM request needs to be at least 128 MiB or the workflow
711             # runner itself won't run reliably.
712             if job_res_reqs[0].get("ramMin", 1024) < 128:
713                 job_res_reqs[0]["ramMin"] = 128
714
715         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
716         if runtimeContext.debug:
717             arguments.insert(0, '--debug')
718
719         wf_runner = cmap({
720             "class": "CommandLineTool",
721             "baseCommand": "cwltool",
722             "inputs": self.tool["inputs"],
723             "outputs": self.tool["outputs"],
724             "stdout": "cwl.output.json",
725             "requirements": self.requirements+job_res_reqs+[
726                 {"class": "InlineJavascriptRequirement"},
727                 {
728                 "class": "InitialWorkDirRequirement",
729                 "listing": [{
730                         "entryname": "workflow.cwl",
731                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
732                     }, {
733                         "entryname": "cwl.input.yml",
734                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
735                     }]
736             }],
737             "hints": self.hints,
738             "arguments": arguments,
739             "id": "#"
740         })
741         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
742
743     def make_workflow_step(self,
744                            toolpath_object,      # type: Dict[Text, Any]
745                            pos,                  # type: int
746                            loadingContext,       # type: LoadingContext
747                            *argc,
748                            **argv
749     ):
750         # (...) -> WorkflowStep
751         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)