19678: Fix for parameters called 'name'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import (
21     Any,
22     Callable,
23     Dict,
24     Iterable,
25     Iterator,
26     List,
27     Mapping,
28     MutableMapping,
29     Sequence,
30     MutableSequence,
31     Optional,
32     Set,
33     Sized,
34     Tuple,
35     Type,
36     Union,
37     cast,
38 )
39 from cwltool.utils import (
40     CWLObjectType,
41     CWLOutputAtomType,
42     CWLOutputType,
43 )
44
45 if os.name == "posix" and sys.version_info[0] < 3:
46     import subprocess32 as subprocess
47 else:
48     import subprocess
49
50 from schema_salad.sourceline import SourceLine, cmap
51
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55                              shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
64
65 import arvados.collection
66 import arvados.util
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
70
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
73 from ._version import __version__
74 from . import done
75 from . context import ArvRuntimeContext
76 from .perf import Perf
77
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
80
81 def trim_anonymous_location(obj):
82     """Remove 'location' field from File and Directory literals.
83
84     To make internal handling easier, literals are assigned a random id for
85     'location'.  However, when writing the record back out, this can break
86     reproducibility.  Since it is valid for literals not have a 'location'
87     field, remove it.
88
89     """
90
91     if obj.get("location", "").startswith("_:"):
92         del obj["location"]
93
94
95 def remove_redundant_fields(obj):
96     for field in ("path", "nameext", "nameroot", "dirname"):
97         if field in obj:
98             del obj[field]
99
100
101 def find_defaults(d, op):
102     if isinstance(d, list):
103         for i in d:
104             find_defaults(i, op)
105     elif isinstance(d, dict):
106         if "default" in d:
107             op(d)
108         else:
109             for i in viewvalues(d):
110                 find_defaults(i, op)
111
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
113     return Builder(
114                  job=joborder,
115                  files=[],               # type: List[Dict[Text, Text]]
116                  bindings=[],            # type: List[Dict[Text, Any]]
117                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
118                  names=None,               # type: Names
119                  requirements=requirements,        # type: List[Dict[Text, Any]]
120                  hints=hints,               # type: List[Dict[Text, Any]]
121                  resources={},           # type: Dict[str, int]
122                  mutation_manager=None,    # type: Optional[MutationManager]
123                  formatgraph=None,         # type: Optional[Graph]
124                  make_fs_access=None,      # type: Type[StdFsAccess]
125                  fs_access=None,           # type: StdFsAccess
126                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127                  timeout=runtimeContext.eval_timeout,             # type: float
128                  debug=runtimeContext.debug,               # type: bool
129                  js_console=runtimeContext.js_console,          # type: bool
130                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
131                  loadListing="",         # type: Text
132                  outdir="",              # type: Text
133                  tmpdir="",              # type: Text
134                  stagedir="",            # type: Text
135                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136                  container_engine="docker"
137                 )
138
139 def search_schemadef(name, reqs):
140     for r in reqs:
141         if r["class"] == "SchemaDefRequirement":
142             for sd in r["types"]:
143                 if sd["name"] == name:
144                     return sd
145     return None
146
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148                                  "float", "double", "string", "record",
149                                  "array", "enum"))
150
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153         # union type, collect all possible secondaryFiles
154         for i in inputschema:
155             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
156         return
157
158     if inputschema == "File":
159         inputschema = {"type": "File"}
160
161     if isinstance(inputschema, basestring):
162         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
163         if sd:
164             inputschema = sd
165         else:
166             return
167
168     if "secondaryFiles" in inputschema:
169         # set secondaryFiles, may be inherited by compound types.
170         secondaryspec = inputschema["secondaryFiles"]
171
172     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173         not isinstance(inputschema["type"], basestring)):
174         # compound type (union, array, record)
175         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
176
177     elif (inputschema["type"] == "record" and
178           isinstance(primary, Mapping)):
179         #
180         # record type, find secondary files associated with fields.
181         #
182         for f in inputschema["fields"]:
183             p = primary.get(shortname(f["name"]))
184             if p:
185                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
186
187     elif (inputschema["type"] == "array" and
188           isinstance(primary, Sequence)):
189         #
190         # array type, find secondary files of elements
191         #
192         for p in primary:
193             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
194
195     elif (inputschema["type"] == "File" and
196           isinstance(primary, Mapping) and
197           primary.get("class") == "File"):
198
199         if "secondaryFiles" in primary or not secondaryspec:
200             # Nothing to do.
201             return
202
203         #
204         # Found a file, check for secondaryFiles
205         #
206         specs = []
207         primary["secondaryFiles"] = secondaryspec
208         for i, sf in enumerate(aslist(secondaryspec)):
209             if builder.cwlVersion == "v1.0":
210                 pattern = sf
211             else:
212                 pattern = sf["pattern"]
213             if pattern is None:
214                 continue
215             if isinstance(pattern, list):
216                 specs.extend(pattern)
217             elif isinstance(pattern, dict):
218                 specs.append(pattern)
219             elif isinstance(pattern, str):
220                 if builder.cwlVersion == "v1.0":
221                     specs.append({"pattern": pattern, "required": True})
222                 else:
223                     specs.append({"pattern": pattern, "required": sf.get("required")})
224             else:
225                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226                     "Expression must return list, object, string or null")
227
228         found = []
229         for i, sf in enumerate(specs):
230             if isinstance(sf, dict):
231                 if sf.get("class") == "File":
232                     pattern = None
233                     if sf.get("location") is None:
234                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235                             "File object is missing 'location': %s" % sf)
236                     sfpath = sf["location"]
237                     required = True
238                 else:
239                     pattern = sf["pattern"]
240                     required = sf.get("required")
241             elif isinstance(sf, str):
242                 pattern = sf
243                 required = True
244             else:
245                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246                     "Expression must return list, object, string or null")
247
248             if pattern is not None:
249                 if "${" in pattern or "$(" in pattern:
250                     sfname = builder.do_eval(pattern, context=primary)
251                 else:
252                     sfname = substitute(primary["basename"], pattern)
253
254                 if sfname is None:
255                     continue
256
257                 if isinstance(sfname, str):
258                     p_location = primary["location"]
259                     if "/" in p_location:
260                         sfpath = (
261                             p_location[0 : p_location.rindex("/") + 1]
262                             + sfname
263                         )
264
265             required = builder.do_eval(required, context=primary)
266
267             if isinstance(sfname, list) or isinstance(sfname, dict):
268                 each = aslist(sfname)
269                 for e in each:
270                     if required and not fsaccess.exists(e.get("location")):
271                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272                             "Required secondary file '%s' does not exist" % e.get("location"))
273                 found.extend(each)
274
275             if isinstance(sfname, str):
276                 if fsaccess.exists(sfpath):
277                     if pattern is not None:
278                         found.append({"location": sfpath, "class": "File"})
279                     else:
280                         found.append(sf)
281                 elif required:
282                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283                         "Required secondary file '%s' does not exist" % sfpath)
284
285         primary["secondaryFiles"] = cmap(found)
286         if discovered is not None:
287             discovered[primary["location"]] = primary["secondaryFiles"]
288     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
290
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292     for inputschema in inputs:
293         primary = job_order.get(shortname(inputschema["id"]))
294         if isinstance(primary, (Mapping, Sequence)):
295             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
296
297 def upload_dependencies(arvrunner, name, document_loader,
298                         workflowobj, uri, loadref_run, runtimeContext,
299                         include_primary=True, discovered_secondaryfiles=None,
300                         cache=None):
301     """Upload the dependencies of the workflowobj document to Keep.
302
303     Returns a pathmapper object mapping local paths to keep references.  Also
304     does an in-place update of references in "workflowobj".
305
306     Use scandeps to find $import, $include, $schemas, run, File and Directory
307     fields that represent external references.
308
309     If workflowobj has an "id" field, this will reload the document to ensure
310     it is scanning the raw document prior to preprocessing.
311     """
312
313     loaded = set()
314     def loadref(b, u):
315         joined = document_loader.fetcher.urljoin(b, u)
316         defrg, _ = urllib.parse.urldefrag(joined)
317         if defrg not in loaded:
318             loaded.add(defrg)
319             if cache is not None and defrg in cache:
320                 return cache[defrg]
321             # Use fetch_text to get raw file (before preprocessing).
322             text = document_loader.fetch_text(defrg)
323             if isinstance(text, bytes):
324                 textIO = StringIO(text.decode('utf-8'))
325             else:
326                 textIO = StringIO(text)
327             yamlloader = YAML(typ='safe', pure=True)
328             result = yamlloader.load(textIO)
329             if cache is not None:
330                 cache[defrg] = result
331             return result
332         else:
333             return {}
334
335     if loadref_run:
336         loadref_fields = set(("$import", "run"))
337     else:
338         loadref_fields = set(("$import",))
339
340     scanobj = workflowobj
341     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
342         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
343         if cache is not None and defrg not in cache:
344             # if we haven't seen this file before, want raw file
345             # content (before preprocessing) to ensure that external
346             # references like $include haven't already been inlined.
347             scanobj = loadref("", workflowobj["id"])
348
349     metadata = scanobj
350
351     with Perf(metrics, "scandeps include, location"):
352         sc_result = scandeps(uri, scanobj,
353                              loadref_fields,
354                              set(("$include", "location")),
355                              loadref, urljoin=document_loader.fetcher.urljoin,
356                              nestdirs=False)
357
358     with Perf(metrics, "scandeps $schemas"):
359         optional_deps = scandeps(uri, scanobj,
360                                       loadref_fields,
361                                       set(("$schemas",)),
362                                       loadref, urljoin=document_loader.fetcher.urljoin,
363                                       nestdirs=False)
364
365     if sc_result is None:
366         sc_result = []
367
368     if optional_deps is None:
369         optional_deps = []
370
371     if optional_deps:
372         sc_result.extend(optional_deps)
373
374     sc = []
375     uuids = {}
376
377     def collect_uuids(obj):
378         loc = obj.get("location", "")
379         sp = loc.split(":")
380         if sp[0] == "keep":
381             # Collect collection uuids that need to be resolved to
382             # portable data hashes
383             gp = collection_uuid_pattern.match(loc)
384             if gp:
385                 uuids[gp.groups()[0]] = obj
386             if collectionUUID in obj:
387                 uuids[obj[collectionUUID]] = obj
388
389     def collect_uploads(obj):
390         loc = obj.get("location", "")
391         sp = loc.split(":")
392         if len(sp) < 1:
393             return
394         if sp[0] in ("file", "http", "https"):
395             # Record local files than need to be uploaded,
396             # don't include file literals, keep references, etc.
397             sc.append(obj)
398         collect_uuids(obj)
399
400     with Perf(metrics, "collect uuids"):
401         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
402
403     with Perf(metrics, "collect uploads"):
404         visit_class(sc_result, ("File", "Directory"), collect_uploads)
405
406     # Resolve any collection uuids we found to portable data hashes
407     # and assign them to uuid_map
408     uuid_map = {}
409     fetch_uuids = list(uuids.keys())
410     with Perf(metrics, "fetch_uuids"):
411         while fetch_uuids:
412             # For a large number of fetch_uuids, API server may limit
413             # response size, so keep fetching from API server has nothing
414             # more to give us.
415             lookups = arvrunner.api.collections().list(
416                 filters=[["uuid", "in", fetch_uuids]],
417                 count="none",
418                 select=["uuid", "portable_data_hash"]).execute(
419                     num_retries=arvrunner.num_retries)
420
421             if not lookups["items"]:
422                 break
423
424             for l in lookups["items"]:
425                 uuid_map[l["uuid"]] = l["portable_data_hash"]
426
427             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
428
429     normalizeFilesDirs(sc)
430
431     if "id" in workflowobj:
432         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
433         if include_primary:
434             # make sure it's included
435             sc.append({"class": "File", "location": defrg})
436         else:
437             # make sure it's excluded
438             sc = [d for d in sc if d.get("location") != defrg]
439
440     def visit_default(obj):
441         def defaults_are_optional(f):
442             if "location" not in f and "path" in f:
443                 f["location"] = f["path"]
444                 del f["path"]
445             normalizeFilesDirs(f)
446             optional_deps.append(f)
447         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
448
449     find_defaults(workflowobj, visit_default)
450
451     discovered = {}
452     def discover_default_secondary_files(obj):
453         builder_job_order = {}
454         for t in obj["inputs"]:
455             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
456         # Need to create a builder object to evaluate expressions.
457         builder = make_builder(builder_job_order,
458                                obj.get("hints", []),
459                                obj.get("requirements", []),
460                                ArvRuntimeContext(),
461                                metadata)
462         discover_secondary_files(arvrunner.fs_access,
463                                  builder,
464                                  obj["inputs"],
465                                  builder_job_order,
466                                  discovered)
467
468     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
469     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
470
471     for d in list(discovered):
472         # Only interested in discovered secondaryFiles which are local
473         # files that need to be uploaded.
474         if d.startswith("file:"):
475             sc.extend(discovered[d])
476         else:
477             del discovered[d]
478
479     with Perf(metrics, "mapper"):
480         mapper = ArvPathMapper(arvrunner, sc, "",
481                                "keep:%s",
482                                "keep:%s/%s",
483                                name=name,
484                                single_collection=True,
485                                optional_deps=optional_deps)
486
487     keeprefs = set()
488     def addkeepref(k):
489         if k.startswith("keep:"):
490             keeprefs.add(collection_pdh_pattern.match(k).group(1))
491
492     def setloc(p):
493         loc = p.get("location")
494         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
495             p["location"] = mapper.mapper(p["location"]).resolved
496             addkeepref(p["location"])
497             return
498
499         if not loc:
500             return
501
502         if collectionUUID in p:
503             uuid = p[collectionUUID]
504             if uuid not in uuid_map:
505                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
506                     "Collection uuid %s not found" % uuid)
507             gp = collection_pdh_pattern.match(loc)
508             if gp and uuid_map[uuid] != gp.groups()[0]:
509                 # This file entry has both collectionUUID and a PDH
510                 # location. If the PDH doesn't match the one returned
511                 # the API server, raise an error.
512                 raise SourceLine(p, "location", validate.ValidationException).makeError(
513                     "Expected collection uuid %s to be %s but API server reported %s" % (
514                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
515
516         gp = collection_uuid_pattern.match(loc)
517         if not gp:
518             # Not a uuid pattern (must be a pdh pattern)
519             addkeepref(p["location"])
520             return
521
522         uuid = gp.groups()[0]
523         if uuid not in uuid_map:
524             raise SourceLine(p, "location", validate.ValidationException).makeError(
525                 "Collection uuid %s not found" % uuid)
526         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
527         p[collectionUUID] = uuid
528
529     with Perf(metrics, "setloc"):
530         visit_class(workflowobj, ("File", "Directory"), setloc)
531         visit_class(discovered, ("File", "Directory"), setloc)
532
533     if discovered_secondaryfiles is not None:
534         for d in discovered:
535             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
536
537     if runtimeContext.copy_deps:
538         # Find referenced collections and copy them into the
539         # destination project, for easy sharing.
540         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
541                                      filters=[["portable_data_hash", "in", list(keeprefs)],
542                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
543                                      select=["uuid", "portable_data_hash", "created_at"]))
544
545         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
546         for kr in keeprefs:
547             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
548                                                   order="created_at desc",
549                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
550                                                    limit=1).execute()
551             if len(col["items"]) == 0:
552                 logger.warning("Cannot find collection with portable data hash %s", kr)
553                 continue
554             col = col["items"][0]
555             try:
556                 arvrunner.api.collections().create(body={"collection": {
557                     "owner_uuid": runtimeContext.project_uuid,
558                     "name": col["name"],
559                     "description": col["description"],
560                     "properties": col["properties"],
561                     "portable_data_hash": col["portable_data_hash"],
562                     "manifest_text": col["manifest_text"],
563                     "storage_classes_desired": col["storage_classes_desired"],
564                     "trash_at": col["trash_at"]
565                 }}, ensure_unique_name=True).execute()
566             except Exception as e:
567                 logger.warning("Unable copy collection to destination: %s", e)
568
569     if "$schemas" in workflowobj:
570         sch = CommentedSeq()
571         for s in workflowobj["$schemas"]:
572             if s in mapper:
573                 sch.append(mapper.mapper(s).resolved)
574         workflowobj["$schemas"] = sch
575
576     return mapper
577
578
579 def upload_docker(arvrunner, tool, runtimeContext):
580     """Uploads Docker images used in CommandLineTool objects."""
581
582     if isinstance(tool, CommandLineTool):
583         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
584         if docker_req:
585             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
586                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
587                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
588
589             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
590                                                        runtimeContext.project_uuid,
591                                                        runtimeContext.force_docker_pull,
592                                                        runtimeContext.tmp_outdir_prefix,
593                                                        runtimeContext.match_local_docker,
594                                                        runtimeContext.copy_deps)
595         else:
596             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
597                                                        True,
598                                                        runtimeContext.project_uuid,
599                                                        runtimeContext.force_docker_pull,
600                                                        runtimeContext.tmp_outdir_prefix,
601                                                        runtimeContext.match_local_docker,
602                                                        runtimeContext.copy_deps)
603     elif isinstance(tool, cwltool.workflow.Workflow):
604         for s in tool.steps:
605             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
606
607
608 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
609     """Create a packed workflow.
610
611     A "packed" workflow is one where all the components have been combined into a single document."""
612
613     rewrites = {}
614     packed = pack(arvrunner.loadingContext, tool.tool["id"],
615                   rewrite_out=rewrites,
616                   loader=tool.doc_loader)
617
618     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
619
620     def visit(v, cur_id):
621         if isinstance(v, dict):
622             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
623                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
624                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
625                 if "id" in v:
626                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
627             if "path" in v and "location" not in v:
628                 v["location"] = v["path"]
629                 del v["path"]
630             if "location" in v and cur_id in merged_map:
631                 if v["location"] in merged_map[cur_id].resolved:
632                     v["location"] = merged_map[cur_id].resolved[v["location"]]
633                 if v["location"] in merged_map[cur_id].secondaryFiles:
634                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
635             if v.get("class") == "DockerRequirement":
636                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
637                                                                                                              runtimeContext.project_uuid,
638                                                                                                              runtimeContext.force_docker_pull,
639                                                                                                              runtimeContext.tmp_outdir_prefix,
640                                                                                                              runtimeContext.match_local_docker,
641                                                                                                              runtimeContext.copy_deps)
642             for l in v:
643                 visit(v[l], cur_id)
644         if isinstance(v, list):
645             for l in v:
646                 visit(l, cur_id)
647     visit(packed, None)
648
649     if git_info:
650         for g in git_info:
651             packed[g] = git_info[g]
652
653     return packed
654
655
656 def tag_git_version(packed):
657     if tool.tool["id"].startswith("file://"):
658         path = os.path.dirname(tool.tool["id"][7:])
659         try:
660             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
661         except (OSError, subprocess.CalledProcessError):
662             pass
663         else:
664             packed["http://schema.org/version"] = githash
665
666
667 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
668     """Upload local files referenced in the input object and return updated input
669     object with 'location' updated to the proper keep references.
670     """
671
672     # Make a copy of the job order and set defaults.
673     builder_job_order = copy.copy(job_order)
674
675     # fill_in_defaults throws an error if there are any
676     # missing required parameters, we don't want it to do that
677     # so make them all optional.
678     inputs_copy = copy.deepcopy(tool.tool["inputs"])
679     for i in inputs_copy:
680         if "null" not in i["type"]:
681             i["type"] = ["null"] + aslist(i["type"])
682
683     fill_in_defaults(inputs_copy,
684                      builder_job_order,
685                      arvrunner.fs_access)
686     # Need to create a builder object to evaluate expressions.
687     builder = make_builder(builder_job_order,
688                            tool.hints,
689                            tool.requirements,
690                            ArvRuntimeContext(),
691                            tool.metadata)
692     # Now update job_order with secondaryFiles
693     discover_secondary_files(arvrunner.fs_access,
694                              builder,
695                              tool.tool["inputs"],
696                              job_order)
697
698     _jobloaderctx = jobloaderctx.copy()
699     jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
700
701     jobmapper = upload_dependencies(arvrunner,
702                                     name,
703                                     jobloader,
704                                     job_order,
705                                     job_order.get("id", "#"),
706                                     False,
707                                     runtimeContext)
708
709     if "id" in job_order:
710         del job_order["id"]
711
712     # Need to filter this out, gets added by cwltool when providing
713     # parameters on the command line.
714     if "job_order" in job_order:
715         del job_order["job_order"]
716
717     return job_order
718
719 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
720
721 def upload_workflow_deps(arvrunner, tool, runtimeContext):
722     # Ensure that Docker images needed by this workflow are available
723
724     with Perf(metrics, "upload_docker"):
725         upload_docker(arvrunner, tool, runtimeContext)
726
727     document_loader = tool.doc_loader
728
729     merged_map = {}
730     tool_dep_cache = {}
731
732     todo = []
733
734     # Standard traversal is top down, we want to go bottom up, so use
735     # the visitor to accumalate a list of nodes to visit, then
736     # visit them in reverse order.
737     def upload_tool_deps(deptool):
738         if "id" in deptool:
739             todo.append(deptool)
740
741     tool.visit(upload_tool_deps)
742
743     for deptool in reversed(todo):
744         discovered_secondaryfiles = {}
745         with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
746             pm = upload_dependencies(arvrunner,
747                                      "%s dependencies" % (shortname(deptool["id"])),
748                                      document_loader,
749                                      deptool,
750                                      deptool["id"],
751                                      False,
752                                      runtimeContext,
753                                      include_primary=False,
754                                      discovered_secondaryfiles=discovered_secondaryfiles,
755                                      cache=tool_dep_cache)
756         document_loader.idx[deptool["id"]] = deptool
757         toolmap = {}
758         for k,v in pm.items():
759             toolmap[k] = v.resolved
760         merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
761
762     return merged_map
763
764 def arvados_jobs_image(arvrunner, img, runtimeContext):
765     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
766
767     try:
768         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
769                                                           True,
770                                                           runtimeContext.project_uuid,
771                                                           runtimeContext.force_docker_pull,
772                                                           runtimeContext.tmp_outdir_prefix,
773                                                           runtimeContext.match_local_docker,
774                                                           runtimeContext.copy_deps)
775     except Exception as e:
776         raise Exception("Docker image %s is not available\n%s" % (img, e) )
777
778
779 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
780     collection = arvados.collection.Collection(api_client=arvrunner.api,
781                                                keep_client=arvrunner.keep_client,
782                                                num_retries=arvrunner.num_retries)
783     with collection.open("workflow.cwl", "w") as f:
784         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
785
786     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
787                ["name", "like", name+"%"]]
788     if runtimeContext.project_uuid:
789         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
790     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
791
792     if exists["items"]:
793         logger.info("Using collection %s", exists["items"][0]["uuid"])
794     else:
795         collection.save_new(name=name,
796                             owner_uuid=runtimeContext.project_uuid,
797                             ensure_unique_name=True,
798                             num_retries=arvrunner.num_retries)
799         logger.info("Uploaded to %s", collection.manifest_locator())
800
801     return collection.portable_data_hash()
802
803
804 class Runner(Process):
805     """Base class for runner processes, which submit an instance of
806     arvados-cwl-runner and wait for the final result."""
807
808     def __init__(self, runner, updated_tool,
809                  tool, loadingContext, enable_reuse,
810                  output_name, output_tags, submit_runner_ram=0,
811                  name=None, on_error=None, submit_runner_image=None,
812                  intermediate_output_ttl=0, merged_map=None,
813                  priority=None, secret_store=None,
814                  collection_cache_size=256,
815                  collection_cache_is_default=True,
816                  git_info=None):
817
818         loadingContext = loadingContext.copy()
819         loadingContext.metadata = updated_tool.metadata.copy()
820
821         super(Runner, self).__init__(updated_tool.tool, loadingContext)
822
823         self.arvrunner = runner
824         self.embedded_tool = tool
825         self.job_order = None
826         self.running = False
827         if enable_reuse:
828             # If reuse is permitted by command line arguments but
829             # disabled by the workflow itself, disable it.
830             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
831             if reuse_req:
832                 enable_reuse = reuse_req["enableReuse"]
833         self.enable_reuse = enable_reuse
834         self.uuid = None
835         self.final_output = None
836         self.output_name = output_name
837         self.output_tags = output_tags
838         self.name = name
839         self.on_error = on_error
840         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
841         self.intermediate_output_ttl = intermediate_output_ttl
842         self.priority = priority
843         self.secret_store = secret_store
844         self.enable_dev = loadingContext.enable_dev
845         self.git_info = git_info
846
847         self.submit_runner_cores = 1
848         self.submit_runner_ram = 1024  # defaut 1 GiB
849         self.collection_cache_size = collection_cache_size
850
851         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
852         if runner_resource_req:
853             if runner_resource_req.get("coresMin"):
854                 self.submit_runner_cores = runner_resource_req["coresMin"]
855             if runner_resource_req.get("ramMin"):
856                 self.submit_runner_ram = runner_resource_req["ramMin"]
857             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
858                 self.collection_cache_size = runner_resource_req["keep_cache"]
859
860         if submit_runner_ram:
861             # Command line / initializer overrides default and/or spec from workflow
862             self.submit_runner_ram = submit_runner_ram
863
864         if self.submit_runner_ram <= 0:
865             raise Exception("Value of submit-runner-ram must be greater than zero")
866
867         if self.submit_runner_cores <= 0:
868             raise Exception("Value of submit-runner-cores must be greater than zero")
869
870         self.merged_map = merged_map or {}
871
872     def job(self,
873             job_order,         # type: Mapping[Text, Text]
874             output_callbacks,  # type: Callable[[Any, Any], Any]
875             runtimeContext     # type: RuntimeContext
876            ):  # type: (...) -> Generator[Any, None, None]
877         self.job_order = job_order
878         self._init_job(job_order, runtimeContext)
879         yield self
880
881     def update_pipeline_component(self, record):
882         pass
883
884     def done(self, record):
885         """Base method for handling a completed runner."""
886
887         try:
888             if record["state"] == "Complete":
889                 if record.get("exit_code") is not None:
890                     if record["exit_code"] == 33:
891                         processStatus = "UnsupportedRequirement"
892                     elif record["exit_code"] == 0:
893                         processStatus = "success"
894                     else:
895                         processStatus = "permanentFail"
896                 else:
897                     processStatus = "success"
898             else:
899                 processStatus = "permanentFail"
900
901             outputs = {}
902
903             if processStatus == "permanentFail":
904                 logc = arvados.collection.CollectionReader(record["log"],
905                                                            api_client=self.arvrunner.api,
906                                                            keep_client=self.arvrunner.keep_client,
907                                                            num_retries=self.arvrunner.num_retries)
908                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
909
910             self.final_output = record["output"]
911             outc = arvados.collection.CollectionReader(self.final_output,
912                                                        api_client=self.arvrunner.api,
913                                                        keep_client=self.arvrunner.keep_client,
914                                                        num_retries=self.arvrunner.num_retries)
915             if "cwl.output.json" in outc:
916                 with outc.open("cwl.output.json", "rb") as f:
917                     if f.size() > 0:
918                         outputs = json.loads(f.read().decode())
919             def keepify(fileobj):
920                 path = fileobj["location"]
921                 if not path.startswith("keep:"):
922                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
923             adjustFileObjs(outputs, keepify)
924             adjustDirObjs(outputs, keepify)
925         except Exception:
926             logger.exception("[%s] While getting final output object", self.name)
927             self.arvrunner.output_callback({}, "permanentFail")
928         else:
929             self.arvrunner.output_callback(outputs, processStatus)