20462: Refactor "common prefix" function & fix fencepost errors
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15 import re
16
17 from typing import (MutableSequence, MutableMapping)
18
19 from ruamel.yaml import YAML
20 from ruamel.yaml.comments import CommentedMap, CommentedSeq
21
22 from schema_salad.sourceline import SourceLine, cmap
23 import schema_salad.ref_resolver
24
25 import arvados.collection
26
27 from cwltool.pack import pack
28 from cwltool.load_tool import fetch_document, resolve_and_validate_document
29 from cwltool.process import shortname, uniquename
30 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
31 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
32 from cwltool.context import LoadingContext
33
34 from schema_salad.ref_resolver import file_uri, uri_file_path
35
36 import ruamel.yaml as yaml
37
38 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
39                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
40                      make_builder, arvados_jobs_image, FileUpdates)
41 from .pathmapper import ArvPathMapper, trim_listing
42 from .arvtool import ArvadosCommandTool, set_cluster_target
43 from ._version import __version__
44
45 from .perf import Perf
46
47 logger = logging.getLogger('arvados.cwl-runner')
48 metrics = logging.getLogger('arvados.cwl-runner.metrics')
49
50 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
51 sum_res_pars = ("outdirMin", "outdirMax")
52
53 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
54     col = arvados.collection.Collection(api_client=arvRunner.api,
55                                         keep_client=arvRunner.keep_client)
56
57     with col.open("workflow.json", "wt") as f:
58         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
59
60     pdh = col.portable_data_hash()
61
62     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
63     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
64         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
65
66     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
67     if len(existing["items"]) == 0:
68         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
69
70     # now construct the wrapper
71
72     step = {
73         "id": "#main/" + toolname,
74         "in": [],
75         "out": [],
76         "run": "keep:%s/workflow.json#main" % pdh,
77         "label": name
78     }
79
80     newinputs = []
81     for i in main["inputs"]:
82         inp = {}
83         # Make sure to only copy known fields that are meaningful at
84         # the workflow level. In practice this ensures that if we're
85         # wrapping a CommandLineTool we don't grab inputBinding.
86         # Right now also excludes extension fields, which is fine,
87         # Arvados doesn't currently look for any extension fields on
88         # input parameters.
89         for f in ("type", "label", "secondaryFiles", "streamable",
90                   "doc", "id", "format", "loadContents",
91                   "loadListing", "default"):
92             if f in i:
93                 inp[f] = i[f]
94         newinputs.append(inp)
95
96     wrapper = {
97         "class": "Workflow",
98         "id": "#main",
99         "inputs": newinputs,
100         "outputs": [],
101         "steps": [step]
102     }
103
104     for i in main["inputs"]:
105         step["in"].append({
106             "id": "#main/step/%s" % shortname(i["id"]),
107             "source": i["id"]
108         })
109
110     for i in main["outputs"]:
111         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
112         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
113                                    "type": i["type"],
114                                    "id": i["id"]})
115
116     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
117
118     if main.get("requirements"):
119         wrapper["requirements"].extend(main["requirements"])
120     if main.get("hints"):
121         wrapper["hints"] = main["hints"]
122
123     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
124
125     if git_info:
126         for g in git_info:
127             doc[g] = git_info[g]
128
129     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
130
131
132 def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
133     if s.startswith("keep:"):
134         return s
135
136     uri = urlexpander(s, baseuri)
137
138     if uri.startswith("keep:"):
139         return uri
140
141     fileuri = urllib.parse.urldefrag(baseuri)[0]
142
143     for u in (baseuri, fileuri):
144         if u in merged_map:
145             replacements = merged_map[u].resolved
146             if uri in replacements:
147                 return replacements[uri]
148
149     if uri in jobmapper:
150         return jobmapper.mapper(uri).target
151
152     p1 = os.path.dirname(uri_file_path(fileuri))
153     p2 = os.path.dirname(uri_file_path(uri))
154     p3 = os.path.basename(uri_file_path(uri))
155
156     r = os.path.relpath(p2, p1)
157     if r == ".":
158         r = ""
159
160     return os.path.join(r, p3)
161
162 def is_basetype(tp):
163     basetypes = ("null", "boolean", "int", "long", "float", "double", "string", "File", "Directory", "record", "array", "enum")
164     for b in basetypes:
165         if re.match(b+"(\[\])?\??", tp):
166             return True
167     return False
168
169
170 def update_refs(d, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix):
171     if isinstance(d, MutableSequence):
172         for i, s in enumerate(d):
173             if prefix and isinstance(s, str):
174                 if s.startswith(prefix):
175                     d[i] = replacePrefix+s[len(prefix):]
176             else:
177                 update_refs(s, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
178     elif isinstance(d, MutableMapping):
179         for field in ("id", "name"):
180             if isinstance(d.get(field), str) and d[field].startswith("_:"):
181                 # blank node reference, was added in automatically, can get rid of it.
182                 del d[field]
183
184         if "id" in d:
185             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
186         elif "name" in d and isinstance(d["name"], str):
187             baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
188
189         if d.get("class") == "DockerRequirement":
190             dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
191             d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
192
193         for field in d:
194             if field in ("location", "run", "name") and isinstance(d[field], str):
195                 d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map, jobmapper)
196                 continue
197
198             if field in ("$include", "$import") and isinstance(d[field], str):
199                 d[field] = rel_ref(d[field], baseuri, urlexpander, {}, jobmapper)
200                 continue
201
202             for t in ("type", "items"):
203                 if (field == t and
204                     isinstance(d[t], str) and
205                     not is_basetype(d[t])):
206                     d[t] = rel_ref(d[t], baseuri, urlexpander, merged_map, jobmapper)
207                     continue
208
209             if field == "inputs" and isinstance(d["inputs"], MutableMapping):
210                 for inp in d["inputs"]:
211                     if isinstance(d["inputs"][inp], str) and not is_basetype(d["inputs"][inp]):
212                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper)
213                     if isinstance(d["inputs"][inp], MutableMapping):
214                         update_refs(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
215                 continue
216
217             if field == "$schemas":
218                 for n, s in enumerate(d["$schemas"]):
219                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map, jobmapper)
220                 continue
221
222             update_refs(d[field], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
223
224
225 def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
226     req = copy.deepcopy(req)
227
228     for f in req["types"]:
229         r = f["name"]
230         path, frag = urllib.parse.urldefrag(r)
231         rel = rel_ref(r, baseuri, urlexpander, merged_map, jobmapper)
232         merged_map.setdefault(path, FileUpdates({}, {}))
233         rename = "keep:%s/%s" %(pdh, rel)
234         for mm in merged_map:
235             merged_map[mm].resolved[r] = rename
236     return req
237
238
239 def drop_ids(d):
240     if isinstance(d, MutableSequence):
241         for i, s in enumerate(d):
242             drop_ids(s)
243     elif isinstance(d, MutableMapping):
244         if "id" in d and d["id"].startswith("file:"):
245             del d["id"]
246
247         for field in d:
248             drop_ids(d[field])
249
250
251 def common_prefix(firstfile, all_files):
252     n = 0
253     allmatch = True
254     if not firstfile:
255         return ""
256
257     while allmatch and n < len(firstfile)-1:
258         n += 1
259         for f in all_files:
260             if len(f)-1 < n:
261                 n -= 1
262                 allmatch = False
263                 break
264             if f[n] != firstfile[n]:
265                 allmatch = False
266                 break
267
268     while n > 0 and firstfile[n] != "/":
269         n -= 1
270
271     if firstfile[n] == "/":
272         n += 1
273
274     return firstfile[:n]
275
276
277 def upload_workflow(arvRunner, tool, job_order, project_uuid,
278                         runtimeContext,
279                         uuid=None,
280                         submit_runner_ram=0, name=None, merged_map=None,
281                         submit_runner_image=None,
282                         git_info=None,
283                         set_defaults=False,
284                         jobmapper=None):
285
286     firstfile = None
287     workflow_files = set()
288     import_files = set()
289     include_files = set()
290
291     # The document loader index will have entries for all the files
292     # that were loaded in the process of parsing the entire workflow
293     # (including subworkflows, tools, imports, etc).  We use this to
294     # get compose a list of the workflow file dependencies.
295     for w in tool.doc_loader.idx:
296         if w.startswith("file://"):
297             workflow_files.add(urllib.parse.urldefrag(w)[0])
298             if firstfile is None:
299                 firstfile = urllib.parse.urldefrag(w)[0]
300         if w.startswith("import:file://"):
301             import_files.add(urllib.parse.urldefrag(w[7:])[0])
302         if w.startswith("include:file://"):
303             include_files.add(urllib.parse.urldefrag(w[8:])[0])
304
305     all_files = workflow_files | import_files | include_files
306
307     # Find the longest common prefix among all the file names.  We'll
308     # use this to recreate the directory structure in a keep
309     # collection with correct relative references.
310     prefix = common_prefix(firstfile, all_files)
311
312     col = arvados.collection.Collection(api_client=arvRunner.api)
313
314     # Now go through all the files and update references to other
315     # files.  We previously scanned for file dependencies, these are
316     # are passed in as merged_map.
317     #
318     # note about merged_map: we upload dependencies of each process
319     # object (CommandLineTool/Workflow) to a separate collection.
320     # That way, when the user edits something, this limits collection
321     # PDH changes to just that tool, and minimizes situations where
322     # small changes break container reuse for the whole workflow.
323     #
324     for w in workflow_files | import_files:
325         # 1. load the YAML  file
326
327         text = tool.doc_loader.fetch_text(w)
328         if isinstance(text, bytes):
329             textIO = StringIO(text.decode('utf-8'))
330         else:
331             textIO = StringIO(text)
332
333         yamlloader = schema_salad.utils.yaml_no_ts()
334         result = yamlloader.load(textIO)
335
336         # If the whole document is in "flow style" it is probably JSON
337         # formatted.  We'll re-export it as JSON because the
338         # ruamel.yaml round-trip mode is a lie and only preserves
339         # "block style" formatting and not "flow style" formatting.
340         export_as_json = result.fa.flow_style()
341
342         # 2. find $import, $include, $schema, run, location
343         # 3. update field value
344         update_refs(result, w, tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, "", "")
345
346         # Write the updated file to the collection.
347         with col.open(w[len(prefix):], "wt") as f:
348             if export_as_json:
349                 json.dump(result, f, indent=4, separators=(',',': '))
350             else:
351                 yamlloader.dump(result, stream=f)
352
353         # Also store a verbatim copy of the original files
354         with col.open(os.path.join("original", w[len(prefix):]), "wt") as f:
355             f.write(text)
356
357
358     # Upload files referenced by $include directives, these are used
359     # unchanged and don't need to be updated.
360     for w in include_files:
361         with col.open(w[len(prefix):], "wb") as f1:
362             with col.open(os.path.join("original", w[len(prefix):]), "wb") as f3:
363                 with open(uri_file_path(w), "rb") as f2:
364                     dat = f2.read(65536)
365                     while dat:
366                         f1.write(dat)
367                         f3.write(dat)
368                         dat = f2.read(65536)
369
370     # Now collect metadata: the collection name and git properties.
371
372     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
373     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
374         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
375
376     toolfile = tool.tool["id"][len(prefix):]
377
378     properties = {
379         "type": "workflow",
380         "arv:workflowMain": toolfile,
381     }
382
383     if git_info:
384         for g in git_info:
385             p = g.split("#", 1)[1]
386             properties["arv:"+p] = git_info[g]
387
388     # Check if a collection with the same content already exists in the target project.  If so, just use that one.
389     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
390                                                          ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
391
392     if len(existing["items"]) == 0:
393         toolname = toolname.replace("/", " ")
394         col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
395         logger.info("Workflow uploaded to %s", col.manifest_locator())
396     else:
397         logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
398
399     # Now that we've updated the workflow and saved it to a
400     # collection, we're going to construct a minimal "wrapper"
401     # workflow which consists of only of input and output parameters
402     # connected to a single step that runs the real workflow.
403
404     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
405
406     step = {
407         "id": "#main/" + toolname,
408         "in": [],
409         "out": [],
410         "run": runfile,
411         "label": name
412     }
413
414     main = tool.tool
415
416     wf_runner_resources = None
417
418     hints = main.get("hints", [])
419     found = False
420     for h in hints:
421         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
422             wf_runner_resources = h
423             found = True
424             break
425     if not found:
426         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
427         hints.append(wf_runner_resources)
428
429     wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
430                                                                   submit_runner_image or "arvados/jobs:"+__version__,
431                                                                   runtimeContext)
432
433     if submit_runner_ram:
434         wf_runner_resources["ramMin"] = submit_runner_ram
435
436     # Remove a few redundant fields from the "job order" (aka input
437     # object or input parameters).  In the situation where we're
438     # creating or updating a workflow record, any values in the job
439     # order get copied over as default values for input parameters.
440     adjustDirObjs(job_order, trim_listing)
441     adjustFileObjs(job_order, trim_anonymous_location)
442     adjustDirObjs(job_order, trim_anonymous_location)
443
444     newinputs = []
445     for i in main["inputs"]:
446         inp = {}
447         # Make sure to only copy known fields that are meaningful at
448         # the workflow level. In practice this ensures that if we're
449         # wrapping a CommandLineTool we don't grab inputBinding.
450         # Right now also excludes extension fields, which is fine,
451         # Arvados doesn't currently look for any extension fields on
452         # input parameters.
453         for f in ("type", "label", "secondaryFiles", "streamable",
454                   "doc", "format", "loadContents",
455                   "loadListing", "default"):
456             if f in i:
457                 inp[f] = i[f]
458
459         if set_defaults:
460             sn = shortname(i["id"])
461             if sn in job_order:
462                 inp["default"] = job_order[sn]
463
464         inp["id"] = "#main/%s" % shortname(i["id"])
465         newinputs.append(inp)
466
467     wrapper = {
468         "class": "Workflow",
469         "id": "#main",
470         "inputs": newinputs,
471         "outputs": [],
472         "steps": [step]
473     }
474
475     for i in main["inputs"]:
476         step["in"].append({
477             "id": "#main/step/%s" % shortname(i["id"]),
478             "source": "#main/%s" % shortname(i["id"])
479         })
480
481     for i in main["outputs"]:
482         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
483         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
484                                    "type": i["type"],
485                                    "id": "#main/%s" % shortname(i["id"])})
486
487     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
488
489     if main.get("requirements"):
490         wrapper["requirements"].extend(main["requirements"])
491     if hints:
492         wrapper["hints"] = hints
493
494     # Schema definitions (this lets you define things like record
495     # types) require a special handling.
496
497     for i, r in enumerate(wrapper["requirements"]):
498         if r["class"] == "SchemaDefRequirement":
499             wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
500
501     update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, main["id"]+"#", "#main/")
502
503     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
504
505     if git_info:
506         for g in git_info:
507             doc[g] = git_info[g]
508
509     # Remove any lingering file references.
510     drop_ids(wrapper)
511
512     return doc
513
514
515 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
516
517     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
518
519     body = {
520         "workflow": {
521             "name": name,
522             "description": tool.tool.get("doc", ""),
523             "definition": wrappertext
524         }}
525     if project_uuid:
526         body["workflow"]["owner_uuid"] = project_uuid
527
528     if update_uuid:
529         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
530     else:
531         call = arvRunner.api.workflows().create(body=body)
532     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
533
534
535 def dedup_reqs(reqs):
536     dedup = {}
537     for r in reversed(reqs):
538         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
539             dedup[r["class"]] = r
540     return [dedup[r] for r in sorted(dedup.keys())]
541
542 def get_overall_res_req(res_reqs):
543     """Take the overall of a list of ResourceRequirement,
544     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
545     and the sum of outdirMin, outdirMax."""
546
547     all_res_req = {}
548     exception_msgs = []
549     for a in max_res_pars + sum_res_pars:
550         all_res_req[a] = []
551         for res_req in res_reqs:
552             if a in res_req:
553                 if isinstance(res_req[a], int): # integer check
554                     all_res_req[a].append(res_req[a])
555                 else:
556                     msg = SourceLine(res_req, a).makeError(
557                     "Non-top-level ResourceRequirement in single container cannot have expressions")
558                     exception_msgs.append(msg)
559     if exception_msgs:
560         raise WorkflowException("\n".join(exception_msgs))
561     else:
562         overall_res_req = {}
563         for a in all_res_req:
564             if all_res_req[a]:
565                 if a in max_res_pars:
566                     overall_res_req[a] = max(all_res_req[a])
567                 elif a in sum_res_pars:
568                     overall_res_req[a] = sum(all_res_req[a])
569         if overall_res_req:
570             overall_res_req["class"] = "ResourceRequirement"
571         return cmap(overall_res_req)
572
573 class ArvadosWorkflowStep(WorkflowStep):
574     def __init__(self,
575                  toolpath_object,      # type: Dict[Text, Any]
576                  pos,                  # type: int
577                  loadingContext,       # type: LoadingContext
578                  arvrunner,
579                  *argc,
580                  **argv
581                 ):  # type: (...) -> None
582
583         if arvrunner.fast_submit:
584             self.tool = toolpath_object
585             self.tool["inputs"] = []
586             self.tool["outputs"] = []
587         else:
588             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
589             self.tool["class"] = "WorkflowStep"
590         self.arvrunner = arvrunner
591
592     def job(self, joborder, output_callback, runtimeContext):
593         runtimeContext = runtimeContext.copy()
594         runtimeContext.toplevel = True  # Preserve behavior for #13365
595
596         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
597                                runtimeContext, self.metadata)
598         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
599         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
600
601
602 class ArvadosWorkflow(Workflow):
603     """Wrap cwltool Workflow to override selected methods."""
604
605     def __init__(self, arvrunner, toolpath_object, loadingContext):
606         self.arvrunner = arvrunner
607         self.wf_pdh = None
608         self.dynamic_resource_req = []
609         self.static_resource_req = []
610         self.wf_reffiles = []
611         self.loadingContext = loadingContext
612         super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
613         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
614
615     def job(self, joborder, output_callback, runtimeContext):
616
617         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
618         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
619
620         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
621         if not req:
622             return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
623
624         # RunInSingleContainer is true
625
626         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
627             if "id" not in self.tool:
628                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
629
630         discover_secondary_files(self.arvrunner.fs_access, builder,
631                                  self.tool["inputs"], joborder)
632
633         normalizeFilesDirs(joborder)
634
635         with Perf(metrics, "subworkflow upload_deps"):
636             upload_dependencies(self.arvrunner,
637                                 os.path.basename(joborder.get("id", "#")),
638                                 self.doc_loader,
639                                 joborder,
640                                 joborder.get("id", "#"),
641                                 runtimeContext)
642
643             if self.wf_pdh is None:
644                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
645
646                 for p in packed["$graph"]:
647                     if p["id"] == "#main":
648                         p["requirements"] = dedup_reqs(self.requirements)
649                         p["hints"] = dedup_reqs(self.hints)
650
651                 def visit(item):
652                     if "requirements" in item:
653                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
654                     for t in ("hints", "requirements"):
655                         if t not in item:
656                             continue
657                         for req in item[t]:
658                             if req["class"] == "ResourceRequirement":
659                                 dyn = False
660                                 for k in max_res_pars + sum_res_pars:
661                                     if k in req:
662                                         if isinstance(req[k], basestring):
663                                             if item["id"] == "#main":
664                                                 # only the top-level requirements/hints may contain expressions
665                                                 self.dynamic_resource_req.append(req)
666                                                 dyn = True
667                                                 break
668                                             else:
669                                                 with SourceLine(req, k, WorkflowException):
670                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
671                                 if not dyn:
672                                     self.static_resource_req.append(req)
673
674                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
675
676                 if self.static_resource_req:
677                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
678
679                 upload_dependencies(self.arvrunner,
680                                     runtimeContext.name,
681                                     self.doc_loader,
682                                     packed,
683                                     self.tool["id"],
684                                     runtimeContext)
685
686                 # Discover files/directories referenced by the
687                 # workflow (mainly "default" values)
688                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
689
690
691         if self.dynamic_resource_req:
692             # Evaluate dynamic resource requirements using current builder
693             rs = copy.copy(self.static_resource_req)
694             for dyn_rs in self.dynamic_resource_req:
695                 eval_req = {"class": "ResourceRequirement"}
696                 for a in max_res_pars + sum_res_pars:
697                     if a in dyn_rs:
698                         eval_req[a] = builder.do_eval(dyn_rs[a])
699                 rs.append(eval_req)
700             job_res_reqs = [get_overall_res_req(rs)]
701         else:
702             job_res_reqs = self.static_resource_req
703
704         with Perf(metrics, "subworkflow adjust"):
705             joborder_resolved = copy.deepcopy(joborder)
706             joborder_keepmount = copy.deepcopy(joborder)
707
708             reffiles = []
709             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
710
711             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
712                                    "/keep/%s",
713                                    "/keep/%s/%s")
714
715             # For containers API, we need to make sure any extra
716             # referenced files (ie referenced by the workflow but
717             # not in the inputs) are included in the mounts.
718             if self.wf_reffiles:
719                 runtimeContext = runtimeContext.copy()
720                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
721
722             def keepmount(obj):
723                 remove_redundant_fields(obj)
724                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
725                     if "location" not in obj:
726                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
727                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
728                     if obj["location"].startswith("keep:"):
729                         obj["location"] = mapper.mapper(obj["location"]).target
730                         if "listing" in obj:
731                             del obj["listing"]
732                     elif obj["location"].startswith("_:"):
733                         del obj["location"]
734                     else:
735                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
736
737             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
738
739             def resolved(obj):
740                 if obj["location"].startswith("keep:"):
741                     obj["location"] = mapper.mapper(obj["location"]).resolved
742
743             visit_class(joborder_resolved, ("File", "Directory"), resolved)
744
745             if self.wf_pdh is None:
746                 adjustFileObjs(packed, keepmount)
747                 adjustDirObjs(packed, keepmount)
748                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
749
750         self.loadingContext = self.loadingContext.copy()
751         self.loadingContext.metadata = self.loadingContext.metadata.copy()
752         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
753
754         if len(job_res_reqs) == 1:
755             # RAM request needs to be at least 128 MiB or the workflow
756             # runner itself won't run reliably.
757             if job_res_reqs[0].get("ramMin", 1024) < 128:
758                 job_res_reqs[0]["ramMin"] = 128
759
760         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
761         if runtimeContext.debug:
762             arguments.insert(0, '--debug')
763
764         wf_runner = cmap({
765             "class": "CommandLineTool",
766             "baseCommand": "cwltool",
767             "inputs": self.tool["inputs"],
768             "outputs": self.tool["outputs"],
769             "stdout": "cwl.output.json",
770             "requirements": self.requirements+job_res_reqs+[
771                 {"class": "InlineJavascriptRequirement"},
772                 {
773                 "class": "InitialWorkDirRequirement",
774                 "listing": [{
775                         "entryname": "workflow.cwl",
776                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
777                     }, {
778                         "entryname": "cwl.input.yml",
779                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
780                     }]
781             }],
782             "hints": self.hints,
783             "arguments": arguments,
784             "id": "#"
785         })
786         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
787
788     def make_workflow_step(self,
789                            toolpath_object,      # type: Dict[Text, Any]
790                            pos,                  # type: int
791                            loadingContext,       # type: LoadingContext
792                            *argc,
793                            **argv
794     ):
795         # (...) -> WorkflowStep
796         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)