20933: Copies collections reported by a-c-r
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15 import re
16
17 from typing import (MutableSequence, MutableMapping)
18
19 from ruamel.yaml import YAML
20 from ruamel.yaml.comments import CommentedMap, CommentedSeq
21
22 from schema_salad.sourceline import SourceLine, cmap
23 import schema_salad.ref_resolver
24
25 import arvados.collection
26
27 from cwltool.pack import pack
28 from cwltool.load_tool import fetch_document, resolve_and_validate_document
29 from cwltool.process import shortname, uniquename
30 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
31 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
32 from cwltool.context import LoadingContext, getdefault
33
34 from schema_salad.ref_resolver import file_uri, uri_file_path
35
36 import ruamel.yaml as yaml
37
38 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
39                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
40                      make_builder, arvados_jobs_image, FileUpdates)
41 from .pathmapper import ArvPathMapper, trim_listing
42 from .arvtool import ArvadosCommandTool, set_cluster_target
43 from ._version import __version__
44 from .util import common_prefix
45
46 from .perf import Perf
47
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50
51 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
52 sum_res_pars = ("outdirMin", "outdirMax")
53
54 _basetype_re = re.compile(r'''(?:
55 Directory
56 |File
57 |array
58 |boolean
59 |double
60 |enum
61 |float
62 |int
63 |long
64 |null
65 |record
66 |string
67 )(?:\[\])?\??''', re.VERBOSE)
68
69 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
70     col = arvados.collection.Collection(api_client=arvRunner.api,
71                                         keep_client=arvRunner.keep_client)
72
73     with col.open("workflow.json", "wt") as f:
74         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
75
76     pdh = col.portable_data_hash()
77
78     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
79     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
80         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
81
82     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
83     if len(existing["items"]) == 0:
84         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
85
86     # now construct the wrapper
87
88     step = {
89         "id": "#main/" + toolname,
90         "in": [],
91         "out": [],
92         "run": "keep:%s/workflow.json#main" % pdh,
93         "label": name
94     }
95
96     newinputs = []
97     for i in main["inputs"]:
98         inp = {}
99         # Make sure to only copy known fields that are meaningful at
100         # the workflow level. In practice this ensures that if we're
101         # wrapping a CommandLineTool we don't grab inputBinding.
102         # Right now also excludes extension fields, which is fine,
103         # Arvados doesn't currently look for any extension fields on
104         # input parameters.
105         for f in ("type", "label", "secondaryFiles", "streamable",
106                   "doc", "id", "format", "loadContents",
107                   "loadListing", "default"):
108             if f in i:
109                 inp[f] = i[f]
110         newinputs.append(inp)
111
112     wrapper = {
113         "class": "Workflow",
114         "id": "#main",
115         "inputs": newinputs,
116         "outputs": [],
117         "steps": [step]
118     }
119
120     for i in main["inputs"]:
121         step["in"].append({
122             "id": "#main/step/%s" % shortname(i["id"]),
123             "source": i["id"]
124         })
125
126     for i in main["outputs"]:
127         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
128         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
129                                    "type": i["type"],
130                                    "id": i["id"]})
131
132     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
133
134     if main.get("requirements"):
135         wrapper["requirements"].extend(main["requirements"])
136     if main.get("hints"):
137         wrapper["hints"] = main["hints"]
138
139     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
140
141     if git_info:
142         for g in git_info:
143             doc[g] = git_info[g]
144
145     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
146
147
148 def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
149     if s.startswith("keep:"):
150         return s
151
152     uri = urlexpander(s, baseuri)
153
154     if uri.startswith("keep:"):
155         return uri
156
157     fileuri = urllib.parse.urldefrag(baseuri)[0]
158
159     for u in (baseuri, fileuri):
160         if u in merged_map:
161             replacements = merged_map[u].resolved
162             if uri in replacements:
163                 return replacements[uri]
164
165     if uri in jobmapper:
166         return jobmapper.mapper(uri).target
167
168     p1 = os.path.dirname(uri_file_path(fileuri))
169     p2 = os.path.dirname(uri_file_path(uri))
170     p3 = os.path.basename(uri_file_path(uri))
171
172     r = os.path.relpath(p2, p1)
173     if r == ".":
174         r = ""
175
176     return os.path.join(r, p3)
177
178 def is_basetype(tp):
179     return _basetype_re.match(tp) is not None
180
181 def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix):
182     if isinstance(d, MutableSequence):
183         for i, s in enumerate(d):
184             if prefix and isinstance(s, str):
185                 if s.startswith(prefix):
186                     d[i] = replacePrefix+s[len(prefix):]
187             else:
188                 update_refs(s, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
189     elif isinstance(d, MutableMapping):
190         for field in ("id", "name"):
191             if isinstance(d.get(field), str) and d[field].startswith("_:"):
192                 # blank node reference, was added in automatically, can get rid of it.
193                 del d[field]
194
195         if "id" in d:
196             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
197         elif "name" in d and isinstance(d["name"], str):
198             baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
199
200         if d.get("class") == "DockerRequirement":
201             dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
202             d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
203
204         for field in d:
205             if field in ("location", "run", "name") and isinstance(d[field], str):
206                 d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map, jobmapper)
207                 continue
208
209             if field in ("$include", "$import") and isinstance(d[field], str):
210                 d[field] = rel_ref(d[field], baseuri, urlexpander, {}, jobmapper)
211                 continue
212
213             for t in ("type", "items"):
214                 if (field == t and
215                     isinstance(d[t], str) and
216                     not is_basetype(d[t])):
217                     d[t] = rel_ref(d[t], baseuri, urlexpander, merged_map, jobmapper)
218                     continue
219
220             if field == "inputs" and isinstance(d["inputs"], MutableMapping):
221                 for inp in d["inputs"]:
222                     if isinstance(d["inputs"][inp], str) and not is_basetype(d["inputs"][inp]):
223                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper)
224                     if isinstance(d["inputs"][inp], MutableMapping):
225                         update_refs(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
226                 continue
227
228             if field == "$schemas":
229                 for n, s in enumerate(d["$schemas"]):
230                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map, jobmapper)
231                 continue
232
233             update_refs(d[field], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
234
235
236 def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
237     req = copy.deepcopy(req)
238
239     for f in req["types"]:
240         r = f["name"]
241         path, frag = urllib.parse.urldefrag(r)
242         rel = rel_ref(r, baseuri, urlexpander, merged_map, jobmapper)
243         merged_map.setdefault(path, FileUpdates({}, {}))
244         rename = "keep:%s/%s" %(pdh, rel)
245         for mm in merged_map:
246             merged_map[mm].resolved[r] = rename
247     return req
248
249
250 def drop_ids(d):
251     if isinstance(d, MutableSequence):
252         for i, s in enumerate(d):
253             drop_ids(s)
254     elif isinstance(d, MutableMapping):
255         if "id" in d and d["id"].startswith("file:"):
256             del d["id"]
257
258         for field in d:
259             drop_ids(d[field])
260
261
262 def upload_workflow(arvRunner, tool, job_order, project_uuid,
263                         runtimeContext,
264                         uuid=None,
265                         submit_runner_ram=0, name=None, merged_map=None,
266                         submit_runner_image=None,
267                         git_info=None,
268                         set_defaults=False,
269                         jobmapper=None):
270
271     firstfile = None
272     workflow_files = set()
273     import_files = set()
274     include_files = set()
275
276     # The document loader index will have entries for all the files
277     # that were loaded in the process of parsing the entire workflow
278     # (including subworkflows, tools, imports, etc).  We use this to
279     # get compose a list of the workflow file dependencies.
280     for w in tool.doc_loader.idx:
281         if w.startswith("file://"):
282             workflow_files.add(urllib.parse.urldefrag(w)[0])
283             if firstfile is None:
284                 firstfile = urllib.parse.urldefrag(w)[0]
285         if w.startswith("import:file://"):
286             import_files.add(urllib.parse.urldefrag(w[7:])[0])
287         if w.startswith("include:file://"):
288             include_files.add(urllib.parse.urldefrag(w[8:])[0])
289
290     all_files = workflow_files | import_files | include_files
291
292     # Find the longest common prefix among all the file names.  We'll
293     # use this to recreate the directory structure in a keep
294     # collection with correct relative references.
295     prefix = common_prefix(firstfile, all_files) if firstfile else ""
296
297
298     col = arvados.collection.Collection(api_client=arvRunner.api)
299
300     # Now go through all the files and update references to other
301     # files.  We previously scanned for file dependencies, these are
302     # are passed in as merged_map.
303     #
304     # note about merged_map: we upload dependencies of each process
305     # object (CommandLineTool/Workflow) to a separate collection.
306     # That way, when the user edits something, this limits collection
307     # PDH changes to just that tool, and minimizes situations where
308     # small changes break container reuse for the whole workflow.
309     #
310     for w in workflow_files | import_files:
311         # 1. load the YAML  file
312
313         text = tool.doc_loader.fetch_text(w)
314         if isinstance(text, bytes):
315             textIO = StringIO(text.decode('utf-8'))
316         else:
317             textIO = StringIO(text)
318
319         yamlloader = schema_salad.utils.yaml_no_ts()
320         result = yamlloader.load(textIO)
321
322         # If the whole document is in "flow style" it is probably JSON
323         # formatted.  We'll re-export it as JSON because the
324         # ruamel.yaml round-trip mode is a lie and only preserves
325         # "block style" formatting and not "flow style" formatting.
326         export_as_json = result.fa.flow_style()
327
328         # 2. find $import, $include, $schema, run, location
329         # 3. update field value
330         update_refs(result, w, tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, "", "")
331
332         # Write the updated file to the collection.
333         with col.open(w[len(prefix):], "wt") as f:
334             if export_as_json:
335                 json.dump(result, f, indent=4, separators=(',',': '))
336             else:
337                 yamlloader.dump(result, stream=f)
338
339         # Also store a verbatim copy of the original files
340         with col.open(os.path.join("original", w[len(prefix):]), "wt") as f:
341             f.write(text)
342
343
344     # Upload files referenced by $include directives, these are used
345     # unchanged and don't need to be updated.
346     for w in include_files:
347         with col.open(w[len(prefix):], "wb") as f1:
348             with col.open(os.path.join("original", w[len(prefix):]), "wb") as f3:
349                 with open(uri_file_path(w), "rb") as f2:
350                     dat = f2.read(65536)
351                     while dat:
352                         f1.write(dat)
353                         f3.write(dat)
354                         dat = f2.read(65536)
355
356     # Now collect metadata: the collection name and git properties.
357
358     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
359     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
360         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
361
362     toolfile = tool.tool["id"][len(prefix):]
363
364     properties = {
365         "type": "workflow",
366         "arv:workflowMain": toolfile,
367     }
368
369     if git_info:
370         for g in git_info:
371             p = g.split("#", 1)[1]
372             properties["arv:"+p] = git_info[g]
373
374     # Check if a collection with the same content already exists in the target project.  If so, just use that one.
375     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
376                                                          ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
377
378     if len(existing["items"]) == 0:
379         toolname = toolname.replace("/", " ")
380         col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
381         logger.info("Workflow uploaded to %s", col.manifest_locator())
382     else:
383         logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
384
385     # Now that we've updated the workflow and saved it to a
386     # collection, we're going to construct a minimal "wrapper"
387     # workflow which consists of only of input and output parameters
388     # connected to a single step that runs the real workflow.
389
390     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
391
392     step = {
393         "id": "#main/" + toolname,
394         "in": [],
395         "out": [],
396         "run": runfile,
397         "label": name
398     }
399
400     main = tool.tool
401
402     wf_runner_resources = None
403
404     hints = main.get("hints", [])
405     found = False
406     for h in hints:
407         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
408             wf_runner_resources = h
409             found = True
410             break
411     if not found:
412         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
413         hints.append(wf_runner_resources)
414
415     if "acrContainerImage" not in wf_runner_resources:
416         wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
417                                                                       submit_runner_image or "arvados/jobs:"+__version__,
418                                                                       runtimeContext)
419
420     if submit_runner_ram:
421         wf_runner_resources["ramMin"] = submit_runner_ram
422
423     # Remove a few redundant fields from the "job order" (aka input
424     # object or input parameters).  In the situation where we're
425     # creating or updating a workflow record, any values in the job
426     # order get copied over as default values for input parameters.
427     adjustDirObjs(job_order, trim_listing)
428     adjustFileObjs(job_order, trim_anonymous_location)
429     adjustDirObjs(job_order, trim_anonymous_location)
430
431     newinputs = []
432     for i in main["inputs"]:
433         inp = {}
434         # Make sure to only copy known fields that are meaningful at
435         # the workflow level. In practice this ensures that if we're
436         # wrapping a CommandLineTool we don't grab inputBinding.
437         # Right now also excludes extension fields, which is fine,
438         # Arvados doesn't currently look for any extension fields on
439         # input parameters.
440         for f in ("type", "label", "secondaryFiles", "streamable",
441                   "doc", "format", "loadContents",
442                   "loadListing", "default"):
443             if f in i:
444                 inp[f] = i[f]
445
446         if set_defaults:
447             sn = shortname(i["id"])
448             if sn in job_order:
449                 inp["default"] = job_order[sn]
450
451         inp["id"] = "#main/%s" % shortname(i["id"])
452         newinputs.append(inp)
453
454     wrapper = {
455         "class": "Workflow",
456         "id": "#main",
457         "inputs": newinputs,
458         "outputs": [],
459         "steps": [step]
460     }
461
462     for i in main["inputs"]:
463         step["in"].append({
464             "id": "#main/step/%s" % shortname(i["id"]),
465             "source": "#main/%s" % shortname(i["id"])
466         })
467
468     for i in main["outputs"]:
469         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
470         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
471                                    "type": i["type"],
472                                    "id": "#main/%s" % shortname(i["id"])})
473
474     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
475
476     if main.get("requirements"):
477         wrapper["requirements"].extend(main["requirements"])
478     if hints:
479         wrapper["hints"] = hints
480
481     # Schema definitions (this lets you define things like record
482     # types) require a special handling.
483
484     for i, r in enumerate(wrapper["requirements"]):
485         if r["class"] == "SchemaDefRequirement":
486             wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
487
488     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, main["id"]+"#", "#main/")
489
490     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
491
492     if git_info:
493         for g in git_info:
494             doc[g] = git_info[g]
495
496     # Remove any lingering file references.
497     drop_ids(wrapper)
498
499     return doc
500
501
502 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
503
504     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
505
506     body = {
507         "workflow": {
508             "name": name,
509             "description": tool.tool.get("doc", ""),
510             "definition": wrappertext
511         }}
512     if project_uuid:
513         body["workflow"]["owner_uuid"] = project_uuid
514
515     if update_uuid:
516         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
517     else:
518         call = arvRunner.api.workflows().create(body=body)
519     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
520
521
522 def dedup_reqs(reqs):
523     dedup = {}
524     for r in reversed(reqs):
525         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
526             dedup[r["class"]] = r
527     return [dedup[r] for r in sorted(dedup.keys())]
528
529 def get_overall_res_req(res_reqs):
530     """Take the overall of a list of ResourceRequirement,
531     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
532     and the sum of outdirMin, outdirMax."""
533
534     all_res_req = {}
535     exception_msgs = []
536     for a in max_res_pars + sum_res_pars:
537         all_res_req[a] = []
538         for res_req in res_reqs:
539             if a in res_req:
540                 if isinstance(res_req[a], int): # integer check
541                     all_res_req[a].append(res_req[a])
542                 else:
543                     msg = SourceLine(res_req, a).makeError(
544                     "Non-top-level ResourceRequirement in single container cannot have expressions")
545                     exception_msgs.append(msg)
546     if exception_msgs:
547         raise WorkflowException("\n".join(exception_msgs))
548     else:
549         overall_res_req = {}
550         for a in all_res_req:
551             if all_res_req[a]:
552                 if a in max_res_pars:
553                     overall_res_req[a] = max(all_res_req[a])
554                 elif a in sum_res_pars:
555                     overall_res_req[a] = sum(all_res_req[a])
556         if overall_res_req:
557             overall_res_req["class"] = "ResourceRequirement"
558         return cmap(overall_res_req)
559
560 class ArvadosWorkflowStep(WorkflowStep):
561     def __init__(self,
562                  toolpath_object,      # type: Dict[Text, Any]
563                  pos,                  # type: int
564                  loadingContext,       # type: LoadingContext
565                  arvrunner,
566                  *argc,
567                  **argv
568                 ):  # type: (...) -> None
569
570         if arvrunner.fast_submit:
571             self.tool = toolpath_object
572             self.tool["inputs"] = []
573             self.tool["outputs"] = []
574         else:
575             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
576             self.tool["class"] = "WorkflowStep"
577         self.arvrunner = arvrunner
578
579     def job(self, joborder, output_callback, runtimeContext):
580         runtimeContext = runtimeContext.copy()
581         runtimeContext.toplevel = True  # Preserve behavior for #13365
582
583         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
584                                runtimeContext, self.metadata)
585         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
586         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
587
588
589 class ArvadosWorkflow(Workflow):
590     """Wrap cwltool Workflow to override selected methods."""
591
592     def __init__(self, arvrunner, toolpath_object, loadingContext):
593         self.arvrunner = arvrunner
594         self.wf_pdh = None
595         self.dynamic_resource_req = []
596         self.static_resource_req = []
597         self.wf_reffiles = []
598         self.loadingContext = loadingContext.copy()
599
600         self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
601         tool_requirements = toolpath_object.get("requirements", [])
602         self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
603         tool_hints = toolpath_object.get("hints", [])
604
605         workflow_runner_req, _ = self.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
606         if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
607             self.loadingContext.default_docker_image = workflow_runner_req.get("acrContainerImage")
608
609         super(ArvadosWorkflow, self).__init__(toolpath_object, self.loadingContext)
610         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
611
612     def job(self, joborder, output_callback, runtimeContext):
613
614         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
615         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
616
617         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
618         if not req:
619             return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
620
621         # RunInSingleContainer is true
622
623         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
624             if "id" not in self.tool:
625                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
626
627         discover_secondary_files(self.arvrunner.fs_access, builder,
628                                  self.tool["inputs"], joborder)
629
630         normalizeFilesDirs(joborder)
631
632         with Perf(metrics, "subworkflow upload_deps"):
633             upload_dependencies(self.arvrunner,
634                                 os.path.basename(joborder.get("id", "#")),
635                                 self.doc_loader,
636                                 joborder,
637                                 joborder.get("id", "#"),
638                                 runtimeContext)
639
640             if self.wf_pdh is None:
641                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
642
643                 for p in packed["$graph"]:
644                     if p["id"] == "#main":
645                         p["requirements"] = dedup_reqs(self.requirements)
646                         p["hints"] = dedup_reqs(self.hints)
647
648                 def visit(item):
649                     if "requirements" in item:
650                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
651                     for t in ("hints", "requirements"):
652                         if t not in item:
653                             continue
654                         for req in item[t]:
655                             if req["class"] == "ResourceRequirement":
656                                 dyn = False
657                                 for k in max_res_pars + sum_res_pars:
658                                     if k in req:
659                                         if isinstance(req[k], basestring):
660                                             if item["id"] == "#main":
661                                                 # only the top-level requirements/hints may contain expressions
662                                                 self.dynamic_resource_req.append(req)
663                                                 dyn = True
664                                                 break
665                                             else:
666                                                 with SourceLine(req, k, WorkflowException):
667                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
668                                 if not dyn:
669                                     self.static_resource_req.append(req)
670
671                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
672
673                 if self.static_resource_req:
674                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
675
676                 upload_dependencies(self.arvrunner,
677                                     runtimeContext.name,
678                                     self.doc_loader,
679                                     packed,
680                                     self.tool["id"],
681                                     runtimeContext)
682
683                 # Discover files/directories referenced by the
684                 # workflow (mainly "default" values)
685                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
686
687
688         if self.dynamic_resource_req:
689             # Evaluate dynamic resource requirements using current builder
690             rs = copy.copy(self.static_resource_req)
691             for dyn_rs in self.dynamic_resource_req:
692                 eval_req = {"class": "ResourceRequirement"}
693                 for a in max_res_pars + sum_res_pars:
694                     if a in dyn_rs:
695                         eval_req[a] = builder.do_eval(dyn_rs[a])
696                 rs.append(eval_req)
697             job_res_reqs = [get_overall_res_req(rs)]
698         else:
699             job_res_reqs = self.static_resource_req
700
701         with Perf(metrics, "subworkflow adjust"):
702             joborder_resolved = copy.deepcopy(joborder)
703             joborder_keepmount = copy.deepcopy(joborder)
704
705             reffiles = []
706             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
707
708             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
709                                    "/keep/%s",
710                                    "/keep/%s/%s")
711
712             # For containers API, we need to make sure any extra
713             # referenced files (ie referenced by the workflow but
714             # not in the inputs) are included in the mounts.
715             if self.wf_reffiles:
716                 runtimeContext = runtimeContext.copy()
717                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
718
719             def keepmount(obj):
720                 remove_redundant_fields(obj)
721                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
722                     if "location" not in obj:
723                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
724                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
725                     if obj["location"].startswith("keep:"):
726                         obj["location"] = mapper.mapper(obj["location"]).target
727                         if "listing" in obj:
728                             del obj["listing"]
729                     elif obj["location"].startswith("_:"):
730                         del obj["location"]
731                     else:
732                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
733
734             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
735
736             def resolved(obj):
737                 if obj["location"].startswith("keep:"):
738                     obj["location"] = mapper.mapper(obj["location"]).resolved
739
740             visit_class(joborder_resolved, ("File", "Directory"), resolved)
741
742             if self.wf_pdh is None:
743                 adjustFileObjs(packed, keepmount)
744                 adjustDirObjs(packed, keepmount)
745                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
746
747         self.loadingContext = self.loadingContext.copy()
748         self.loadingContext.metadata = self.loadingContext.metadata.copy()
749         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
750
751         if len(job_res_reqs) == 1:
752             # RAM request needs to be at least 128 MiB or the workflow
753             # runner itself won't run reliably.
754             if job_res_reqs[0].get("ramMin", 1024) < 128:
755                 job_res_reqs[0]["ramMin"] = 128
756
757         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
758         if runtimeContext.debug:
759             arguments.insert(0, '--debug')
760
761         wf_runner = cmap({
762             "class": "CommandLineTool",
763             "baseCommand": "cwltool",
764             "inputs": self.tool["inputs"],
765             "outputs": self.tool["outputs"],
766             "stdout": "cwl.output.json",
767             "requirements": self.requirements+job_res_reqs+[
768                 {"class": "InlineJavascriptRequirement"},
769                 {
770                 "class": "InitialWorkDirRequirement",
771                 "listing": [{
772                         "entryname": "workflow.cwl",
773                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
774                     }, {
775                         "entryname": "cwl.input.yml",
776                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
777                     }]
778             }],
779             "hints": self.hints,
780             "arguments": arguments,
781             "id": "#"
782         })
783         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
784
785     def make_workflow_step(self,
786                            toolpath_object,      # type: Dict[Text, Any]
787                            pos,                  # type: int
788                            loadingContext,       # type: LoadingContext
789                            *argc,
790                            **argv
791     ):
792         # (...) -> WorkflowStep
793         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)