Merge branch '20933-arv-copy-cwl' refs #20933
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from past.builtins import basestring
6 from future.utils import viewitems
7
8 import os
9 import json
10 import copy
11 import logging
12 import urllib
13 from io import StringIO
14 import sys
15 import re
16
17 from typing import (MutableSequence, MutableMapping)
18
19 from ruamel.yaml import YAML
20 from ruamel.yaml.comments import CommentedMap, CommentedSeq
21
22 from schema_salad.sourceline import SourceLine, cmap
23 import schema_salad.ref_resolver
24
25 import arvados.collection
26
27 from cwltool.pack import pack
28 from cwltool.load_tool import fetch_document, resolve_and_validate_document
29 from cwltool.process import shortname, uniquename
30 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
31 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
32 from cwltool.context import LoadingContext, getdefault
33
34 from schema_salad.ref_resolver import file_uri, uri_file_path
35
36 import ruamel.yaml as yaml
37
38 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
39                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
40                      make_builder, arvados_jobs_image, FileUpdates)
41 from .pathmapper import ArvPathMapper, trim_listing
42 from .arvtool import ArvadosCommandTool, set_cluster_target
43 from ._version import __version__
44 from .util import common_prefix
45 from .arvdocker import arv_docker_get_image
46
47 from .perf import Perf
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51
52 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
53 sum_res_pars = ("outdirMin", "outdirMax")
54
55 _basetype_re = re.compile(r'''(?:
56 Directory
57 |File
58 |array
59 |boolean
60 |double
61 |enum
62 |float
63 |int
64 |long
65 |null
66 |record
67 |string
68 )(?:\[\])?\??''', re.VERBOSE)
69
70 def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info, tool):
71     col = arvados.collection.Collection(api_client=arvRunner.api,
72                                         keep_client=arvRunner.keep_client)
73
74     with col.open("workflow.json", "wt") as f:
75         json.dump(packed, f, sort_keys=True, indent=4, separators=(',',': '))
76
77     pdh = col.portable_data_hash()
78
79     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
80     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
81         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
82
83     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", pdh], ["owner_uuid", "=", project_uuid]]).execute(num_retries=arvRunner.num_retries)
84     if len(existing["items"]) == 0:
85         col.save_new(name=toolname, owner_uuid=project_uuid, ensure_unique_name=True)
86
87     # now construct the wrapper
88
89     step = {
90         "id": "#main/" + toolname,
91         "in": [],
92         "out": [],
93         "run": "keep:%s/workflow.json#main" % pdh,
94         "label": name
95     }
96
97     newinputs = []
98     for i in main["inputs"]:
99         inp = {}
100         # Make sure to only copy known fields that are meaningful at
101         # the workflow level. In practice this ensures that if we're
102         # wrapping a CommandLineTool we don't grab inputBinding.
103         # Right now also excludes extension fields, which is fine,
104         # Arvados doesn't currently look for any extension fields on
105         # input parameters.
106         for f in ("type", "label", "secondaryFiles", "streamable",
107                   "doc", "id", "format", "loadContents",
108                   "loadListing", "default"):
109             if f in i:
110                 inp[f] = i[f]
111         newinputs.append(inp)
112
113     wrapper = {
114         "class": "Workflow",
115         "id": "#main",
116         "inputs": newinputs,
117         "outputs": [],
118         "steps": [step]
119     }
120
121     for i in main["inputs"]:
122         step["in"].append({
123             "id": "#main/step/%s" % shortname(i["id"]),
124             "source": i["id"]
125         })
126
127     for i in main["outputs"]:
128         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
129         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
130                                    "type": i["type"],
131                                    "id": i["id"]})
132
133     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
134
135     if main.get("requirements"):
136         wrapper["requirements"].extend(main["requirements"])
137     if main.get("hints"):
138         wrapper["hints"] = main["hints"]
139
140     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
141
142     if git_info:
143         for g in git_info:
144             doc[g] = git_info[g]
145
146     return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
147
148
149 def rel_ref(s, baseuri, urlexpander, merged_map, jobmapper):
150     if s.startswith("keep:"):
151         return s
152
153     uri = urlexpander(s, baseuri)
154
155     if uri.startswith("keep:"):
156         return uri
157
158     fileuri = urllib.parse.urldefrag(baseuri)[0]
159
160     for u in (baseuri, fileuri):
161         if u in merged_map:
162             replacements = merged_map[u].resolved
163             if uri in replacements:
164                 return replacements[uri]
165
166     if uri in jobmapper:
167         return jobmapper.mapper(uri).target
168
169     p1 = os.path.dirname(uri_file_path(fileuri))
170     p2 = os.path.dirname(uri_file_path(uri))
171     p3 = os.path.basename(uri_file_path(uri))
172
173     r = os.path.relpath(p2, p1)
174     if r == ".":
175         r = ""
176
177     return os.path.join(r, p3)
178
179 def is_basetype(tp):
180     return _basetype_re.match(tp) is not None
181
182 def update_refs(api, d, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix):
183     if isinstance(d, MutableSequence):
184         for i, s in enumerate(d):
185             if prefix and isinstance(s, str):
186                 if s.startswith(prefix):
187                     d[i] = replacePrefix+s[len(prefix):]
188             else:
189                 update_refs(api, s, baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
190     elif isinstance(d, MutableMapping):
191         for field in ("id", "name"):
192             if isinstance(d.get(field), str) and d[field].startswith("_:"):
193                 # blank node reference, was added in automatically, can get rid of it.
194                 del d[field]
195
196         if "id" in d:
197             baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
198         elif "name" in d and isinstance(d["name"], str):
199             baseuri = urlexpander(d["name"], baseuri, scoped_id=True)
200
201         if d.get("class") == "DockerRequirement":
202             d["http://arvados.org/cwl#dockerCollectionPDH"] = arv_docker_get_image(api, d, False,
203                                                                                    runtimeContext)
204
205         for field in d:
206             if field in ("location", "run", "name") and isinstance(d[field], str):
207                 d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map, jobmapper)
208                 continue
209
210             if field in ("$include", "$import") and isinstance(d[field], str):
211                 d[field] = rel_ref(d[field], baseuri, urlexpander, {}, jobmapper)
212                 continue
213
214             for t in ("type", "items"):
215                 if (field == t and
216                     isinstance(d[t], str) and
217                     not is_basetype(d[t])):
218                     d[t] = rel_ref(d[t], baseuri, urlexpander, merged_map, jobmapper)
219                     continue
220
221             if field == "inputs" and isinstance(d["inputs"], MutableMapping):
222                 for inp in d["inputs"]:
223                     if isinstance(d["inputs"][inp], str) and not is_basetype(d["inputs"][inp]):
224                         d["inputs"][inp] = rel_ref(d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper)
225                     if isinstance(d["inputs"][inp], MutableMapping):
226                         update_refs(api, d["inputs"][inp], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
227                 continue
228
229             if field in ("requirements", "hints") and isinstance(d[field], MutableMapping):
230                 dr = d[field].get("DockerRequirement")
231                 if dr:
232                     dr["http://arvados.org/cwl#dockerCollectionPDH"] = arv_docker_get_image(api, dr, False,
233                                                                                             runtimeContext)
234
235             if field == "$schemas":
236                 for n, s in enumerate(d["$schemas"]):
237                     d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map, jobmapper)
238                 continue
239
240             update_refs(api, d[field], baseuri, urlexpander, merged_map, jobmapper, runtimeContext, prefix, replacePrefix)
241
242
243 def fix_schemadef(req, baseuri, urlexpander, merged_map, jobmapper, pdh):
244     req = copy.deepcopy(req)
245
246     for f in req["types"]:
247         r = f["name"]
248         path, frag = urllib.parse.urldefrag(r)
249         rel = rel_ref(r, baseuri, urlexpander, merged_map, jobmapper)
250         merged_map.setdefault(path, FileUpdates({}, {}))
251         rename = "keep:%s/%s" %(pdh, rel)
252         for mm in merged_map:
253             merged_map[mm].resolved[r] = rename
254     return req
255
256
257 def drop_ids(d):
258     if isinstance(d, MutableSequence):
259         for i, s in enumerate(d):
260             drop_ids(s)
261     elif isinstance(d, MutableMapping):
262         if "id" in d and d["id"].startswith("file:"):
263             del d["id"]
264
265         for field in d:
266             drop_ids(d[field])
267
268
269 def upload_workflow(arvRunner, tool, job_order, project_uuid,
270                         runtimeContext,
271                         uuid=None,
272                         submit_runner_ram=0, name=None, merged_map=None,
273                         submit_runner_image=None,
274                         git_info=None,
275                         set_defaults=False,
276                         jobmapper=None):
277
278     firstfile = None
279     workflow_files = set()
280     import_files = set()
281     include_files = set()
282
283     # The document loader index will have entries for all the files
284     # that were loaded in the process of parsing the entire workflow
285     # (including subworkflows, tools, imports, etc).  We use this to
286     # get compose a list of the workflow file dependencies.
287     for w in tool.doc_loader.idx:
288         if w.startswith("file://"):
289             workflow_files.add(urllib.parse.urldefrag(w)[0])
290             if firstfile is None:
291                 firstfile = urllib.parse.urldefrag(w)[0]
292         if w.startswith("import:file://"):
293             import_files.add(urllib.parse.urldefrag(w[7:])[0])
294         if w.startswith("include:file://"):
295             include_files.add(urllib.parse.urldefrag(w[8:])[0])
296
297     all_files = workflow_files | import_files | include_files
298
299     # Find the longest common prefix among all the file names.  We'll
300     # use this to recreate the directory structure in a keep
301     # collection with correct relative references.
302     prefix = common_prefix(firstfile, all_files) if firstfile else ""
303
304
305     col = arvados.collection.Collection(api_client=arvRunner.api)
306
307     # Now go through all the files and update references to other
308     # files.  We previously scanned for file dependencies, these are
309     # are passed in as merged_map.
310     #
311     # note about merged_map: we upload dependencies of each process
312     # object (CommandLineTool/Workflow) to a separate collection.
313     # That way, when the user edits something, this limits collection
314     # PDH changes to just that tool, and minimizes situations where
315     # small changes break container reuse for the whole workflow.
316     #
317     for w in workflow_files | import_files:
318         # 1. load the YAML  file
319
320         text = tool.doc_loader.fetch_text(w)
321         if isinstance(text, bytes):
322             textIO = StringIO(text.decode('utf-8'))
323         else:
324             textIO = StringIO(text)
325
326         yamlloader = schema_salad.utils.yaml_no_ts()
327         result = yamlloader.load(textIO)
328
329         # If the whole document is in "flow style" it is probably JSON
330         # formatted.  We'll re-export it as JSON because the
331         # ruamel.yaml round-trip mode is a lie and only preserves
332         # "block style" formatting and not "flow style" formatting.
333         export_as_json = result.fa.flow_style()
334
335         # 2. find $import, $include, $schema, run, location
336         # 3. update field value
337         update_refs(arvRunner.api, result, w, tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, "", "")
338
339         # Write the updated file to the collection.
340         with col.open(w[len(prefix):], "wt") as f:
341             if export_as_json:
342                 json.dump(result, f, indent=4, separators=(',',': '))
343             else:
344                 yamlloader.dump(result, stream=f)
345
346         # Also store a verbatim copy of the original files
347         with col.open(os.path.join("original", w[len(prefix):]), "wt") as f:
348             f.write(text)
349
350
351     # Upload files referenced by $include directives, these are used
352     # unchanged and don't need to be updated.
353     for w in include_files:
354         with col.open(w[len(prefix):], "wb") as f1:
355             with col.open(os.path.join("original", w[len(prefix):]), "wb") as f3:
356                 with open(uri_file_path(w), "rb") as f2:
357                     dat = f2.read(65536)
358                     while dat:
359                         f1.write(dat)
360                         f3.write(dat)
361                         dat = f2.read(65536)
362
363     # Now collect metadata: the collection name and git properties.
364
365     toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
366     if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
367         toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
368
369     toolfile = tool.tool["id"][len(prefix):]
370
371     properties = {
372         "type": "workflow",
373         "arv:workflowMain": toolfile,
374     }
375
376     if git_info:
377         for g in git_info:
378             p = g.split("#", 1)[1]
379             properties["arv:"+p] = git_info[g]
380
381     # Check if a collection with the same content already exists in the target project.  If so, just use that one.
382     existing = arvRunner.api.collections().list(filters=[["portable_data_hash", "=", col.portable_data_hash()],
383                                                          ["owner_uuid", "=", arvRunner.project_uuid]]).execute(num_retries=arvRunner.num_retries)
384
385     if len(existing["items"]) == 0:
386         toolname = toolname.replace("/", " ")
387         col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
388         logger.info("Workflow uploaded to %s", col.manifest_locator())
389     else:
390         logger.info("Workflow uploaded to %s", existing["items"][0]["uuid"])
391
392     # Now that we've updated the workflow and saved it to a
393     # collection, we're going to construct a minimal "wrapper"
394     # workflow which consists of only of input and output parameters
395     # connected to a single step that runs the real workflow.
396
397     runfile = "keep:%s/%s" % (col.portable_data_hash(), toolfile)
398
399     step = {
400         "id": "#main/" + toolname,
401         "in": [],
402         "out": [],
403         "run": runfile,
404         "label": name
405     }
406
407     main = tool.tool
408
409     wf_runner_resources = None
410
411     hints = main.get("hints", [])
412     found = False
413     for h in hints:
414         if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
415             wf_runner_resources = h
416             found = True
417             break
418     if not found:
419         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
420         hints.append(wf_runner_resources)
421
422     if "acrContainerImage" not in wf_runner_resources:
423         wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
424                                                                       submit_runner_image or "arvados/jobs:"+__version__,
425                                                                       runtimeContext)
426
427     if submit_runner_ram:
428         wf_runner_resources["ramMin"] = submit_runner_ram
429
430     # Remove a few redundant fields from the "job order" (aka input
431     # object or input parameters).  In the situation where we're
432     # creating or updating a workflow record, any values in the job
433     # order get copied over as default values for input parameters.
434     adjustDirObjs(job_order, trim_listing)
435     adjustFileObjs(job_order, trim_anonymous_location)
436     adjustDirObjs(job_order, trim_anonymous_location)
437
438     newinputs = []
439     for i in main["inputs"]:
440         inp = {}
441         # Make sure to only copy known fields that are meaningful at
442         # the workflow level. In practice this ensures that if we're
443         # wrapping a CommandLineTool we don't grab inputBinding.
444         # Right now also excludes extension fields, which is fine,
445         # Arvados doesn't currently look for any extension fields on
446         # input parameters.
447         for f in ("type", "label", "secondaryFiles", "streamable",
448                   "doc", "format", "loadContents",
449                   "loadListing", "default"):
450             if f in i:
451                 inp[f] = i[f]
452
453         if set_defaults:
454             sn = shortname(i["id"])
455             if sn in job_order:
456                 inp["default"] = job_order[sn]
457
458         inp["id"] = "#main/%s" % shortname(i["id"])
459         newinputs.append(inp)
460
461     wrapper = {
462         "class": "Workflow",
463         "id": "#main",
464         "inputs": newinputs,
465         "outputs": [],
466         "steps": [step]
467     }
468
469     for i in main["inputs"]:
470         step["in"].append({
471             "id": "#main/step/%s" % shortname(i["id"]),
472             "source": "#main/%s" % shortname(i["id"])
473         })
474
475     for i in main["outputs"]:
476         step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
477         wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
478                                    "type": i["type"],
479                                    "id": "#main/%s" % shortname(i["id"])})
480
481     wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
482
483     if main.get("requirements"):
484         wrapper["requirements"].extend(main["requirements"])
485     if hints:
486         wrapper["hints"] = hints
487
488     # Schema definitions (this lets you define things like record
489     # types) require a special handling.
490
491     for i, r in enumerate(wrapper["requirements"]):
492         if r["class"] == "SchemaDefRequirement":
493             wrapper["requirements"][i] = fix_schemadef(r, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, col.portable_data_hash())
494
495     update_refs(arvRunner.api, wrapper, main["id"], tool.doc_loader.expand_url, merged_map, jobmapper, runtimeContext, main["id"]+"#", "#main/")
496
497     doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
498
499     if git_info:
500         for g in git_info:
501             doc[g] = git_info[g]
502
503     # Remove any lingering file references.
504     drop_ids(wrapper)
505
506     return doc
507
508
509 def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
510
511     wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
512
513     body = {
514         "workflow": {
515             "name": name,
516             "description": tool.tool.get("doc", ""),
517             "definition": wrappertext
518         }}
519     if project_uuid:
520         body["workflow"]["owner_uuid"] = project_uuid
521
522     if update_uuid:
523         call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
524     else:
525         call = arvRunner.api.workflows().create(body=body)
526     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
527
528
529 def dedup_reqs(reqs):
530     dedup = {}
531     for r in reversed(reqs):
532         if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
533             dedup[r["class"]] = r
534     return [dedup[r] for r in sorted(dedup.keys())]
535
536 def get_overall_res_req(res_reqs):
537     """Take the overall of a list of ResourceRequirement,
538     i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
539     and the sum of outdirMin, outdirMax."""
540
541     all_res_req = {}
542     exception_msgs = []
543     for a in max_res_pars + sum_res_pars:
544         all_res_req[a] = []
545         for res_req in res_reqs:
546             if a in res_req:
547                 if isinstance(res_req[a], int): # integer check
548                     all_res_req[a].append(res_req[a])
549                 else:
550                     msg = SourceLine(res_req, a).makeError(
551                     "Non-top-level ResourceRequirement in single container cannot have expressions")
552                     exception_msgs.append(msg)
553     if exception_msgs:
554         raise WorkflowException("\n".join(exception_msgs))
555     else:
556         overall_res_req = {}
557         for a in all_res_req:
558             if all_res_req[a]:
559                 if a in max_res_pars:
560                     overall_res_req[a] = max(all_res_req[a])
561                 elif a in sum_res_pars:
562                     overall_res_req[a] = sum(all_res_req[a])
563         if overall_res_req:
564             overall_res_req["class"] = "ResourceRequirement"
565         return cmap(overall_res_req)
566
567 class ArvadosWorkflowStep(WorkflowStep):
568     def __init__(self,
569                  toolpath_object,      # type: Dict[Text, Any]
570                  pos,                  # type: int
571                  loadingContext,       # type: LoadingContext
572                  arvrunner,
573                  *argc,
574                  **argv
575                 ):  # type: (...) -> None
576
577         if arvrunner.fast_submit:
578             self.tool = toolpath_object
579             self.tool["inputs"] = []
580             self.tool["outputs"] = []
581         else:
582             super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
583             self.tool["class"] = "WorkflowStep"
584         self.arvrunner = arvrunner
585
586     def job(self, joborder, output_callback, runtimeContext):
587         runtimeContext = runtimeContext.copy()
588         runtimeContext.toplevel = True  # Preserve behavior for #13365
589
590         builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
591                                runtimeContext, self.metadata)
592         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
593         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
594
595
596 class ArvadosWorkflow(Workflow):
597     """Wrap cwltool Workflow to override selected methods."""
598
599     def __init__(self, arvrunner, toolpath_object, loadingContext):
600         self.arvrunner = arvrunner
601         self.wf_pdh = None
602         self.dynamic_resource_req = []
603         self.static_resource_req = []
604         self.wf_reffiles = []
605         self.loadingContext = loadingContext.copy()
606
607         self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
608         tool_requirements = toolpath_object.get("requirements", [])
609         self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
610         tool_hints = toolpath_object.get("hints", [])
611
612         workflow_runner_req, _ = self.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
613         if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
614             self.loadingContext.default_docker_image = workflow_runner_req.get("acrContainerImage")
615
616         super(ArvadosWorkflow, self).__init__(toolpath_object, self.loadingContext)
617         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
618
619     def job(self, joborder, output_callback, runtimeContext):
620
621         builder = make_builder(joborder, self.hints, self.requirements, runtimeContext, self.metadata)
622         runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
623
624         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
625         if not req:
626             return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
627
628         # RunInSingleContainer is true
629
630         with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
631             if "id" not in self.tool:
632                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
633
634         discover_secondary_files(self.arvrunner.fs_access, builder,
635                                  self.tool["inputs"], joborder)
636
637         normalizeFilesDirs(joborder)
638
639         with Perf(metrics, "subworkflow upload_deps"):
640             upload_dependencies(self.arvrunner,
641                                 os.path.basename(joborder.get("id", "#")),
642                                 self.doc_loader,
643                                 joborder,
644                                 joborder.get("id", "#"),
645                                 runtimeContext)
646
647             if self.wf_pdh is None:
648                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
649
650                 for p in packed["$graph"]:
651                     if p["id"] == "#main":
652                         p["requirements"] = dedup_reqs(self.requirements)
653                         p["hints"] = dedup_reqs(self.hints)
654
655                 def visit(item):
656                     if "requirements" in item:
657                         item["requirements"] = [i for i in item["requirements"] if i["class"] != "DockerRequirement"]
658                     for t in ("hints", "requirements"):
659                         if t not in item:
660                             continue
661                         for req in item[t]:
662                             if req["class"] == "ResourceRequirement":
663                                 dyn = False
664                                 for k in max_res_pars + sum_res_pars:
665                                     if k in req:
666                                         if isinstance(req[k], basestring):
667                                             if item["id"] == "#main":
668                                                 # only the top-level requirements/hints may contain expressions
669                                                 self.dynamic_resource_req.append(req)
670                                                 dyn = True
671                                                 break
672                                             else:
673                                                 with SourceLine(req, k, WorkflowException):
674                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
675                                 if not dyn:
676                                     self.static_resource_req.append(req)
677
678                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
679
680                 if self.static_resource_req:
681                     self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
682
683                 upload_dependencies(self.arvrunner,
684                                     runtimeContext.name,
685                                     self.doc_loader,
686                                     packed,
687                                     self.tool["id"],
688                                     runtimeContext)
689
690                 # Discover files/directories referenced by the
691                 # workflow (mainly "default" values)
692                 visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
693
694
695         if self.dynamic_resource_req:
696             # Evaluate dynamic resource requirements using current builder
697             rs = copy.copy(self.static_resource_req)
698             for dyn_rs in self.dynamic_resource_req:
699                 eval_req = {"class": "ResourceRequirement"}
700                 for a in max_res_pars + sum_res_pars:
701                     if a in dyn_rs:
702                         eval_req[a] = builder.do_eval(dyn_rs[a])
703                 rs.append(eval_req)
704             job_res_reqs = [get_overall_res_req(rs)]
705         else:
706             job_res_reqs = self.static_resource_req
707
708         with Perf(metrics, "subworkflow adjust"):
709             joborder_resolved = copy.deepcopy(joborder)
710             joborder_keepmount = copy.deepcopy(joborder)
711
712             reffiles = []
713             visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
714
715             mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
716                                    "/keep/%s",
717                                    "/keep/%s/%s")
718
719             # For containers API, we need to make sure any extra
720             # referenced files (ie referenced by the workflow but
721             # not in the inputs) are included in the mounts.
722             if self.wf_reffiles:
723                 runtimeContext = runtimeContext.copy()
724                 runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
725
726             def keepmount(obj):
727                 remove_redundant_fields(obj)
728                 with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
729                     if "location" not in obj:
730                         raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
731                 with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
732                     if obj["location"].startswith("keep:"):
733                         obj["location"] = mapper.mapper(obj["location"]).target
734                         if "listing" in obj:
735                             del obj["listing"]
736                     elif obj["location"].startswith("_:"):
737                         del obj["location"]
738                     else:
739                         raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
740
741             visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
742
743             def resolved(obj):
744                 if obj["location"].startswith("keep:"):
745                     obj["location"] = mapper.mapper(obj["location"]).resolved
746
747             visit_class(joborder_resolved, ("File", "Directory"), resolved)
748
749             if self.wf_pdh is None:
750                 adjustFileObjs(packed, keepmount)
751                 adjustDirObjs(packed, keepmount)
752                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
753
754         self.loadingContext = self.loadingContext.copy()
755         self.loadingContext.metadata = self.loadingContext.metadata.copy()
756         self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
757
758         if len(job_res_reqs) == 1:
759             # RAM request needs to be at least 128 MiB or the workflow
760             # runner itself won't run reliably.
761             if job_res_reqs[0].get("ramMin", 1024) < 128:
762                 job_res_reqs[0]["ramMin"] = 128
763
764         arguments = ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl", "cwl.input.yml"]
765         if runtimeContext.debug:
766             arguments.insert(0, '--debug')
767
768         wf_runner = cmap({
769             "class": "CommandLineTool",
770             "baseCommand": "cwltool",
771             "inputs": self.tool["inputs"],
772             "outputs": self.tool["outputs"],
773             "stdout": "cwl.output.json",
774             "requirements": self.requirements+job_res_reqs+[
775                 {"class": "InlineJavascriptRequirement"},
776                 {
777                 "class": "InitialWorkDirRequirement",
778                 "listing": [{
779                         "entryname": "workflow.cwl",
780                         "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
781                     }, {
782                         "entryname": "cwl.input.yml",
783                         "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
784                     }]
785             }],
786             "hints": self.hints,
787             "arguments": arguments,
788             "id": "#"
789         })
790         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
791
792     def make_workflow_step(self,
793                            toolpath_object,      # type: Dict[Text, Any]
794                            pos,                  # type: int
795                            loadingContext,       # type: LoadingContext
796                            *argc,
797                            **argv
798     ):
799         # (...) -> WorkflowStep
800         return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)