19280: Add metrics checkpoints
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 import arvados.util
43 from .util import collectionUUID
44 from ruamel.yaml import YAML
45 from ruamel.yaml.comments import CommentedMap, CommentedSeq
46
47 import arvados_cwl.arvdocker
48 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
49 from ._version import __version__
50 from . import done
51 from . context import ArvRuntimeContext
52 from .perf import Perf
53
54 logger = logging.getLogger('arvados.cwl-runner')
55 metrics = logging.getLogger('arvados.cwl-runner.metrics')
56
57 def trim_anonymous_location(obj):
58     """Remove 'location' field from File and Directory literals.
59
60     To make internal handling easier, literals are assigned a random id for
61     'location'.  However, when writing the record back out, this can break
62     reproducibility.  Since it is valid for literals not have a 'location'
63     field, remove it.
64
65     """
66
67     if obj.get("location", "").startswith("_:"):
68         del obj["location"]
69
70
71 def remove_redundant_fields(obj):
72     for field in ("path", "nameext", "nameroot", "dirname"):
73         if field in obj:
74             del obj[field]
75
76
77 def find_defaults(d, op):
78     if isinstance(d, list):
79         for i in d:
80             find_defaults(i, op)
81     elif isinstance(d, dict):
82         if "default" in d:
83             op(d)
84         else:
85             for i in viewvalues(d):
86                 find_defaults(i, op)
87
88 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
89     return Builder(
90                  job=joborder,
91                  files=[],               # type: List[Dict[Text, Text]]
92                  bindings=[],            # type: List[Dict[Text, Any]]
93                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
94                  names=None,               # type: Names
95                  requirements=requirements,        # type: List[Dict[Text, Any]]
96                  hints=hints,               # type: List[Dict[Text, Any]]
97                  resources={},           # type: Dict[str, int]
98                  mutation_manager=None,    # type: Optional[MutationManager]
99                  formatgraph=None,         # type: Optional[Graph]
100                  make_fs_access=None,      # type: Type[StdFsAccess]
101                  fs_access=None,           # type: StdFsAccess
102                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
103                  timeout=runtimeContext.eval_timeout,             # type: float
104                  debug=runtimeContext.debug,               # type: bool
105                  js_console=runtimeContext.js_console,          # type: bool
106                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
107                  loadListing="",         # type: Text
108                  outdir="",              # type: Text
109                  tmpdir="",              # type: Text
110                  stagedir="",            # type: Text
111                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
112                  container_engine="docker"
113                 )
114
115 def search_schemadef(name, reqs):
116     for r in reqs:
117         if r["class"] == "SchemaDefRequirement":
118             for sd in r["types"]:
119                 if sd["name"] == name:
120                     return sd
121     return None
122
123 primitive_types_set = frozenset(("null", "boolean", "int", "long",
124                                  "float", "double", "string", "record",
125                                  "array", "enum"))
126
127 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
128     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
129         # union type, collect all possible secondaryFiles
130         for i in inputschema:
131             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
132         return
133
134     if inputschema == "File":
135         inputschema = {"type": "File"}
136
137     if isinstance(inputschema, basestring):
138         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
139         if sd:
140             inputschema = sd
141         else:
142             return
143
144     if "secondaryFiles" in inputschema:
145         # set secondaryFiles, may be inherited by compound types.
146         secondaryspec = inputschema["secondaryFiles"]
147
148     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
149         not isinstance(inputschema["type"], basestring)):
150         # compound type (union, array, record)
151         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
152
153     elif (inputschema["type"] == "record" and
154           isinstance(primary, Mapping)):
155         #
156         # record type, find secondary files associated with fields.
157         #
158         for f in inputschema["fields"]:
159             p = primary.get(shortname(f["name"]))
160             if p:
161                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
162
163     elif (inputschema["type"] == "array" and
164           isinstance(primary, Sequence)):
165         #
166         # array type, find secondary files of elements
167         #
168         for p in primary:
169             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
170
171     elif (inputschema["type"] == "File" and
172           isinstance(primary, Mapping) and
173           primary.get("class") == "File"):
174
175         if "secondaryFiles" in primary or not secondaryspec:
176             # Nothing to do.
177             return
178
179         #
180         # Found a file, check for secondaryFiles
181         #
182         specs = []
183         primary["secondaryFiles"] = secondaryspec
184         for i, sf in enumerate(aslist(secondaryspec)):
185             if builder.cwlVersion == "v1.0":
186                 pattern = sf
187             else:
188                 pattern = sf["pattern"]
189             if pattern is None:
190                 continue
191             if isinstance(pattern, list):
192                 specs.extend(pattern)
193             elif isinstance(pattern, dict):
194                 specs.append(pattern)
195             elif isinstance(pattern, str):
196                 if builder.cwlVersion == "v1.0":
197                     specs.append({"pattern": pattern, "required": True})
198                 else:
199                     specs.append({"pattern": pattern, "required": sf.get("required")})
200             else:
201                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202                     "Expression must return list, object, string or null")
203
204         found = []
205         for i, sf in enumerate(specs):
206             if isinstance(sf, dict):
207                 if sf.get("class") == "File":
208                     pattern = None
209                     if sf.get("location") is None:
210                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
211                             "File object is missing 'location': %s" % sf)
212                     sfpath = sf["location"]
213                     required = True
214                 else:
215                     pattern = sf["pattern"]
216                     required = sf.get("required")
217             elif isinstance(sf, str):
218                 pattern = sf
219                 required = True
220             else:
221                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
222                     "Expression must return list, object, string or null")
223
224             if pattern is not None:
225                 if "${" in pattern or "$(" in pattern:
226                     sfname = builder.do_eval(pattern, context=primary)
227                 else:
228                     sfname = substitute(primary["basename"], pattern)
229
230                 if sfname is None:
231                     continue
232
233                 p_location = primary["location"]
234                 if "/" in p_location:
235                     sfpath = (
236                         p_location[0 : p_location.rindex("/") + 1]
237                         + sfname
238                     )
239
240             required = builder.do_eval(required, context=primary)
241
242             if fsaccess.exists(sfpath):
243                 if pattern is not None:
244                     found.append({"location": sfpath, "class": "File"})
245                 else:
246                     found.append(sf)
247             elif required:
248                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
249                     "Required secondary file '%s' does not exist" % sfpath)
250
251         primary["secondaryFiles"] = cmap(found)
252         if discovered is not None:
253             discovered[primary["location"]] = primary["secondaryFiles"]
254     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
255         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
256
257 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
258     for inputschema in inputs:
259         primary = job_order.get(shortname(inputschema["id"]))
260         if isinstance(primary, (Mapping, Sequence)):
261             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
262
263 def upload_dependencies(arvrunner, name, document_loader,
264                         workflowobj, uri, loadref_run, runtimeContext,
265                         include_primary=True, discovered_secondaryfiles=None,
266                         cache=None):
267     """Upload the dependencies of the workflowobj document to Keep.
268
269     Returns a pathmapper object mapping local paths to keep references.  Also
270     does an in-place update of references in "workflowobj".
271
272     Use scandeps to find $import, $include, $schemas, run, File and Directory
273     fields that represent external references.
274
275     If workflowobj has an "id" field, this will reload the document to ensure
276     it is scanning the raw document prior to preprocessing.
277     """
278
279     loaded = set()
280     def loadref(b, u):
281         joined = document_loader.fetcher.urljoin(b, u)
282         defrg, _ = urllib.parse.urldefrag(joined)
283         if defrg not in loaded:
284             loaded.add(defrg)
285             if cache is not None and defrg in cache:
286                 return cache[defrg]
287             # Use fetch_text to get raw file (before preprocessing).
288             text = document_loader.fetch_text(defrg)
289             if isinstance(text, bytes):
290                 textIO = StringIO(text.decode('utf-8'))
291             else:
292                 textIO = StringIO(text)
293             yamlloader = YAML(typ='safe', pure=True)
294             result = yamlloader.load(textIO)
295             if cache is not None:
296                 cache[defrg] = result
297             return result
298         else:
299             return {}
300
301     if loadref_run:
302         loadref_fields = set(("$import", "run"))
303     else:
304         loadref_fields = set(("$import",))
305
306     scanobj = workflowobj
307     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
308         # Need raw file content (before preprocessing) to ensure
309         # that external references in $include and $mixin are captured.
310         scanobj = loadref("", workflowobj["id"])
311
312     metadata = scanobj
313
314     sc_result = scandeps(uri, scanobj,
315                          loadref_fields,
316                          set(("$include", "location")),
317                          loadref, urljoin=document_loader.fetcher.urljoin,
318                          nestdirs=False)
319
320     optional_deps = scandeps(uri, scanobj,
321                                   loadref_fields,
322                                   set(("$schemas",)),
323                                   loadref, urljoin=document_loader.fetcher.urljoin,
324                                   nestdirs=False)
325
326     sc_result.extend(optional_deps)
327
328     sc = []
329     uuids = {}
330
331     def collect_uuids(obj):
332         loc = obj.get("location", "")
333         sp = loc.split(":")
334         if sp[0] == "keep":
335             # Collect collection uuids that need to be resolved to
336             # portable data hashes
337             gp = collection_uuid_pattern.match(loc)
338             if gp:
339                 uuids[gp.groups()[0]] = obj
340             if collectionUUID in obj:
341                 uuids[obj[collectionUUID]] = obj
342
343     def collect_uploads(obj):
344         loc = obj.get("location", "")
345         sp = loc.split(":")
346         if len(sp) < 1:
347             return
348         if sp[0] in ("file", "http", "https"):
349             # Record local files than need to be uploaded,
350             # don't include file literals, keep references, etc.
351             sc.append(obj)
352         collect_uuids(obj)
353
354     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
355     visit_class(sc_result, ("File", "Directory"), collect_uploads)
356
357     # Resolve any collection uuids we found to portable data hashes
358     # and assign them to uuid_map
359     uuid_map = {}
360     fetch_uuids = list(uuids.keys())
361     while fetch_uuids:
362         # For a large number of fetch_uuids, API server may limit
363         # response size, so keep fetching from API server has nothing
364         # more to give us.
365         lookups = arvrunner.api.collections().list(
366             filters=[["uuid", "in", fetch_uuids]],
367             count="none",
368             select=["uuid", "portable_data_hash"]).execute(
369                 num_retries=arvrunner.num_retries)
370
371         if not lookups["items"]:
372             break
373
374         for l in lookups["items"]:
375             uuid_map[l["uuid"]] = l["portable_data_hash"]
376
377         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
378
379     normalizeFilesDirs(sc)
380
381     if include_primary and "id" in workflowobj:
382         sc.append({"class": "File", "location": workflowobj["id"]})
383
384     def visit_default(obj):
385         def defaults_are_optional(f):
386             if "location" not in f and "path" in f:
387                 f["location"] = f["path"]
388                 del f["path"]
389             normalizeFilesDirs(f)
390             optional_deps.append(f)
391         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
392
393     find_defaults(workflowobj, visit_default)
394
395     discovered = {}
396     def discover_default_secondary_files(obj):
397         builder_job_order = {}
398         for t in obj["inputs"]:
399             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
400         # Need to create a builder object to evaluate expressions.
401         builder = make_builder(builder_job_order,
402                                obj.get("hints", []),
403                                obj.get("requirements", []),
404                                ArvRuntimeContext(),
405                                metadata)
406         discover_secondary_files(arvrunner.fs_access,
407                                  builder,
408                                  obj["inputs"],
409                                  builder_job_order,
410                                  discovered)
411
412     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
413     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
414
415     for d in list(discovered):
416         # Only interested in discovered secondaryFiles which are local
417         # files that need to be uploaded.
418         if d.startswith("file:"):
419             sc.extend(discovered[d])
420         else:
421             del discovered[d]
422
423     mapper = ArvPathMapper(arvrunner, sc, "",
424                            "keep:%s",
425                            "keep:%s/%s",
426                            name=name,
427                            single_collection=True,
428                            optional_deps=optional_deps)
429
430     keeprefs = set()
431     def addkeepref(k):
432         if k.startswith("keep:"):
433             keeprefs.add(collection_pdh_pattern.match(k).group(1))
434
435     def setloc(p):
436         loc = p.get("location")
437         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
438             p["location"] = mapper.mapper(p["location"]).resolved
439             addkeepref(p["location"])
440             return
441
442         if not loc:
443             return
444
445         if collectionUUID in p:
446             uuid = p[collectionUUID]
447             if uuid not in uuid_map:
448                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
449                     "Collection uuid %s not found" % uuid)
450             gp = collection_pdh_pattern.match(loc)
451             if gp and uuid_map[uuid] != gp.groups()[0]:
452                 # This file entry has both collectionUUID and a PDH
453                 # location. If the PDH doesn't match the one returned
454                 # the API server, raise an error.
455                 raise SourceLine(p, "location", validate.ValidationException).makeError(
456                     "Expected collection uuid %s to be %s but API server reported %s" % (
457                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
458
459         gp = collection_uuid_pattern.match(loc)
460         if not gp:
461             # Not a uuid pattern (must be a pdh pattern)
462             addkeepref(p["location"])
463             return
464
465         uuid = gp.groups()[0]
466         if uuid not in uuid_map:
467             raise SourceLine(p, "location", validate.ValidationException).makeError(
468                 "Collection uuid %s not found" % uuid)
469         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
470         p[collectionUUID] = uuid
471
472     visit_class(workflowobj, ("File", "Directory"), setloc)
473     visit_class(discovered, ("File", "Directory"), setloc)
474
475     if discovered_secondaryfiles is not None:
476         for d in discovered:
477             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
478
479     if runtimeContext.copy_deps:
480         # Find referenced collections and copy them into the
481         # destination project, for easy sharing.
482         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
483                                      filters=[["portable_data_hash", "in", list(keeprefs)],
484                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
485                                      select=["uuid", "portable_data_hash", "created_at"]))
486
487         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
488         for kr in keeprefs:
489             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
490                                                   order="created_at desc",
491                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
492                                                    limit=1).execute()
493             if len(col["items"]) == 0:
494                 logger.warning("Cannot find collection with portable data hash %s", kr)
495                 continue
496             col = col["items"][0]
497             try:
498                 arvrunner.api.collections().create(body={"collection": {
499                     "owner_uuid": runtimeContext.project_uuid,
500                     "name": col["name"],
501                     "description": col["description"],
502                     "properties": col["properties"],
503                     "portable_data_hash": col["portable_data_hash"],
504                     "manifest_text": col["manifest_text"],
505                     "storage_classes_desired": col["storage_classes_desired"],
506                     "trash_at": col["trash_at"]
507                 }}, ensure_unique_name=True).execute()
508             except Exception as e:
509                 logger.warning("Unable copy collection to destination: %s", e)
510
511     if "$schemas" in workflowobj:
512         sch = CommentedSeq()
513         for s in workflowobj["$schemas"]:
514             if s in mapper:
515                 sch.append(mapper.mapper(s).resolved)
516         workflowobj["$schemas"] = sch
517
518     return mapper
519
520
521 def upload_docker(arvrunner, tool, runtimeContext):
522     """Uploads Docker images used in CommandLineTool objects."""
523
524     if isinstance(tool, CommandLineTool):
525         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
526         if docker_req:
527             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
528                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
529                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
530
531             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
532                                                        runtimeContext.project_uuid,
533                                                        runtimeContext.force_docker_pull,
534                                                        runtimeContext.tmp_outdir_prefix,
535                                                        runtimeContext.match_local_docker,
536                                                        runtimeContext.copy_deps)
537         else:
538             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
539                                                        True,
540                                                        runtimeContext.project_uuid,
541                                                        runtimeContext.force_docker_pull,
542                                                        runtimeContext.tmp_outdir_prefix,
543                                                        runtimeContext.match_local_docker,
544                                                        runtimeContext.copy_deps)
545     elif isinstance(tool, cwltool.workflow.Workflow):
546         for s in tool.steps:
547             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
548
549
550 def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
551     """Create a packed workflow.
552
553     A "packed" workflow is one where all the components have been combined into a single document."""
554
555     rewrites = {}
556     packed = pack(arvrunner.loadingContext, tool.tool["id"],
557                   rewrite_out=rewrites,
558                   loader=tool.doc_loader)
559
560     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
561
562     def visit(v, cur_id):
563         if isinstance(v, dict):
564             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
565                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
566                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
567                 if "id" in v:
568                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
569             if "path" in v and "location" not in v:
570                 v["location"] = v["path"]
571                 del v["path"]
572             if "location" in v and cur_id in merged_map:
573                 if v["location"] in merged_map[cur_id].resolved:
574                     v["location"] = merged_map[cur_id].resolved[v["location"]]
575                 if v["location"] in merged_map[cur_id].secondaryFiles:
576                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
577             if v.get("class") == "DockerRequirement":
578                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
579                                                                                                              runtimeContext.project_uuid,
580                                                                                                              runtimeContext.force_docker_pull,
581                                                                                                              runtimeContext.tmp_outdir_prefix,
582                                                                                                              runtimeContext.match_local_docker,
583                                                                                                              runtimeContext.copy_deps)
584             for l in v:
585                 visit(v[l], cur_id)
586         if isinstance(v, list):
587             for l in v:
588                 visit(l, cur_id)
589     visit(packed, None)
590     return packed
591
592
593 def tag_git_version(packed):
594     if tool.tool["id"].startswith("file://"):
595         path = os.path.dirname(tool.tool["id"][7:])
596         try:
597             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
598         except (OSError, subprocess.CalledProcessError):
599             pass
600         else:
601             packed["http://schema.org/version"] = githash
602
603
604 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
605     """Upload local files referenced in the input object and return updated input
606     object with 'location' updated to the proper keep references.
607     """
608
609     # Make a copy of the job order and set defaults.
610     builder_job_order = copy.copy(job_order)
611
612     # fill_in_defaults throws an error if there are any
613     # missing required parameters, we don't want it to do that
614     # so make them all optional.
615     inputs_copy = copy.deepcopy(tool.tool["inputs"])
616     for i in inputs_copy:
617         if "null" not in i["type"]:
618             i["type"] = ["null"] + aslist(i["type"])
619
620     fill_in_defaults(inputs_copy,
621                      builder_job_order,
622                      arvrunner.fs_access)
623     # Need to create a builder object to evaluate expressions.
624     builder = make_builder(builder_job_order,
625                            tool.hints,
626                            tool.requirements,
627                            ArvRuntimeContext(),
628                            tool.metadata)
629     # Now update job_order with secondaryFiles
630     discover_secondary_files(arvrunner.fs_access,
631                              builder,
632                              tool.tool["inputs"],
633                              job_order)
634
635     jobmapper = upload_dependencies(arvrunner,
636                                     name,
637                                     tool.doc_loader,
638                                     job_order,
639                                     job_order.get("id", "#"),
640                                     False,
641                                     runtimeContext)
642
643     if "id" in job_order:
644         del job_order["id"]
645
646     # Need to filter this out, gets added by cwltool when providing
647     # parameters on the command line.
648     if "job_order" in job_order:
649         del job_order["job_order"]
650
651     return job_order
652
653 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
654
655 def upload_workflow_deps(arvrunner, tool, runtimeContext):
656     # Ensure that Docker images needed by this workflow are available
657
658     with Perf(metrics, "upload_docker"):
659         upload_docker(arvrunner, tool, runtimeContext)
660
661     document_loader = tool.doc_loader
662
663     merged_map = {}
664     tool_dep_cache = {}
665     def upload_tool_deps(deptool):
666         if "id" in deptool:
667             discovered_secondaryfiles = {}
668             with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"]):
669                 pm = upload_dependencies(arvrunner,
670                                          "%s dependencies" % (shortname(deptool["id"])),
671                                          document_loader,
672                                          deptool,
673                                          deptool["id"],
674                                          False,
675                                          runtimeContext,
676                                          include_primary=False,
677                                          discovered_secondaryfiles=discovered_secondaryfiles,
678                                          cache=tool_dep_cache)
679             document_loader.idx[deptool["id"]] = deptool
680             toolmap = {}
681             for k,v in pm.items():
682                 toolmap[k] = v.resolved
683             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
684
685     tool.visit(upload_tool_deps)
686
687     return merged_map
688
689 def arvados_jobs_image(arvrunner, img, runtimeContext):
690     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
691
692     try:
693         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
694                                                           True,
695                                                           runtimeContext.project_uuid,
696                                                           runtimeContext.force_docker_pull,
697                                                           runtimeContext.tmp_outdir_prefix,
698                                                           runtimeContext.match_local_docker,
699                                                           runtimeContext.copy_deps)
700     except Exception as e:
701         raise Exception("Docker image %s is not available\n%s" % (img, e) )
702
703
704 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
705     collection = arvados.collection.Collection(api_client=arvrunner.api,
706                                                keep_client=arvrunner.keep_client,
707                                                num_retries=arvrunner.num_retries)
708     with collection.open("workflow.cwl", "w") as f:
709         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
710
711     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
712                ["name", "like", name+"%"]]
713     if runtimeContext.project_uuid:
714         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
715     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
716
717     if exists["items"]:
718         logger.info("Using collection %s", exists["items"][0]["uuid"])
719     else:
720         collection.save_new(name=name,
721                             owner_uuid=runtimeContext.project_uuid,
722                             ensure_unique_name=True,
723                             num_retries=arvrunner.num_retries)
724         logger.info("Uploaded to %s", collection.manifest_locator())
725
726     return collection.portable_data_hash()
727
728
729 class Runner(Process):
730     """Base class for runner processes, which submit an instance of
731     arvados-cwl-runner and wait for the final result."""
732
733     def __init__(self, runner, updated_tool,
734                  tool, loadingContext, enable_reuse,
735                  output_name, output_tags, submit_runner_ram=0,
736                  name=None, on_error=None, submit_runner_image=None,
737                  intermediate_output_ttl=0, merged_map=None,
738                  priority=None, secret_store=None,
739                  collection_cache_size=256,
740                  collection_cache_is_default=True):
741
742         loadingContext = loadingContext.copy()
743         loadingContext.metadata = updated_tool.metadata.copy()
744
745         super(Runner, self).__init__(updated_tool.tool, loadingContext)
746
747         self.arvrunner = runner
748         self.embedded_tool = tool
749         self.job_order = None
750         self.running = False
751         if enable_reuse:
752             # If reuse is permitted by command line arguments but
753             # disabled by the workflow itself, disable it.
754             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
755             if reuse_req:
756                 enable_reuse = reuse_req["enableReuse"]
757         self.enable_reuse = enable_reuse
758         self.uuid = None
759         self.final_output = None
760         self.output_name = output_name
761         self.output_tags = output_tags
762         self.name = name
763         self.on_error = on_error
764         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
765         self.intermediate_output_ttl = intermediate_output_ttl
766         self.priority = priority
767         self.secret_store = secret_store
768         self.enable_dev = loadingContext.enable_dev
769
770         self.submit_runner_cores = 1
771         self.submit_runner_ram = 1024  # defaut 1 GiB
772         self.collection_cache_size = collection_cache_size
773
774         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
775         if runner_resource_req:
776             if runner_resource_req.get("coresMin"):
777                 self.submit_runner_cores = runner_resource_req["coresMin"]
778             if runner_resource_req.get("ramMin"):
779                 self.submit_runner_ram = runner_resource_req["ramMin"]
780             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
781                 self.collection_cache_size = runner_resource_req["keep_cache"]
782
783         if submit_runner_ram:
784             # Command line / initializer overrides default and/or spec from workflow
785             self.submit_runner_ram = submit_runner_ram
786
787         if self.submit_runner_ram <= 0:
788             raise Exception("Value of submit-runner-ram must be greater than zero")
789
790         if self.submit_runner_cores <= 0:
791             raise Exception("Value of submit-runner-cores must be greater than zero")
792
793         self.merged_map = merged_map or {}
794
795     def job(self,
796             job_order,         # type: Mapping[Text, Text]
797             output_callbacks,  # type: Callable[[Any, Any], Any]
798             runtimeContext     # type: RuntimeContext
799            ):  # type: (...) -> Generator[Any, None, None]
800         self.job_order = job_order
801         self._init_job(job_order, runtimeContext)
802         yield self
803
804     def update_pipeline_component(self, record):
805         pass
806
807     def done(self, record):
808         """Base method for handling a completed runner."""
809
810         try:
811             if record["state"] == "Complete":
812                 if record.get("exit_code") is not None:
813                     if record["exit_code"] == 33:
814                         processStatus = "UnsupportedRequirement"
815                     elif record["exit_code"] == 0:
816                         processStatus = "success"
817                     else:
818                         processStatus = "permanentFail"
819                 else:
820                     processStatus = "success"
821             else:
822                 processStatus = "permanentFail"
823
824             outputs = {}
825
826             if processStatus == "permanentFail":
827                 logc = arvados.collection.CollectionReader(record["log"],
828                                                            api_client=self.arvrunner.api,
829                                                            keep_client=self.arvrunner.keep_client,
830                                                            num_retries=self.arvrunner.num_retries)
831                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
832
833             self.final_output = record["output"]
834             outc = arvados.collection.CollectionReader(self.final_output,
835                                                        api_client=self.arvrunner.api,
836                                                        keep_client=self.arvrunner.keep_client,
837                                                        num_retries=self.arvrunner.num_retries)
838             if "cwl.output.json" in outc:
839                 with outc.open("cwl.output.json", "rb") as f:
840                     if f.size() > 0:
841                         outputs = json.loads(f.read().decode())
842             def keepify(fileobj):
843                 path = fileobj["location"]
844                 if not path.startswith("keep:"):
845                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
846             adjustFileObjs(outputs, keepify)
847             adjustDirObjs(outputs, keepify)
848         except Exception:
849             logger.exception("[%s] While getting final output object", self.name)
850             self.arvrunner.output_callback({}, "permanentFail")
851         else:
852             self.arvrunner.output_callback(outputs, processStatus)