19280: More metrics
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 import arvados.util
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52 from .perf import Perf
53
54 logger = logging.getLogger('arvados.cwl-runner')
55 metrics = logging.getLogger('arvados.cwl-runner.metrics')
56
57 def trim_anonymous_location(obj):
58     """Remove 'location' field from File and Directory literals.
59
60     To make internal handling easier, literals are assigned a random id for
61     'location'.  However, when writing the record back out, this can break
62     reproducibility.  Since it is valid for literals not have a 'location'
63     field, remove it.
64
65     """
66
67     if obj.get("location", "").startswith("_:"):
68         del obj["location"]
69
70
71 def remove_redundant_fields(obj):
72     for field in ("path", "nameext", "nameroot", "dirname"):
73         if field in obj:
74             del obj[field]
75
76
77 def find_defaults(d, op):
78     if isinstance(d, list):
79         for i in d:
80             find_defaults(i, op)
81     elif isinstance(d, dict):
82         if "default" in d:
83             op(d)
84         else:
85             for i in viewvalues(d):
86                 find_defaults(i, op)
87
88 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
89     return Builder(
90                  job=joborder,
91                  files=[],               # type: List[Dict[Text, Text]]
92                  bindings=[],            # type: List[Dict[Text, Any]]
93                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
94                  names=None,               # type: Names
95                  requirements=requirements,        # type: List[Dict[Text, Any]]
96                  hints=hints,               # type: List[Dict[Text, Any]]
97                  resources={},           # type: Dict[str, int]
98                  mutation_manager=None,    # type: Optional[MutationManager]
99                  formatgraph=None,         # type: Optional[Graph]
100                  make_fs_access=None,      # type: Type[StdFsAccess]
101                  fs_access=None,           # type: StdFsAccess
102                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
103                  timeout=runtimeContext.eval_timeout,             # type: float
104                  debug=runtimeContext.debug,               # type: bool
105                  js_console=runtimeContext.js_console,          # type: bool
106                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
107                  loadListing="",         # type: Text
108                  outdir="",              # type: Text
109                  tmpdir="",              # type: Text
110                  stagedir="",            # type: Text
111                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
112                  container_engine="docker"
113                 )
114
115 def search_schemadef(name, reqs):
116     for r in reqs:
117         if r["class"] == "SchemaDefRequirement":
118             for sd in r["types"]:
119                 if sd["name"] == name:
120                     return sd
121     return None
122
123 primitive_types_set = frozenset(("null", "boolean", "int", "long",
124                                  "float", "double", "string", "record",
125                                  "array", "enum"))
126
127 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
128     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
129         # union type, collect all possible secondaryFiles
130         for i in inputschema:
131             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
132         return
133
134     if inputschema == "File":
135         inputschema = {"type": "File"}
136
137     if isinstance(inputschema, basestring):
138         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
139         if sd:
140             inputschema = sd
141         else:
142             return
143
144     if "secondaryFiles" in inputschema:
145         # set secondaryFiles, may be inherited by compound types.
146         secondaryspec = inputschema["secondaryFiles"]
147
148     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
149         not isinstance(inputschema["type"], basestring)):
150         # compound type (union, array, record)
151         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
152
153     elif (inputschema["type"] == "record" and
154           isinstance(primary, Mapping)):
155         #
156         # record type, find secondary files associated with fields.
157         #
158         for f in inputschema["fields"]:
159             p = primary.get(shortname(f["name"]))
160             if p:
161                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
162
163     elif (inputschema["type"] == "array" and
164           isinstance(primary, Sequence)):
165         #
166         # array type, find secondary files of elements
167         #
168         for p in primary:
169             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
170
171     elif (inputschema["type"] == "File" and
172           isinstance(primary, Mapping) and
173           primary.get("class") == "File"):
174
175         if "secondaryFiles" in primary or not secondaryspec:
176             # Nothing to do.
177             return
178
179         #
180         # Found a file, check for secondaryFiles
181         #
182         specs = []
183         primary["secondaryFiles"] = secondaryspec
184         for i, sf in enumerate(aslist(secondaryspec)):
185             if builder.cwlVersion == "v1.0":
186                 pattern = sf
187             else:
188                 pattern = sf["pattern"]
189             if pattern is None:
190                 continue
191             if isinstance(pattern, list):
192                 specs.extend(pattern)
193             elif isinstance(pattern, dict):
194                 specs.append(pattern)
195             elif isinstance(pattern, str):
196                 if builder.cwlVersion == "v1.0":
197                     specs.append({"pattern": pattern, "required": True})
198                 else:
199                     specs.append({"pattern": pattern, "required": sf.get("required")})
200             else:
201                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202                     "Expression must return list, object, string or null")
203
204         found = []
205         for i, sf in enumerate(specs):
206             if isinstance(sf, dict):
207                 if sf.get("class") == "File":
208                     pattern = None
209                     if sf.get("location") is None:
210                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
211                             "File object is missing 'location': %s" % sf)
212                     sfpath = sf["location"]
213                     required = True
214                 else:
215                     pattern = sf["pattern"]
216                     required = sf.get("required")
217             elif isinstance(sf, str):
218                 pattern = sf
219                 required = True
220             else:
221                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
222                     "Expression must return list, object, string or null")
223
224             if pattern is not None:
225                 if "${" in pattern or "$(" in pattern:
226                     sfname = builder.do_eval(pattern, context=primary)
227                 else:
228                     sfname = substitute(primary["basename"], pattern)
229
230                 if sfname is None:
231                     continue
232
233                 p_location = primary["location"]
234                 if "/" in p_location:
235                     sfpath = (
236                         p_location[0 : p_location.rindex("/") + 1]
237                         + sfname
238                     )
239
240             required = builder.do_eval(required, context=primary)
241
242             if fsaccess.exists(sfpath):
243                 if pattern is not None:
244                     found.append({"location": sfpath, "class": "File"})
245                 else:
246                     found.append(sf)
247             elif required:
248                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
249                     "Required secondary file '%s' does not exist" % sfpath)
250
251         primary["secondaryFiles"] = cmap(found)
252         if discovered is not None:
253             discovered[primary["location"]] = primary["secondaryFiles"]
254     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
255         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
256
257 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
258     for inputschema in inputs:
259         primary = job_order.get(shortname(inputschema["id"]))
260         if isinstance(primary, (Mapping, Sequence)):
261             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
262
263 def upload_dependencies(arvrunner, name, document_loader,
264                         workflowobj, uri, loadref_run, runtimeContext,
265                         include_primary=True, discovered_secondaryfiles=None,
266                         cache=None):
267     """Upload the dependencies of the workflowobj document to Keep.
268
269     Returns a pathmapper object mapping local paths to keep references.  Also
270     does an in-place update of references in "workflowobj".
271
272     Use scandeps to find $import, $include, $schemas, run, File and Directory
273     fields that represent external references.
274
275     If workflowobj has an "id" field, this will reload the document to ensure
276     it is scanning the raw document prior to preprocessing.
277     """
278
279     loaded = set()
280     def loadref(b, u):
281         joined = document_loader.fetcher.urljoin(b, u)
282         defrg, _ = urllib.parse.urldefrag(joined)
283         if defrg not in loaded:
284             loaded.add(defrg)
285             if cache is not None and defrg in cache:
286                 return cache[defrg]
287             # Use fetch_text to get raw file (before preprocessing).
288             text = document_loader.fetch_text(defrg)
289             if isinstance(text, bytes):
290                 textIO = StringIO(text.decode('utf-8'))
291             else:
292                 textIO = StringIO(text)
293             yamlloader = YAML(typ='safe', pure=True)
294             result = yamlloader.load(textIO)
295             if cache is not None:
296                 cache[defrg] = result
297             return result
298         else:
299             return {}
300
301     if loadref_run:
302         loadref_fields = set(("$import", "run"))
303     else:
304         loadref_fields = set(("$import",))
305
306     scanobj = workflowobj
307     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
308         # Need raw file content (before preprocessing) to ensure
309         # that external references in $include and $mixin are captured.
310         scanobj = loadref("", workflowobj["id"])
311
312     metadata = scanobj
313
314     with Perf(metrics, "scandeps include, location"):
315         sc_result = scandeps(uri, scanobj,
316                              loadref_fields,
317                              set(("$include", "location")),
318                              loadref, urljoin=document_loader.fetcher.urljoin,
319                              nestdirs=False)
320
321     with Perf(metrics, "scandeps $schemas"):
322         optional_deps = scandeps(uri, scanobj,
323                                       loadref_fields,
324                                       set(("$schemas",)),
325                                       loadref, urljoin=document_loader.fetcher.urljoin,
326                                       nestdirs=False)
327
328     sc_result.extend(optional_deps)
329
330     sc = []
331     uuids = {}
332
333     def collect_uuids(obj):
334         loc = obj.get("location", "")
335         sp = loc.split(":")
336         if sp[0] == "keep":
337             # Collect collection uuids that need to be resolved to
338             # portable data hashes
339             gp = collection_uuid_pattern.match(loc)
340             if gp:
341                 uuids[gp.groups()[0]] = obj
342             if collectionUUID in obj:
343                 uuids[obj[collectionUUID]] = obj
344
345     def collect_uploads(obj):
346         loc = obj.get("location", "")
347         sp = loc.split(":")
348         if len(sp) < 1:
349             return
350         if sp[0] in ("file", "http", "https"):
351             # Record local files than need to be uploaded,
352             # don't include file literals, keep references, etc.
353             sc.append(obj)
354         collect_uuids(obj)
355
356     with Perf(metrics, "collect uuids"):
357         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
358
359     with Perf(metrics, "collect uploads"):
360         visit_class(sc_result, ("File", "Directory"), collect_uploads)
361
362     # Resolve any collection uuids we found to portable data hashes
363     # and assign them to uuid_map
364     uuid_map = {}
365     fetch_uuids = list(uuids.keys())
366     with Perf(metrics, "fetch_uuids"):
367         while fetch_uuids:
368             # For a large number of fetch_uuids, API server may limit
369             # response size, so keep fetching from API server has nothing
370             # more to give us.
371             lookups = arvrunner.api.collections().list(
372                 filters=[["uuid", "in", fetch_uuids]],
373                 count="none",
374                 select=["uuid", "portable_data_hash"]).execute(
375                     num_retries=arvrunner.num_retries)
376
377             if not lookups["items"]:
378                 break
379
380             for l in lookups["items"]:
381                 uuid_map[l["uuid"]] = l["portable_data_hash"]
382
383             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
384
385     normalizeFilesDirs(sc)
386
387     if include_primary and "id" in workflowobj:
388         sc.append({"class": "File", "location": workflowobj["id"]})
389
390     def visit_default(obj):
391         def defaults_are_optional(f):
392             if "location" not in f and "path" in f:
393                 f["location"] = f["path"]
394                 del f["path"]
395             normalizeFilesDirs(f)
396             optional_deps.append(f)
397         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
398
399     find_defaults(workflowobj, visit_default)
400
401     discovered = {}
402     def discover_default_secondary_files(obj):
403         builder_job_order = {}
404         for t in obj["inputs"]:
405             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
406         # Need to create a builder object to evaluate expressions.
407         builder = make_builder(builder_job_order,
408                                obj.get("hints", []),
409                                obj.get("requirements", []),
410                                ArvRuntimeContext(),
411                                metadata)
412         discover_secondary_files(arvrunner.fs_access,
413                                  builder,
414                                  obj["inputs"],
415                                  builder_job_order,
416                                  discovered)
417
418     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
419     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
420
421     for d in list(discovered):
422         # Only interested in discovered secondaryFiles which are local
423         # files that need to be uploaded.
424         if d.startswith("file:"):
425             sc.extend(discovered[d])
426         else:
427             del discovered[d]
428
429     with Perf(metrics, "mapper"):
430         mapper = ArvPathMapper(arvrunner, sc, "",
431                                "keep:%s",
432                                "keep:%s/%s",
433                                name=name,
434                                single_collection=True,
435                                optional_deps=optional_deps)
436
437     keeprefs = set()
438     def addkeepref(k):
439         if k.startswith("keep:"):
440             keeprefs.add(collection_pdh_pattern.match(k).group(1))
441
442     def setloc(p):
443         loc = p.get("location")
444         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
445             p["location"] = mapper.mapper(p["location"]).resolved
446             addkeepref(p["location"])
447             return
448
449         if not loc:
450             return
451
452         if collectionUUID in p:
453             uuid = p[collectionUUID]
454             if uuid not in uuid_map:
455                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
456                     "Collection uuid %s not found" % uuid)
457             gp = collection_pdh_pattern.match(loc)
458             if gp and uuid_map[uuid] != gp.groups()[0]:
459                 # This file entry has both collectionUUID and a PDH
460                 # location. If the PDH doesn't match the one returned
461                 # the API server, raise an error.
462                 raise SourceLine(p, "location", validate.ValidationException).makeError(
463                     "Expected collection uuid %s to be %s but API server reported %s" % (
464                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
465
466         gp = collection_uuid_pattern.match(loc)
467         if not gp:
468             # Not a uuid pattern (must be a pdh pattern)
469             addkeepref(p["location"])
470             return
471
472         uuid = gp.groups()[0]
473         if uuid not in uuid_map:
474             raise SourceLine(p, "location", validate.ValidationException).makeError(
475                 "Collection uuid %s not found" % uuid)
476         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
477         p[collectionUUID] = uuid
478
479     with Perf(metrics, "setloc"):
480         visit_class(workflowobj, ("File", "Directory"), setloc)
481         visit_class(discovered, ("File", "Directory"), setloc)
482
483     if discovered_secondaryfiles is not None:
484         for d in discovered:
485             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
486
487     if runtimeContext.copy_deps:
488         # Find referenced collections and copy them into the
489         # destination project, for easy sharing.
490         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
491                                      filters=[["portable_data_hash", "in", list(keeprefs)],
492                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
493                                      select=["uuid", "portable_data_hash", "created_at"]))
494
495         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
496         for kr in keeprefs:
497             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
498                                                   order="created_at desc",
499                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
500                                                    limit=1).execute()
501             if len(col["items"]) == 0:
502                 logger.warning("Cannot find collection with portable data hash %s", kr)
503                 continue
504             col = col["items"][0]
505             try:
506                 arvrunner.api.collections().create(body={"collection": {
507                     "owner_uuid": runtimeContext.project_uuid,
508                     "name": col["name"],
509                     "description": col["description"],
510                     "properties": col["properties"],
511                     "portable_data_hash": col["portable_data_hash"],
512                     "manifest_text": col["manifest_text"],
513                     "storage_classes_desired": col["storage_classes_desired"],
514                     "trash_at": col["trash_at"]
515                 }}, ensure_unique_name=True).execute()
516             except Exception as e:
517                 logger.warning("Unable copy collection to destination: %s", e)
518
519     if "$schemas" in workflowobj:
520         sch = CommentedSeq()
521         for s in workflowobj["$schemas"]:
522             if s in mapper:
523                 sch.append(mapper.mapper(s).resolved)
524         workflowobj["$schemas"] = sch
525
526     return mapper
527
528
529 def upload_docker(arvrunner, tool, runtimeContext):
530     """Uploads Docker images used in CommandLineTool objects."""
531
532     if isinstance(tool, CommandLineTool):
533         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
534         if docker_req:
535             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
536                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
537                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
538
539             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
540                                                        runtimeContext.project_uuid,
541                                                        runtimeContext.force_docker_pull,
542                                                        runtimeContext.tmp_outdir_prefix,
543                                                        runtimeContext.match_local_docker,
544                                                        runtimeContext.copy_deps)
545         else:
546             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
547                                                        True,
548                                                        runtimeContext.project_uuid,
549                                                        runtimeContext.force_docker_pull,
550                                                        runtimeContext.tmp_outdir_prefix,
551                                                        runtimeContext.match_local_docker,
552                                                        runtimeContext.copy_deps)
553     elif isinstance(tool, cwltool.workflow.Workflow):
554         for s in tool.steps:
555             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
556
557
558 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
559     """Create a packed workflow.
560
561     A "packed" workflow is one where all the components have been combined into a single document."""
562
563     rewrites = {}
564     packed = pack(arvrunner.loadingContext, tool.tool["id"],
565                   rewrite_out=rewrites,
566                   loader=tool.doc_loader)
567
568     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
569
570     def visit(v, cur_id):
571         if isinstance(v, dict):
572             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
573                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
574                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
575                 if "id" in v:
576                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
577             if "path" in v and "location" not in v:
578                 v["location"] = v["path"]
579                 del v["path"]
580             if "location" in v and cur_id in merged_map:
581                 if v["location"] in merged_map[cur_id].resolved:
582                     v["location"] = merged_map[cur_id].resolved[v["location"]]
583                 if v["location"] in merged_map[cur_id].secondaryFiles:
584                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
585             if v.get("class") == "DockerRequirement":
586                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
587                                                                                                              runtimeContext.project_uuid,
588                                                                                                              runtimeContext.force_docker_pull,
589                                                                                                              runtimeContext.tmp_outdir_prefix,
590                                                                                                              runtimeContext.match_local_docker,
591                                                                                                              runtimeContext.copy_deps)
592             for l in v:
593                 visit(v[l], cur_id)
594         if isinstance(v, list):
595             for l in v:
596                 visit(l, cur_id)
597     visit(packed, None)
598     return packed
599
600
601 def tag_git_version(packed):
602     if tool.tool["id"].startswith("file://"):
603         path = os.path.dirname(tool.tool["id"][7:])
604         try:
605             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
606         except (OSError, subprocess.CalledProcessError):
607             pass
608         else:
609             packed["http://schema.org/version"] = githash
610
611
612 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
613     """Upload local files referenced in the input object and return updated input
614     object with 'location' updated to the proper keep references.
615     """
616
617     # Make a copy of the job order and set defaults.
618     builder_job_order = copy.copy(job_order)
619
620     # fill_in_defaults throws an error if there are any
621     # missing required parameters, we don't want it to do that
622     # so make them all optional.
623     inputs_copy = copy.deepcopy(tool.tool["inputs"])
624     for i in inputs_copy:
625         if "null" not in i["type"]:
626             i["type"] = ["null"] + aslist(i["type"])
627
628     fill_in_defaults(inputs_copy,
629                      builder_job_order,
630                      arvrunner.fs_access)
631     # Need to create a builder object to evaluate expressions.
632     builder = make_builder(builder_job_order,
633                            tool.hints,
634                            tool.requirements,
635                            ArvRuntimeContext(),
636                            tool.metadata)
637     # Now update job_order with secondaryFiles
638     discover_secondary_files(arvrunner.fs_access,
639                              builder,
640                              tool.tool["inputs"],
641                              job_order)
642
643     jobmapper = upload_dependencies(arvrunner,
644                                     name,
645                                     tool.doc_loader,
646                                     job_order,
647                                     job_order.get("id", "#"),
648                                     False,
649                                     runtimeContext)
650
651     if "id" in job_order:
652         del job_order["id"]
653
654     # Need to filter this out, gets added by cwltool when providing
655     # parameters on the command line.
656     if "job_order" in job_order:
657         del job_order["job_order"]
658
659     return job_order
660
661 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
662
663 def upload_workflow_deps(arvrunner, tool, runtimeContext):
664     # Ensure that Docker images needed by this workflow are available
665
666     with Perf(metrics, "upload_docker"):
667         upload_docker(arvrunner, tool, runtimeContext)
668
669     document_loader = tool.doc_loader
670
671     merged_map = {}
672     tool_dep_cache = {}
673     def upload_tool_deps(deptool):
674         if "id" in deptool:
675             discovered_secondaryfiles = {}
676             with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
677                 pm = upload_dependencies(arvrunner,
678                                          "%s dependencies" % (shortname(deptool["id"])),
679                                          document_loader,
680                                          deptool,
681                                          deptool["id"],
682                                          False,
683                                          runtimeContext,
684                                          include_primary=False,
685                                          discovered_secondaryfiles=discovered_secondaryfiles,
686                                          cache=tool_dep_cache)
687             document_loader.idx[deptool["id"]] = deptool
688             toolmap = {}
689             for k,v in pm.items():
690                 toolmap[k] = v.resolved
691             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
692
693     tool.visit(upload_tool_deps)
694
695     return merged_map
696
697 def arvados_jobs_image(arvrunner, img, runtimeContext):
698     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
699
700     try:
701         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
702                                                           True,
703                                                           runtimeContext.project_uuid,
704                                                           runtimeContext.force_docker_pull,
705                                                           runtimeContext.tmp_outdir_prefix,
706                                                           runtimeContext.match_local_docker,
707                                                           runtimeContext.copy_deps)
708     except Exception as e:
709         raise Exception("Docker image %s is not available\n%s" % (img, e) )
710
711
712 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
713     collection = arvados.collection.Collection(api_client=arvrunner.api,
714                                                keep_client=arvrunner.keep_client,
715                                                num_retries=arvrunner.num_retries)
716     with collection.open("workflow.cwl", "w") as f:
717         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
718
719     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
720                ["name", "like", name+"%"]]
721     if runtimeContext.project_uuid:
722         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
723     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
724
725     if exists["items"]:
726         logger.info("Using collection %s", exists["items"][0]["uuid"])
727     else:
728         collection.save_new(name=name,
729                             owner_uuid=runtimeContext.project_uuid,
730                             ensure_unique_name=True,
731                             num_retries=arvrunner.num_retries)
732         logger.info("Uploaded to %s", collection.manifest_locator())
733
734     return collection.portable_data_hash()
735
736
737 class Runner(Process):
738     """Base class for runner processes, which submit an instance of
739     arvados-cwl-runner and wait for the final result."""
740
741     def __init__(self, runner, updated_tool,
742                  tool, loadingContext, enable_reuse,
743                  output_name, output_tags, submit_runner_ram=0,
744                  name=None, on_error=None, submit_runner_image=None,
745                  intermediate_output_ttl=0, merged_map=None,
746                  priority=None, secret_store=None,
747                  collection_cache_size=256,
748                  collection_cache_is_default=True):
749
750         loadingContext = loadingContext.copy()
751         loadingContext.metadata = updated_tool.metadata.copy()
752
753         super(Runner, self).__init__(updated_tool.tool, loadingContext)
754
755         self.arvrunner = runner
756         self.embedded_tool = tool
757         self.job_order = None
758         self.running = False
759         if enable_reuse:
760             # If reuse is permitted by command line arguments but
761             # disabled by the workflow itself, disable it.
762             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
763             if reuse_req:
764                 enable_reuse = reuse_req["enableReuse"]
765         self.enable_reuse = enable_reuse
766         self.uuid = None
767         self.final_output = None
768         self.output_name = output_name
769         self.output_tags = output_tags
770         self.name = name
771         self.on_error = on_error
772         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
773         self.intermediate_output_ttl = intermediate_output_ttl
774         self.priority = priority
775         self.secret_store = secret_store
776         self.enable_dev = loadingContext.enable_dev
777
778         self.submit_runner_cores = 1
779         self.submit_runner_ram = 1024  # defaut 1 GiB
780         self.collection_cache_size = collection_cache_size
781
782         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
783         if runner_resource_req:
784             if runner_resource_req.get("coresMin"):
785                 self.submit_runner_cores = runner_resource_req["coresMin"]
786             if runner_resource_req.get("ramMin"):
787                 self.submit_runner_ram = runner_resource_req["ramMin"]
788             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
789                 self.collection_cache_size = runner_resource_req["keep_cache"]
790
791         if submit_runner_ram:
792             # Command line / initializer overrides default and/or spec from workflow
793             self.submit_runner_ram = submit_runner_ram
794
795         if self.submit_runner_ram <= 0:
796             raise Exception("Value of submit-runner-ram must be greater than zero")
797
798         if self.submit_runner_cores <= 0:
799             raise Exception("Value of submit-runner-cores must be greater than zero")
800
801         self.merged_map = merged_map or {}
802
803     def job(self,
804             job_order,         # type: Mapping[Text, Text]
805             output_callbacks,  # type: Callable[[Any, Any], Any]
806             runtimeContext     # type: RuntimeContext
807            ):  # type: (...) -> Generator[Any, None, None]
808         self.job_order = job_order
809         self._init_job(job_order, runtimeContext)
810         yield self
811
812     def update_pipeline_component(self, record):
813         pass
814
815     def done(self, record):
816         """Base method for handling a completed runner."""
817
818         try:
819             if record["state"] == "Complete":
820                 if record.get("exit_code") is not None:
821                     if record["exit_code"] == 33:
822                         processStatus = "UnsupportedRequirement"
823                     elif record["exit_code"] == 0:
824                         processStatus = "success"
825                     else:
826                         processStatus = "permanentFail"
827                 else:
828                     processStatus = "success"
829             else:
830                 processStatus = "permanentFail"
831
832             outputs = {}
833
834             if processStatus == "permanentFail":
835                 logc = arvados.collection.CollectionReader(record["log"],
836                                                            api_client=self.arvrunner.api,
837                                                            keep_client=self.arvrunner.keep_client,
838                                                            num_retries=self.arvrunner.num_retries)
839                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
840
841             self.final_output = record["output"]
842             outc = arvados.collection.CollectionReader(self.final_output,
843                                                        api_client=self.arvrunner.api,
844                                                        keep_client=self.arvrunner.keep_client,
845                                                        num_retries=self.arvrunner.num_retries)
846             if "cwl.output.json" in outc:
847                 with outc.open("cwl.output.json", "rb") as f:
848                     if f.size() > 0:
849                         outputs = json.loads(f.read().decode())
850             def keepify(fileobj):
851                 path = fileobj["location"]
852                 if not path.startswith("keep:"):
853                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
854             adjustFileObjs(outputs, keepify)
855             adjustDirObjs(outputs, keepify)
856         except Exception:
857             logger.exception("[%s] While getting final output object", self.name)
858             self.arvrunner.output_callback({}, "permanentFail")
859         else:
860             self.arvrunner.output_callback(outputs, processStatus)