21023: Use known-good exponential backoff with jitter strategy.
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15 import re
16
17 from typing import (MutableSequence, MutableMapping)
18
19 from ruamel.yaml import YAML
20 from ruamel.yaml.comments import CommentedMap, CommentedSeq
21
22 from schema_salad.sourceline import SourceLine, cmap
23 import schema_salad.ref_resolver
24
25 import arvados.collection
26
27 from cwltool.pack import pack
28 from cwltool.load_tool import fetch_document, resolve_and_validate_document
29 from cwltool.process import shortname, uniquename
30 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
31 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
32 from cwltool.context import LoadingContext, getdefault
33
34 from schema_salad.ref_resolver import file_uri, uri_file_path
35
36 import ruamel.yaml as yaml
37
38 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
39                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
40                      make_builder, arvados_jobs_image, FileUpdates)
41 from .arvcontainer import RunnerContainer
42 from .pathmapper import ArvPathMapper, trim_listing
43 from .arvtool import ArvadosCommandTool, set_cluster_target
44 from ._version import __version__
45 from .util import common_prefix
46 from .arvdocker import arv_docker_get_image
47
48 from .perf import Perf
49
50 logger = logging.getLogger('arvados.cwl-runner')
51 metrics = logging.getLogger('arvados.cwl-runner.metrics')
52
53 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
54 sum_res_pars = ("outdirMin", "outdirMax")
55
56 _basetype_re = re.compile(r'''(?:
57 Directory
58 |File
59 |array
60 |boolean
61 |double
62 |enum
63 |float
64 |int
65 |long
66 |null
67 |record
68 |string
69 )(?:\[\])?\??''', re.VERBOSE)
70
71 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
72     col = arvados.collection.Collection(api_client=arvRunner.api,
73                                         keep_client=arvRunner.keep_client)
74
75     with col.open("workflow.json", "wt") as f:
76         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
77
78     pdh = col.portable_data_hash()
79
80     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
81     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
82         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
83
84     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
85     if len(existing["items"]) == 0:
86         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
87
88     # now construct the wrapper
89
90     step = {
91         "id": "#main/" + toolname,
92         "in": [],
93         "out": [],
94         "run": "keep:%s/workflow.json#main" % pdh,
95         "label": name
96     }
97
98     newinputs = []
99     for i in main["inputs"]:
100         inp = {}
101         # Make sure to only copy known fields that are meaningful at
102         # the workflow level. In practice this ensures that if we're
103         # wrapping a CommandLineTool we don't grab inputBinding.
104         # Right now also excludes extension fields, which is fine,
105         # Arvados doesn't currently look for any extension fields on
106         # input parameters.
107         for f in ("type", "label", "secondaryFiles", "streamable",
108                   "doc", "id", "format", "loadContents",
109                   "loadListing", "default"):
110             if f in i:
111                 inp[f] = i[f]
112         newinputs.append(inp)
113
114     wrapper = {
115         "class": "Workflow",
116         "id": "#main",
117         "inputs": newinputs,
118         "outputs": [],
119         "steps": [step]
120     }
121
122     for i in main["inputs"]:
123         step["in"].append({
124             "id": "#main/step/%s" % shortname(i["id"]),
125             "source": i["id"]
126         })
127
128     for i in main["outputs"]:
129         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
130         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
131                                    "type": i["type"],
132                                    "id": i["id"]})
133
134     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
135
136     if main.get("requirements"):
137         wrapper["requirements"].extend(main["requirements"])
138     if main.get("hints"):
139         wrapper["hints"] = main["hints"]
140
141     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
142
143     if git_info:
144         for g in git_info:
145             doc[g] = git_info[g]
146
147     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
148
149
150 def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
151     if s.startswith("keep:") or s.startswith("arvwf:"):
152         return s
153
154     uri = urlexpander(s, baseuri)
155
156     if uri.startswith("keep:"):
157         return uri
158
159     fileuri = urllib.parse.urldefrag(baseuri)[0]
160
161     for u in (baseuri, fileuri):
162         if u in merged_map:
163             replacements = merged_map[u].resolved
164             if uri in replacements:
165                 return replacements[uri]
166
167     if uri in jobmapper:
168         return jobmapper.mapper(uri).target
169
170     p1 = os.path.dirname(uri_file_path(fileuri))
171     p2 = os.path.dirname(uri_file_path(uri))
172     p3 = os.path.basename(uri_file_path(uri))
173
174     r = os.path.relpath(p2, p1)
175     if r == ".":
176         r = ""
177
178     return os.path.join(r, p3)
179
180 def is_basetype(tp):
181     return _basetype_re.match(tp) is not None
182
183 def update_refs(api, d, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix):
184     if isinstance(d, MutableSequence):
185         for i, s in enumerate(d):
186             if prefix and isinstance(s, str):
187                 if s.startswith(prefix):
188                     d[i] = replacePrefix+s[len(prefix):]
189             else:
190                 update_refs(api, s, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
191     elif isinstance(d, MutableMapping):
192         for field in ("id", "name"):
193             if isinstance(d.get(field), str) and d[field].startswith("_:"):
194                 # blank node reference, was added in automatically, can get rid of it.
195                 del d[field]
196
197         if "id" in d:
198             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
199         elif "name" in d and isinstance(d["name"], str):
200             baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
201
202         if d.get("class") == "DockerRequirement":
203             d["http://arvados.org/cwl#dockerCollectionPDH"] = arv_docker_get_image(api, d, False,
204                                                                                    runtimeContext)
205
206         for field in d:
207             if field in ("location", "run", "name") and isinstance(d[field], str):
208                 d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map, jobmapper)
209                 continue
210
211             if field in ("$include", "$import") and isinstance(d[field], str):
212                 d[field] = rel_ref(d[field], baseuri, urlexpander, {}, jobmapper)
213                 continue
214
215             for t in ("type", "items"):
216                 if (field == t and
217                     isinstance(d[t], str) and
218                     not is_basetype(d[t])):
219                     d[t] = rel_ref(d[t], baseuri, urlexpander, merged_map, jobmapper)
220                     continue
221
222             if field == "inputs" and isinstance(d["inputs"], MutableMapping):
223                 for inp in d["inputs"]:
224                     if isinstance(d["inputs"][inp], str) and not is_basetype(d["inputs"][inp]):
225                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper)
226                     if isinstance(d["inputs"][inp], MutableMapping):
227                         update_refs(api, d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
228                 continue
229
230             if field in ("requirements", "hints") and isinstance(d[field], MutableMapping):
231                 dr = d[field].get("DockerRequirement")
232                 if dr:
233                     dr["http://arvados.org/cwl#dockerCollectionPDH"] = arv_docker_get_image(api, dr, False,
234                                                                                             runtimeContext)
235
236             if field == "$schemas":
237                 for n, s in enumerate(d["$schemas"]):
238                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map, jobmapper)
239                 continue
240
241             update_refs(api, d[field], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
242
243
244 def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
245     req = copy.deepcopy(req)
246
247     for f in req["types"]:
248         r = f["name"]
249         path, frag = urllib.parse.urldefrag(r)
250         rel = rel_ref(r, baseuri, urlexpander, merged_map, jobmapper)
251         merged_map.setdefault(path, FileUpdates({}, {}))
252         rename = "keep:%s/%s" %(pdh, rel)
253         for mm in merged_map:
254             merged_map[mm].resolved[r] = rename
255     return req
256
257
258 def drop_ids(d):
259     if isinstance(d, MutableSequence):
260         for i, s in enumerate(d):
261             drop_ids(s)
262     elif isinstance(d, MutableMapping):
263         if "id" in d and d["id"].startswith("file:"):
264             del d["id"]
265
266         for field in d:
267             drop_ids(d[field])
268
269
270 def upload_workflow(arvRunner, tool, job_order, project_uuid,
271                         runtimeContext,
272                         uuid=None,
273                         submit_runner_ram=0, name=None, merged_map=None,
274                         submit_runner_image=None,
275                         git_info=None,
276                         set_defaults=False,
277                         jobmapper=None):
278
279     firstfile = None
280     workflow_files = set()
281     import_files = set()
282     include_files = set()
283
284     # The document loader index will have entries for all the files
285     # that were loaded in the process of parsing the entire workflow
286     # (including subworkflows, tools, imports, etc).  We use this to
287     # get compose a list of the workflow file dependencies.
288     for w in tool.doc_loader.idx:
289         if w.startswith("file://"):
290             workflow_files.add(urllib.parse.urldefrag(w)[0])
291             if firstfile is None:
292                 firstfile = urllib.parse.urldefrag(w)[0]
293         if w.startswith("import:file://"):
294             import_files.add(urllib.parse.urldefrag(w[7:])[0])
295         if w.startswith("include:file://"):
296             include_files.add(urllib.parse.urldefrag(w[8:])[0])
297
298     all_files = workflow_files | import_files | include_files
299
300     # Find the longest common prefix among all the file names.  We'll
301     # use this to recreate the directory structure in a keep
302     # collection with correct relative references.
303     prefix = common_prefix(firstfile, all_files) if firstfile else ""
304
305
306     col = arvados.collection.Collection(api_client=arvRunner.api)
307
308     # Now go through all the files and update references to other
309     # files.  We previously scanned for file dependencies, these are
310     # are passed in as merged_map.
311     #
312     # note about merged_map: we upload dependencies of each process
313     # object (CommandLineTool/Workflow) to a separate collection.
314     # That way, when the user edits something, this limits collection
315     # PDH changes to just that tool, and minimizes situations where
316     # small changes break container reuse for the whole workflow.
317     #
318     for w in workflow_files | import_files:
319         # 1. load the YAML  file
320
321         text = tool.doc_loader.fetch_text(w)
322         if isinstance(text, bytes):
323             textIO = StringIO(text.decode('utf-8'))
324         else:
325             textIO = StringIO(text)
326
327         yamlloader = schema_salad.utils.yaml_no_ts()
328         result = yamlloader.load(textIO)
329
330         # If the whole document is in "flow style" it is probably JSON
331         # formatted.  We'll re-export it as JSON because the
332         # ruamel.yaml round-trip mode is a lie and only preserves
333         # "block style" formatting and not "flow style" formatting.
334         export_as_json = result.fa.flow_style()
335
336         # 2. find $import, $include, $schema, run, location
337         # 3. update field value
338         update_refs(arvRunner.api, result, w, tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, "", "")
339
340         # Write the updated file to the collection.
341         with col.open(w[len(prefix):], "wt") as f:
342             if export_as_json:
343                 json.dump(result, f, indent=4, separators=(',',': '))
344             else:
345                 yamlloader.dump(result, stream=f)
346
347         # Also store a verbatim copy of the original files
348         with col.open(os.path.join("original", w[len(prefix):]), "wt") as f:
349             f.write(text)
350
351
352     # Upload files referenced by $include directives, these are used
353     # unchanged and don't need to be updated.
354     for w in include_files:
355         with col.open(w[len(prefix):], "wb") as f1:
356             with col.open(os.path.join("original", w[len(prefix):]), "wb") as f3:
357                 with open(uri_file_path(w), "rb") as f2:
358                     dat = f2.read(65536)
359                     while dat:
360                         f1.write(dat)
361                         f3.write(dat)
362                         dat = f2.read(65536)
363
364     # Now collect metadata: the collection name and git properties.
365
366     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
367     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
368         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
369
370     toolfile = tool.tool["id"][len(prefix):]
371
372     properties = {
373         "type": "workflow",
374         "arv:workflowMain": toolfile,
375     }
376
377     if git_info:
378         for g in git_info:
379             p = g.split("#", 1)[1]
380             properties["arv:"+p] = git_info[g]
381
382     # Check if a collection with the same content already exists in the target project.  If so, just use that one.
383     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
384                                                          ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
385
386     if len(existing["items"]) == 0:
387         toolname = toolname.replace("/", " ")
388         col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
389         logger.info("Workflow uploaded to %s", col.manifest_locator())
390     else:
391         logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
392
393     # Now that we've updated the workflow and saved it to a
394     # collection, we're going to construct a minimal "wrapper"
395     # workflow which consists of only of input and output parameters
396     # connected to a single step that runs the real workflow.
397
398     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
399
400     step = {
401         "id": "#main/" + toolname,
402         "in": [],
403         "out": [],
404         "run": runfile,
405         "label": name
406     }
407
408     main = tool.tool
409
410     wf_runner_resources = None
411
412     hints = main.get("hints", [])
413     found = False
414     for h in hints:
415         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
416             wf_runner_resources = h
417             found = True
418             break
419     if not found:
420         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
421         hints.append(wf_runner_resources)
422
423     if "acrContainerImage" not in wf_runner_resources:
424         wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
425                                                                       submit_runner_image or "arvados/jobs:"+__version__,
426                                                                       runtimeContext)
427
428     if submit_runner_ram:
429         wf_runner_resources["ramMin"] = submit_runner_ram
430
431     # Remove a few redundant fields from the "job order" (aka input
432     # object or input parameters).  In the situation where we're
433     # creating or updating a workflow record, any values in the job
434     # order get copied over as default values for input parameters.
435     adjustDirObjs(job_order, trim_listing)
436     adjustFileObjs(job_order, trim_anonymous_location)
437     adjustDirObjs(job_order, trim_anonymous_location)
438
439     newinputs = []
440     for i in main["inputs"]:
441         inp = {}
442         # Make sure to only copy known fields that are meaningful at
443         # the workflow level. In practice this ensures that if we're
444         # wrapping a CommandLineTool we don't grab inputBinding.
445         # Right now also excludes extension fields, which is fine,
446         # Arvados doesn't currently look for any extension fields on
447         # input parameters.
448         for f in ("type", "label", "secondaryFiles", "streamable",
449                   "doc", "format", "loadContents",
450                   "loadListing", "default"):
451             if f in i:
452                 inp[f] = i[f]
453
454         if set_defaults:
455             sn = shortname(i["id"])
456             if sn in job_order:
457                 inp["default"] = job_order[sn]
458
459         inp["id"] = "#main/%s" % shortname(i["id"])
460         newinputs.append(inp)
461
462     wrapper = {
463         "class": "Workflow",
464         "id": "#main",
465         "inputs": newinputs,
466         "outputs": [],
467         "steps": [step]
468     }
469
470     for i in main["inputs"]:
471         step["in"].append({
472             "id": "#main/step/%s" % shortname(i["id"]),
473             "source": "#main/%s" % shortname(i["id"])
474         })
475
476     for i in main["outputs"]:
477         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
478         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
479                                    "type": i["type"],
480                                    "id": "#main/%s" % shortname(i["id"])})
481
482     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
483
484     if main.get("requirements"):
485         wrapper["requirements"].extend(main["requirements"])
486     if hints:
487         wrapper["hints"] = hints
488
489     # Schema definitions (this lets you define things like record
490     # types) require a special handling.
491
492     for i, r in enumerate(wrapper["requirements"]):
493         if r["class"] == "SchemaDefRequirement":
494             wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
495
496     update_refs(arvRunner.api, wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, main["id"]+"#", "#main/")
497
498     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
499
500     if git_info:
501         for g in git_info:
502             doc[g] = git_info[g]
503
504     # Remove any lingering file references.
505     drop_ids(wrapper)
506
507     return doc
508
509
510 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
511
512     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
513
514     body = {
515         "workflow": {
516             "name": name,
517             "description": tool.tool.get("doc", ""),
518             "definition": wrappertext
519         }}
520     if project_uuid:
521         body["workflow"]["owner_uuid"] = project_uuid
522
523     if update_uuid:
524         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
525     else:
526         call = arvRunner.api.workflows().create(body=body)
527     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
528
529
530 def dedup_reqs(reqs):
531     dedup = {}
532     for r in reversed(reqs):
533         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
534             dedup[r["class"]] = r
535     return [dedup[r] for r in sorted(dedup.keys())]
536
537 def get_overall_res_req(res_reqs):
538     """Take the overall of a list of ResourceRequirement,
539     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
540     and the sum of outdirMin, outdirMax."""
541
542     all_res_req = {}
543     exception_msgs = []
544     for a in max_res_pars + sum_res_pars:
545         all_res_req[a] = []
546         for res_req in res_reqs:
547             if a in res_req:
548                 if isinstance(res_req[a], int): # integer check
549                     all_res_req[a].append(res_req[a])
550                 else:
551                     msg = SourceLine(res_req, a).makeError(
552                     "Non-top-level ResourceRequirement in single container cannot have expressions")
553                     exception_msgs.append(msg)
554     if exception_msgs:
555         raise WorkflowException("\n".join(exception_msgs))
556     else:
557         overall_res_req = {}
558         for a in all_res_req:
559             if all_res_req[a]:
560                 if a in max_res_pars:
561                     overall_res_req[a] = max(all_res_req[a])
562                 elif a in sum_res_pars:
563                     overall_res_req[a] = sum(all_res_req[a])
564         if overall_res_req:
565             overall_res_req["class"] = "ResourceRequirement"
566         return cmap(overall_res_req)
567
568 class ArvadosWorkflowStep(WorkflowStep):
569     def __init__(self,
570                  toolpath_object,      # type: Dict[Text, Any]
571                  pos,                  # type: int
572                  loadingContext,       # type: LoadingContext
573                  arvrunner,
574                  *argc,
575                  **argv
576                 ):  # type: (...) -> None
577
578         if arvrunner.fast_submit:
579             self.tool = toolpath_object
580             self.tool["inputs"] = []
581             self.tool["outputs"] = []
582         else:
583             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
584             self.tool["class"] = "WorkflowStep"
585         self.arvrunner = arvrunner
586
587     def job(self, joborder, output_callback, runtimeContext):
588         runtimeContext = runtimeContext.copy()
589         runtimeContext.toplevel = True  # Preserve behavior for #13365
590
591         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
592                                runtimeContext, self.metadata)
593         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
594         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
595
596
597 class ArvadosWorkflow(Workflow):
598     """Wrap cwltool Workflow to override selected methods."""
599
600     def __init__(self, arvrunner, toolpath_object, loadingContext):
601         self.arvrunner = arvrunner
602         self.wf_pdh = None
603         self.dynamic_resource_req = []
604         self.static_resource_req = []
605         self.wf_reffiles = []
606         self.loadingContext = loadingContext.copy()
607
608         self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
609         tool_requirements = toolpath_object.get("requirements", [])
610         self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
611         tool_hints = toolpath_object.get("hints", [])
612
613         workflow_runner_req, _ = self.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
614         if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
615             self.loadingContext.default_docker_image = workflow_runner_req.get("acrContainerImage")
616
617         super(ArvadosWorkflow, self).__init__(toolpath_object, self.loadingContext)
618         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
619
620
621     def runInSingleContainer(self, joborder, output_callback, runtimeContext, builder):
622         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
623             if "id" not in self.tool:
624                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
625
626         discover_secondary_files(self.arvrunner.fs_access, builder,
627                                  self.tool["inputs"], joborder)
628
629         normalizeFilesDirs(joborder)
630
631         with Perf(metrics, "subworkflow upload_deps"):
632             upload_dependencies(self.arvrunner,
633                                 os.path.basename(joborder.get("id", "#")),
634                                 self.doc_loader,
635                                 joborder,
636                                 joborder.get("id", "#"),
637                                 runtimeContext)
638
639             if self.wf_pdh is None:
640                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
641
642                 for p in packed["$graph"]:
643                     if p["id"] == "#main":
644                         p["requirements"] = dedup_reqs(self.requirements)
645                         p["hints"] = dedup_reqs(self.hints)
646
647                 def visit(item):
648                     if "requirements" in item:
649                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
650                     for t in ("hints", "requirements"):
651                         if t not in item:
652                             continue
653                         for req in item[t]:
654                             if req["class"] == "ResourceRequirement":
655                                 dyn = False
656                                 for k in max_res_pars + sum_res_pars:
657                                     if k in req:
658                                         if isinstance(req[k], basestring):
659                                             if item["id"] == "#main":
660                                                 # only the top-level requirements/hints may contain expressions
661                                                 self.dynamic_resource_req.append(req)
662                                                 dyn = True
663                                                 break
664                                             else:
665                                                 with SourceLine(req, k, WorkflowException):
666                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
667                                 if not dyn:
668                                     self.static_resource_req.append(req)
669
670                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
671
672                 if self.static_resource_req:
673                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
674
675                 upload_dependencies(self.arvrunner,
676                                     runtimeContext.name,
677                                     self.doc_loader,
678                                     packed,
679                                     self.tool["id"],
680                                     runtimeContext)
681
682                 # Discover files/directories referenced by the
683                 # workflow (mainly "default" values)
684                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
685
686
687         if self.dynamic_resource_req:
688             # Evaluate dynamic resource requirements using current builder
689             rs = copy.copy(self.static_resource_req)
690             for dyn_rs in self.dynamic_resource_req:
691                 eval_req = {"class": "ResourceRequirement"}
692                 for a in max_res_pars + sum_res_pars:
693                     if a in dyn_rs:
694                         eval_req[a] = builder.do_eval(dyn_rs[a])
695                 rs.append(eval_req)
696             job_res_reqs = [get_overall_res_req(rs)]
697         else:
698             job_res_reqs = self.static_resource_req
699
700         with Perf(metrics, "subworkflow adjust"):
701             joborder_resolved = copy.deepcopy(joborder)
702             joborder_keepmount = copy.deepcopy(joborder)
703
704             reffiles = []
705             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
706
707             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
708                                    "/keep/%s",
709                                    "/keep/%s/%s")
710
711             # For containers API, we need to make sure any extra
712             # referenced files (ie referenced by the workflow but
713             # not in the inputs) are included in the mounts.
714             if self.wf_reffiles:
715                 runtimeContext = runtimeContext.copy()
716                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
717
718             def keepmount(obj):
719                 remove_redundant_fields(obj)
720                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
721                     if "location" not in obj:
722                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
723                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
724                     if obj["location"].startswith("keep:"):
725                         obj["location"] = mapper.mapper(obj["location"]).target
726                         if "listing" in obj:
727                             del obj["listing"]
728                     elif obj["location"].startswith("_:"):
729                         del obj["location"]
730                     else:
731                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
732
733             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
734
735             def resolved(obj):
736                 if obj["location"].startswith("keep:"):
737                     obj["location"] = mapper.mapper(obj["location"]).resolved
738
739             visit_class(joborder_resolved, ("File", "Directory"), resolved)
740
741             if self.wf_pdh is None:
742                 adjustFileObjs(packed, keepmount)
743                 adjustDirObjs(packed, keepmount)
744                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
745
746         self.loadingContext = self.loadingContext.copy()
747         self.loadingContext.metadata = self.loadingContext.metadata.copy()
748         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
749
750         if len(job_res_reqs) == 1:
751             # RAM request needs to be at least 128 MiB or the workflow
752             # runner itself won't run reliably.
753             if job_res_reqs[0].get("ramMin", 1024) < 128:
754                 job_res_reqs[0]["ramMin"] = 128
755
756         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
757         if runtimeContext.debug:
758             arguments.insert(0, '--debug')
759
760         wf_runner = cmap({
761             "class": "CommandLineTool",
762             "baseCommand": "cwltool",
763             "inputs": self.tool["inputs"],
764             "outputs": self.tool["outputs"],
765             "stdout": "cwl.output.json",
766             "requirements": self.requirements+job_res_reqs+[
767                 {"class": "InlineJavascriptRequirement"},
768                 {
769                 "class": "InitialWorkDirRequirement",
770                 "listing": [{
771                         "entryname": "workflow.cwl",
772                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
773                     }, {
774                         "entryname": "cwl.input.yml",
775                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
776                     }]
777             }],
778             "hints": self.hints,
779             "arguments": arguments,
780             "id": "#"
781         })
782         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
783
784
785     def separateRunner(self, joborder, output_callback, runtimeContext, req, builder):
786
787         name = runtimeContext.name
788
789         rpn = req.get("runnerProcessName")
790         if rpn:
791             name = builder.do_eval(rpn)
792
793         return RunnerContainer(self.arvrunner,
794                                self,
795                                self.loadingContext,
796                                runtimeContext.enable_reuse,
797                                None,
798                                None,
799                                submit_runner_ram=runtimeContext.submit_runner_ram,
800                                name=name,
801                                on_error=runtimeContext.on_error,
802                                submit_runner_image=runtimeContext.submit_runner_image,
803                                intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
804                                merged_map=None,
805                                priority=runtimeContext.priority,
806                                secret_store=self.arvrunner.secret_store,
807                                collection_cache_size=runtimeContext.collection_cache_size,
808                                collection_cache_is_default=self.arvrunner.should_estimate_cache_size,
809                                git_info=runtimeContext.git_info,
810                                reuse_runner=True).job(joborder, output_callback, runtimeContext)
811
812
813     def job(self, joborder, output_callback, runtimeContext):
814
815         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
816         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
817
818         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
819         if req:
820             return self.runInSingleContainer(joborder, output_callback, runtimeContext, builder)
821
822         req, _ = self.get_requirement("http://arvados.org/cwl#SeparateRunner")
823         if req:
824             return self.separateRunner(joborder, output_callback, runtimeContext, req, builder)
825
826         return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
827
828
829     def make_workflow_step(self,
830                            toolpath_object,      # type: Dict[Text, Any]
831                            pos,                  # type: int
832                            loadingContext,       # type: LoadingContext
833                            *argc,
834                            **argv
835     ):
836         # (...) -> WorkflowStep
837         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)