a307cf7f666b6e6f5bdda0203abea6f081a7ef86
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import (
21     Any,
22     Callable,
23     Dict,
24     Iterable,
25     Iterator,
26     List,
27     Mapping,
28     MutableMapping,
29     Sequence,
30     MutableSequence,
31     Optional,
32     Set,
33     Sized,
34     Tuple,
35     Type,
36     Union,
37     cast,
38 )
39 from cwltool.utils import (
40     CWLObjectType,
41     CWLOutputAtomType,
42     CWLOutputType,
43 )
44
45 if os.name == "posix" and sys.version_info[0] < 3:
46     import subprocess32 as subprocess
47 else:
48     import subprocess
49
50 from schema_salad.sourceline import SourceLine, cmap
51
52 from cwltool.command_line_tool import CommandLineTool
53 import cwltool.workflow
54 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
55                              shortname, Process, fill_in_defaults)
56 from cwltool.load_tool import fetch_document, jobloaderctx
57 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
58 from cwltool.builder import substitute
59 from cwltool.pack import pack
60 from cwltool.update import INTERNAL_VERSION
61 from cwltool.builder import Builder
62 import schema_salad.validate as validate
63 import schema_salad.ref_resolver
64
65 import arvados.collection
66 import arvados.util
67 from .util import collectionUUID
68 from ruamel.yaml import YAML
69 from ruamel.yaml.comments import CommentedMap, CommentedSeq
70
71 import arvados_cwl.arvdocker
72 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
73 from ._version import __version__
74 from . import done
75 from . context import ArvRuntimeContext
76 from .perf import Perf
77
78 logger = logging.getLogger('arvados.cwl-runner')
79 metrics = logging.getLogger('arvados.cwl-runner.metrics')
80
81 def trim_anonymous_location(obj):
82     """Remove 'location' field from File and Directory literals.
83
84     To make internal handling easier, literals are assigned a random id for
85     'location'.  However, when writing the record back out, this can break
86     reproducibility.  Since it is valid for literals not have a 'location'
87     field, remove it.
88
89     """
90
91     if obj.get("location", "").startswith("_:"):
92         del obj["location"]
93
94
95 def remove_redundant_fields(obj):
96     for field in ("path", "nameext", "nameroot", "dirname"):
97         if field in obj:
98             del obj[field]
99
100
101 def find_defaults(d, op):
102     if isinstance(d, list):
103         for i in d:
104             find_defaults(i, op)
105     elif isinstance(d, dict):
106         if "default" in d:
107             op(d)
108         else:
109             for i in viewvalues(d):
110                 find_defaults(i, op)
111
112 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
113     return Builder(
114                  job=joborder,
115                  files=[],               # type: List[Dict[Text, Text]]
116                  bindings=[],            # type: List[Dict[Text, Any]]
117                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
118                  names=None,               # type: Names
119                  requirements=requirements,        # type: List[Dict[Text, Any]]
120                  hints=hints,               # type: List[Dict[Text, Any]]
121                  resources={},           # type: Dict[str, int]
122                  mutation_manager=None,    # type: Optional[MutationManager]
123                  formatgraph=None,         # type: Optional[Graph]
124                  make_fs_access=None,      # type: Type[StdFsAccess]
125                  fs_access=None,           # type: StdFsAccess
126                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
127                  timeout=runtimeContext.eval_timeout,             # type: float
128                  debug=runtimeContext.debug,               # type: bool
129                  js_console=runtimeContext.js_console,          # type: bool
130                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
131                  loadListing="",         # type: Text
132                  outdir="",              # type: Text
133                  tmpdir="",              # type: Text
134                  stagedir="",            # type: Text
135                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
136                  container_engine="docker"
137                 )
138
139 def search_schemadef(name, reqs):
140     for r in reqs:
141         if r["class"] == "SchemaDefRequirement":
142             for sd in r["types"]:
143                 if sd["name"] == name:
144                     return sd
145     return None
146
147 primitive_types_set = frozenset(("null", "boolean", "int", "long",
148                                  "float", "double", "string", "record",
149                                  "array", "enum"))
150
151 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
152     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
153         # union type, collect all possible secondaryFiles
154         for i in inputschema:
155             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
156         return
157
158     if inputschema == "File":
159         inputschema = {"type": "File"}
160
161     if isinstance(inputschema, basestring):
162         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
163         if sd:
164             inputschema = sd
165         else:
166             return
167
168     if "secondaryFiles" in inputschema:
169         # set secondaryFiles, may be inherited by compound types.
170         secondaryspec = inputschema["secondaryFiles"]
171
172     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
173         not isinstance(inputschema["type"], basestring)):
174         # compound type (union, array, record)
175         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
176
177     elif (inputschema["type"] == "record" and
178           isinstance(primary, Mapping)):
179         #
180         # record type, find secondary files associated with fields.
181         #
182         for f in inputschema["fields"]:
183             p = primary.get(shortname(f["name"]))
184             if p:
185                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
186
187     elif (inputschema["type"] == "array" and
188           isinstance(primary, Sequence)):
189         #
190         # array type, find secondary files of elements
191         #
192         for p in primary:
193             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
194
195     elif (inputschema["type"] == "File" and
196           isinstance(primary, Mapping) and
197           primary.get("class") == "File"):
198
199         if "secondaryFiles" in primary or not secondaryspec:
200             # Nothing to do.
201             return
202
203         #
204         # Found a file, check for secondaryFiles
205         #
206         specs = []
207         primary["secondaryFiles"] = secondaryspec
208         for i, sf in enumerate(aslist(secondaryspec)):
209             if builder.cwlVersion == "v1.0":
210                 pattern = sf
211             else:
212                 pattern = sf["pattern"]
213             if pattern is None:
214                 continue
215             if isinstance(pattern, list):
216                 specs.extend(pattern)
217             elif isinstance(pattern, dict):
218                 specs.append(pattern)
219             elif isinstance(pattern, str):
220                 if builder.cwlVersion == "v1.0":
221                     specs.append({"pattern": pattern, "required": True})
222                 else:
223                     specs.append({"pattern": pattern, "required": sf.get("required")})
224             else:
225                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
226                     "Expression must return list, object, string or null")
227
228         found = []
229         for i, sf in enumerate(specs):
230             if isinstance(sf, dict):
231                 if sf.get("class") == "File":
232                     pattern = None
233                     if sf.get("location") is None:
234                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
235                             "File object is missing 'location': %s" % sf)
236                     sfpath = sf["location"]
237                     required = True
238                 else:
239                     pattern = sf["pattern"]
240                     required = sf.get("required")
241             elif isinstance(sf, str):
242                 pattern = sf
243                 required = True
244             else:
245                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
246                     "Expression must return list, object, string or null")
247
248             if pattern is not None:
249                 if "${" in pattern or "$(" in pattern:
250                     sfname = builder.do_eval(pattern, context=primary)
251                 else:
252                     sfname = substitute(primary["basename"], pattern)
253
254                 if sfname is None:
255                     continue
256
257                 if isinstance(sfname, str):
258                     p_location = primary["location"]
259                     if "/" in p_location:
260                         sfpath = (
261                             p_location[0 : p_location.rindex("/") + 1]
262                             + sfname
263                         )
264
265             required = builder.do_eval(required, context=primary)
266
267             if isinstance(sfname, list) or isinstance(sfname, dict):
268                 each = aslist(sfname)
269                 for e in each:
270                     if required and not fsaccess.exists(e.get("location")):
271                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
272                             "Required secondary file '%s' does not exist" % e.get("location"))
273                 found.extend(each)
274
275             if isinstance(sfname, str):
276                 if fsaccess.exists(sfpath):
277                     if pattern is not None:
278                         found.append({"location": sfpath, "class": "File"})
279                     else:
280                         found.append(sf)
281                 elif required:
282                     raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
283                         "Required secondary file '%s' does not exist" % sfpath)
284
285         primary["secondaryFiles"] = cmap(found)
286         if discovered is not None:
287             discovered[primary["location"]] = primary["secondaryFiles"]
288     elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
289         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
290
291 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
292     for inputschema in inputs:
293         primary = job_order.get(shortname(inputschema["id"]))
294         if isinstance(primary, (Mapping, Sequence)):
295             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
296
297 def upload_dependencies(arvrunner, name, document_loader,
298                         workflowobj, uri, loadref_run, runtimeContext,
299                         include_primary=True, discovered_secondaryfiles=None,
300                         cache=None):
301     """Upload the dependencies of the workflowobj document to Keep.
302
303     Returns a pathmapper object mapping local paths to keep references.  Also
304     does an in-place update of references in "workflowobj".
305
306     Use scandeps to find $import, $include, $schemas, run, File and Directory
307     fields that represent external references.
308
309     If workflowobj has an "id" field, this will reload the document to ensure
310     it is scanning the raw document prior to preprocessing.
311     """
312
313     loaded = set()
314     def loadref(b, u):
315         joined = document_loader.fetcher.urljoin(b, u)
316         defrg, _ = urllib.parse.urldefrag(joined)
317         if defrg not in loaded:
318             loaded.add(defrg)
319             if cache is not None and defrg in cache:
320                 return cache[defrg]
321             # Use fetch_text to get raw file (before preprocessing).
322             text = document_loader.fetch_text(defrg)
323             if isinstance(text, bytes):
324                 textIO = StringIO(text.decode('utf-8'))
325             else:
326                 textIO = StringIO(text)
327             yamlloader = YAML(typ='safe', pure=True)
328             result = yamlloader.load(textIO)
329             if cache is not None:
330                 cache[defrg] = result
331             return result
332         else:
333             return {}
334
335     if loadref_run:
336         loadref_fields = set(("$import", "run"))
337     else:
338         loadref_fields = set(("$import",))
339
340     scanobj = workflowobj
341     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
342         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
343         if cache is not None and defrg not in cache:
344             # if we haven't seen this file before, want raw file
345             # content (before preprocessing) to ensure that external
346             # references like $include haven't already been inlined.
347             scanobj = loadref("", workflowobj["id"])
348
349     metadata = scanobj
350
351     with Perf(metrics, "scandeps include, location"):
352         sc_result = scandeps(uri, scanobj,
353                              loadref_fields,
354                              set(("$include", "location")),
355                              loadref, urljoin=document_loader.fetcher.urljoin,
356                              nestdirs=False)
357
358     with Perf(metrics, "scandeps $schemas"):
359         optional_deps = scandeps(uri, scanobj,
360                                       loadref_fields,
361                                       set(("$schemas",)),
362                                       loadref, urljoin=document_loader.fetcher.urljoin,
363                                       nestdirs=False)
364
365     if sc_result is None:
366         sc_result = []
367
368     if optional_deps is None:
369         optional_deps = []
370
371     if optional_deps:
372         sc_result.extend(optional_deps)
373
374     sc = []
375     uuids = {}
376
377     def collect_uuids(obj):
378         loc = obj.get("location", "")
379         sp = loc.split(":")
380         if sp[0] == "keep":
381             # Collect collection uuids that need to be resolved to
382             # portable data hashes
383             gp = collection_uuid_pattern.match(loc)
384             if gp:
385                 uuids[gp.groups()[0]] = obj
386             if collectionUUID in obj:
387                 uuids[obj[collectionUUID]] = obj
388
389     def collect_uploads(obj):
390         loc = obj.get("location", "")
391         sp = loc.split(":")
392         if len(sp) < 1:
393             return
394         if sp[0] in ("file", "http", "https"):
395             # Record local files than need to be uploaded,
396             # don't include file literals, keep references, etc.
397             sc.append(obj)
398         collect_uuids(obj)
399
400     with Perf(metrics, "collect uuids"):
401         visit_class(workflowobj, ("File", "Directory"), collect_uuids)
402
403     with Perf(metrics, "collect uploads"):
404         visit_class(sc_result, ("File", "Directory"), collect_uploads)
405
406     # Resolve any collection uuids we found to portable data hashes
407     # and assign them to uuid_map
408     uuid_map = {}
409     fetch_uuids = list(uuids.keys())
410     with Perf(metrics, "fetch_uuids"):
411         while fetch_uuids:
412             # For a large number of fetch_uuids, API server may limit
413             # response size, so keep fetching from API server has nothing
414             # more to give us.
415             lookups = arvrunner.api.collections().list(
416                 filters=[["uuid", "in", fetch_uuids]],
417                 count="none",
418                 select=["uuid", "portable_data_hash"]).execute(
419                     num_retries=arvrunner.num_retries)
420
421             if not lookups["items"]:
422                 break
423
424             for l in lookups["items"]:
425                 uuid_map[l["uuid"]] = l["portable_data_hash"]
426
427             fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
428
429     normalizeFilesDirs(sc)
430
431     if "id" in workflowobj:
432         defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
433         if include_primary:
434             # make sure it's included
435             sc.append({"class": "File", "location": defrg})
436         else:
437             # make sure it's excluded
438             sc = [d for d in sc if d.get("location") != defrg]
439
440     def visit_default(obj):
441         def defaults_are_optional(f):
442             if "location" not in f and "path" in f:
443                 f["location"] = f["path"]
444                 del f["path"]
445             normalizeFilesDirs(f)
446             optional_deps.append(f)
447         visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
448
449     find_defaults(workflowobj, visit_default)
450
451     discovered = {}
452     def discover_default_secondary_files(obj):
453         builder_job_order = {}
454         for t in obj["inputs"]:
455             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
456         # Need to create a builder object to evaluate expressions.
457         builder = make_builder(builder_job_order,
458                                obj.get("hints", []),
459                                obj.get("requirements", []),
460                                ArvRuntimeContext(),
461                                metadata)
462         discover_secondary_files(arvrunner.fs_access,
463                                  builder,
464                                  obj["inputs"],
465                                  builder_job_order,
466                                  discovered)
467
468     _jobloaderctx = jobloaderctx.copy()
469     loader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=document_loader.fetcher_constructor)
470
471     copied, _ = loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
472     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
473
474     for d in list(discovered):
475         # Only interested in discovered secondaryFiles which are local
476         # files that need to be uploaded.
477         if d.startswith("file:"):
478             sc.extend(discovered[d])
479         else:
480             del discovered[d]
481
482     with Perf(metrics, "mapper"):
483         mapper = ArvPathMapper(arvrunner, sc, "",
484                                "keep:%s",
485                                "keep:%s/%s",
486                                name=name,
487                                single_collection=True,
488                                optional_deps=optional_deps)
489
490     keeprefs = set()
491     def addkeepref(k):
492         if k.startswith("keep:"):
493             keeprefs.add(collection_pdh_pattern.match(k).group(1))
494
495     def setloc(p):
496         loc = p.get("location")
497         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
498             p["location"] = mapper.mapper(p["location"]).resolved
499             addkeepref(p["location"])
500             return
501
502         if not loc:
503             return
504
505         if collectionUUID in p:
506             uuid = p[collectionUUID]
507             if uuid not in uuid_map:
508                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
509                     "Collection uuid %s not found" % uuid)
510             gp = collection_pdh_pattern.match(loc)
511             if gp and uuid_map[uuid] != gp.groups()[0]:
512                 # This file entry has both collectionUUID and a PDH
513                 # location. If the PDH doesn't match the one returned
514                 # the API server, raise an error.
515                 raise SourceLine(p, "location", validate.ValidationException).makeError(
516                     "Expected collection uuid %s to be %s but API server reported %s" % (
517                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
518
519         gp = collection_uuid_pattern.match(loc)
520         if not gp:
521             # Not a uuid pattern (must be a pdh pattern)
522             addkeepref(p["location"])
523             return
524
525         uuid = gp.groups()[0]
526         if uuid not in uuid_map:
527             raise SourceLine(p, "location", validate.ValidationException).makeError(
528                 "Collection uuid %s not found" % uuid)
529         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
530         p[collectionUUID] = uuid
531
532     with Perf(metrics, "setloc"):
533         visit_class(workflowobj, ("File", "Directory"), setloc)
534         visit_class(discovered, ("File", "Directory"), setloc)
535
536     if discovered_secondaryfiles is not None:
537         for d in discovered:
538             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
539
540     if runtimeContext.copy_deps:
541         # Find referenced collections and copy them into the
542         # destination project, for easy sharing.
543         already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
544                                      filters=[["portable_data_hash", "in", list(keeprefs)],
545                                               ["owner_uuid", "=", runtimeContext.project_uuid]],
546                                      select=["uuid", "portable_data_hash", "created_at"]))
547
548         keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
549         for kr in keeprefs:
550             col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
551                                                   order="created_at desc",
552                                                    select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
553                                                    limit=1).execute()
554             if len(col["items"]) == 0:
555                 logger.warning("Cannot find collection with portable data hash %s", kr)
556                 continue
557             col = col["items"][0]
558             try:
559                 arvrunner.api.collections().create(body={"collection": {
560                     "owner_uuid": runtimeContext.project_uuid,
561                     "name": col["name"],
562                     "description": col["description"],
563                     "properties": col["properties"],
564                     "portable_data_hash": col["portable_data_hash"],
565                     "manifest_text": col["manifest_text"],
566                     "storage_classes_desired": col["storage_classes_desired"],
567                     "trash_at": col["trash_at"]
568                 }}, ensure_unique_name=True).execute()
569             except Exception as e:
570                 logger.warning("Unable copy collection to destination: %s", e)
571
572     if "$schemas" in workflowobj:
573         sch = CommentedSeq()
574         for s in workflowobj["$schemas"]:
575             if s in mapper:
576                 sch.append(mapper.mapper(s).resolved)
577         workflowobj["$schemas"] = sch
578
579     return mapper
580
581
582 def upload_docker(arvrunner, tool, runtimeContext):
583     """Uploads Docker images used in CommandLineTool objects."""
584
585     if isinstance(tool, CommandLineTool):
586         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
587         if docker_req:
588             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
589                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
590                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
591
592             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
593                                                        runtimeContext.project_uuid,
594                                                        runtimeContext.force_docker_pull,
595                                                        runtimeContext.tmp_outdir_prefix,
596                                                        runtimeContext.match_local_docker,
597                                                        runtimeContext.copy_deps)
598         else:
599             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
600                                                        True,
601                                                        runtimeContext.project_uuid,
602                                                        runtimeContext.force_docker_pull,
603                                                        runtimeContext.tmp_outdir_prefix,
604                                                        runtimeContext.match_local_docker,
605                                                        runtimeContext.copy_deps)
606     elif isinstance(tool, cwltool.workflow.Workflow):
607         for s in tool.steps:
608             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
609
610
611 def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
612     """Create a packed workflow.
613
614     A "packed" workflow is one where all the components have been combined into a single document."""
615
616     rewrites = {}
617     packed = pack(arvrunner.loadingContext, tool.tool["id"],
618                   rewrite_out=rewrites,
619                   loader=tool.doc_loader)
620
621     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
622
623     def visit(v, cur_id):
624         if isinstance(v, dict):
625             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
626                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
627                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
628                 if "id" in v:
629                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
630             if "path" in v and "location" not in v:
631                 v["location"] = v["path"]
632                 del v["path"]
633             if "location" in v and cur_id in merged_map:
634                 if v["location"] in merged_map[cur_id].resolved:
635                     v["location"] = merged_map[cur_id].resolved[v["location"]]
636                 if v["location"] in merged_map[cur_id].secondaryFiles:
637                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
638             if v.get("class") == "DockerRequirement":
639                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
640                                                                                                              runtimeContext.project_uuid,
641                                                                                                              runtimeContext.force_docker_pull,
642                                                                                                              runtimeContext.tmp_outdir_prefix,
643                                                                                                              runtimeContext.match_local_docker,
644                                                                                                              runtimeContext.copy_deps)
645             for l in v:
646                 visit(v[l], cur_id)
647         if isinstance(v, list):
648             for l in v:
649                 visit(l, cur_id)
650     visit(packed, None)
651
652     if git_info:
653         for g in git_info:
654             packed[g] = git_info[g]
655
656     return packed
657
658
659 def tag_git_version(packed):
660     if tool.tool["id"].startswith("file://"):
661         path = os.path.dirname(tool.tool["id"][7:])
662         try:
663             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
664         except (OSError, subprocess.CalledProcessError):
665             pass
666         else:
667             packed["http://schema.org/version"] = githash
668
669
670 def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
671     """Upload local files referenced in the input object and return updated input
672     object with 'location' updated to the proper keep references.
673     """
674
675     # Make a copy of the job order and set defaults.
676     builder_job_order = copy.copy(job_order)
677
678     # fill_in_defaults throws an error if there are any
679     # missing required parameters, we don't want it to do that
680     # so make them all optional.
681     inputs_copy = copy.deepcopy(tool.tool["inputs"])
682     for i in inputs_copy:
683         if "null" not in i["type"]:
684             i["type"] = ["null"] + aslist(i["type"])
685
686     fill_in_defaults(inputs_copy,
687                      builder_job_order,
688                      arvrunner.fs_access)
689     # Need to create a builder object to evaluate expressions.
690     builder = make_builder(builder_job_order,
691                            tool.hints,
692                            tool.requirements,
693                            ArvRuntimeContext(),
694                            tool.metadata)
695     # Now update job_order with secondaryFiles
696     discover_secondary_files(arvrunner.fs_access,
697                              builder,
698                              tool.tool["inputs"],
699                              job_order)
700
701     jobmapper = upload_dependencies(arvrunner,
702                                     name,
703                                     tool.doc_loader,
704                                     job_order,
705                                     job_order.get("id", "#"),
706                                     False,
707                                     runtimeContext)
708
709     if "id" in job_order:
710         del job_order["id"]
711
712     # Need to filter this out, gets added by cwltool when providing
713     # parameters on the command line.
714     if "job_order" in job_order:
715         del job_order["job_order"]
716
717     return job_order
718
719 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
720
721 def upload_workflow_deps(arvrunner, tool, runtimeContext):
722     # Ensure that Docker images needed by this workflow are available
723
724     with Perf(metrics, "upload_docker"):
725         upload_docker(arvrunner, tool, runtimeContext)
726
727     document_loader = tool.doc_loader
728
729     merged_map = {}
730     tool_dep_cache = {}
731     def upload_tool_deps(deptool):
732         if "id" in deptool:
733             discovered_secondaryfiles = {}
734             with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
735                 pm = upload_dependencies(arvrunner,
736                                          "%s dependencies" % (shortname(deptool["id"])),
737                                          document_loader,
738                                          deptool,
739                                          deptool["id"],
740                                          False,
741                                          runtimeContext,
742                                          include_primary=False,
743                                          discovered_secondaryfiles=discovered_secondaryfiles,
744                                          cache=tool_dep_cache)
745             document_loader.idx[deptool["id"]] = deptool
746             toolmap = {}
747             for k,v in pm.items():
748                 toolmap[k] = v.resolved
749             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
750
751     tool.visit(upload_tool_deps)
752
753     return merged_map
754
755 def arvados_jobs_image(arvrunner, img, runtimeContext):
756     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
757
758     try:
759         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
760                                                           True,
761                                                           runtimeContext.project_uuid,
762                                                           runtimeContext.force_docker_pull,
763                                                           runtimeContext.tmp_outdir_prefix,
764                                                           runtimeContext.match_local_docker,
765                                                           runtimeContext.copy_deps)
766     except Exception as e:
767         raise Exception("Docker image %s is not available\n%s" % (img, e) )
768
769
770 def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
771     collection = arvados.collection.Collection(api_client=arvrunner.api,
772                                                keep_client=arvrunner.keep_client,
773                                                num_retries=arvrunner.num_retries)
774     with collection.open("workflow.cwl", "w") as f:
775         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
776
777     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
778                ["name", "like", name+"%"]]
779     if runtimeContext.project_uuid:
780         filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
781     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
782
783     if exists["items"]:
784         logger.info("Using collection %s", exists["items"][0]["uuid"])
785     else:
786         collection.save_new(name=name,
787                             owner_uuid=runtimeContext.project_uuid,
788                             ensure_unique_name=True,
789                             num_retries=arvrunner.num_retries)
790         logger.info("Uploaded to %s", collection.manifest_locator())
791
792     return collection.portable_data_hash()
793
794
795 class Runner(Process):
796     """Base class for runner processes, which submit an instance of
797     arvados-cwl-runner and wait for the final result."""
798
799     def __init__(self, runner, updated_tool,
800                  tool, loadingContext, enable_reuse,
801                  output_name, output_tags, submit_runner_ram=0,
802                  name=None, on_error=None, submit_runner_image=None,
803                  intermediate_output_ttl=0, merged_map=None,
804                  priority=None, secret_store=None,
805                  collection_cache_size=256,
806                  collection_cache_is_default=True,
807                  git_info=None):
808
809         loadingContext = loadingContext.copy()
810         loadingContext.metadata = updated_tool.metadata.copy()
811
812         super(Runner, self).__init__(updated_tool.tool, loadingContext)
813
814         self.arvrunner = runner
815         self.embedded_tool = tool
816         self.job_order = None
817         self.running = False
818         if enable_reuse:
819             # If reuse is permitted by command line arguments but
820             # disabled by the workflow itself, disable it.
821             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
822             if reuse_req:
823                 enable_reuse = reuse_req["enableReuse"]
824         self.enable_reuse = enable_reuse
825         self.uuid = None
826         self.final_output = None
827         self.output_name = output_name
828         self.output_tags = output_tags
829         self.name = name
830         self.on_error = on_error
831         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
832         self.intermediate_output_ttl = intermediate_output_ttl
833         self.priority = priority
834         self.secret_store = secret_store
835         self.enable_dev = loadingContext.enable_dev
836         self.git_info = git_info
837
838         self.submit_runner_cores = 1
839         self.submit_runner_ram = 1024  # defaut 1 GiB
840         self.collection_cache_size = collection_cache_size
841
842         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
843         if runner_resource_req:
844             if runner_resource_req.get("coresMin"):
845                 self.submit_runner_cores = runner_resource_req["coresMin"]
846             if runner_resource_req.get("ramMin"):
847                 self.submit_runner_ram = runner_resource_req["ramMin"]
848             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
849                 self.collection_cache_size = runner_resource_req["keep_cache"]
850
851         if submit_runner_ram:
852             # Command line / initializer overrides default and/or spec from workflow
853             self.submit_runner_ram = submit_runner_ram
854
855         if self.submit_runner_ram <= 0:
856             raise Exception("Value of submit-runner-ram must be greater than zero")
857
858         if self.submit_runner_cores <= 0:
859             raise Exception("Value of submit-runner-cores must be greater than zero")
860
861         self.merged_map = merged_map or {}
862
863     def job(self,
864             job_order,         # type: Mapping[Text, Text]
865             output_callbacks,  # type: Callable[[Any, Any], Any]
866             runtimeContext     # type: RuntimeContext
867            ):  # type: (...) -> Generator[Any, None, None]
868         self.job_order = job_order
869         self._init_job(job_order, runtimeContext)
870         yield self
871
872     def update_pipeline_component(self, record):
873         pass
874
875     def done(self, record):
876         """Base method for handling a completed runner."""
877
878         try:
879             if record["state"] == "Complete":
880                 if record.get("exit_code") is not None:
881                     if record["exit_code"] == 33:
882                         processStatus = "UnsupportedRequirement"
883                     elif record["exit_code"] == 0:
884                         processStatus = "success"
885                     else:
886                         processStatus = "permanentFail"
887                 else:
888                     processStatus = "success"
889             else:
890                 processStatus = "permanentFail"
891
892             outputs = {}
893
894             if processStatus == "permanentFail":
895                 logc = arvados.collection.CollectionReader(record["log"],
896                                                            api_client=self.arvrunner.api,
897                                                            keep_client=self.arvrunner.keep_client,
898                                                            num_retries=self.arvrunner.num_retries)
899                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
900
901             self.final_output = record["output"]
902             outc = arvados.collection.CollectionReader(self.final_output,
903                                                        api_client=self.arvrunner.api,
904                                                        keep_client=self.arvrunner.keep_client,
905                                                        num_retries=self.arvrunner.num_retries)
906             if "cwl.output.json" in outc:
907                 with outc.open("cwl.output.json", "rb") as f:
908                     if f.size() > 0:
909                         outputs = json.loads(f.read().decode())
910             def keepify(fileobj):
911                 path = fileobj["location"]
912                 if not path.startswith("keep:"):
913                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
914             adjustFileObjs(outputs, keepify)
915             adjustDirObjs(outputs, keepify)
916         except Exception:
917             logger.exception("[%s] While getting final output object", self.name)
918             self.arvrunner.output_callback({}, "permanentFail")
919         else:
920             self.arvrunner.output_callback(outputs, processStatus)