19280: explicitly include/exclude primary
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import (
21     Any,
22     Callable,
23     Dict,
24     Iterable,
25     Iterator,
26     List,
27     Mapping,
28     MutableMapping,
29     Sequence,
30     MutableSequence,
31     Optional,
32     Set,
33     Sized,
34     Tuple,
35     Type,
36     Union,
37     cast,
38 )
39 from cwltool.utils import (
40     CWLObjectType,
41     CWLOutputAtomType,
42     CWLOutputType,
43 )
44
45 if os.name == "posix" and sys.version_info[0] < 3:
46     import subprocess32 as subprocess
47 else:
48     import subprocess
49
50 from schema_salad.sourceline import SourceLine, cmap
51
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55                              shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63
64 import arvados.collection
65 import arvados.util
66 from .util import collectionUUID
67 from ruamel.yaml import YAML
68 from ruamel.yaml.comments import CommentedMap, CommentedSeq
69
70 import arvados_cwl.arvdocker
71 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
72 from ._version import __version__
73 from . import done
74 from . context import ArvRuntimeContext
75 from .perf import Perf
76
77 logger = logging.getLogger('arvados.cwl-runner')
78 metrics = logging.getLogger('arvados.cwl-runner.metrics')
79
80 def trim_anonymous_location(obj):
81     """Remove 'location' field from File and Directory literals.
82
83     To make internal handling easier, literals are assigned a random id for
84     'location'.  However, when writing the record back out, this can break
85     reproducibility.  Since it is valid for literals not have a 'location'
86     field, remove it.
87
88     """
89
90     if obj.get("location", "").startswith("_:"):
91         del obj["location"]
92
93
94 def remove_redundant_fields(obj):
95     for field in ("path", "nameext", "nameroot", "dirname"):
96         if field in obj:
97             del obj[field]
98
99
100 def find_defaults(d, op):
101     if isinstance(d, list):
102         for i in d:
103             find_defaults(i, op)
104     elif isinstance(d, dict):
105         if "default" in d:
106             op(d)
107         else:
108             for i in viewvalues(d):
109                 find_defaults(i, op)
110
111 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
112     return Builder(
113                  job=joborder,
114                  files=[],               # type: List[Dict[Text, Text]]
115                  bindings=[],            # type: List[Dict[Text, Any]]
116                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
117                  names=None,               # type: Names
118                  requirements=requirements,        # type: List[Dict[Text, Any]]
119                  hints=hints,               # type: List[Dict[Text, Any]]
120                  resources={},           # type: Dict[str, int]
121                  mutation_manager=None,    # type: Optional[MutationManager]
122                  formatgraph=None,         # type: Optional[Graph]
123                  make_fs_access=None,      # type: Type[StdFsAccess]
124                  fs_access=None,           # type: StdFsAccess
125                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
126                  timeout=runtimeContext.eval_timeout,             # type: float
127                  debug=runtimeContext.debug,               # type: bool
128                  js_console=runtimeContext.js_console,          # type: bool
129                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
130                  loadListing="",         # type: Text
131                  outdir="",              # type: Text
132                  tmpdir="",              # type: Text
133                  stagedir="",            # type: Text
134                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
135                  container_engine="docker"
136                 )
137
138 def search_schemadef(name, reqs):
139     for r in reqs:
140         if r["class"] == "SchemaDefRequirement":
141             for sd in r["types"]:
142                 if sd["name"] == name:
143                     return sd
144     return None
145
146 primitive_types_set = frozenset(("null", "boolean", "int", "long",
147                                  "float", "double", "string", "record",
148                                  "array", "enum"))
149
150 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
151     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
152         # union type, collect all possible secondaryFiles
153         for i in inputschema:
154             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
155         return
156
157     if inputschema == "File":
158         inputschema = {"type": "File"}
159
160     if isinstance(inputschema, basestring):
161         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
162         if sd:
163             inputschema = sd
164         else:
165             return
166
167     if "secondaryFiles" in inputschema:
168         # set secondaryFiles, may be inherited by compound types.
169         secondaryspec = inputschema["secondaryFiles"]
170
171     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
172         not isinstance(inputschema["type"], basestring)):
173         # compound type (union, array, record)
174         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
175
176     elif (inputschema["type"] == "record" and
177           isinstance(primary, Mapping)):
178         #
179         # record type, find secondary files associated with fields.
180         #
181         for f in inputschema["fields"]:
182             p = primary.get(shortname(f["name"]))
183             if p:
184                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
185
186     elif (inputschema["type"] == "array" and
187           isinstance(primary, Sequence)):
188         #
189         # array type, find secondary files of elements
190         #
191         for p in primary:
192             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
193
194     elif (inputschema["type"] == "File" and
195           isinstance(primary, Mapping) and
196           primary.get("class") == "File"):
197
198         if "secondaryFiles" in primary or not secondaryspec:
199             # Nothing to do.
200             return
201
202         #
203         # Found a file, check for secondaryFiles
204         #
205         specs = []
206         primary["secondaryFiles"] = secondaryspec
207         for i, sf in enumerate(aslist(secondaryspec)):
208             if builder.cwlVersion == "v1.0":
209                 pattern = sf
210             else:
211                 pattern = sf["pattern"]
212             if pattern is None:
213                 continue
214             if isinstance(pattern, list):
215                 specs.extend(pattern)
216             elif isinstance(pattern, dict):
217                 specs.append(pattern)
218             elif isinstance(pattern, str):
219                 if builder.cwlVersion == "v1.0":
220                     specs.append({"pattern": pattern, "required": True})
221                 else:
222                     specs.append({"pattern": pattern, "required": sf.get("required")})
223             else:
224                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
225                     "Expression must return list, object, string or null")
226
227         found = []
228         for i, sf in enumerate(specs):
229             if isinstance(sf, dict):
230                 if sf.get("class") == "File":
231                     pattern = None
232                     if sf.get("location") is None:
233                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
234                             "File object is missing 'location': %s" % sf)
235                     sfpath = sf["location"]
236                     required = True
237                 else:
238                     pattern = sf["pattern"]
239                     required = sf.get("required")
240             elif isinstance(sf, str):
241                 pattern = sf
242                 required = True
243             else:
244                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
245                     "Expression must return list, object, string or null")
246
247             if pattern is not None:
248                 if "${" in pattern or "$(" in pattern:
249                     sfname = builder.do_eval(pattern, context=primary)
250                 else:
251                     sfname = substitute(primary["basename"], pattern)
252
253                 if sfname is None:
254                     continue
255
256                 p_location = primary["location"]
257                 if "/" in p_location:
258                     sfpath = (
259                         p_location[0 : p_location.rindex("/") + 1]
260                         + sfname
261                     )
262
263             required = builder.do_eval(required, context=primary)
264
265             if fsaccess.exists(sfpath):
266                 if pattern is not None:
267                     found.append({"location": sfpath, "class": "File"})
268                 else:
269                     found.append(sf)
270             elif required:
271                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272                     "Required secondary file '%s' does not exist" % sfpath)
273
274         primary["secondaryFiles"] = cmap(found)
275         if discovered is not None:
276             discovered[primary["location"]] = primary["secondaryFiles"]
277     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
278         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
279
280 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
281     for inputschema in inputs:
282         primary = job_order.get(shortname(inputschema["id"]))
283         if isinstance(primary, (Mapping, Sequence)):
284             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
285
286 def upload_dependencies(arvrunner, name, document_loader,
287                         workflowobj, uri, loadref_run, runtimeContext,
288                         include_primary=True, discovered_secondaryfiles=None,
289                         cache=None):
290     """Upload the dependencies of the workflowobj document to Keep.
291
292     Returns a pathmapper object mapping local paths to keep references.  Also
293     does an in-place update of references in "workflowobj".
294
295     Use scandeps to find $import, $include, $schemas, run, File and Directory
296     fields that represent external references.
297
298     If workflowobj has an "id" field, this will reload the document to ensure
299     it is scanning the raw document prior to preprocessing.
300     """
301
302     loaded = set()
303     def loadref(b, u):
304         joined = document_loader.fetcher.urljoin(b, u)
305         defrg, _ = urllib.parse.urldefrag(joined)
306         if defrg not in loaded:
307             loaded.add(defrg)
308             if cache is not None and defrg in cache:
309                 return cache[defrg]
310             # Use fetch_text to get raw file (before preprocessing).
311             text = document_loader.fetch_text(defrg)
312             if isinstance(text, bytes):
313                 textIO = StringIO(text.decode('utf-8'))
314             else:
315                 textIO = StringIO(text)
316             yamlloader = YAML(typ='safe', pure=True)
317             result = yamlloader.load(textIO)
318             if cache is not None:
319                 cache[defrg] = result
320             return result
321         else:
322             return {}
323
324     if loadref_run:
325         loadref_fields = set(("$import", "run"))
326     else:
327         loadref_fields = set(("$import",))
328
329     scanobj = workflowobj
330     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
331         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
332         if cache is not None and defrg not in cache:
333             # if we haven't seen this file before, want raw file
334             # content (before preprocessing) to ensure that external
335             # references like $include haven't already been inlined.
336             scanobj = loadref("", workflowobj["id"])
337
338     metadata = scanobj
339
340     with Perf(metrics, "scandeps include, location"):
341         sc_result = scandeps(uri, scanobj,
342                              loadref_fields,
343                              set(("$include", "location")),
344                              loadref, urljoin=document_loader.fetcher.urljoin,
345                              nestdirs=False)
346
347     with Perf(metrics, "scandeps $schemas"):
348         optional_deps = scandeps(uri, scanobj,
349                                       loadref_fields,
350                                       set(("$schemas",)),
351                                       loadref, urljoin=document_loader.fetcher.urljoin,
352                                       nestdirs=False)
353
354     if sc_result is None:
355         sc_result = []
356
357     if optional_deps is None:
358         optional_deps = []
359
360     if optional_deps:
361         sc_result.extend(optional_deps)
362
363     sc = []
364     uuids = {}
365
366     def collect_uuids(obj):
367         loc = obj.get("location", "")
368         sp = loc.split(":")
369         if sp[0] == "keep":
370             # Collect collection uuids that need to be resolved to
371             # portable data hashes
372             gp = collection_uuid_pattern.match(loc)
373             if gp:
374                 uuids[gp.groups()[0]] = obj
375             if collectionUUID in obj:
376                 uuids[obj[collectionUUID]] = obj
377
378     def collect_uploads(obj):
379         loc = obj.get("location", "")
380         sp = loc.split(":")
381         if len(sp) < 1:
382             return
383         if sp[0] in ("file", "http", "https"):
384             # Record local files than need to be uploaded,
385             # don't include file literals, keep references, etc.
386             sc.append(obj)
387         collect_uuids(obj)
388
389     with Perf(metrics, "collect uuids"):
390         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
391
392     with Perf(metrics, "collect uploads"):
393         visit_class(sc_result, ("File", "Directory"), collect_uploads)
394
395     # Resolve any collection uuids we found to portable data hashes
396     # and assign them to uuid_map
397     uuid_map = {}
398     fetch_uuids = list(uuids.keys())
399     with Perf(metrics, "fetch_uuids"):
400         while fetch_uuids:
401             # For a large number of fetch_uuids, API server may limit
402             # response size, so keep fetching from API server has nothing
403             # more to give us.
404             lookups = arvrunner.api.collections().list(
405                 filters=[["uuid", "in", fetch_uuids]],
406                 count="none",
407                 select=["uuid", "portable_data_hash"]).execute(
408                     num_retries=arvrunner.num_retries)
409
410             if not lookups["items"]:
411                 break
412
413             for l in lookups["items"]:
414                 uuid_map[l["uuid"]] = l["portable_data_hash"]
415
416             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
417
418     normalizeFilesDirs(sc)
419
420     if "id" in workflowobj:
421         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
422         if include_primary:
423             # make sure it's included
424             sc.append({"class": "File", "location": defrg})
425         else:
426             # make sure it's excluded
427             sc = [d for d in sc if d.get("location") != defrg]
428
429     def visit_default(obj):
430         def defaults_are_optional(f):
431             if "location" not in f and "path" in f:
432                 f["location"] = f["path"]
433                 del f["path"]
434             normalizeFilesDirs(f)
435             optional_deps.append(f)
436         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
437
438     find_defaults(workflowobj, visit_default)
439
440     discovered = {}
441     def discover_default_secondary_files(obj):
442         builder_job_order = {}
443         for t in obj["inputs"]:
444             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
445         # Need to create a builder object to evaluate expressions.
446         builder = make_builder(builder_job_order,
447                                obj.get("hints", []),
448                                obj.get("requirements", []),
449                                ArvRuntimeContext(),
450                                metadata)
451         discover_secondary_files(arvrunner.fs_access,
452                                  builder,
453                                  obj["inputs"],
454                                  builder_job_order,
455                                  discovered)
456
457     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
458     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
459
460     for d in list(discovered):
461         # Only interested in discovered secondaryFiles which are local
462         # files that need to be uploaded.
463         if d.startswith("file:"):
464             sc.extend(discovered[d])
465         else:
466             del discovered[d]
467
468     with Perf(metrics, "mapper"):
469         mapper = ArvPathMapper(arvrunner, sc, "",
470                                "keep:%s",
471                                "keep:%s/%s",
472                                name=name,
473                                single_collection=True,
474                                optional_deps=optional_deps)
475
476     keeprefs = set()
477     def addkeepref(k):
478         if k.startswith("keep:"):
479             keeprefs.add(collection_pdh_pattern.match(k).group(1))
480
481     def setloc(p):
482         loc = p.get("location")
483         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
484             p["location"] = mapper.mapper(p["location"]).resolved
485             addkeepref(p["location"])
486             return
487
488         if not loc:
489             return
490
491         if collectionUUID in p:
492             uuid = p[collectionUUID]
493             if uuid not in uuid_map:
494                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
495                     "Collection uuid %s not found" % uuid)
496             gp = collection_pdh_pattern.match(loc)
497             if gp and uuid_map[uuid] != gp.groups()[0]:
498                 # This file entry has both collectionUUID and a PDH
499                 # location. If the PDH doesn't match the one returned
500                 # the API server, raise an error.
501                 raise SourceLine(p, "location", validate.ValidationException).makeError(
502                     "Expected collection uuid %s to be %s but API server reported %s" % (
503                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
504
505         gp = collection_uuid_pattern.match(loc)
506         if not gp:
507             # Not a uuid pattern (must be a pdh pattern)
508             addkeepref(p["location"])
509             return
510
511         uuid = gp.groups()[0]
512         if uuid not in uuid_map:
513             raise SourceLine(p, "location", validate.ValidationException).makeError(
514                 "Collection uuid %s not found" % uuid)
515         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
516         p[collectionUUID] = uuid
517
518     with Perf(metrics, "setloc"):
519         visit_class(workflowobj, ("File", "Directory"), setloc)
520         visit_class(discovered, ("File", "Directory"), setloc)
521
522     if discovered_secondaryfiles is not None:
523         for d in discovered:
524             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
525
526     if runtimeContext.copy_deps:
527         # Find referenced collections and copy them into the
528         # destination project, for easy sharing.
529         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
530                                      filters=[["portable_data_hash", "in", list(keeprefs)],
531                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
532                                      select=["uuid", "portable_data_hash", "created_at"]))
533
534         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
535         for kr in keeprefs:
536             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
537                                                   order="created_at desc",
538                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
539                                                    limit=1).execute()
540             if len(col["items"]) == 0:
541                 logger.warning("Cannot find collection with portable data hash %s", kr)
542                 continue
543             col = col["items"][0]
544             try:
545                 arvrunner.api.collections().create(body={"collection": {
546                     "owner_uuid": runtimeContext.project_uuid,
547                     "name": col["name"],
548                     "description": col["description"],
549                     "properties": col["properties"],
550                     "portable_data_hash": col["portable_data_hash"],
551                     "manifest_text": col["manifest_text"],
552                     "storage_classes_desired": col["storage_classes_desired"],
553                     "trash_at": col["trash_at"]
554                 }}, ensure_unique_name=True).execute()
555             except Exception as e:
556                 logger.warning("Unable copy collection to destination: %s", e)
557
558     if "$schemas" in workflowobj:
559         sch = CommentedSeq()
560         for s in workflowobj["$schemas"]:
561             if s in mapper:
562                 sch.append(mapper.mapper(s).resolved)
563         workflowobj["$schemas"] = sch
564
565     return mapper
566
567
568 def upload_docker(arvrunner, tool, runtimeContext):
569     """Uploads Docker images used in CommandLineTool objects."""
570
571     if isinstance(tool, CommandLineTool):
572         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
573         if docker_req:
574             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
575                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
576                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
577
578             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
579                                                        runtimeContext.project_uuid,
580                                                        runtimeContext.force_docker_pull,
581                                                        runtimeContext.tmp_outdir_prefix,
582                                                        runtimeContext.match_local_docker,
583                                                        runtimeContext.copy_deps)
584         else:
585             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
586                                                        True,
587                                                        runtimeContext.project_uuid,
588                                                        runtimeContext.force_docker_pull,
589                                                        runtimeContext.tmp_outdir_prefix,
590                                                        runtimeContext.match_local_docker,
591                                                        runtimeContext.copy_deps)
592     elif isinstance(tool, cwltool.workflow.Workflow):
593         for s in tool.steps:
594             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
595
596
597 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
598     """Create a packed workflow.
599
600     A "packed" workflow is one where all the components have been combined into a single document."""
601
602     rewrites = {}
603     packed = pack(arvrunner.loadingContext, tool.tool["id"],
604                   rewrite_out=rewrites,
605                   loader=tool.doc_loader)
606
607     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
608
609     def visit(v, cur_id):
610         if isinstance(v, dict):
611             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
612                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
613                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
614                 if "id" in v:
615                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
616             if "path" in v and "location" not in v:
617                 v["location"] = v["path"]
618                 del v["path"]
619             if "location" in v and cur_id in merged_map:
620                 if v["location"] in merged_map[cur_id].resolved:
621                     v["location"] = merged_map[cur_id].resolved[v["location"]]
622                 if v["location"] in merged_map[cur_id].secondaryFiles:
623                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
624             if v.get("class") == "DockerRequirement":
625                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
626                                                                                                              runtimeContext.project_uuid,
627                                                                                                              runtimeContext.force_docker_pull,
628                                                                                                              runtimeContext.tmp_outdir_prefix,
629                                                                                                              runtimeContext.match_local_docker,
630                                                                                                              runtimeContext.copy_deps)
631             for l in v:
632                 visit(v[l], cur_id)
633         if isinstance(v, list):
634             for l in v:
635                 visit(l, cur_id)
636     visit(packed, None)
637     return packed
638
639
640 def tag_git_version(packed):
641     if tool.tool["id"].startswith("file://"):
642         path = os.path.dirname(tool.tool["id"][7:])
643         try:
644             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
645         except (OSError, subprocess.CalledProcessError):
646             pass
647         else:
648             packed["http://schema.org/version"] = githash
649
650
651 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
652     """Upload local files referenced in the input object and return updated input
653     object with 'location' updated to the proper keep references.
654     """
655
656     # Make a copy of the job order and set defaults.
657     builder_job_order = copy.copy(job_order)
658
659     # fill_in_defaults throws an error if there are any
660     # missing required parameters, we don't want it to do that
661     # so make them all optional.
662     inputs_copy = copy.deepcopy(tool.tool["inputs"])
663     for i in inputs_copy:
664         if "null" not in i["type"]:
665             i["type"] = ["null"] + aslist(i["type"])
666
667     fill_in_defaults(inputs_copy,
668                      builder_job_order,
669                      arvrunner.fs_access)
670     # Need to create a builder object to evaluate expressions.
671     builder = make_builder(builder_job_order,
672                            tool.hints,
673                            tool.requirements,
674                            ArvRuntimeContext(),
675                            tool.metadata)
676     # Now update job_order with secondaryFiles
677     discover_secondary_files(arvrunner.fs_access,
678                              builder,
679                              tool.tool["inputs"],
680                              job_order)
681
682     jobmapper = upload_dependencies(arvrunner,
683                                     name,
684                                     tool.doc_loader,
685                                     job_order,
686                                     job_order.get("id", "#"),
687                                     False,
688                                     runtimeContext)
689
690     if "id" in job_order:
691         del job_order["id"]
692
693     # Need to filter this out, gets added by cwltool when providing
694     # parameters on the command line.
695     if "job_order" in job_order:
696         del job_order["job_order"]
697
698     return job_order
699
700 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
701
702 def upload_workflow_deps(arvrunner, tool, runtimeContext):
703     # Ensure that Docker images needed by this workflow are available
704
705     with Perf(metrics, "upload_docker"):
706         upload_docker(arvrunner, tool, runtimeContext)
707
708     document_loader = tool.doc_loader
709
710     merged_map = {}
711     tool_dep_cache = {}
712     def upload_tool_deps(deptool):
713         if "id" in deptool:
714             discovered_secondaryfiles = {}
715             with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
716                 pm = upload_dependencies(arvrunner,
717                                          "%s dependencies" % (shortname(deptool["id"])),
718                                          document_loader,
719                                          deptool,
720                                          deptool["id"],
721                                          False,
722                                          runtimeContext,
723                                          include_primary=False,
724                                          discovered_secondaryfiles=discovered_secondaryfiles,
725                                          cache=tool_dep_cache)
726             document_loader.idx[deptool["id"]] = deptool
727             toolmap = {}
728             for k,v in pm.items():
729                 toolmap[k] = v.resolved
730             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
731
732     tool.visit(upload_tool_deps)
733
734     return merged_map
735
736 def arvados_jobs_image(arvrunner, img, runtimeContext):
737     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
738
739     try:
740         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
741                                                           True,
742                                                           runtimeContext.project_uuid,
743                                                           runtimeContext.force_docker_pull,
744                                                           runtimeContext.tmp_outdir_prefix,
745                                                           runtimeContext.match_local_docker,
746                                                           runtimeContext.copy_deps)
747     except Exception as e:
748         raise Exception("Docker image %s is not available\n%s" % (img, e) )
749
750
751 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
752     collection = arvados.collection.Collection(api_client=arvrunner.api,
753                                                keep_client=arvrunner.keep_client,
754                                                num_retries=arvrunner.num_retries)
755     with collection.open("workflow.cwl", "w") as f:
756         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
757
758     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
759                ["name", "like", name+"%"]]
760     if runtimeContext.project_uuid:
761         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
762     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
763
764     if exists["items"]:
765         logger.info("Using collection %s", exists["items"][0]["uuid"])
766     else:
767         collection.save_new(name=name,
768                             owner_uuid=runtimeContext.project_uuid,
769                             ensure_unique_name=True,
770                             num_retries=arvrunner.num_retries)
771         logger.info("Uploaded to %s", collection.manifest_locator())
772
773     return collection.portable_data_hash()
774
775
776 class Runner(Process):
777     """Base class for runner processes, which submit an instance of
778     arvados-cwl-runner and wait for the final result."""
779
780     def __init__(self, runner, updated_tool,
781                  tool, loadingContext, enable_reuse,
782                  output_name, output_tags, submit_runner_ram=0,
783                  name=None, on_error=None, submit_runner_image=None,
784                  intermediate_output_ttl=0, merged_map=None,
785                  priority=None, secret_store=None,
786                  collection_cache_size=256,
787                  collection_cache_is_default=True):
788
789         loadingContext = loadingContext.copy()
790         loadingContext.metadata = updated_tool.metadata.copy()
791
792         super(Runner, self).__init__(updated_tool.tool, loadingContext)
793
794         self.arvrunner = runner
795         self.embedded_tool = tool
796         self.job_order = None
797         self.running = False
798         if enable_reuse:
799             # If reuse is permitted by command line arguments but
800             # disabled by the workflow itself, disable it.
801             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
802             if reuse_req:
803                 enable_reuse = reuse_req["enableReuse"]
804         self.enable_reuse = enable_reuse
805         self.uuid = None
806         self.final_output = None
807         self.output_name = output_name
808         self.output_tags = output_tags
809         self.name = name
810         self.on_error = on_error
811         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
812         self.intermediate_output_ttl = intermediate_output_ttl
813         self.priority = priority
814         self.secret_store = secret_store
815         self.enable_dev = loadingContext.enable_dev
816
817         self.submit_runner_cores = 1
818         self.submit_runner_ram = 1024  # defaut 1 GiB
819         self.collection_cache_size = collection_cache_size
820
821         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
822         if runner_resource_req:
823             if runner_resource_req.get("coresMin"):
824                 self.submit_runner_cores = runner_resource_req["coresMin"]
825             if runner_resource_req.get("ramMin"):
826                 self.submit_runner_ram = runner_resource_req["ramMin"]
827             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
828                 self.collection_cache_size = runner_resource_req["keep_cache"]
829
830         if submit_runner_ram:
831             # Command line / initializer overrides default and/or spec from workflow
832             self.submit_runner_ram = submit_runner_ram
833
834         if self.submit_runner_ram <= 0:
835             raise Exception("Value of submit-runner-ram must be greater than zero")
836
837         if self.submit_runner_cores <= 0:
838             raise Exception("Value of submit-runner-cores must be greater than zero")
839
840         self.merged_map = merged_map or {}
841
842     def job(self,
843             job_order,         # type: Mapping[Text, Text]
844             output_callbacks,  # type: Callable[[Any, Any], Any]
845             runtimeContext     # type: RuntimeContext
846            ):  # type: (...) -> Generator[Any, None, None]
847         self.job_order = job_order
848         self._init_job(job_order, runtimeContext)
849         yield self
850
851     def update_pipeline_component(self, record):
852         pass
853
854     def done(self, record):
855         """Base method for handling a completed runner."""
856
857         try:
858             if record["state"] == "Complete":
859                 if record.get("exit_code") is not None:
860                     if record["exit_code"] == 33:
861                         processStatus = "UnsupportedRequirement"
862                     elif record["exit_code"] == 0:
863                         processStatus = "success"
864                     else:
865                         processStatus = "permanentFail"
866                 else:
867                     processStatus = "success"
868             else:
869                 processStatus = "permanentFail"
870
871             outputs = {}
872
873             if processStatus == "permanentFail":
874                 logc = arvados.collection.CollectionReader(record["log"],
875                                                            api_client=self.arvrunner.api,
876                                                            keep_client=self.arvrunner.keep_client,
877                                                            num_retries=self.arvrunner.num_retries)
878                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
879
880             self.final_output = record["output"]
881             outc = arvados.collection.CollectionReader(self.final_output,
882                                                        api_client=self.arvrunner.api,
883                                                        keep_client=self.arvrunner.keep_client,
884                                                        num_retries=self.arvrunner.num_retries)
885             if "cwl.output.json" in outc:
886                 with outc.open("cwl.output.json", "rb") as f:
887                     if f.size() > 0:
888                         outputs = json.loads(f.read().decode())
889             def keepify(fileobj):
890                 path = fileobj["location"]
891                 if not path.startswith("keep:"):
892                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
893             adjustFileObjs(outputs, keepify)
894             adjustDirObjs(outputs, keepify)
895         except Exception:
896             logger.exception("[%s] While getting final output object", self.name)
897             self.arvrunner.output_callback({}, "permanentFail")
898         else:
899             self.arvrunner.output_callback(outputs, processStatus)