Merge branch '9964-output-glob-acr' refs #9964
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import sys
7 import re
8 import urllib.parse
9 from functools import partial
10 import logging
11 import json
12 import copy
13 from collections import namedtuple
14 from io import StringIO
15 from typing import (
16     Any,
17     Callable,
18     Dict,
19     Iterable,
20     Iterator,
21     List,
22     Mapping,
23     MutableMapping,
24     Sequence,
25     MutableSequence,
26     Optional,
27     Set,
28     Sized,
29     Tuple,
30     Type,
31     Union,
32     cast,
33 )
34
35 import subprocess
36
37 from schema_salad.sourceline import SourceLine, cmap
38
39 from cwltool.command_line_tool import CommandLineTool
40 import cwltool.workflow
41 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
42                              shortname, Process, fill_in_defaults)
43 from cwltool.load_tool import fetch_document, jobloaderctx
44 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
45 from cwltool.builder import substitute
46 from cwltool.pack import pack
47 from cwltool.update import INTERNAL_VERSION
48 from cwltool.builder import Builder
49 import schema_salad.validate as validate
50 import schema_salad.ref_resolver
51
52 import arvados.collection
53 import arvados.util
54 from .util import collectionUUID
55 from ruamel.yaml import YAML
56 from ruamel.yaml.comments import CommentedMap, CommentedSeq
57
58 import arvados_cwl.arvdocker
59 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern, MapperEnt
60 from ._version import __version__
61 from . import done
62 from . context import ArvRuntimeContext
63 from .perf import Perf
64
65 basestring = (bytes, str)
66 logger = logging.getLogger('arvados.cwl-runner')
67 metrics = logging.getLogger('arvados.cwl-runner.metrics')
68
69 def trim_anonymous_location(obj):
70     """Remove 'location' field from File and Directory literals.
71
72     To make internal handling easier, literals are assigned a random id for
73     'location'.  However, when writing the record back out, this can break
74     reproducibility.  Since it is valid for literals not have a 'location'
75     field, remove it.
76
77     """
78
79     if obj.get("location", "").startswith("_:"):
80         del obj["location"]
81
82
83 def remove_redundant_fields(obj):
84     for field in ("path", "nameext", "nameroot", "dirname"):
85         if field in obj:
86             del obj[field]
87
88
89 def find_defaults(d, op):
90     if isinstance(d, list):
91         for i in d:
92             find_defaults(i, op)
93     elif isinstance(d, dict):
94         if "default" in d:
95             op(d)
96         else:
97             for i in d.values():
98                 find_defaults(i, op)
99
100 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
101     return Builder(
102                  job=joborder,
103                  files=[],               # type: List[Dict[Text, Text]]
104                  bindings=[],            # type: List[Dict[Text, Any]]
105                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
106                  names=None,               # type: Names
107                  requirements=requirements,        # type: List[Dict[Text, Any]]
108                  hints=hints,               # type: List[Dict[Text, Any]]
109                  resources={},           # type: Dict[str, int]
110                  mutation_manager=None,    # type: Optional[MutationManager]
111                  formatgraph=None,         # type: Optional[Graph]
112                  make_fs_access=None,      # type: Type[StdFsAccess]
113                  fs_access=None,           # type: StdFsAccess
114                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
115                  timeout=runtimeContext.eval_timeout,             # type: float
116                  debug=runtimeContext.debug,               # type: bool
117                  js_console=runtimeContext.js_console,          # type: bool
118                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
119                  loadListing="",         # type: Text
120                  outdir="",              # type: Text
121                  tmpdir="",              # type: Text
122                  stagedir="",            # type: Text
123                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
124                  container_engine="docker"
125                 )
126
127 def search_schemadef(name, reqs):
128     for r in reqs:
129         if r["class"] == "SchemaDefRequirement":
130             for sd in r["types"]:
131                 if sd["name"] == name:
132                     return sd
133     return None
134
135 primitive_types_set = frozenset(("null", "boolean", "int", "long",
136                                  "float", "double", "string", "record",
137                                  "array", "enum"))
138
139 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
140     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
141         # union type, collect all possible secondaryFiles
142         for i in inputschema:
143             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
144         return
145
146     if inputschema == "File":
147         inputschema = {"type": "File"}
148
149     if isinstance(inputschema, basestring):
150         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
151         if sd:
152             inputschema = sd
153         else:
154             return
155
156     if "secondaryFiles" in inputschema:
157         # set secondaryFiles, may be inherited by compound types.
158         secondaryspec = inputschema["secondaryFiles"]
159
160     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
161         not isinstance(inputschema["type"], basestring)):
162         # compound type (union, array, record)
163         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
164
165     elif (inputschema["type"] == "record" and
166           isinstance(primary, Mapping)):
167         #
168         # record type, find secondary files associated with fields.
169         #
170         for f in inputschema["fields"]:
171             p = primary.get(shortname(f["name"]))
172             if p:
173                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
174
175     elif (inputschema["type"] == "array" and
176           isinstance(primary, Sequence)):
177         #
178         # array type, find secondary files of elements
179         #
180         for p in primary:
181             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
182
183     elif (inputschema["type"] == "File" and
184           isinstance(primary, Mapping) and
185           primary.get("class") == "File"):
186
187         if "secondaryFiles" in primary or not secondaryspec:
188             # Nothing to do.
189             return
190
191         #
192         # Found a file, check for secondaryFiles
193         #
194         specs = []
195         primary["secondaryFiles"] = secondaryspec
196         for i, sf in enumerate(aslist(secondaryspec)):
197             if builder.cwlVersion == "v1.0":
198                 pattern = sf
199             else:
200                 pattern = sf["pattern"]
201             if pattern is None:
202                 continue
203             if isinstance(pattern, list):
204                 specs.extend(pattern)
205             elif isinstance(pattern, dict):
206                 specs.append(pattern)
207             elif isinstance(pattern, str):
208                 if builder.cwlVersion == "v1.0":
209                     specs.append({"pattern": pattern, "required": True})
210                 else:
211                     specs.append({"pattern": pattern, "required": sf.get("required")})
212             else:
213                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
214                     "Expression must return list, object, string or null")
215
216         found = []
217         for i, sf in enumerate(specs):
218             if isinstance(sf, dict):
219                 if sf.get("class") == "File":
220                     pattern = None
221                     if sf.get("location") is None:
222                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
223                             "File object is missing 'location': %s" % sf)
224                     sfpath = sf["location"]
225                     required = True
226                 else:
227                     pattern = sf["pattern"]
228                     required = sf.get("required")
229             elif isinstance(sf, str):
230                 pattern = sf
231                 required = True
232             else:
233                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
234                     "Expression must return list, object, string or null")
235
236             if pattern is not None:
237                 if "${" in pattern or "$(" in pattern:
238                     sfname = builder.do_eval(pattern, context=primary)
239                 else:
240                     sfname = substitute(primary["basename"], pattern)
241
242                 if sfname is None:
243                     continue
244
245                 if isinstance(sfname, str):
246                     p_location = primary["location"]
247                     if "/" in p_location:
248                         sfpath = (
249                             p_location[0 : p_location.rindex("/") + 1]
250                             + sfname
251                         )
252
253             required = builder.do_eval(required, context=primary)
254
255             if isinstance(sfname, list) or isinstance(sfname, dict):
256                 each = aslist(sfname)
257                 for e in each:
258                     if required and not fsaccess.exists(e.get("location")):
259                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
260                             "Required secondary file '%s' does not exist" % e.get("location"))
261                 found.extend(each)
262
263             if isinstance(sfname, str):
264                 if fsaccess.exists(sfpath):
265                     if pattern is not None:
266                         found.append({"location": sfpath, "class": "File"})
267                     else:
268                         found.append(sf)
269                 elif required:
270                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
271                         "Required secondary file '%s' does not exist" % sfpath)
272
273         primary["secondaryFiles"] = cmap(found)
274         if discovered is not None:
275             discovered[primary["location"]] = primary["secondaryFiles"]
276     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
277         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
278
279 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
280     for inputschema in inputs:
281         primary = job_order.get(shortname(inputschema["id"]))
282         if isinstance(primary, (Mapping, Sequence)):
283             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
284
285 def upload_dependencies(arvrunner, name, document_loader,
286                         workflowobj, uri, runtimeContext,
287                         include_primary=True, discovered_secondaryfiles=None,
288                         cache=None):
289     """Upload the dependencies of the workflowobj document to Keep.
290
291     Returns a pathmapper object mapping local paths to keep references.  Also
292     does an in-place update of references in "workflowobj".
293
294     Use scandeps to find $schemas, File and Directory
295     fields that represent external references.
296
297     If workflowobj has an "id" field, this will reload the document to ensure
298     it is scanning the raw document prior to preprocessing.
299     """
300
301     scanobj = workflowobj
302     metadata = scanobj
303
304     with Perf(metrics, "scandeps"):
305         sc_result = scandeps(uri, scanobj,
306                              set(),
307                              set(("location",)),
308                              None, urljoin=document_loader.fetcher.urljoin,
309                              nestdirs=False)
310         optional_deps = scandeps(uri, scanobj,
311                              set(),
312                              set(("$schemas",)),
313                              None, urljoin=document_loader.fetcher.urljoin,
314                              nestdirs=False)
315
316     if sc_result is None:
317         sc_result = []
318
319     if optional_deps is None:
320         optional_deps = []
321
322     if optional_deps:
323         sc_result.extend(optional_deps)
324
325     sc = []
326     uuids = {}
327
328     def collect_uuids(obj):
329         loc = obj.get("location", "")
330         sp = loc.split(":")
331         if sp[0] == "keep":
332             # Collect collection uuids that need to be resolved to
333             # portable data hashes
334             gp = collection_uuid_pattern.match(loc)
335             if gp:
336                 uuids[gp.groups()[0]] = obj
337             if collectionUUID in obj:
338                 uuids[obj[collectionUUID]] = obj
339
340     def collect_uploads(obj):
341         loc = obj.get("location", "")
342         sp = loc.split(":")
343         if len(sp) < 1:
344             return
345         if sp[0] in ("file", "http", "https"):
346             # Record local files than need to be uploaded,
347             # don't include file literals, keep references, etc.
348             sc.append(obj)
349         collect_uuids(obj)
350
351     with Perf(metrics, "collect uuids"):
352         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
353
354     with Perf(metrics, "collect uploads"):
355         visit_class(sc_result, ("File", "Directory"), collect_uploads)
356
357     # Resolve any collection uuids we found to portable data hashes
358     # and assign them to uuid_map
359     uuid_map = {}
360     fetch_uuids = list(uuids.keys())
361     with Perf(metrics, "fetch_uuids"):
362         while fetch_uuids:
363             # For a large number of fetch_uuids, API server may limit
364             # response size, so keep fetching from API server has nothing
365             # more to give us.
366             lookups = arvrunner.api.collections().list(
367                 filters=[["uuid", "in", fetch_uuids]],
368                 count="none",
369                 select=["uuid", "portable_data_hash"]).execute(
370                     num_retries=arvrunner.num_retries)
371
372             if not lookups["items"]:
373                 break
374
375             for l in lookups["items"]:
376                 uuid_map[l["uuid"]] = l["portable_data_hash"]
377
378             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
379
380     normalizeFilesDirs(sc)
381
382     if "id" in workflowobj:
383         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
384         if include_primary:
385             # make sure it's included
386             sc.append({"class": "File", "location": defrg})
387         else:
388             # make sure it's excluded
389             sc = [d for d in sc if d.get("location") != defrg]
390
391     def visit_default(obj):
392         def defaults_are_optional(f):
393             if "location" not in f and "path" in f:
394                 f["location"] = f["path"]
395                 del f["path"]
396             normalizeFilesDirs(f)
397             optional_deps.append(f)
398         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
399
400     find_defaults(workflowobj, visit_default)
401
402     discovered = {}
403     def discover_default_secondary_files(obj):
404         builder_job_order = {}
405         for t in obj["inputs"]:
406             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
407         # Need to create a builder object to evaluate expressions.
408         builder = make_builder(builder_job_order,
409                                obj.get("hints", []),
410                                obj.get("requirements", []),
411                                ArvRuntimeContext(),
412                                metadata)
413         discover_secondary_files(arvrunner.fs_access,
414                                  builder,
415                                  obj["inputs"],
416                                  builder_job_order,
417                                  discovered)
418
419     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
420     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
421
422     for d in list(discovered):
423         # Only interested in discovered secondaryFiles which are local
424         # files that need to be uploaded.
425         if d.startswith("file:"):
426             sc.extend(discovered[d])
427         else:
428             del discovered[d]
429
430     with Perf(metrics, "mapper"):
431         mapper = ArvPathMapper(arvrunner, sc, "",
432                                "keep:%s",
433                                "keep:%s/%s",
434                                name=name,
435                                single_collection=True,
436                                optional_deps=optional_deps)
437
438     for k, v in uuid_map.items():
439         mapper._pathmap["keep:"+k] = MapperEnt(v, "", "", False)
440
441     keeprefs = set()
442     def addkeepref(k):
443         if k.startswith("keep:"):
444             keeprefs.add(collection_pdh_pattern.match(k).group(1))
445
446
447     def collectloc(p):
448         loc = p.get("location")
449         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
450             addkeepref(p["location"])
451             return
452
453         if not loc:
454             return
455
456         if collectionUUID in p:
457             uuid = p[collectionUUID]
458             if uuid not in uuid_map:
459                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
460                     "Collection uuid %s not found" % uuid)
461             gp = collection_pdh_pattern.match(loc)
462             if gp and uuid_map[uuid] != gp.groups()[0]:
463                 # This file entry has both collectionUUID and a PDH
464                 # location. If the PDH doesn't match the one returned
465                 # the API server, raise an error.
466                 raise SourceLine(p, "location", validate.ValidationException).makeError(
467                     "Expected collection uuid %s to be %s but API server reported %s" % (
468                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
469
470         gp = collection_uuid_pattern.match(loc)
471         if not gp:
472             # Not a uuid pattern (must be a pdh pattern)
473             addkeepref(p["location"])
474             return
475
476         uuid = gp.groups()[0]
477         if uuid not in uuid_map:
478             raise SourceLine(p, "location", validate.ValidationException).makeError(
479                 "Collection uuid %s not found" % uuid)
480
481     with Perf(metrics, "collectloc"):
482         visit_class(workflowobj, ("File", "Directory"), collectloc)
483         visit_class(discovered, ("File", "Directory"), collectloc)
484
485     if discovered_secondaryfiles is not None:
486         for d in discovered:
487             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
488
489     if runtimeContext.copy_deps:
490         # Find referenced collections and copy them into the
491         # destination project, for easy sharing.
492         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
493                                      filters=[["portable_data_hash", "in", list(keeprefs)],
494                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
495                                      select=["uuid", "portable_data_hash", "created_at"]))
496
497         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
498         for kr in keeprefs:
499             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
500                                                   order="created_at desc",
501                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
502                                                    limit=1).execute()
503             if len(col["items"]) == 0:
504                 logger.warning("Cannot find collection with portable data hash %s", kr)
505                 continue
506             col = col["items"][0]
507             col["name"] = arvados.util.trim_name(col["name"])
508             try:
509                 arvrunner.api.collections().create(body={"collection": {
510                     "owner_uuid": runtimeContext.project_uuid,
511                     "name": col["name"],
512                     "description": col["description"],
513                     "properties": col["properties"],
514                     "portable_data_hash": col["portable_data_hash"],
515                     "manifest_text": col["manifest_text"],
516                     "storage_classes_desired": col["storage_classes_desired"],
517                     "trash_at": col["trash_at"]
518                 }}, ensure_unique_name=True).execute()
519             except Exception as e:
520                 logger.warning("Unable to copy collection to destination: %s", e)
521
522     if "$schemas" in workflowobj:
523         sch = CommentedSeq()
524         for s in workflowobj["$schemas"]:
525             if s in mapper:
526                 sch.append(mapper.mapper(s).resolved)
527         workflowobj["$schemas"] = sch
528
529     return mapper
530
531
532 def upload_docker(arvrunner, tool, runtimeContext):
533     """Uploads Docker images used in CommandLineTool objects."""
534
535     if isinstance(tool, CommandLineTool):
536         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
537         if docker_req:
538             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
539                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
540                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
541
542             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
543         else:
544             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
545                                                        True, runtimeContext)
546     elif isinstance(tool, cwltool.workflow.Workflow):
547         for s in tool.steps:
548             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
549
550
551 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
552     """Create a packed workflow.
553
554     A "packed" workflow is one where all the components have been combined into a single document."""
555
556     rewrites = {}
557     packed = pack(arvrunner.loadingContext, tool.tool["id"],
558                   rewrite_out=rewrites,
559                   loader=tool.doc_loader)
560
561     rewrite_to_orig = {v: k for k,v in rewrites.items()}
562
563     def visit(v, cur_id):
564         if isinstance(v, dict):
565             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
566                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
567                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
568                 if "id" in v:
569                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
570             if "path" in v and "location" not in v:
571                 v["location"] = v["path"]
572                 del v["path"]
573             if "location" in v and cur_id in merged_map:
574                 if v["location"] in merged_map[cur_id].resolved:
575                     v["location"] = merged_map[cur_id].resolved[v["location"]]
576                 if v["location"] in merged_map[cur_id].secondaryFiles:
577                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
578             if v.get("class") == "DockerRequirement":
579                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
580                                                                                                              runtimeContext)
581             for l in v:
582                 visit(v[l], cur_id)
583         if isinstance(v, list):
584             for l in v:
585                 visit(l, cur_id)
586     visit(packed, None)
587
588     if git_info:
589         for g in git_info:
590             packed[g] = git_info[g]
591
592     return packed
593
594
595 def tag_git_version(packed):
596     if tool.tool["id"].startswith("file://"):
597         path = os.path.dirname(tool.tool["id"][7:])
598         try:
599             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
600         except (OSError, subprocess.CalledProcessError):
601             pass
602         else:
603             packed["http://schema.org/version"] = githash
604
605 def setloc(mapper, p):
606     loc = p.get("location")
607     if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
608         p["location"] = mapper.mapper(p["location"]).resolved
609         return
610
611     if not loc:
612         return
613
614     if collectionUUID in p:
615         uuid = p[collectionUUID]
616         keepuuid = "keep:"+uuid
617         if keepuuid not in mapper:
618             raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
619                 "Collection uuid %s not found" % uuid)
620         gp = collection_pdh_pattern.match(loc)
621         if gp and mapper.mapper(keepuuid).resolved != gp.groups()[0]:
622             # This file entry has both collectionUUID and a PDH
623             # location. If the PDH doesn't match the one returned
624             # the API server, raise an error.
625             raise SourceLine(p, "location", validate.ValidationException).makeError(
626                 "Expected collection uuid %s to be %s but API server reported %s" % (
627                     uuid, gp.groups()[0], mapper.mapper(keepuuid).resolved))
628
629     gp = collection_uuid_pattern.match(loc)
630     if not gp:
631         # Not a uuid pattern (must be a pdh pattern)
632         return
633
634     uuid = gp.groups()[0]
635     keepuuid = "keep:"+uuid
636     if keepuuid not in mapper:
637         raise SourceLine(p, "location", validate.ValidationException).makeError(
638             "Collection uuid %s not found" % uuid)
639     p["location"] = "keep:%s%s" % (mapper.mapper(keepuuid).resolved, gp.groups()[1] if gp.groups()[1] else "")
640     p[collectionUUID] = uuid
641
642 def update_from_mapper(workflowobj, mapper):
643     with Perf(metrics, "setloc"):
644         visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
645
646 def apply_merged_map(merged_map, workflowobj):
647     def visit(v, cur_id):
648         if isinstance(v, dict):
649             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
650                 if "id" in v:
651                     cur_id = v["id"]
652             if "path" in v and "location" not in v:
653                 v["location"] = v["path"]
654                 del v["path"]
655             if "location" in v and cur_id in merged_map:
656                 if v["location"] in merged_map[cur_id].resolved:
657                     v["location"] = merged_map[cur_id].resolved[v["location"]]
658                 if v["location"] in merged_map[cur_id].secondaryFiles:
659                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
660             #if v.get("class") == "DockerRequirement":
661             #    v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
662             #                                                                                                 runtimeContext)
663             for l in v:
664                 visit(v[l], cur_id)
665         if isinstance(v, list):
666             for l in v:
667                 visit(l, cur_id)
668     visit(workflowobj, None)
669
670 def update_from_merged_map(tool, merged_map):
671     tool.visit(partial(apply_merged_map, merged_map))
672
673 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
674     """Upload local files referenced in the input object and return updated input
675     object with 'location' updated to the proper keep references.
676     """
677
678     # Make a copy of the job order and set defaults.
679     builder_job_order = copy.copy(job_order)
680
681     # fill_in_defaults throws an error if there are any
682     # missing required parameters, we don't want it to do that
683     # so make them all optional.
684     inputs_copy = copy.deepcopy(tool.tool["inputs"])
685     for i in inputs_copy:
686         if "null" not in i["type"]:
687             i["type"] = ["null"] + aslist(i["type"])
688
689     fill_in_defaults(inputs_copy,
690                      builder_job_order,
691                      arvrunner.fs_access)
692     # Need to create a builder object to evaluate expressions.
693     builder = make_builder(builder_job_order,
694                            tool.hints,
695                            tool.requirements,
696                            ArvRuntimeContext(),
697                            tool.metadata)
698     # Now update job_order with secondaryFiles
699     discover_secondary_files(arvrunner.fs_access,
700                              builder,
701                              tool.tool["inputs"],
702                              job_order)
703
704     _jobloaderctx = jobloaderctx.copy()
705     jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
706
707     jobmapper = upload_dependencies(arvrunner,
708                                     name,
709                                     jobloader,
710                                     job_order,
711                                     job_order.get("id", "#"),
712                                     runtimeContext)
713
714     if "id" in job_order:
715         del job_order["id"]
716
717     # Need to filter this out, gets added by cwltool when providing
718     # parameters on the command line.
719     if "job_order" in job_order:
720         del job_order["job_order"]
721
722     update_from_mapper(job_order, jobmapper)
723
724     return job_order, jobmapper
725
726 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
727
728 def upload_workflow_deps(arvrunner, tool, runtimeContext):
729     # Ensure that Docker images needed by this workflow are available
730
731     with Perf(metrics, "upload_docker"):
732         upload_docker(arvrunner, tool, runtimeContext)
733
734     document_loader = tool.doc_loader
735
736     merged_map = {}
737     tool_dep_cache = {}
738
739     todo = []
740
741     # Standard traversal is top down, we want to go bottom up, so use
742     # the visitor to accumalate a list of nodes to visit, then
743     # visit them in reverse order.
744     def upload_tool_deps(deptool):
745         if "id" in deptool:
746             todo.append(deptool)
747
748     tool.visit(upload_tool_deps)
749
750     for deptool in reversed(todo):
751         discovered_secondaryfiles = {}
752         with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
753             pm = upload_dependencies(arvrunner,
754                                      "%s dependencies" % (shortname(deptool["id"])),
755                                      document_loader,
756                                      deptool,
757                                      deptool["id"],
758                                      runtimeContext,
759                                      include_primary=False,
760                                      discovered_secondaryfiles=discovered_secondaryfiles,
761                                      cache=tool_dep_cache)
762
763         document_loader.idx[deptool["id"]] = deptool
764         toolmap = {}
765         for k,v in pm.items():
766             toolmap[k] = v.resolved
767
768         merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
769
770     return merged_map
771
772 def arvados_jobs_image(arvrunner, img, runtimeContext):
773     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
774
775     try:
776         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
777                                                           True, runtimeContext)
778     except Exception as e:
779         raise Exception("Docker image %s is not available\n%s" % (img, e) )
780
781
782 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
783     collection = arvados.collection.Collection(api_client=arvrunner.api,
784                                                keep_client=arvrunner.keep_client,
785                                                num_retries=arvrunner.num_retries)
786     with collection.open("workflow.cwl", "w") as f:
787         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
788
789     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
790                ["name", "like", name+"%"]]
791     if runtimeContext.project_uuid:
792         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
793     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
794
795     if exists["items"]:
796         logger.info("Using collection %s", exists["items"][0]["uuid"])
797     else:
798         collection.save_new(name=name,
799                             owner_uuid=runtimeContext.project_uuid,
800                             ensure_unique_name=True,
801                             num_retries=arvrunner.num_retries)
802         logger.info("Uploaded to %s", collection.manifest_locator())
803
804     return collection.portable_data_hash()
805
806
807 class Runner(Process):
808     """Base class for runner processes, which submit an instance of
809     arvados-cwl-runner and wait for the final result."""
810
811     def __init__(self, runner,
812                  tool, loadingContext, enable_reuse,
813                  output_name, output_tags, submit_runner_ram=0,
814                  name=None, on_error=None, submit_runner_image=None,
815                  intermediate_output_ttl=0, merged_map=None,
816                  priority=None, secret_store=None,
817                  collection_cache_size=256,
818                  collection_cache_is_default=True,
819                  git_info=None,
820                  reuse_runner=False):
821
822         self.loadingContext = loadingContext.copy()
823
824         super(Runner, self).__init__(tool.tool, loadingContext)
825
826         self.arvrunner = runner
827         self.embedded_tool = tool
828         self.job_order = None
829         self.running = False
830         if enable_reuse:
831             # If reuse is permitted by command line arguments but
832             # disabled by the workflow itself, disable it.
833             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
834             if reuse_req:
835                 enable_reuse = reuse_req["enableReuse"]
836             reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
837             if reuse_req:
838                 enable_reuse = reuse_req["enableReuse"]
839         self.enable_reuse = enable_reuse
840         self.uuid = None
841         self.final_output = None
842         self.output_name = output_name
843         self.output_tags = output_tags
844         self.name = name
845         self.on_error = on_error
846         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
847         self.intermediate_output_ttl = intermediate_output_ttl
848         self.priority = priority
849         self.secret_store = secret_store
850         self.enable_dev = self.loadingContext.enable_dev
851         self.git_info = git_info
852         self.fast_parser = self.loadingContext.fast_parser
853         self.reuse_runner = reuse_runner
854
855         self.submit_runner_cores = 1
856         self.submit_runner_ram = 1024  # defaut 1 GiB
857         self.collection_cache_size = collection_cache_size
858
859         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
860         if runner_resource_req:
861             if runner_resource_req.get("coresMin"):
862                 self.submit_runner_cores = runner_resource_req["coresMin"]
863             if runner_resource_req.get("ramMin"):
864                 self.submit_runner_ram = runner_resource_req["ramMin"]
865             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
866                 self.collection_cache_size = runner_resource_req["keep_cache"]
867
868         if submit_runner_ram:
869             # Command line / initializer overrides default and/or spec from workflow
870             self.submit_runner_ram = submit_runner_ram
871
872         if self.submit_runner_ram <= 0:
873             raise Exception("Value of submit-runner-ram must be greater than zero")
874
875         if self.submit_runner_cores <= 0:
876             raise Exception("Value of submit-runner-cores must be greater than zero")
877
878         self.merged_map = merged_map or {}
879
880     def job(self,
881             job_order,         # type: Mapping[Text, Text]
882             output_callbacks,  # type: Callable[[Any, Any], Any]
883             runtimeContext     # type: RuntimeContext
884            ):  # type: (...) -> Generator[Any, None, None]
885         self.job_order = job_order
886         self._init_job(job_order, runtimeContext)
887         yield self
888
889     def update_pipeline_component(self, record):
890         pass
891
892     def done(self, record):
893         """Base method for handling a completed runner."""
894
895         try:
896             if record["state"] == "Complete":
897                 if record.get("exit_code") is not None:
898                     if record["exit_code"] == 33:
899                         processStatus = "UnsupportedRequirement"
900                     elif record["exit_code"] == 0:
901                         processStatus = "success"
902                     else:
903                         processStatus = "permanentFail"
904                 else:
905                     processStatus = "success"
906             else:
907                 processStatus = "permanentFail"
908
909             outputs = {}
910
911             if processStatus == "permanentFail":
912                 logc = arvados.collection.CollectionReader(record["log"],
913                                                            api_client=self.arvrunner.api,
914                                                            keep_client=self.arvrunner.keep_client,
915                                                            num_retries=self.arvrunner.num_retries)
916                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
917                              include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
918
919             self.final_output = record["output"]
920             outc = arvados.collection.CollectionReader(self.final_output,
921                                                        api_client=self.arvrunner.api,
922                                                        keep_client=self.arvrunner.keep_client,
923                                                        num_retries=self.arvrunner.num_retries)
924             if "cwl.output.json" in outc:
925                 with outc.open("cwl.output.json", "rb") as f:
926                     if f.size() > 0:
927                         outputs = json.loads(str(f.read(), 'utf-8'))
928             def keepify(fileobj):
929                 path = fileobj["location"]
930                 if not path.startswith("keep:"):
931                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
932             adjustFileObjs(outputs, keepify)
933             adjustDirObjs(outputs, keepify)
934         except Exception:
935             logger.exception("[%s] While getting final output object", self.name)
936             self.arvrunner.output_callback({}, "permanentFail")
937         else:
938             self.arvrunner.output_callback(outputs, processStatus)
939
940
941 def print_keep_deps_visitor(api, runtimeContext, references, doc_loader, tool):
942     def collect_locators(obj):
943         loc = obj.get("location", "")
944
945         g = arvados.util.keepuri_pattern.match(loc)
946         if g:
947             references.add(g[1])
948
949         if obj.get("class") == "http://arvados.org/cwl#WorkflowRunnerResources" and "acrContainerImage" in obj:
950             references.add(obj["acrContainerImage"])
951
952         if obj.get("class") == "DockerRequirement":
953             references.add(arvados_cwl.arvdocker.arv_docker_get_image(api, obj, False, runtimeContext))
954
955     sc_result = scandeps(tool["id"], tool,
956                          set(),
957                          set(("location", "id")),
958                          None, urljoin=doc_loader.fetcher.urljoin,
959                          nestdirs=False)
960
961     visit_class(sc_result, ("File", "Directory"), collect_locators)
962     visit_class(tool, ("DockerRequirement", "http://arvados.org/cwl#WorkflowRunnerResources"), collect_locators)
963
964
965 def print_keep_deps(arvRunner, runtimeContext, merged_map, tool):
966     references = set()
967
968     tool.visit(partial(print_keep_deps_visitor, arvRunner.api, runtimeContext, references, tool.doc_loader))
969
970     for mm in merged_map:
971         for k, v in merged_map[mm].resolved.items():
972             g = arvados.util.keepuri_pattern.match(v)
973             if g:
974                 references.add(g[1])
975
976     json.dump(sorted(references), arvRunner.stdout)
977     print(file=arvRunner.stdout)