12684: Support num_retries in PySDK client constructors
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15 import re
16
17 from typing import (MutableSequence, MutableMapping)
18
19 from ruamel.yaml import YAML
20 from ruamel.yaml.comments import CommentedMap, CommentedSeq
21
22 from schema_salad.sourceline import SourceLine, cmap
23 import schema_salad.ref_resolver
24
25 import arvados.collection
26
27 from cwltool.pack import pack
28 from cwltool.load_tool import fetch_document, resolve_and_validate_document
29 from cwltool.process import shortname, uniquename
30 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
31 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
32 from cwltool.context import LoadingContext
33
34 from schema_salad.ref_resolver import file_uri, uri_file_path
35
36 import ruamel.yaml as yaml
37
38 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
39                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
40                      make_builder, arvados_jobs_image, FileUpdates)
41 from .pathmapper import ArvPathMapper, trim_listing
42 from .arvtool import ArvadosCommandTool, set_cluster_target
43 from ._version import __version__
44 from .util import common_prefix
45
46 from .perf import Perf
47
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50
51 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
52 sum_res_pars = ("outdirMin", "outdirMax")
53
54 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
55     col = arvados.collection.Collection(api_client=arvRunner.api,
56                                         keep_client=arvRunner.keep_client)
57
58     with col.open("workflow.json", "wt") as f:
59         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
60
61     pdh = col.portable_data_hash()
62
63     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
64     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
65         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
66
67     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
68     if len(existing["items"]) == 0:
69         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
70
71     # now construct the wrapper
72
73     step = {
74         "id": "#main/" + toolname,
75         "in": [],
76         "out": [],
77         "run": "keep:%s/workflow.json#main" % pdh,
78         "label": name
79     }
80
81     newinputs = []
82     for i in main["inputs"]:
83         inp = {}
84         # Make sure to only copy known fields that are meaningful at
85         # the workflow level. In practice this ensures that if we're
86         # wrapping a CommandLineTool we don't grab inputBinding.
87         # Right now also excludes extension fields, which is fine,
88         # Arvados doesn't currently look for any extension fields on
89         # input parameters.
90         for f in ("type", "label", "secondaryFiles", "streamable",
91                   "doc", "id", "format", "loadContents",
92                   "loadListing", "default"):
93             if f in i:
94                 inp[f] = i[f]
95         newinputs.append(inp)
96
97     wrapper = {
98         "class": "Workflow",
99         "id": "#main",
100         "inputs": newinputs,
101         "outputs": [],
102         "steps": [step]
103     }
104
105     for i in main["inputs"]:
106         step["in"].append({
107             "id": "#main/step/%s" % shortname(i["id"]),
108             "source": i["id"]
109         })
110
111     for i in main["outputs"]:
112         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
113         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
114                                    "type": i["type"],
115                                    "id": i["id"]})
116
117     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
118
119     if main.get("requirements"):
120         wrapper["requirements"].extend(main["requirements"])
121     if main.get("hints"):
122         wrapper["hints"] = main["hints"]
123
124     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
125
126     if git_info:
127         for g in git_info:
128             doc[g] = git_info[g]
129
130     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
131
132
133 def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
134     if s.startswith("keep:"):
135         return s
136
137     uri = urlexpander(s, baseuri)
138
139     if uri.startswith("keep:"):
140         return uri
141
142     fileuri = urllib.parse.urldefrag(baseuri)[0]
143
144     for u in (baseuri, fileuri):
145         if u in merged_map:
146             replacements = merged_map[u].resolved
147             if uri in replacements:
148                 return replacements[uri]
149
150     if uri in jobmapper:
151         return jobmapper.mapper(uri).target
152
153     p1 = os.path.dirname(uri_file_path(fileuri))
154     p2 = os.path.dirname(uri_file_path(uri))
155     p3 = os.path.basename(uri_file_path(uri))
156
157     r = os.path.relpath(p2, p1)
158     if r == ".":
159         r = ""
160
161     return os.path.join(r, p3)
162
163 def is_basetype(tp):
164     basetypes = ("null", "boolean", "int", "long", "float", "double", "string", "File", "Directory", "record", "array", "enum")
165     for b in basetypes:
166         if re.match(b+"(\[\])?\??", tp):
167             return True
168     return False
169
170
171 def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix):
172     if isinstance(d, MutableSequence):
173         for i, s in enumerate(d):
174             if prefix and isinstance(s, str):
175                 if s.startswith(prefix):
176                     d[i] = replacePrefix+s[len(prefix):]
177             else:
178                 update_refs(s, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
179     elif isinstance(d, MutableMapping):
180         for field in ("id", "name"):
181             if isinstance(d.get(field), str) and d[field].startswith("_:"):
182                 # blank node reference, was added in automatically, can get rid of it.
183                 del d[field]
184
185         if "id" in d:
186             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
187         elif "name" in d and isinstance(d["name"], str):
188             baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
189
190         if d.get("class") == "DockerRequirement":
191             dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
192             d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
193
194         for field in d:
195             if field in ("location", "run", "name") and isinstance(d[field], str):
196                 d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map, jobmapper)
197                 continue
198
199             if field in ("$include", "$import") and isinstance(d[field], str):
200                 d[field] = rel_ref(d[field], baseuri, urlexpander, {}, jobmapper)
201                 continue
202
203             for t in ("type", "items"):
204                 if (field == t and
205                     isinstance(d[t], str) and
206                     not is_basetype(d[t])):
207                     d[t] = rel_ref(d[t], baseuri, urlexpander, merged_map, jobmapper)
208                     continue
209
210             if field == "inputs" and isinstance(d["inputs"], MutableMapping):
211                 for inp in d["inputs"]:
212                     if isinstance(d["inputs"][inp], str) and not is_basetype(d["inputs"][inp]):
213                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper)
214                     if isinstance(d["inputs"][inp], MutableMapping):
215                         update_refs(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
216                 continue
217
218             if field == "$schemas":
219                 for n, s in enumerate(d["$schemas"]):
220                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map, jobmapper)
221                 continue
222
223             update_refs(d[field], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
224
225
226 def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
227     req = copy.deepcopy(req)
228
229     for f in req["types"]:
230         r = f["name"]
231         path, frag = urllib.parse.urldefrag(r)
232         rel = rel_ref(r, baseuri, urlexpander, merged_map, jobmapper)
233         merged_map.setdefault(path, FileUpdates({}, {}))
234         rename = "keep:%s/%s" %(pdh, rel)
235         for mm in merged_map:
236             merged_map[mm].resolved[r] = rename
237     return req
238
239
240 def drop_ids(d):
241     if isinstance(d, MutableSequence):
242         for i, s in enumerate(d):
243             drop_ids(s)
244     elif isinstance(d, MutableMapping):
245         if "id" in d and d["id"].startswith("file:"):
246             del d["id"]
247
248         for field in d:
249             drop_ids(d[field])
250
251
252 def upload_workflow(arvRunner, tool, job_order, project_uuid,
253                         runtimeContext,
254                         uuid=None,
255                         submit_runner_ram=0, name=None, merged_map=None,
256                         submit_runner_image=None,
257                         git_info=None,
258                         set_defaults=False,
259                         jobmapper=None):
260
261     firstfile = None
262     workflow_files = set()
263     import_files = set()
264     include_files = set()
265
266     # The document loader index will have entries for all the files
267     # that were loaded in the process of parsing the entire workflow
268     # (including subworkflows, tools, imports, etc).  We use this to
269     # get compose a list of the workflow file dependencies.
270     for w in tool.doc_loader.idx:
271         if w.startswith("file://"):
272             workflow_files.add(urllib.parse.urldefrag(w)[0])
273             if firstfile is None:
274                 firstfile = urllib.parse.urldefrag(w)[0]
275         if w.startswith("import:file://"):
276             import_files.add(urllib.parse.urldefrag(w[7:])[0])
277         if w.startswith("include:file://"):
278             include_files.add(urllib.parse.urldefrag(w[8:])[0])
279
280     all_files = workflow_files | import_files | include_files
281
282     # Find the longest common prefix among all the file names.  We'll
283     # use this to recreate the directory structure in a keep
284     # collection with correct relative references.
285     prefix = common_prefix(firstfile, all_files)
286
287     col = arvados.collection.Collection(api_client=arvRunner.api)
288
289     # Now go through all the files and update references to other
290     # files.  We previously scanned for file dependencies, these are
291     # are passed in as merged_map.
292     #
293     # note about merged_map: we upload dependencies of each process
294     # object (CommandLineTool/Workflow) to a separate collection.
295     # That way, when the user edits something, this limits collection
296     # PDH changes to just that tool, and minimizes situations where
297     # small changes break container reuse for the whole workflow.
298     #
299     for w in workflow_files | import_files:
300         # 1. load the YAML  file
301
302         text = tool.doc_loader.fetch_text(w)
303         if isinstance(text, bytes):
304             textIO = StringIO(text.decode('utf-8'))
305         else:
306             textIO = StringIO(text)
307
308         yamlloader = schema_salad.utils.yaml_no_ts()
309         result = yamlloader.load(textIO)
310
311         # If the whole document is in "flow style" it is probably JSON
312         # formatted.  We'll re-export it as JSON because the
313         # ruamel.yaml round-trip mode is a lie and only preserves
314         # "block style" formatting and not "flow style" formatting.
315         export_as_json = result.fa.flow_style()
316
317         # 2. find $import, $include, $schema, run, location
318         # 3. update field value
319         update_refs(result, w, tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, "", "")
320
321         # Write the updated file to the collection.
322         with col.open(w[len(prefix):], "wt") as f:
323             if export_as_json:
324                 json.dump(result, f, indent=4, separators=(',',': '))
325             else:
326                 yamlloader.dump(result, stream=f)
327
328         # Also store a verbatim copy of the original files
329         with col.open(os.path.join("original", w[len(prefix):]), "wt") as f:
330             f.write(text)
331
332
333     # Upload files referenced by $include directives, these are used
334     # unchanged and don't need to be updated.
335     for w in include_files:
336         with col.open(w[len(prefix):], "wb") as f1:
337             with col.open(os.path.join("original", w[len(prefix):]), "wb") as f3:
338                 with open(uri_file_path(w), "rb") as f2:
339                     dat = f2.read(65536)
340                     while dat:
341                         f1.write(dat)
342                         f3.write(dat)
343                         dat = f2.read(65536)
344
345     # Now collect metadata: the collection name and git properties.
346
347     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
348     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
349         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
350
351     toolfile = tool.tool["id"][len(prefix):]
352
353     properties = {
354         "type": "workflow",
355         "arv:workflowMain": toolfile,
356     }
357
358     if git_info:
359         for g in git_info:
360             p = g.split("#", 1)[1]
361             properties["arv:"+p] = git_info[g]
362
363     # Check if a collection with the same content already exists in the target project.  If so, just use that one.
364     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
365                                                          ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
366
367     if len(existing["items"]) == 0:
368         toolname = toolname.replace("/", " ")
369         col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
370         logger.info("Workflow uploaded to %s", col.manifest_locator())
371     else:
372         logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
373
374     # Now that we've updated the workflow and saved it to a
375     # collection, we're going to construct a minimal "wrapper"
376     # workflow which consists of only of input and output parameters
377     # connected to a single step that runs the real workflow.
378
379     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
380
381     step = {
382         "id": "#main/" + toolname,
383         "in": [],
384         "out": [],
385         "run": runfile,
386         "label": name
387     }
388
389     main = tool.tool
390
391     wf_runner_resources = None
392
393     hints = main.get("hints", [])
394     found = False
395     for h in hints:
396         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
397             wf_runner_resources = h
398             found = True
399             break
400     if not found:
401         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
402         hints.append(wf_runner_resources)
403
404     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
405                                                                   submit_runner_image or "arvados/jobs:"+__version__,
406                                                                   runtimeContext)
407
408     if submit_runner_ram:
409         wf_runner_resources["ramMin"] = submit_runner_ram
410
411     # Remove a few redundant fields from the "job order" (aka input
412     # object or input parameters).  In the situation where we're
413     # creating or updating a workflow record, any values in the job
414     # order get copied over as default values for input parameters.
415     adjustDirObjs(job_order, trim_listing)
416     adjustFileObjs(job_order, trim_anonymous_location)
417     adjustDirObjs(job_order, trim_anonymous_location)
418
419     newinputs = []
420     for i in main["inputs"]:
421         inp = {}
422         # Make sure to only copy known fields that are meaningful at
423         # the workflow level. In practice this ensures that if we're
424         # wrapping a CommandLineTool we don't grab inputBinding.
425         # Right now also excludes extension fields, which is fine,
426         # Arvados doesn't currently look for any extension fields on
427         # input parameters.
428         for f in ("type", "label", "secondaryFiles", "streamable",
429                   "doc", "format", "loadContents",
430                   "loadListing", "default"):
431             if f in i:
432                 inp[f] = i[f]
433
434         if set_defaults:
435             sn = shortname(i["id"])
436             if sn in job_order:
437                 inp["default"] = job_order[sn]
438
439         inp["id"] = "#main/%s" % shortname(i["id"])
440         newinputs.append(inp)
441
442     wrapper = {
443         "class": "Workflow",
444         "id": "#main",
445         "inputs": newinputs,
446         "outputs": [],
447         "steps": [step]
448     }
449
450     for i in main["inputs"]:
451         step["in"].append({
452             "id": "#main/step/%s" % shortname(i["id"]),
453             "source": "#main/%s" % shortname(i["id"])
454         })
455
456     for i in main["outputs"]:
457         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
458         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
459                                    "type": i["type"],
460                                    "id": "#main/%s" % shortname(i["id"])})
461
462     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
463
464     if main.get("requirements"):
465         wrapper["requirements"].extend(main["requirements"])
466     if hints:
467         wrapper["hints"] = hints
468
469     # Schema definitions (this lets you define things like record
470     # types) require a special handling.
471
472     for i, r in enumerate(wrapper["requirements"]):
473         if r["class"] == "SchemaDefRequirement":
474             wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
475
476     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, main["id"]+"#", "#main/")
477
478     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
479
480     if git_info:
481         for g in git_info:
482             doc[g] = git_info[g]
483
484     # Remove any lingering file references.
485     drop_ids(wrapper)
486
487     return doc
488
489
490 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
491
492     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
493
494     body = {
495         "workflow": {
496             "name": name,
497             "description": tool.tool.get("doc", ""),
498             "definition": wrappertext
499         }}
500     if project_uuid:
501         body["workflow"]["owner_uuid"] = project_uuid
502
503     if update_uuid:
504         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
505     else:
506         call = arvRunner.api.workflows().create(body=body)
507     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
508
509
510 def dedup_reqs(reqs):
511     dedup = {}
512     for r in reversed(reqs):
513         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
514             dedup[r["class"]] = r
515     return [dedup[r] for r in sorted(dedup.keys())]
516
517 def get_overall_res_req(res_reqs):
518     """Take the overall of a list of ResourceRequirement,
519     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
520     and the sum of outdirMin, outdirMax."""
521
522     all_res_req = {}
523     exception_msgs = []
524     for a in max_res_pars + sum_res_pars:
525         all_res_req[a] = []
526         for res_req in res_reqs:
527             if a in res_req:
528                 if isinstance(res_req[a], int): # integer check
529                     all_res_req[a].append(res_req[a])
530                 else:
531                     msg = SourceLine(res_req, a).makeError(
532                     "Non-top-level ResourceRequirement in single container cannot have expressions")
533                     exception_msgs.append(msg)
534     if exception_msgs:
535         raise WorkflowException("\n".join(exception_msgs))
536     else:
537         overall_res_req = {}
538         for a in all_res_req:
539             if all_res_req[a]:
540                 if a in max_res_pars:
541                     overall_res_req[a] = max(all_res_req[a])
542                 elif a in sum_res_pars:
543                     overall_res_req[a] = sum(all_res_req[a])
544         if overall_res_req:
545             overall_res_req["class"] = "ResourceRequirement"
546         return cmap(overall_res_req)
547
548 class ArvadosWorkflowStep(WorkflowStep):
549     def __init__(self,
550                  toolpath_object,      # type: Dict[Text, Any]
551                  pos,                  # type: int
552                  loadingContext,       # type: LoadingContext
553                  arvrunner,
554                  *argc,
555                  **argv
556                 ):  # type: (...) -> None
557
558         if arvrunner.fast_submit:
559             self.tool = toolpath_object
560             self.tool["inputs"] = []
561             self.tool["outputs"] = []
562         else:
563             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
564             self.tool["class"] = "WorkflowStep"
565         self.arvrunner = arvrunner
566
567     def job(self, joborder, output_callback, runtimeContext):
568         runtimeContext = runtimeContext.copy()
569         runtimeContext.toplevel = True  # Preserve behavior for #13365
570
571         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
572                                runtimeContext, self.metadata)
573         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
574         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
575
576
577 class ArvadosWorkflow(Workflow):
578     """Wrap cwltool Workflow to override selected methods."""
579
580     def __init__(self, arvrunner, toolpath_object, loadingContext):
581         self.arvrunner = arvrunner
582         self.wf_pdh = None
583         self.dynamic_resource_req = []
584         self.static_resource_req = []
585         self.wf_reffiles = []
586         self.loadingContext = loadingContext
587         super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
588         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
589
590     def job(self, joborder, output_callback, runtimeContext):
591
592         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
593         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
594
595         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
596         if not req:
597             return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
598
599         # RunInSingleContainer is true
600
601         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
602             if "id" not in self.tool:
603                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
604
605         discover_secondary_files(self.arvrunner.fs_access, builder,
606                                  self.tool["inputs"], joborder)
607
608         normalizeFilesDirs(joborder)
609
610         with Perf(metrics, "subworkflow upload_deps"):
611             upload_dependencies(self.arvrunner,
612                                 os.path.basename(joborder.get("id", "#")),
613                                 self.doc_loader,
614                                 joborder,
615                                 joborder.get("id", "#"),
616                                 runtimeContext)
617
618             if self.wf_pdh is None:
619                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
620
621                 for p in packed["$graph"]:
622                     if p["id"] == "#main":
623                         p["requirements"] = dedup_reqs(self.requirements)
624                         p["hints"] = dedup_reqs(self.hints)
625
626                 def visit(item):
627                     if "requirements" in item:
628                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
629                     for t in ("hints", "requirements"):
630                         if t not in item:
631                             continue
632                         for req in item[t]:
633                             if req["class"] == "ResourceRequirement":
634                                 dyn = False
635                                 for k in max_res_pars + sum_res_pars:
636                                     if k in req:
637                                         if isinstance(req[k], basestring):
638                                             if item["id"] == "#main":
639                                                 # only the top-level requirements/hints may contain expressions
640                                                 self.dynamic_resource_req.append(req)
641                                                 dyn = True
642                                                 break
643                                             else:
644                                                 with SourceLine(req, k, WorkflowException):
645                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
646                                 if not dyn:
647                                     self.static_resource_req.append(req)
648
649                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
650
651                 if self.static_resource_req:
652                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
653
654                 upload_dependencies(self.arvrunner,
655                                     runtimeContext.name,
656                                     self.doc_loader,
657                                     packed,
658                                     self.tool["id"],
659                                     runtimeContext)
660
661                 # Discover files/directories referenced by the
662                 # workflow (mainly "default" values)
663                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
664
665
666         if self.dynamic_resource_req:
667             # Evaluate dynamic resource requirements using current builder
668             rs = copy.copy(self.static_resource_req)
669             for dyn_rs in self.dynamic_resource_req:
670                 eval_req = {"class": "ResourceRequirement"}
671                 for a in max_res_pars + sum_res_pars:
672                     if a in dyn_rs:
673                         eval_req[a] = builder.do_eval(dyn_rs[a])
674                 rs.append(eval_req)
675             job_res_reqs = [get_overall_res_req(rs)]
676         else:
677             job_res_reqs = self.static_resource_req
678
679         with Perf(metrics, "subworkflow adjust"):
680             joborder_resolved = copy.deepcopy(joborder)
681             joborder_keepmount = copy.deepcopy(joborder)
682
683             reffiles = []
684             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
685
686             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
687                                    "/keep/%s",
688                                    "/keep/%s/%s")
689
690             # For containers API, we need to make sure any extra
691             # referenced files (ie referenced by the workflow but
692             # not in the inputs) are included in the mounts.
693             if self.wf_reffiles:
694                 runtimeContext = runtimeContext.copy()
695                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
696
697             def keepmount(obj):
698                 remove_redundant_fields(obj)
699                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
700                     if "location" not in obj:
701                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
702                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
703                     if obj["location"].startswith("keep:"):
704                         obj["location"] = mapper.mapper(obj["location"]).target
705                         if "listing" in obj:
706                             del obj["listing"]
707                     elif obj["location"].startswith("_:"):
708                         del obj["location"]
709                     else:
710                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
711
712             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
713
714             def resolved(obj):
715                 if obj["location"].startswith("keep:"):
716                     obj["location"] = mapper.mapper(obj["location"]).resolved
717
718             visit_class(joborder_resolved, ("File", "Directory"), resolved)
719
720             if self.wf_pdh is None:
721                 adjustFileObjs(packed, keepmount)
722                 adjustDirObjs(packed, keepmount)
723                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
724
725         self.loadingContext = self.loadingContext.copy()
726         self.loadingContext.metadata = self.loadingContext.metadata.copy()
727         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
728
729         if len(job_res_reqs) == 1:
730             # RAM request needs to be at least 128 MiB or the workflow
731             # runner itself won't run reliably.
732             if job_res_reqs[0].get("ramMin", 1024) < 128:
733                 job_res_reqs[0]["ramMin"] = 128
734
735         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
736         if runtimeContext.debug:
737             arguments.insert(0, '--debug')
738
739         wf_runner = cmap({
740             "class": "CommandLineTool",
741             "baseCommand": "cwltool",
742             "inputs": self.tool["inputs"],
743             "outputs": self.tool["outputs"],
744             "stdout": "cwl.output.json",
745             "requirements": self.requirements+job_res_reqs+[
746                 {"class": "InlineJavascriptRequirement"},
747                 {
748                 "class": "InitialWorkDirRequirement",
749                 "listing": [{
750                         "entryname": "workflow.cwl",
751                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
752                     }, {
753                         "entryname": "cwl.input.yml",
754                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
755                     }]
756             }],
757             "hints": self.hints,
758             "arguments": arguments,
759             "id": "#"
760         })
761         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
762
763     def make_workflow_step(self,
764                            toolpath_object,      # type: Dict[Text, Any]
765                            pos,                  # type: int
766                            loadingContext,       # type: LoadingContext
767                            *argc,
768                            **argv
769     ):
770         # (...) -> WorkflowStep
771         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)