19385: Fix submit secrets test
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15
16 from typing import (MutableSequence, MutableMapping)
17
18 from ruamel.yaml import YAML
19 from ruamel.yaml.comments import CommentedMap, CommentedSeq
20
21 from schema_salad.sourceline import SourceLine, cmap
22 import schema_salad.ref_resolver
23
24 import arvados.collection
25
26 from cwltool.pack import pack
27 from cwltool.load_tool import fetch_document, resolve_and_validate_document
28 from cwltool.process import shortname
29 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
30 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
31 from cwltool.context import LoadingContext
32
33 from schema_salad.ref_resolver import file_uri, uri_file_path
34
35 import ruamel.yaml as yaml
36
37 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
38                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
39                      make_builder, arvados_jobs_image)
40 from .pathmapper import ArvPathMapper, trim_listing
41 from .arvtool import ArvadosCommandTool, set_cluster_target
42 from ._version import __version__
43
44 from .perf import Perf
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48
49 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
50 sum_res_pars = ("outdirMin", "outdirMax")
51
52 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
53     col = arvados.collection.Collection(api_client=arvRunner.api,
54                                         keep_client=arvRunner.keep_client)
55
56     with col.open("workflow.json", "wt") as f:
57         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
58
59     pdh = col.portable_data_hash()
60
61     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
62     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
63         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
64
65     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
66     if len(existing["items"]) == 0:
67         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
68
69     # now construct the wrapper
70
71     step = {
72         "id": "#main/" + toolname,
73         "in": [],
74         "out": [],
75         "run": "keep:%s/workflow.json#main" % pdh,
76         "label": name
77     }
78
79     newinputs = []
80     for i in main["inputs"]:
81         inp = {}
82         # Make sure to only copy known fields that are meaningful at
83         # the workflow level. In practice this ensures that if we're
84         # wrapping a CommandLineTool we don't grab inputBinding.
85         # Right now also excludes extension fields, which is fine,
86         # Arvados doesn't currently look for any extension fields on
87         # input parameters.
88         for f in ("type", "label", "secondaryFiles", "streamable",
89                   "doc", "id", "format", "loadContents",
90                   "loadListing", "default"):
91             if f in i:
92                 inp[f] = i[f]
93         newinputs.append(inp)
94
95     wrapper = {
96         "class": "Workflow",
97         "id": "#main",
98         "inputs": newinputs,
99         "outputs": [],
100         "steps": [step]
101     }
102
103     for i in main["inputs"]:
104         step["in"].append({
105             "id": "#main/step/%s" % shortname(i["id"]),
106             "source": i["id"]
107         })
108
109     for i in main["outputs"]:
110         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
111         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
112                                    "type": i["type"],
113                                    "id": i["id"]})
114
115     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
116
117     if main.get("requirements"):
118         wrapper["requirements"].extend(main["requirements"])
119     if main.get("hints"):
120         wrapper["hints"] = main["hints"]
121
122     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
123
124     if git_info:
125         for g in git_info:
126             doc[g] = git_info[g]
127
128     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
129
130 def rel_ref(s, baseuri, urlexpander, merged_map):
131     uri = urlexpander(s, baseuri)
132     fileuri = urllib.parse.urldefrag(baseuri)[0]
133     if fileuri in merged_map:
134         replacements = merged_map[fileuri].resolved
135         if uri in replacements:
136             return replacements[uri]
137
138     if s.startswith("keep:"):
139         return s
140
141     p1 = os.path.dirname(uri_file_path(baseuri))
142     p2 = os.path.dirname(uri_file_path(uri))
143     p3 = os.path.basename(uri_file_path(uri))
144     r = os.path.relpath(p2, p1)
145     if r == ".":
146         r = ""
147     return os.path.join(r, p3)
148
149
150 def update_refs(d, baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix):
151     if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
152         d.fa.set_block_style()
153
154     if isinstance(d, MutableSequence):
155         for i, s in enumerate(d):
156             if prefix and isinstance(s, str):
157                 if s.startswith(prefix):
158                     d[i] = replacePrefix+s[len(prefix)+1:]
159             else:
160                 update_refs(s, baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
161     elif isinstance(d, MutableMapping):
162         if "id" in d:
163             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
164
165         if d.get("class") == "DockerRequirement":
166             dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
167             d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
168
169         for s in d:
170             for field in ("$include", "$import", "location", "run"):
171                 if field in d and isinstance(d[field], str):
172                     d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
173
174             if "$schemas" in d:
175                 for n, s in enumerate(d["$schemas"]):
176                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
177
178             update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style, runtimeContext, prefix, replacePrefix)
179
180 def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
181                         runtimeContext,
182                         uuid=None,
183                         submit_runner_ram=0, name=None, merged_map=None,
184                         submit_runner_image=None,
185                         git_info=None,
186                         set_defaults=False):
187
188     firstfile = None
189     workflow_files = set()
190     import_files = set()
191     include_files = set()
192
193     for w in tool.doc_loader.idx:
194         if w.startswith("file://"):
195             workflow_files.add(urllib.parse.urldefrag(w)[0])
196             if firstfile is None:
197                 firstfile = urllib.parse.urldefrag(w)[0]
198         if w.startswith("import:file://"):
199             import_files.add(urllib.parse.urldefrag(w[7:])[0])
200         if w.startswith("include:file://"):
201             include_files.add(urllib.parse.urldefrag(w[8:])[0])
202
203     all_files = workflow_files | import_files | include_files
204
205     n = 7
206     allmatch = True
207     while allmatch:
208         n += 1
209         for f in all_files:
210             if len(f)-1 < n:
211                 n -= 1
212                 allmatch = False
213                 break
214             if f[n] != firstfile[n]:
215                 allmatch = False
216                 break
217
218     while firstfile[n] != "/":
219         n -= 1
220
221     prefix = firstfile[:n+1]
222
223     col = arvados.collection.Collection(api_client=arvRunner.api)
224
225     for w in workflow_files | import_files:
226         # 1. load YAML
227
228         text = tool.doc_loader.fetch_text(w)
229         if isinstance(text, bytes):
230             textIO = StringIO(text.decode('utf-8'))
231         else:
232             textIO = StringIO(text)
233
234         yamlloader = schema_salad.utils.yaml_no_ts()
235         result = yamlloader.load(textIO)
236
237         set_block_style = False
238         if result.fa.flow_style():
239             set_block_style = True
240
241         # 2. find $import, $include, $schema, run, location
242         # 3. update field value
243         update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style, runtimeContext, "", "")
244
245         with col.open(w[n+1:], "wt") as f:
246             #print(yamlloader.dump(result, stream=sys.stdout))
247             yamlloader.dump(result, stream=f)
248
249     for w in include_files:
250         with col.open(w[n+1:], "wb") as f1:
251             with open(uri_file_path(w), "rb") as f2:
252                 dat = f2.read(65536)
253                 while dat:
254                     f1.write(dat)
255                     dat = f2.read(65536)
256
257     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
258     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
259         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
260
261     toolfile = tool.tool["id"][n+1:]
262
263     properties = {
264         "type": "workflow",
265         "arv:workflowMain": toolfile,
266     }
267
268     if git_info:
269         for g in git_info:
270             p = g.split("#", 1)[1]
271             properties["arv:"+p] = git_info[g]
272
273     col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
274
275     adjustDirObjs(job_order, trim_listing)
276     adjustFileObjs(job_order, trim_anonymous_location)
277     adjustDirObjs(job_order, trim_anonymous_location)
278
279     # now construct the wrapper
280
281     step = {
282         "id": "#main/" + toolname,
283         "in": [],
284         "out": [],
285         "run": "keep:%s/%s" % (col.portable_data_hash(), toolfile),
286         "label": name
287     }
288
289     main = tool.tool
290
291     wf_runner_resources = None
292
293     hints = main.get("hints", [])
294     found = False
295     for h in hints:
296         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
297             wf_runner_resources = h
298             found = True
299             break
300     if not found:
301         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
302         hints.append(wf_runner_resources)
303
304     # uncomment me
305     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
306                                                                   submit_runner_image or "arvados/jobs:"+__version__,
307                                                                   runtimeContext)
308
309     if submit_runner_ram:
310         wf_runner_resources["ramMin"] = submit_runner_ram
311
312     newinputs = []
313     for i in main["inputs"]:
314         inp = {}
315         # Make sure to only copy known fields that are meaningful at
316         # the workflow level. In practice this ensures that if we're
317         # wrapping a CommandLineTool we don't grab inputBinding.
318         # Right now also excludes extension fields, which is fine,
319         # Arvados doesn't currently look for any extension fields on
320         # input parameters.
321         for f in ("type", "label", "secondaryFiles", "streamable",
322                   "doc", "format", "loadContents",
323                   "loadListing", "default"):
324             if f in i:
325                 inp[f] = i[f]
326
327         if set_defaults:
328             sn = shortname(i["id"])
329             if sn in job_order:
330                 inp["default"] = job_order[sn]
331
332         inp["id"] = "#main/%s" % shortname(i["id"])
333         newinputs.append(inp)
334
335     wrapper = {
336         "class": "Workflow",
337         "id": "#main",
338         "inputs": newinputs,
339         "outputs": [],
340         "steps": [step]
341     }
342
343     for i in main["inputs"]:
344         step["in"].append({
345             "id": "#main/step/%s" % shortname(i["id"]),
346             "source": "#main/%s" % shortname(i["id"])
347         })
348
349     for i in main["outputs"]:
350         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
351         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
352                                    "type": i["type"],
353                                    "id": "#main/%s" % shortname(i["id"])})
354
355     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
356
357     if main.get("requirements"):
358         wrapper["requirements"].extend(main["requirements"])
359     if hints:
360         wrapper["hints"] = hints
361
362     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
363
364     if git_info:
365         for g in git_info:
366             doc[g] = git_info[g]
367
368     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext, main["id"], "#main/")
369
370     return doc
371
372
373 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
374
375     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
376
377     body = {
378         "workflow": {
379             "name": name,
380             "description": tool.tool.get("doc", ""),
381             "definition": wrappertext
382         }}
383     if project_uuid:
384         body["workflow"]["owner_uuid"] = project_uuid
385
386     if update_uuid:
387         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
388     else:
389         call = arvRunner.api.workflows().create(body=body)
390     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
391
392
393 def upload_workflow(arvRunner, tool, job_order, project_uuid,
394                     runtimeContext, uuid=None,
395                     submit_runner_ram=0, name=None, merged_map=None,
396                     submit_runner_image=None,
397                     git_info=None):
398
399     packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext, git_info)
400
401     adjustDirObjs(job_order, trim_listing)
402     adjustFileObjs(job_order, trim_anonymous_location)
403     adjustDirObjs(job_order, trim_anonymous_location)
404
405     main = [p for p in packed["$graph"] if p["id"] == "#main"][0]
406     for inp in main["inputs"]:
407         sn = shortname(inp["id"])
408         if sn in job_order:
409             inp["default"] = job_order[sn]
410
411     if not name:
412         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
413
414     upload_dependencies(arvRunner, name, tool.doc_loader,
415                         packed, tool.tool["id"],
416                         runtimeContext)
417
418     wf_runner_resources = None
419
420     hints = main.get("hints", [])
421     found = False
422     for h in hints:
423         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
424             wf_runner_resources = h
425             found = True
426             break
427     if not found:
428         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
429         hints.append(wf_runner_resources)
430
431     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
432                                                                   submit_runner_image or "arvados/jobs:"+__version__,
433                                                                   runtimeContext)
434
435     if submit_runner_ram:
436         wf_runner_resources["ramMin"] = submit_runner_ram
437
438     main["hints"] = hints
439
440     wrapper = make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool)
441
442     body = {
443         "workflow": {
444             "name": name,
445             "description": tool.tool.get("doc", ""),
446             "definition": wrapper
447         }}
448     if project_uuid:
449         body["workflow"]["owner_uuid"] = project_uuid
450
451     if uuid:
452         call = arvRunner.api.workflows().update(uuid=uuid, body=body)
453     else:
454         call = arvRunner.api.workflows().create(body=body)
455     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
456
457 def dedup_reqs(reqs):
458     dedup = {}
459     for r in reversed(reqs):
460         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
461             dedup[r["class"]] = r
462     return [dedup[r] for r in sorted(dedup.keys())]
463
464 def get_overall_res_req(res_reqs):
465     """Take the overall of a list of ResourceRequirement,
466     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
467     and the sum of outdirMin, outdirMax."""
468
469     all_res_req = {}
470     exception_msgs = []
471     for a in max_res_pars + sum_res_pars:
472         all_res_req[a] = []
473         for res_req in res_reqs:
474             if a in res_req:
475                 if isinstance(res_req[a], int): # integer check
476                     all_res_req[a].append(res_req[a])
477                 else:
478                     msg = SourceLine(res_req, a).makeError(
479                     "Non-top-level ResourceRequirement in single container cannot have expressions")
480                     exception_msgs.append(msg)
481     if exception_msgs:
482         raise WorkflowException("\n".join(exception_msgs))
483     else:
484         overall_res_req = {}
485         for a in all_res_req:
486             if all_res_req[a]:
487                 if a in max_res_pars:
488                     overall_res_req[a] = max(all_res_req[a])
489                 elif a in sum_res_pars:
490                     overall_res_req[a] = sum(all_res_req[a])
491         if overall_res_req:
492             overall_res_req["class"] = "ResourceRequirement"
493         return cmap(overall_res_req)
494
495 class ArvadosWorkflowStep(WorkflowStep):
496     def __init__(self,
497                  toolpath_object,      # type: Dict[Text, Any]
498                  pos,                  # type: int
499                  loadingContext,       # type: LoadingContext
500                  arvrunner,
501                  *argc,
502                  **argv
503                 ):  # type: (...) -> None
504
505         if arvrunner.fast_submit:
506             self.tool = toolpath_object
507             self.tool["inputs"] = []
508             self.tool["outputs"] = []
509         else:
510             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
511             self.tool["class"] = "WorkflowStep"
512         self.arvrunner = arvrunner
513
514     def job(self, joborder, output_callback, runtimeContext):
515         runtimeContext = runtimeContext.copy()
516         runtimeContext.toplevel = True  # Preserve behavior for #13365
517
518         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
519                                runtimeContext, self.metadata)
520         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
521         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
522
523
524 class ArvadosWorkflow(Workflow):
525     """Wrap cwltool Workflow to override selected methods."""
526
527     def __init__(self, arvrunner, toolpath_object, loadingContext):
528         self.arvrunner = arvrunner
529         self.wf_pdh = None
530         self.dynamic_resource_req = []
531         self.static_resource_req = []
532         self.wf_reffiles = []
533         self.loadingContext = loadingContext
534         super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
535         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
536
537     def job(self, joborder, output_callback, runtimeContext):
538
539         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
540         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
541
542         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
543         if not req:
544             return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
545
546         # RunInSingleContainer is true
547
548         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
549             if "id" not in self.tool:
550                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
551
552         discover_secondary_files(self.arvrunner.fs_access, builder,
553                                  self.tool["inputs"], joborder)
554
555         normalizeFilesDirs(joborder)
556
557         with Perf(metrics, "subworkflow upload_deps"):
558             upload_dependencies(self.arvrunner,
559                                 os.path.basename(joborder.get("id", "#")),
560                                 self.doc_loader,
561                                 joborder,
562                                 joborder.get("id", "#"),
563                                 runtimeContext)
564
565             if self.wf_pdh is None:
566                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
567
568                 for p in packed["$graph"]:
569                     if p["id"] == "#main":
570                         p["requirements"] = dedup_reqs(self.requirements)
571                         p["hints"] = dedup_reqs(self.hints)
572
573                 def visit(item):
574                     if "requirements" in item:
575                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
576                     for t in ("hints", "requirements"):
577                         if t not in item:
578                             continue
579                         for req in item[t]:
580                             if req["class"] == "ResourceRequirement":
581                                 dyn = False
582                                 for k in max_res_pars + sum_res_pars:
583                                     if k in req:
584                                         if isinstance(req[k], basestring):
585                                             if item["id"] == "#main":
586                                                 # only the top-level requirements/hints may contain expressions
587                                                 self.dynamic_resource_req.append(req)
588                                                 dyn = True
589                                                 break
590                                             else:
591                                                 with SourceLine(req, k, WorkflowException):
592                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
593                                 if not dyn:
594                                     self.static_resource_req.append(req)
595
596                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
597
598                 if self.static_resource_req:
599                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
600
601                 upload_dependencies(self.arvrunner,
602                                     runtimeContext.name,
603                                     self.doc_loader,
604                                     packed,
605                                     self.tool["id"],
606                                     runtimeContext)
607
608                 # Discover files/directories referenced by the
609                 # workflow (mainly "default" values)
610                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
611
612
613         if self.dynamic_resource_req:
614             # Evaluate dynamic resource requirements using current builder
615             rs = copy.copy(self.static_resource_req)
616             for dyn_rs in self.dynamic_resource_req:
617                 eval_req = {"class": "ResourceRequirement"}
618                 for a in max_res_pars + sum_res_pars:
619                     if a in dyn_rs:
620                         eval_req[a] = builder.do_eval(dyn_rs[a])
621                 rs.append(eval_req)
622             job_res_reqs = [get_overall_res_req(rs)]
623         else:
624             job_res_reqs = self.static_resource_req
625
626         with Perf(metrics, "subworkflow adjust"):
627             joborder_resolved = copy.deepcopy(joborder)
628             joborder_keepmount = copy.deepcopy(joborder)
629
630             reffiles = []
631             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
632
633             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
634                                    "/keep/%s",
635                                    "/keep/%s/%s")
636
637             # For containers API, we need to make sure any extra
638             # referenced files (ie referenced by the workflow but
639             # not in the inputs) are included in the mounts.
640             if self.wf_reffiles:
641                 runtimeContext = runtimeContext.copy()
642                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
643
644             def keepmount(obj):
645                 remove_redundant_fields(obj)
646                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
647                     if "location" not in obj:
648                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
649                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
650                     if obj["location"].startswith("keep:"):
651                         obj["location"] = mapper.mapper(obj["location"]).target
652                         if "listing" in obj:
653                             del obj["listing"]
654                     elif obj["location"].startswith("_:"):
655                         del obj["location"]
656                     else:
657                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
658
659             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
660
661             def resolved(obj):
662                 if obj["location"].startswith("keep:"):
663                     obj["location"] = mapper.mapper(obj["location"]).resolved
664
665             visit_class(joborder_resolved, ("File", "Directory"), resolved)
666
667             if self.wf_pdh is None:
668                 adjustFileObjs(packed, keepmount)
669                 adjustDirObjs(packed, keepmount)
670                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
671
672         self.loadingContext = self.loadingContext.copy()
673         self.loadingContext.metadata = self.loadingContext.metadata.copy()
674         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
675
676         if len(job_res_reqs) == 1:
677             # RAM request needs to be at least 128 MiB or the workflow
678             # runner itself won't run reliably.
679             if job_res_reqs[0].get("ramMin", 1024) < 128:
680                 job_res_reqs[0]["ramMin"] = 128
681
682         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
683         if runtimeContext.debug:
684             arguments.insert(0, '--debug')
685
686         wf_runner = cmap({
687             "class": "CommandLineTool",
688             "baseCommand": "cwltool",
689             "inputs": self.tool["inputs"],
690             "outputs": self.tool["outputs"],
691             "stdout": "cwl.output.json",
692             "requirements": self.requirements+job_res_reqs+[
693                 {"class": "InlineJavascriptRequirement"},
694                 {
695                 "class": "InitialWorkDirRequirement",
696                 "listing": [{
697                         "entryname": "workflow.cwl",
698                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
699                     }, {
700                         "entryname": "cwl.input.yml",
701                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
702                     }]
703             }],
704             "hints": self.hints,
705             "arguments": arguments,
706             "id": "#"
707         })
708         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
709
710     def make_workflow_step(self,
711                            toolpath_object,      # type: Dict[Text, Any]
712                            pos,                  # type: int
713                            loadingContext,       # type: LoadingContext
714                            *argc,
715                            **argv
716     ):
717         # (...) -> WorkflowStep
718         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)