19385: Work in progress checkpoint, submitting uses wrappers
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15
16 from typing import (MutableSequence, MutableMapping)
17
18 from ruamel.yaml import YAML
19 from ruamel.yaml.comments import CommentedMap, CommentedSeq
20
21 from schema_salad.sourceline import SourceLine, cmap
22 import schema_salad.ref_resolver
23
24 import arvados.collection
25
26 from cwltool.pack import pack
27 from cwltool.load_tool import fetch_document, resolve_and_validate_document
28 from cwltool.process import shortname
29 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
30 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
31 from cwltool.context import LoadingContext
32
33 from schema_salad.ref_resolver import file_uri, uri_file_path
34
35 import ruamel.yaml as yaml
36
37 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
38                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
39                      make_builder, arvados_jobs_image)
40 from .pathmapper import ArvPathMapper, trim_listing
41 from .arvtool import ArvadosCommandTool, set_cluster_target
42 from ._version import __version__
43
44 from .perf import Perf
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48
49 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
50 sum_res_pars = ("outdirMin", "outdirMax")
51
52 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
53     col = arvados.collection.Collection(api_client=arvRunner.api,
54                                         keep_client=arvRunner.keep_client)
55
56     with col.open("workflow.json", "wt") as f:
57         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
58
59     pdh = col.portable_data_hash()
60
61     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
62     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
63         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
64
65     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
66     if len(existing["items"]) == 0:
67         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
68
69     # now construct the wrapper
70
71     step = {
72         "id": "#main/" + toolname,
73         "in": [],
74         "out": [],
75         "run": "keep:%s/workflow.json#main" % pdh,
76         "label": name
77     }
78
79     newinputs = []
80     for i in main["inputs"]:
81         inp = {}
82         # Make sure to only copy known fields that are meaningful at
83         # the workflow level. In practice this ensures that if we're
84         # wrapping a CommandLineTool we don't grab inputBinding.
85         # Right now also excludes extension fields, which is fine,
86         # Arvados doesn't currently look for any extension fields on
87         # input parameters.
88         for f in ("type", "label", "secondaryFiles", "streamable",
89                   "doc", "id", "format", "loadContents",
90                   "loadListing", "default"):
91             if f in i:
92                 inp[f] = i[f]
93         newinputs.append(inp)
94
95     wrapper = {
96         "class": "Workflow",
97         "id": "#main",
98         "inputs": newinputs,
99         "outputs": [],
100         "steps": [step]
101     }
102
103     for i in main["inputs"]:
104         step["in"].append({
105             "id": "#main/step/%s" % shortname(i["id"]),
106             "source": i["id"]
107         })
108
109     for i in main["outputs"]:
110         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
111         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
112                                    "type": i["type"],
113                                    "id": i["id"]})
114
115     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
116
117     if main.get("requirements"):
118         wrapper["requirements"].extend(main["requirements"])
119     if main.get("hints"):
120         wrapper["hints"] = main["hints"]
121
122     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
123
124     if git_info:
125         for g in git_info:
126             doc[g] = git_info[g]
127
128     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
129
130 def rel_ref(s, baseuri, urlexpander, merged_map):
131     uri = urlexpander(s, baseuri)
132     fileuri = urllib.parse.urldefrag(baseuri)[0]
133     if fileuri in merged_map:
134         replacements = merged_map[fileuri].resolved
135         if uri in replacements:
136             return replacements[uri]
137
138     if s.startswith("keep:"):
139         return s
140
141     p1 = os.path.dirname(uri_file_path(baseuri))
142     p2 = os.path.dirname(uri_file_path(uri))
143     p3 = os.path.basename(uri_file_path(uri))
144     r = os.path.relpath(p2, p1)
145     if r == ".":
146         r = ""
147     return os.path.join(r, p3)
148
149
150 def update_refs(d, baseuri, urlexpander, merged_map, set_block_style, runtimeContext):
151     if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
152         d.fa.set_block_style()
153
154     if isinstance(d, MutableSequence):
155         for s in d:
156             update_refs(s, baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
157     elif isinstance(d, MutableMapping):
158         if "id" in d:
159             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
160
161         if d.get("class") == "DockerRequirement":
162             dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
163             d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
164
165         for s in d:
166             for field in ("$include", "$import", "location", "run"):
167                 if field in d and isinstance(d[field], str):
168                     d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
169
170             if "$schemas" in d:
171                 for n, s in enumerate(d["$schemas"]):
172                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
173
174             update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
175
176 def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
177                         runtimeContext,
178                         uuid=None,
179                         submit_runner_ram=0, name=None, merged_map=None,
180                         submit_runner_image=None,
181                         git_info=None,
182                         set_defaults=False):
183
184     firstfile = None
185     workflow_files = set()
186     import_files = set()
187     include_files = set()
188
189     for w in tool.doc_loader.idx:
190         if w.startswith("file://"):
191             workflow_files.add(urllib.parse.urldefrag(w)[0])
192             if firstfile is None:
193                 firstfile = urllib.parse.urldefrag(w)[0]
194         if w.startswith("import:file://"):
195             import_files.add(urllib.parse.urldefrag(w[7:])[0])
196         if w.startswith("include:file://"):
197             include_files.add(urllib.parse.urldefrag(w[8:])[0])
198
199     all_files = workflow_files | import_files | include_files
200
201     n = 7
202     allmatch = True
203     while allmatch:
204         n += 1
205         for f in all_files:
206             if len(f)-1 < n:
207                 n -= 1
208                 allmatch = False
209                 break
210             if f[n] != firstfile[n]:
211                 allmatch = False
212                 break
213
214     while firstfile[n] != "/":
215         n -= 1
216
217     prefix = firstfile[:n+1]
218
219     col = arvados.collection.Collection()
220
221     for w in workflow_files | import_files:
222         # 1. load YAML
223
224         text = tool.doc_loader.fetch_text(w)
225         if isinstance(text, bytes):
226             textIO = StringIO(text.decode('utf-8'))
227         else:
228             textIO = StringIO(text)
229
230         yamlloader = schema_salad.utils.yaml_no_ts()
231         result = yamlloader.load(textIO)
232
233         set_block_style = False
234         if result.fa.flow_style():
235             set_block_style = True
236
237         # 2. find $import, $include, $schema, run, location
238         # 3. update field value
239         update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style, runtimeContext)
240
241         with col.open(w[n+1:], "wt") as f:
242             yamlloader.dump(result, stream=f)
243
244     for w in include_files:
245         with col.open(w[n+1:], "wb") as f1:
246             with open(uri_file_path(w), "rb") as f2:
247                 dat = f2.read(65536)
248                 while dat:
249                     f1.write(dat)
250                     dat = f2.read(65536)
251
252     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
253     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
254         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
255
256     toolfile = tool.tool["id"][n+1:]
257
258     properties = {
259         "type": "workflow",
260         "arv:workflowMain": toolfile,
261     }
262
263     col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
264
265     adjustDirObjs(job_order, trim_listing)
266     adjustFileObjs(job_order, trim_anonymous_location)
267     adjustDirObjs(job_order, trim_anonymous_location)
268
269     # now construct the wrapper
270
271     step = {
272         "id": "#main/" + toolname,
273         "in": [],
274         "out": [],
275         "run": "keep:%s/%s" % (col.portable_data_hash(), toolfile),
276         "label": name
277     }
278
279     main = tool.tool
280
281     wf_runner_resources = None
282
283     hints = main.get("hints", [])
284     found = False
285     for h in hints:
286         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
287             wf_runner_resources = h
288             found = True
289             break
290     if not found:
291         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
292         hints.append(wf_runner_resources)
293
294     # uncomment me
295     # wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
296     #                                                               submit_runner_image or "arvados/jobs:"+__version__,
297     #                                                               runtimeContext)
298
299     if submit_runner_ram:
300         wf_runner_resources["ramMin"] = submit_runner_ram
301
302     newinputs = []
303     for i in main["inputs"]:
304         inp = {}
305         # Make sure to only copy known fields that are meaningful at
306         # the workflow level. In practice this ensures that if we're
307         # wrapping a CommandLineTool we don't grab inputBinding.
308         # Right now also excludes extension fields, which is fine,
309         # Arvados doesn't currently look for any extension fields on
310         # input parameters.
311         for f in ("type", "label", "secondaryFiles", "streamable",
312                   "doc", "format", "loadContents",
313                   "loadListing", "default"):
314             if f in i:
315                 inp[f] = i[f]
316
317         if set_defaults:
318             sn = shortname(i["id"])
319             if sn in job_order:
320                 inp["default"] = job_order[sn]
321
322         inp["id"] = "#main/%s" % shortname(i["id"])
323         newinputs.append(inp)
324
325     wrapper = {
326         "class": "Workflow",
327         "id": "#main",
328         "inputs": newinputs,
329         "outputs": [],
330         "steps": [step]
331     }
332
333     for i in main["inputs"]:
334         step["in"].append({
335             "id": "#main/step/%s" % shortname(i["id"]),
336             "source": "#main/%s" % shortname(i["id"])
337         })
338
339     for i in main["outputs"]:
340         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
341         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
342                                    "type": i["type"],
343                                    "id": "#main/%s" % shortname(i["id"])})
344
345     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
346
347     if main.get("requirements"):
348         wrapper["requirements"].extend(main["requirements"])
349     if hints:
350         wrapper["hints"] = main["hints"]
351
352     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
353
354     if git_info:
355         for g in git_info:
356             doc[g] = git_info[g]
357
358     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext)
359
360     return doc
361
362
363 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
364
365     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
366
367     body = {
368         "workflow": {
369             "name": name,
370             "description": tool.tool.get("doc", ""),
371             "definition": wrappertext
372         }}
373     if project_uuid:
374         body["workflow"]["owner_uuid"] = project_uuid
375
376     if update_uuid:
377         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
378     else:
379         call = arvRunner.api.workflows().create(body=body)
380     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
381
382
383 def upload_workflow(arvRunner, tool, job_order, project_uuid,
384                     runtimeContext, uuid=None,
385                     submit_runner_ram=0, name=None, merged_map=None,
386                     submit_runner_image=None,
387                     git_info=None):
388
389     packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext, git_info)
390
391     adjustDirObjs(job_order, trim_listing)
392     adjustFileObjs(job_order, trim_anonymous_location)
393     adjustDirObjs(job_order, trim_anonymous_location)
394
395     main = [p for p in packed["$graph"] if p["id"] == "#main"][0]
396     for inp in main["inputs"]:
397         sn = shortname(inp["id"])
398         if sn in job_order:
399             inp["default"] = job_order[sn]
400
401     if not name:
402         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
403
404     upload_dependencies(arvRunner, name, tool.doc_loader,
405                         packed, tool.tool["id"],
406                         runtimeContext)
407
408     wf_runner_resources = None
409
410     hints = main.get("hints", [])
411     found = False
412     for h in hints:
413         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
414             wf_runner_resources = h
415             found = True
416             break
417     if not found:
418         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
419         hints.append(wf_runner_resources)
420
421     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
422                                                                   submit_runner_image or "arvados/jobs:"+__version__,
423                                                                   runtimeContext)
424
425     if submit_runner_ram:
426         wf_runner_resources["ramMin"] = submit_runner_ram
427
428     main["hints"] = hints
429
430     wrapper = make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool)
431
432     body = {
433         "workflow": {
434             "name": name,
435             "description": tool.tool.get("doc", ""),
436             "definition": wrapper
437         }}
438     if project_uuid:
439         body["workflow"]["owner_uuid"] = project_uuid
440
441     if uuid:
442         call = arvRunner.api.workflows().update(uuid=uuid, body=body)
443     else:
444         call = arvRunner.api.workflows().create(body=body)
445     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
446
447 def dedup_reqs(reqs):
448     dedup = {}
449     for r in reversed(reqs):
450         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
451             dedup[r["class"]] = r
452     return [dedup[r] for r in sorted(dedup.keys())]
453
454 def get_overall_res_req(res_reqs):
455     """Take the overall of a list of ResourceRequirement,
456     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
457     and the sum of outdirMin, outdirMax."""
458
459     all_res_req = {}
460     exception_msgs = []
461     for a in max_res_pars + sum_res_pars:
462         all_res_req[a] = []
463         for res_req in res_reqs:
464             if a in res_req:
465                 if isinstance(res_req[a], int): # integer check
466                     all_res_req[a].append(res_req[a])
467                 else:
468                     msg = SourceLine(res_req, a).makeError(
469                     "Non-top-level ResourceRequirement in single container cannot have expressions")
470                     exception_msgs.append(msg)
471     if exception_msgs:
472         raise WorkflowException("\n".join(exception_msgs))
473     else:
474         overall_res_req = {}
475         for a in all_res_req:
476             if all_res_req[a]:
477                 if a in max_res_pars:
478                     overall_res_req[a] = max(all_res_req[a])
479                 elif a in sum_res_pars:
480                     overall_res_req[a] = sum(all_res_req[a])
481         if overall_res_req:
482             overall_res_req["class"] = "ResourceRequirement"
483         return cmap(overall_res_req)
484
485 class ArvadosWorkflowStep(WorkflowStep):
486     def __init__(self,
487                  toolpath_object,      # type: Dict[Text, Any]
488                  pos,                  # type: int
489                  loadingContext,       # type: LoadingContext
490                  arvrunner,
491                  *argc,
492                  **argv
493                 ):  # type: (...) -> None
494
495         if arvrunner.fast_submit:
496             self.tool = toolpath_object
497             self.tool["inputs"] = []
498             self.tool["outputs"] = []
499         else:
500             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
501             self.tool["class"] = "WorkflowStep"
502         self.arvrunner = arvrunner
503
504     def job(self, joborder, output_callback, runtimeContext):
505         runtimeContext = runtimeContext.copy()
506         runtimeContext.toplevel = True  # Preserve behavior for #13365
507
508         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
509                                runtimeContext, self.metadata)
510         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
511         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
512
513
514 class ArvadosWorkflow(Workflow):
515     """Wrap cwltool Workflow to override selected methods."""
516
517     def __init__(self, arvrunner, toolpath_object, loadingContext):
518         self.arvrunner = arvrunner
519         self.wf_pdh = None
520         self.dynamic_resource_req = []
521         self.static_resource_req = []
522         self.wf_reffiles = []
523         self.loadingContext = loadingContext
524         super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
525         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
526
527     def job(self, joborder, output_callback, runtimeContext):
528
529         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
530         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
531
532         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
533         if not req:
534             return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
535
536         # RunInSingleContainer is true
537
538         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
539             if "id" not in self.tool:
540                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
541
542         discover_secondary_files(self.arvrunner.fs_access, builder,
543                                  self.tool["inputs"], joborder)
544         normalizeFilesDirs(joborder)
545
546         with Perf(metrics, "subworkflow upload_deps"):
547             upload_dependencies(self.arvrunner,
548                                 os.path.basename(joborder.get("id", "#")),
549                                 self.doc_loader,
550                                 joborder,
551                                 joborder.get("id", "#"),
552                                 runtimeContext)
553
554             if self.wf_pdh is None:
555                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
556
557                 for p in packed["$graph"]:
558                     if p["id"] == "#main":
559                         p["requirements"] = dedup_reqs(self.requirements)
560                         p["hints"] = dedup_reqs(self.hints)
561
562                 def visit(item):
563                     if "requirements" in item:
564                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
565                     for t in ("hints", "requirements"):
566                         if t not in item:
567                             continue
568                         for req in item[t]:
569                             if req["class"] == "ResourceRequirement":
570                                 dyn = False
571                                 for k in max_res_pars + sum_res_pars:
572                                     if k in req:
573                                         if isinstance(req[k], basestring):
574                                             if item["id"] == "#main":
575                                                 # only the top-level requirements/hints may contain expressions
576                                                 self.dynamic_resource_req.append(req)
577                                                 dyn = True
578                                                 break
579                                             else:
580                                                 with SourceLine(req, k, WorkflowException):
581                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
582                                 if not dyn:
583                                     self.static_resource_req.append(req)
584
585                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
586
587                 if self.static_resource_req:
588                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
589
590                 upload_dependencies(self.arvrunner,
591                                     runtimeContext.name,
592                                     self.doc_loader,
593                                     packed,
594                                     self.tool["id"],
595                                     runtimeContext)
596
597                 # Discover files/directories referenced by the
598                 # workflow (mainly "default" values)
599                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
600
601
602         if self.dynamic_resource_req:
603             # Evaluate dynamic resource requirements using current builder
604             rs = copy.copy(self.static_resource_req)
605             for dyn_rs in self.dynamic_resource_req:
606                 eval_req = {"class": "ResourceRequirement"}
607                 for a in max_res_pars + sum_res_pars:
608                     if a in dyn_rs:
609                         eval_req[a] = builder.do_eval(dyn_rs[a])
610                 rs.append(eval_req)
611             job_res_reqs = [get_overall_res_req(rs)]
612         else:
613             job_res_reqs = self.static_resource_req
614
615         with Perf(metrics, "subworkflow adjust"):
616             joborder_resolved = copy.deepcopy(joborder)
617             joborder_keepmount = copy.deepcopy(joborder)
618
619             reffiles = []
620             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
621
622             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
623                                    "/keep/%s",
624                                    "/keep/%s/%s")
625
626             # For containers API, we need to make sure any extra
627             # referenced files (ie referenced by the workflow but
628             # not in the inputs) are included in the mounts.
629             if self.wf_reffiles:
630                 runtimeContext = runtimeContext.copy()
631                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
632
633             def keepmount(obj):
634                 remove_redundant_fields(obj)
635                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
636                     if "location" not in obj:
637                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
638                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
639                     if obj["location"].startswith("keep:"):
640                         obj["location"] = mapper.mapper(obj["location"]).target
641                         if "listing" in obj:
642                             del obj["listing"]
643                     elif obj["location"].startswith("_:"):
644                         del obj["location"]
645                     else:
646                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
647
648             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
649
650             def resolved(obj):
651                 if obj["location"].startswith("keep:"):
652                     obj["location"] = mapper.mapper(obj["location"]).resolved
653
654             visit_class(joborder_resolved, ("File", "Directory"), resolved)
655
656             if self.wf_pdh is None:
657                 adjustFileObjs(packed, keepmount)
658                 adjustDirObjs(packed, keepmount)
659                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
660
661         self.loadingContext = self.loadingContext.copy()
662         self.loadingContext.metadata = self.loadingContext.metadata.copy()
663         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
664
665         if len(job_res_reqs) == 1:
666             # RAM request needs to be at least 128 MiB or the workflow
667             # runner itself won't run reliably.
668             if job_res_reqs[0].get("ramMin", 1024) < 128:
669                 job_res_reqs[0]["ramMin"] = 128
670
671         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
672         if runtimeContext.debug:
673             arguments.insert(0, '--debug')
674
675         wf_runner = cmap({
676             "class": "CommandLineTool",
677             "baseCommand": "cwltool",
678             "inputs": self.tool["inputs"],
679             "outputs": self.tool["outputs"],
680             "stdout": "cwl.output.json",
681             "requirements": self.requirements+job_res_reqs+[
682                 {"class": "InlineJavascriptRequirement"},
683                 {
684                 "class": "InitialWorkDirRequirement",
685                 "listing": [{
686                         "entryname": "workflow.cwl",
687                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
688                     }, {
689                         "entryname": "cwl.input.yml",
690                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
691                     }]
692             }],
693             "hints": self.hints,
694             "arguments": arguments,
695             "id": "#"
696         })
697         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
698
699     def make_workflow_step(self,
700                            toolpath_object,      # type: Dict[Text, Any]
701                            pos,                  # type: int
702                            loadingContext,       # type: LoadingContext
703                            *argc,
704                            **argv
705     ):
706         # (...) -> WorkflowStep
707         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)