Fix 2.4.2 upgrade notes formatting refs #19330
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import (
21     Any,
22     Callable,
23     Dict,
24     Iterable,
25     Iterator,
26     List,
27     Mapping,
28     MutableMapping,
29     Sequence,
30     MutableSequence,
31     Optional,
32     Set,
33     Sized,
34     Tuple,
35     Type,
36     Union,
37     cast,
38 )
39 from cwltool.utils import (
40     CWLObjectType,
41     CWLOutputAtomType,
42     CWLOutputType,
43 )
44
45 if os.name == "posix" and sys.version_info[0] < 3:
46     import subprocess32 as subprocess
47 else:
48     import subprocess
49
50 from schema_salad.sourceline import SourceLine, cmap
51
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55                              shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63
64 import arvados.collection
65 import arvados.util
66 from .util import collectionUUID
67 from ruamel.yaml import YAML
68 from ruamel.yaml.comments import CommentedMap, CommentedSeq
69
70 import arvados_cwl.arvdocker
71 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
72 from ._version import __version__
73 from . import done
74 from . context import ArvRuntimeContext
75 from .perf import Perf
76
77 logger = logging.getLogger('arvados.cwl-runner')
78 metrics = logging.getLogger('arvados.cwl-runner.metrics')
79
80 def trim_anonymous_location(obj):
81     """Remove 'location' field from File and Directory literals.
82
83     To make internal handling easier, literals are assigned a random id for
84     'location'.  However, when writing the record back out, this can break
85     reproducibility.  Since it is valid for literals not have a 'location'
86     field, remove it.
87
88     """
89
90     if obj.get("location", "").startswith("_:"):
91         del obj["location"]
92
93
94 def remove_redundant_fields(obj):
95     for field in ("path", "nameext", "nameroot", "dirname"):
96         if field in obj:
97             del obj[field]
98
99
100 def find_defaults(d, op):
101     if isinstance(d, list):
102         for i in d:
103             find_defaults(i, op)
104     elif isinstance(d, dict):
105         if "default" in d:
106             op(d)
107         else:
108             for i in viewvalues(d):
109                 find_defaults(i, op)
110
111 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
112     return Builder(
113                  job=joborder,
114                  files=[],               # type: List[Dict[Text, Text]]
115                  bindings=[],            # type: List[Dict[Text, Any]]
116                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
117                  names=None,               # type: Names
118                  requirements=requirements,        # type: List[Dict[Text, Any]]
119                  hints=hints,               # type: List[Dict[Text, Any]]
120                  resources={},           # type: Dict[str, int]
121                  mutation_manager=None,    # type: Optional[MutationManager]
122                  formatgraph=None,         # type: Optional[Graph]
123                  make_fs_access=None,      # type: Type[StdFsAccess]
124                  fs_access=None,           # type: StdFsAccess
125                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
126                  timeout=runtimeContext.eval_timeout,             # type: float
127                  debug=runtimeContext.debug,               # type: bool
128                  js_console=runtimeContext.js_console,          # type: bool
129                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
130                  loadListing="",         # type: Text
131                  outdir="",              # type: Text
132                  tmpdir="",              # type: Text
133                  stagedir="",            # type: Text
134                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
135                  container_engine="docker"
136                 )
137
138 def search_schemadef(name, reqs):
139     for r in reqs:
140         if r["class"] == "SchemaDefRequirement":
141             for sd in r["types"]:
142                 if sd["name"] == name:
143                     return sd
144     return None
145
146 primitive_types_set = frozenset(("null", "boolean", "int", "long",
147                                  "float", "double", "string", "record",
148                                  "array", "enum"))
149
150 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
151     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
152         # union type, collect all possible secondaryFiles
153         for i in inputschema:
154             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
155         return
156
157     if inputschema == "File":
158         inputschema = {"type": "File"}
159
160     if isinstance(inputschema, basestring):
161         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
162         if sd:
163             inputschema = sd
164         else:
165             return
166
167     if "secondaryFiles" in inputschema:
168         # set secondaryFiles, may be inherited by compound types.
169         secondaryspec = inputschema["secondaryFiles"]
170
171     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
172         not isinstance(inputschema["type"], basestring)):
173         # compound type (union, array, record)
174         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
175
176     elif (inputschema["type"] == "record" and
177           isinstance(primary, Mapping)):
178         #
179         # record type, find secondary files associated with fields.
180         #
181         for f in inputschema["fields"]:
182             p = primary.get(shortname(f["name"]))
183             if p:
184                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
185
186     elif (inputschema["type"] == "array" and
187           isinstance(primary, Sequence)):
188         #
189         # array type, find secondary files of elements
190         #
191         for p in primary:
192             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
193
194     elif (inputschema["type"] == "File" and
195           isinstance(primary, Mapping) and
196           primary.get("class") == "File"):
197
198         if "secondaryFiles" in primary or not secondaryspec:
199             # Nothing to do.
200             return
201
202         #
203         # Found a file, check for secondaryFiles
204         #
205         specs = []
206         primary["secondaryFiles"] = secondaryspec
207         for i, sf in enumerate(aslist(secondaryspec)):
208             if builder.cwlVersion == "v1.0":
209                 pattern = sf
210             else:
211                 pattern = sf["pattern"]
212             if pattern is None:
213                 continue
214             if isinstance(pattern, list):
215                 specs.extend(pattern)
216             elif isinstance(pattern, dict):
217                 specs.append(pattern)
218             elif isinstance(pattern, str):
219                 if builder.cwlVersion == "v1.0":
220                     specs.append({"pattern": pattern, "required": True})
221                 else:
222                     specs.append({"pattern": pattern, "required": sf.get("required")})
223             else:
224                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
225                     "Expression must return list, object, string or null")
226
227         found = []
228         for i, sf in enumerate(specs):
229             if isinstance(sf, dict):
230                 if sf.get("class") == "File":
231                     pattern = None
232                     if sf.get("location") is None:
233                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
234                             "File object is missing 'location': %s" % sf)
235                     sfpath = sf["location"]
236                     required = True
237                 else:
238                     pattern = sf["pattern"]
239                     required = sf.get("required")
240             elif isinstance(sf, str):
241                 pattern = sf
242                 required = True
243             else:
244                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
245                     "Expression must return list, object, string or null")
246
247             if pattern is not None:
248                 if "${" in pattern or "$(" in pattern:
249                     sfname = builder.do_eval(pattern, context=primary)
250                 else:
251                     sfname = substitute(primary["basename"], pattern)
252
253                 if sfname is None:
254                     continue
255
256                 if isinstance(sfname, str):
257                     p_location = primary["location"]
258                     if "/" in p_location:
259                         sfpath = (
260                             p_location[0 : p_location.rindex("/") + 1]
261                             + sfname
262                         )
263
264             required = builder.do_eval(required, context=primary)
265
266             if isinstance(sfname, list) or isinstance(sfname, dict):
267                 each = aslist(sfname)
268                 for e in each:
269                     if required and not fsaccess.exists(e.get("location")):
270                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
271                             "Required secondary file '%s' does not exist" % e.get("location"))
272                 found.extend(each)
273
274             if isinstance(sfname, str):
275                 if fsaccess.exists(sfpath):
276                     if pattern is not None:
277                         found.append({"location": sfpath, "class": "File"})
278                     else:
279                         found.append(sf)
280                 elif required:
281                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
282                         "Required secondary file '%s' does not exist" % sfpath)
283
284         primary["secondaryFiles"] = cmap(found)
285         if discovered is not None:
286             discovered[primary["location"]] = primary["secondaryFiles"]
287     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
288         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
289
290 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
291     for inputschema in inputs:
292         primary = job_order.get(shortname(inputschema["id"]))
293         if isinstance(primary, (Mapping, Sequence)):
294             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
295
296 def upload_dependencies(arvrunner, name, document_loader,
297                         workflowobj, uri, loadref_run, runtimeContext,
298                         include_primary=True, discovered_secondaryfiles=None,
299                         cache=None):
300     """Upload the dependencies of the workflowobj document to Keep.
301
302     Returns a pathmapper object mapping local paths to keep references.  Also
303     does an in-place update of references in "workflowobj".
304
305     Use scandeps to find $import, $include, $schemas, run, File and Directory
306     fields that represent external references.
307
308     If workflowobj has an "id" field, this will reload the document to ensure
309     it is scanning the raw document prior to preprocessing.
310     """
311
312     loaded = set()
313     def loadref(b, u):
314         joined = document_loader.fetcher.urljoin(b, u)
315         defrg, _ = urllib.parse.urldefrag(joined)
316         if defrg not in loaded:
317             loaded.add(defrg)
318             if cache is not None and defrg in cache:
319                 return cache[defrg]
320             # Use fetch_text to get raw file (before preprocessing).
321             text = document_loader.fetch_text(defrg)
322             if isinstance(text, bytes):
323                 textIO = StringIO(text.decode('utf-8'))
324             else:
325                 textIO = StringIO(text)
326             yamlloader = YAML(typ='safe', pure=True)
327             result = yamlloader.load(textIO)
328             if cache is not None:
329                 cache[defrg] = result
330             return result
331         else:
332             return {}
333
334     if loadref_run:
335         loadref_fields = set(("$import", "run"))
336     else:
337         loadref_fields = set(("$import",))
338
339     scanobj = workflowobj
340     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
341         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
342         if cache is not None and defrg not in cache:
343             # if we haven't seen this file before, want raw file
344             # content (before preprocessing) to ensure that external
345             # references like $include haven't already been inlined.
346             scanobj = loadref("", workflowobj["id"])
347
348     metadata = scanobj
349
350     with Perf(metrics, "scandeps include, location"):
351         sc_result = scandeps(uri, scanobj,
352                              loadref_fields,
353                              set(("$include", "location")),
354                              loadref, urljoin=document_loader.fetcher.urljoin,
355                              nestdirs=False)
356
357     with Perf(metrics, "scandeps $schemas"):
358         optional_deps = scandeps(uri, scanobj,
359                                       loadref_fields,
360                                       set(("$schemas",)),
361                                       loadref, urljoin=document_loader.fetcher.urljoin,
362                                       nestdirs=False)
363
364     if sc_result is None:
365         sc_result = []
366
367     if optional_deps is None:
368         optional_deps = []
369
370     if optional_deps:
371         sc_result.extend(optional_deps)
372
373     sc = []
374     uuids = {}
375
376     def collect_uuids(obj):
377         loc = obj.get("location", "")
378         sp = loc.split(":")
379         if sp[0] == "keep":
380             # Collect collection uuids that need to be resolved to
381             # portable data hashes
382             gp = collection_uuid_pattern.match(loc)
383             if gp:
384                 uuids[gp.groups()[0]] = obj
385             if collectionUUID in obj:
386                 uuids[obj[collectionUUID]] = obj
387
388     def collect_uploads(obj):
389         loc = obj.get("location", "")
390         sp = loc.split(":")
391         if len(sp) < 1:
392             return
393         if sp[0] in ("file", "http", "https"):
394             # Record local files than need to be uploaded,
395             # don't include file literals, keep references, etc.
396             sc.append(obj)
397         collect_uuids(obj)
398
399     with Perf(metrics, "collect uuids"):
400         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
401
402     with Perf(metrics, "collect uploads"):
403         visit_class(sc_result, ("File", "Directory"), collect_uploads)
404
405     # Resolve any collection uuids we found to portable data hashes
406     # and assign them to uuid_map
407     uuid_map = {}
408     fetch_uuids = list(uuids.keys())
409     with Perf(metrics, "fetch_uuids"):
410         while fetch_uuids:
411             # For a large number of fetch_uuids, API server may limit
412             # response size, so keep fetching from API server has nothing
413             # more to give us.
414             lookups = arvrunner.api.collections().list(
415                 filters=[["uuid", "in", fetch_uuids]],
416                 count="none",
417                 select=["uuid", "portable_data_hash"]).execute(
418                     num_retries=arvrunner.num_retries)
419
420             if not lookups["items"]:
421                 break
422
423             for l in lookups["items"]:
424                 uuid_map[l["uuid"]] = l["portable_data_hash"]
425
426             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
427
428     normalizeFilesDirs(sc)
429
430     if "id" in workflowobj:
431         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
432         if include_primary:
433             # make sure it's included
434             sc.append({"class": "File", "location": defrg})
435         else:
436             # make sure it's excluded
437             sc = [d for d in sc if d.get("location") != defrg]
438
439     def visit_default(obj):
440         def defaults_are_optional(f):
441             if "location" not in f and "path" in f:
442                 f["location"] = f["path"]
443                 del f["path"]
444             normalizeFilesDirs(f)
445             optional_deps.append(f)
446         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
447
448     find_defaults(workflowobj, visit_default)
449
450     discovered = {}
451     def discover_default_secondary_files(obj):
452         builder_job_order = {}
453         for t in obj["inputs"]:
454             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
455         # Need to create a builder object to evaluate expressions.
456         builder = make_builder(builder_job_order,
457                                obj.get("hints", []),
458                                obj.get("requirements", []),
459                                ArvRuntimeContext(),
460                                metadata)
461         discover_secondary_files(arvrunner.fs_access,
462                                  builder,
463                                  obj["inputs"],
464                                  builder_job_order,
465                                  discovered)
466
467     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
468     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
469
470     for d in list(discovered):
471         # Only interested in discovered secondaryFiles which are local
472         # files that need to be uploaded.
473         if d.startswith("file:"):
474             sc.extend(discovered[d])
475         else:
476             del discovered[d]
477
478     with Perf(metrics, "mapper"):
479         mapper = ArvPathMapper(arvrunner, sc, "",
480                                "keep:%s",
481                                "keep:%s/%s",
482                                name=name,
483                                single_collection=True,
484                                optional_deps=optional_deps)
485
486     keeprefs = set()
487     def addkeepref(k):
488         if k.startswith("keep:"):
489             keeprefs.add(collection_pdh_pattern.match(k).group(1))
490
491     def setloc(p):
492         loc = p.get("location")
493         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
494             p["location"] = mapper.mapper(p["location"]).resolved
495             addkeepref(p["location"])
496             return
497
498         if not loc:
499             return
500
501         if collectionUUID in p:
502             uuid = p[collectionUUID]
503             if uuid not in uuid_map:
504                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
505                     "Collection uuid %s not found" % uuid)
506             gp = collection_pdh_pattern.match(loc)
507             if gp and uuid_map[uuid] != gp.groups()[0]:
508                 # This file entry has both collectionUUID and a PDH
509                 # location. If the PDH doesn't match the one returned
510                 # the API server, raise an error.
511                 raise SourceLine(p, "location", validate.ValidationException).makeError(
512                     "Expected collection uuid %s to be %s but API server reported %s" % (
513                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
514
515         gp = collection_uuid_pattern.match(loc)
516         if not gp:
517             # Not a uuid pattern (must be a pdh pattern)
518             addkeepref(p["location"])
519             return
520
521         uuid = gp.groups()[0]
522         if uuid not in uuid_map:
523             raise SourceLine(p, "location", validate.ValidationException).makeError(
524                 "Collection uuid %s not found" % uuid)
525         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
526         p[collectionUUID] = uuid
527
528     with Perf(metrics, "setloc"):
529         visit_class(workflowobj, ("File", "Directory"), setloc)
530         visit_class(discovered, ("File", "Directory"), setloc)
531
532     if discovered_secondaryfiles is not None:
533         for d in discovered:
534             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
535
536     if runtimeContext.copy_deps:
537         # Find referenced collections and copy them into the
538         # destination project, for easy sharing.
539         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
540                                      filters=[["portable_data_hash", "in", list(keeprefs)],
541                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
542                                      select=["uuid", "portable_data_hash", "created_at"]))
543
544         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
545         for kr in keeprefs:
546             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
547                                                   order="created_at desc",
548                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
549                                                    limit=1).execute()
550             if len(col["items"]) == 0:
551                 logger.warning("Cannot find collection with portable data hash %s", kr)
552                 continue
553             col = col["items"][0]
554             try:
555                 arvrunner.api.collections().create(body={"collection": {
556                     "owner_uuid": runtimeContext.project_uuid,
557                     "name": col["name"],
558                     "description": col["description"],
559                     "properties": col["properties"],
560                     "portable_data_hash": col["portable_data_hash"],
561                     "manifest_text": col["manifest_text"],
562                     "storage_classes_desired": col["storage_classes_desired"],
563                     "trash_at": col["trash_at"]
564                 }}, ensure_unique_name=True).execute()
565             except Exception as e:
566                 logger.warning("Unable copy collection to destination: %s", e)
567
568     if "$schemas" in workflowobj:
569         sch = CommentedSeq()
570         for s in workflowobj["$schemas"]:
571             if s in mapper:
572                 sch.append(mapper.mapper(s).resolved)
573         workflowobj["$schemas"] = sch
574
575     return mapper
576
577
578 def upload_docker(arvrunner, tool, runtimeContext):
579     """Uploads Docker images used in CommandLineTool objects."""
580
581     if isinstance(tool, CommandLineTool):
582         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
583         if docker_req:
584             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
585                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
586                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
587
588             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
589                                                        runtimeContext.project_uuid,
590                                                        runtimeContext.force_docker_pull,
591                                                        runtimeContext.tmp_outdir_prefix,
592                                                        runtimeContext.match_local_docker,
593                                                        runtimeContext.copy_deps)
594         else:
595             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
596                                                        True,
597                                                        runtimeContext.project_uuid,
598                                                        runtimeContext.force_docker_pull,
599                                                        runtimeContext.tmp_outdir_prefix,
600                                                        runtimeContext.match_local_docker,
601                                                        runtimeContext.copy_deps)
602     elif isinstance(tool, cwltool.workflow.Workflow):
603         for s in tool.steps:
604             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
605
606
607 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
608     """Create a packed workflow.
609
610     A "packed" workflow is one where all the components have been combined into a single document."""
611
612     rewrites = {}
613     packed = pack(arvrunner.loadingContext, tool.tool["id"],
614                   rewrite_out=rewrites,
615                   loader=tool.doc_loader)
616
617     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
618
619     def visit(v, cur_id):
620         if isinstance(v, dict):
621             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
622                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
623                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
624                 if "id" in v:
625                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
626             if "path" in v and "location" not in v:
627                 v["location"] = v["path"]
628                 del v["path"]
629             if "location" in v and cur_id in merged_map:
630                 if v["location"] in merged_map[cur_id].resolved:
631                     v["location"] = merged_map[cur_id].resolved[v["location"]]
632                 if v["location"] in merged_map[cur_id].secondaryFiles:
633                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
634             if v.get("class") == "DockerRequirement":
635                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
636                                                                                                              runtimeContext.project_uuid,
637                                                                                                              runtimeContext.force_docker_pull,
638                                                                                                              runtimeContext.tmp_outdir_prefix,
639                                                                                                              runtimeContext.match_local_docker,
640                                                                                                              runtimeContext.copy_deps)
641             for l in v:
642                 visit(v[l], cur_id)
643         if isinstance(v, list):
644             for l in v:
645                 visit(l, cur_id)
646     visit(packed, None)
647     return packed
648
649
650 def tag_git_version(packed):
651     if tool.tool["id"].startswith("file://"):
652         path = os.path.dirname(tool.tool["id"][7:])
653         try:
654             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
655         except (OSError, subprocess.CalledProcessError):
656             pass
657         else:
658             packed["http://schema.org/version"] = githash
659
660
661 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
662     """Upload local files referenced in the input object and return updated input
663     object with 'location' updated to the proper keep references.
664     """
665
666     # Make a copy of the job order and set defaults.
667     builder_job_order = copy.copy(job_order)
668
669     # fill_in_defaults throws an error if there are any
670     # missing required parameters, we don't want it to do that
671     # so make them all optional.
672     inputs_copy = copy.deepcopy(tool.tool["inputs"])
673     for i in inputs_copy:
674         if "null" not in i["type"]:
675             i["type"] = ["null"] + aslist(i["type"])
676
677     fill_in_defaults(inputs_copy,
678                      builder_job_order,
679                      arvrunner.fs_access)
680     # Need to create a builder object to evaluate expressions.
681     builder = make_builder(builder_job_order,
682                            tool.hints,
683                            tool.requirements,
684                            ArvRuntimeContext(),
685                            tool.metadata)
686     # Now update job_order with secondaryFiles
687     discover_secondary_files(arvrunner.fs_access,
688                              builder,
689                              tool.tool["inputs"],
690                              job_order)
691
692     jobmapper = upload_dependencies(arvrunner,
693                                     name,
694                                     tool.doc_loader,
695                                     job_order,
696                                     job_order.get("id", "#"),
697                                     False,
698                                     runtimeContext)
699
700     if "id" in job_order:
701         del job_order["id"]
702
703     # Need to filter this out, gets added by cwltool when providing
704     # parameters on the command line.
705     if "job_order" in job_order:
706         del job_order["job_order"]
707
708     return job_order
709
710 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
711
712 def upload_workflow_deps(arvrunner, tool, runtimeContext):
713     # Ensure that Docker images needed by this workflow are available
714
715     with Perf(metrics, "upload_docker"):
716         upload_docker(arvrunner, tool, runtimeContext)
717
718     document_loader = tool.doc_loader
719
720     merged_map = {}
721     tool_dep_cache = {}
722     def upload_tool_deps(deptool):
723         if "id" in deptool:
724             discovered_secondaryfiles = {}
725             with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
726                 pm = upload_dependencies(arvrunner,
727                                          "%s dependencies" % (shortname(deptool["id"])),
728                                          document_loader,
729                                          deptool,
730                                          deptool["id"],
731                                          False,
732                                          runtimeContext,
733                                          include_primary=False,
734                                          discovered_secondaryfiles=discovered_secondaryfiles,
735                                          cache=tool_dep_cache)
736             document_loader.idx[deptool["id"]] = deptool
737             toolmap = {}
738             for k,v in pm.items():
739                 toolmap[k] = v.resolved
740             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
741
742     tool.visit(upload_tool_deps)
743
744     return merged_map
745
746 def arvados_jobs_image(arvrunner, img, runtimeContext):
747     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
748
749     try:
750         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
751                                                           True,
752                                                           runtimeContext.project_uuid,
753                                                           runtimeContext.force_docker_pull,
754                                                           runtimeContext.tmp_outdir_prefix,
755                                                           runtimeContext.match_local_docker,
756                                                           runtimeContext.copy_deps)
757     except Exception as e:
758         raise Exception("Docker image %s is not available\n%s" % (img, e) )
759
760
761 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
762     collection = arvados.collection.Collection(api_client=arvrunner.api,
763                                                keep_client=arvrunner.keep_client,
764                                                num_retries=arvrunner.num_retries)
765     with collection.open("workflow.cwl", "w") as f:
766         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
767
768     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
769                ["name", "like", name+"%"]]
770     if runtimeContext.project_uuid:
771         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
772     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
773
774     if exists["items"]:
775         logger.info("Using collection %s", exists["items"][0]["uuid"])
776     else:
777         collection.save_new(name=name,
778                             owner_uuid=runtimeContext.project_uuid,
779                             ensure_unique_name=True,
780                             num_retries=arvrunner.num_retries)
781         logger.info("Uploaded to %s", collection.manifest_locator())
782
783     return collection.portable_data_hash()
784
785
786 class Runner(Process):
787     """Base class for runner processes, which submit an instance of
788     arvados-cwl-runner and wait for the final result."""
789
790     def __init__(self, runner, updated_tool,
791                  tool, loadingContext, enable_reuse,
792                  output_name, output_tags, submit_runner_ram=0,
793                  name=None, on_error=None, submit_runner_image=None,
794                  intermediate_output_ttl=0, merged_map=None,
795                  priority=None, secret_store=None,
796                  collection_cache_size=256,
797                  collection_cache_is_default=True):
798
799         loadingContext = loadingContext.copy()
800         loadingContext.metadata = updated_tool.metadata.copy()
801
802         super(Runner, self).__init__(updated_tool.tool, loadingContext)
803
804         self.arvrunner = runner
805         self.embedded_tool = tool
806         self.job_order = None
807         self.running = False
808         if enable_reuse:
809             # If reuse is permitted by command line arguments but
810             # disabled by the workflow itself, disable it.
811             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
812             if reuse_req:
813                 enable_reuse = reuse_req["enableReuse"]
814         self.enable_reuse = enable_reuse
815         self.uuid = None
816         self.final_output = None
817         self.output_name = output_name
818         self.output_tags = output_tags
819         self.name = name
820         self.on_error = on_error
821         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
822         self.intermediate_output_ttl = intermediate_output_ttl
823         self.priority = priority
824         self.secret_store = secret_store
825         self.enable_dev = loadingContext.enable_dev
826
827         self.submit_runner_cores = 1
828         self.submit_runner_ram = 1024  # defaut 1 GiB
829         self.collection_cache_size = collection_cache_size
830
831         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
832         if runner_resource_req:
833             if runner_resource_req.get("coresMin"):
834                 self.submit_runner_cores = runner_resource_req["coresMin"]
835             if runner_resource_req.get("ramMin"):
836                 self.submit_runner_ram = runner_resource_req["ramMin"]
837             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
838                 self.collection_cache_size = runner_resource_req["keep_cache"]
839
840         if submit_runner_ram:
841             # Command line / initializer overrides default and/or spec from workflow
842             self.submit_runner_ram = submit_runner_ram
843
844         if self.submit_runner_ram <= 0:
845             raise Exception("Value of submit-runner-ram must be greater than zero")
846
847         if self.submit_runner_cores <= 0:
848             raise Exception("Value of submit-runner-cores must be greater than zero")
849
850         self.merged_map = merged_map or {}
851
852     def job(self,
853             job_order,         # type: Mapping[Text, Text]
854             output_callbacks,  # type: Callable[[Any, Any], Any]
855             runtimeContext     # type: RuntimeContext
856            ):  # type: (...) -> Generator[Any, None, None]
857         self.job_order = job_order
858         self._init_job(job_order, runtimeContext)
859         yield self
860
861     def update_pipeline_component(self, record):
862         pass
863
864     def done(self, record):
865         """Base method for handling a completed runner."""
866
867         try:
868             if record["state"] == "Complete":
869                 if record.get("exit_code") is not None:
870                     if record["exit_code"] == 33:
871                         processStatus = "UnsupportedRequirement"
872                     elif record["exit_code"] == 0:
873                         processStatus = "success"
874                     else:
875                         processStatus = "permanentFail"
876                 else:
877                     processStatus = "success"
878             else:
879                 processStatus = "permanentFail"
880
881             outputs = {}
882
883             if processStatus == "permanentFail":
884                 logc = arvados.collection.CollectionReader(record["log"],
885                                                            api_client=self.arvrunner.api,
886                                                            keep_client=self.arvrunner.keep_client,
887                                                            num_retries=self.arvrunner.num_retries)
888                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
889
890             self.final_output = record["output"]
891             outc = arvados.collection.CollectionReader(self.final_output,
892                                                        api_client=self.arvrunner.api,
893                                                        keep_client=self.arvrunner.keep_client,
894                                                        num_retries=self.arvrunner.num_retries)
895             if "cwl.output.json" in outc:
896                 with outc.open("cwl.output.json", "rb") as f:
897                     if f.size() > 0:
898                         outputs = json.loads(f.read().decode())
899             def keepify(fileobj):
900                 path = fileobj["location"]
901                 if not path.startswith("keep:"):
902                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
903             adjustFileObjs(outputs, keepify)
904             adjustDirObjs(outputs, keepify)
905         except Exception:
906             logger.exception("[%s] While getting final output object", self.name)
907             self.arvrunner.output_callback({}, "permanentFail")
908         else:
909             self.arvrunner.output_callback(outputs, processStatus)