18994: Fixing quoting WIP
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8 from past.builtins import basestring
9
10 import os
11 import sys
12 import re
13 import urllib.parse
14 from functools import partial
15 import logging
16 import json
17 import copy
18 from collections import namedtuple
19 from io import StringIO
20 from typing import Mapping, Sequence
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 from schema_salad.sourceline import SourceLine, cmap
28
29 from cwltool.command_line_tool import CommandLineTool
30 import cwltool.workflow
31 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
32                              shortname, Process, fill_in_defaults)
33 from cwltool.load_tool import fetch_document
34 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
35 from cwltool.builder import substitute
36 from cwltool.pack import pack
37 from cwltool.update import INTERNAL_VERSION
38 from cwltool.builder import Builder
39 import schema_salad.validate as validate
40
41 import arvados.collection
42 from .util import collectionUUID
43 from ruamel.yaml import YAML
44 from ruamel.yaml.comments import CommentedMap, CommentedSeq
45
46 import arvados_cwl.arvdocker
47 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
48 from ._version import __version__
49 from . import done
50 from . context import ArvRuntimeContext
51
52 logger = logging.getLogger('arvados.cwl-runner')
53
54 def trim_anonymous_location(obj):
55     """Remove 'location' field from File and Directory literals.
56
57     To make internal handling easier, literals are assigned a random id for
58     'location'.  However, when writing the record back out, this can break
59     reproducibility.  Since it is valid for literals not have a 'location'
60     field, remove it.
61
62     """
63
64     if obj.get("location", "").startswith("_:"):
65         del obj["location"]
66
67
68 def remove_redundant_fields(obj):
69     for field in ("path", "nameext", "nameroot", "dirname"):
70         if field in obj:
71             del obj[field]
72
73
74 def find_defaults(d, op):
75     if isinstance(d, list):
76         for i in d:
77             find_defaults(i, op)
78     elif isinstance(d, dict):
79         if "default" in d:
80             op(d)
81         else:
82             for i in viewvalues(d):
83                 find_defaults(i, op)
84
85 def make_builder(joborder, hints, requirements, runtimeContext, metadata):
86     return Builder(
87                  job=joborder,
88                  files=[],               # type: List[Dict[Text, Text]]
89                  bindings=[],            # type: List[Dict[Text, Any]]
90                  schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
91                  names=None,               # type: Names
92                  requirements=requirements,        # type: List[Dict[Text, Any]]
93                  hints=hints,               # type: List[Dict[Text, Any]]
94                  resources={},           # type: Dict[str, int]
95                  mutation_manager=None,    # type: Optional[MutationManager]
96                  formatgraph=None,         # type: Optional[Graph]
97                  make_fs_access=None,      # type: Type[StdFsAccess]
98                  fs_access=None,           # type: StdFsAccess
99                  job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
100                  timeout=runtimeContext.eval_timeout,             # type: float
101                  debug=runtimeContext.debug,               # type: bool
102                  js_console=runtimeContext.js_console,          # type: bool
103                  force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
104                  loadListing="",         # type: Text
105                  outdir="",              # type: Text
106                  tmpdir="",              # type: Text
107                  stagedir="",            # type: Text
108                  cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
109                  container_engine="docker"
110                 )
111
112 def search_schemadef(name, reqs):
113     for r in reqs:
114         if r["class"] == "SchemaDefRequirement":
115             for sd in r["types"]:
116                 if sd["name"] == name:
117                     return sd
118     return None
119
120 primitive_types_set = frozenset(("null", "boolean", "int", "long",
121                                  "float", "double", "string", "record",
122                                  "array", "enum"))
123
124 def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
125     if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
126         # union type, collect all possible secondaryFiles
127         for i in inputschema:
128             set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
129         return
130
131     if isinstance(inputschema, basestring):
132         sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
133         if sd:
134             inputschema = sd
135         else:
136             return
137
138     if "secondaryFiles" in inputschema:
139         # set secondaryFiles, may be inherited by compound types.
140         secondaryspec = inputschema["secondaryFiles"]
141
142     if (isinstance(inputschema["type"], (Mapping, Sequence)) and
143         not isinstance(inputschema["type"], basestring)):
144         # compound type (union, array, record)
145         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
146
147     elif (inputschema["type"] == "record" and
148           isinstance(primary, Mapping)):
149         #
150         # record type, find secondary files associated with fields.
151         #
152         for f in inputschema["fields"]:
153             p = primary.get(shortname(f["name"]))
154             if p:
155                 set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
156
157     elif (inputschema["type"] == "array" and
158           isinstance(primary, Sequence)):
159         #
160         # array type, find secondary files of elements
161         #
162         for p in primary:
163             set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
164
165     elif (inputschema["type"] == "File" and
166           secondaryspec and
167           isinstance(primary, Mapping) and
168           primary.get("class") == "File" and
169           "secondaryFiles" not in primary):
170         #
171         # Found a file, check for secondaryFiles
172         #
173         specs = []
174         primary["secondaryFiles"] = secondaryspec
175         for i, sf in enumerate(aslist(secondaryspec)):
176             if builder.cwlVersion == "v1.0":
177                 pattern = builder.do_eval(sf, context=primary)
178             else:
179                 pattern = builder.do_eval(sf["pattern"], context=primary)
180             if pattern is None:
181                 continue
182             if isinstance(pattern, list):
183                 specs.extend(pattern)
184             elif isinstance(pattern, dict):
185                 specs.append(pattern)
186             elif isinstance(pattern, str):
187                 if builder.cwlVersion == "v1.0":
188                     specs.append({"pattern": pattern, "required": True})
189                 else:
190                     specs.append({"pattern": pattern, "required": sf.get("required")})
191             else:
192                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
193                     "Expression must return list, object, string or null")
194
195         found = []
196         for i, sf in enumerate(specs):
197             if isinstance(sf, dict):
198                 if sf.get("class") == "File":
199                     pattern = None
200                     if sf.get("location") is None:
201                         raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
202                             "File object is missing 'location': %s" % sf)
203                     sfpath = sf["location"]
204                     required = True
205                 else:
206                     pattern = sf["pattern"]
207                     required = sf.get("required")
208             elif isinstance(sf, str):
209                 pattern = sf
210                 required = True
211             else:
212                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
213                     "Expression must return list, object, string or null")
214
215             if pattern is not None:
216                 sfpath = substitute(primary["location"], pattern)
217
218             required = builder.do_eval(required, context=primary)
219
220             if fsaccess.exists(sfpath):
221                 if pattern is not None:
222                     found.append({"location": sfpath, "class": "File"})
223                 else:
224                     found.append(sf)
225             elif required:
226                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
227                     "Required secondary file '%s' does not exist" % sfpath)
228
229         primary["secondaryFiles"] = cmap(found)
230         if discovered is not None:
231             discovered[primary["location"]] = primary["secondaryFiles"]
232     elif inputschema["type"] not in primitive_types_set:
233         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
234
235 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
236     for inputschema in inputs:
237         primary = job_order.get(shortname(inputschema["id"]))
238         if isinstance(primary, (Mapping, Sequence)):
239             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
240
241 def upload_dependencies(arvrunner, name, document_loader,
242                         workflowobj, uri, loadref_run,
243                         include_primary=True, discovered_secondaryfiles=None):
244     """Upload the dependencies of the workflowobj document to Keep.
245
246     Returns a pathmapper object mapping local paths to keep references.  Also
247     does an in-place update of references in "workflowobj".
248
249     Use scandeps to find $import, $include, $schemas, run, File and Directory
250     fields that represent external references.
251
252     If workflowobj has an "id" field, this will reload the document to ensure
253     it is scanning the raw document prior to preprocessing.
254     """
255
256     loaded = set()
257     def loadref(b, u):
258         joined = document_loader.fetcher.urljoin(b, u)
259         defrg, _ = urllib.parse.urldefrag(joined)
260         if defrg not in loaded:
261             loaded.add(defrg)
262             # Use fetch_text to get raw file (before preprocessing).
263             text = document_loader.fetch_text(defrg)
264             if isinstance(text, bytes):
265                 textIO = StringIO(text.decode('utf-8'))
266             else:
267                 textIO = StringIO(text)
268             yamlloader = YAML(typ='safe', pure=True)
269             return yamlloader.load(textIO)
270         else:
271             return {}
272
273     if loadref_run:
274         loadref_fields = set(("$import", "run"))
275     else:
276         loadref_fields = set(("$import",))
277
278     scanobj = workflowobj
279     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
280         # Need raw file content (before preprocessing) to ensure
281         # that external references in $include and $mixin are captured.
282         scanobj = loadref("", workflowobj["id"])
283
284     metadata = scanobj
285
286     sc_result = scandeps(uri, scanobj,
287                          loadref_fields,
288                          set(("$include", "location")),
289                          loadref, urljoin=document_loader.fetcher.urljoin,
290                          nestdirs=False)
291
292     optional_deps = scandeps(uri, scanobj,
293                                   loadref_fields,
294                                   set(("$schemas",)),
295                                   loadref, urljoin=document_loader.fetcher.urljoin,
296                                   nestdirs=False)
297
298     sc_result.extend(optional_deps)
299
300     sc = []
301     uuids = {}
302
303     def collect_uuids(obj):
304         loc = obj.get("location", "")
305         sp = loc.split(":")
306         if sp[0] == "keep":
307             # Collect collection uuids that need to be resolved to
308             # portable data hashes
309             gp = collection_uuid_pattern.match(loc)
310             if gp:
311                 uuids[gp.groups()[0]] = obj
312             if collectionUUID in obj:
313                 uuids[obj[collectionUUID]] = obj
314
315     def collect_uploads(obj):
316         loc = obj.get("location", "")
317         sp = loc.split(":")
318         if len(sp) < 1:
319             return
320         if sp[0] in ("file", "http", "https"):
321             # Record local files than need to be uploaded,
322             # don't include file literals, keep references, etc.
323             sc.append(obj)
324         collect_uuids(obj)
325
326     visit_class(workflowobj, ("File", "Directory"), collect_uuids)
327     visit_class(sc_result, ("File", "Directory"), collect_uploads)
328
329     # Resolve any collection uuids we found to portable data hashes
330     # and assign them to uuid_map
331     uuid_map = {}
332     fetch_uuids = list(uuids.keys())
333     while fetch_uuids:
334         # For a large number of fetch_uuids, API server may limit
335         # response size, so keep fetching from API server has nothing
336         # more to give us.
337         lookups = arvrunner.api.collections().list(
338             filters=[["uuid", "in", fetch_uuids]],
339             count="none",
340             select=["uuid", "portable_data_hash"]).execute(
341                 num_retries=arvrunner.num_retries)
342
343         if not lookups["items"]:
344             break
345
346         for l in lookups["items"]:
347             uuid_map[l["uuid"]] = l["portable_data_hash"]
348
349         fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
350
351     normalizeFilesDirs(sc)
352
353     if include_primary and "id" in workflowobj:
354         sc.append({"class": "File", "location": workflowobj["id"]})
355
356     #if "$schemas" in workflowobj:
357     #    for s in workflowobj["$schemas"]:
358     #        sc.append({"class": "File", "location": s})
359
360     def visit_default(obj):
361         #remove = [False]
362         def ensure_default_location(f):
363             if "location" not in f and "path" in f:
364                 f["location"] = f["path"]
365                 del f["path"]
366             optional_deps.append(f)
367             #if "location" in f and not arvrunner.fs_access.exists(f["location"]):
368             #    # Doesn't exist, remove from list of dependencies to upload
369             #    sc[:] = [x for x in sc if x["location"] != f["location"]]
370             #    # Delete "default" from workflowobj
371             #    remove[0] = True
372         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
373         #if remove[0]:
374         #    del obj["default"]
375
376     find_defaults(workflowobj, visit_default)
377
378     discovered = {}
379     def discover_default_secondary_files(obj):
380         builder_job_order = {}
381         for t in obj["inputs"]:
382             builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
383         # Need to create a builder object to evaluate expressions.
384         builder = make_builder(builder_job_order,
385                                obj.get("hints", []),
386                                obj.get("requirements", []),
387                                ArvRuntimeContext(),
388                                metadata)
389         discover_secondary_files(arvrunner.fs_access,
390                                  builder,
391                                  obj["inputs"],
392                                  builder_job_order,
393                                  discovered)
394
395     copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
396     visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
397
398     for d in list(discovered):
399         # Only interested in discovered secondaryFiles which are local
400         # files that need to be uploaded.
401         if d.startswith("file:"):
402             sc.extend(discovered[d])
403         else:
404             del discovered[d]
405
406     print("NN", sc)
407
408     mapper = ArvPathMapper(arvrunner, sc, "",
409                            "keep:%s",
410                            "keep:%s/%s",
411                            name=name,
412                            single_collection=True,
413                            optional_deps=optional_deps)
414
415     print("whargh", mapper._pathmap)
416
417     def setloc(p):
418         loc = p.get("location")
419         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
420             p["location"] = mapper.mapper(p["location"]).resolved
421             return
422
423         if not loc:
424             return
425
426         if collectionUUID in p:
427             uuid = p[collectionUUID]
428             if uuid not in uuid_map:
429                 raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
430                     "Collection uuid %s not found" % uuid)
431             gp = collection_pdh_pattern.match(loc)
432             if gp and uuid_map[uuid] != gp.groups()[0]:
433                 # This file entry has both collectionUUID and a PDH
434                 # location. If the PDH doesn't match the one returned
435                 # the API server, raise an error.
436                 raise SourceLine(p, "location", validate.ValidationException).makeError(
437                     "Expected collection uuid %s to be %s but API server reported %s" % (
438                         uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
439
440         gp = collection_uuid_pattern.match(loc)
441         if not gp:
442             return
443         uuid = gp.groups()[0]
444         if uuid not in uuid_map:
445             raise SourceLine(p, "location", validate.ValidationException).makeError(
446                 "Collection uuid %s not found" % uuid)
447         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
448         p[collectionUUID] = uuid
449
450     visit_class(workflowobj, ("File", "Directory"), setloc)
451     visit_class(discovered, ("File", "Directory"), setloc)
452
453     if discovered_secondaryfiles is not None:
454         for d in discovered:
455             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
456
457     if "$schemas" in workflowobj:
458         sch = CommentedSeq()
459         for s in workflowobj["$schemas"]:
460             if s in mapper:
461                 sch.append(mapper.mapper(s).resolved)
462         workflowobj["$schemas"] = sch
463
464     return mapper
465
466
467 def upload_docker(arvrunner, tool):
468     """Uploads Docker images used in CommandLineTool objects."""
469
470     if isinstance(tool, CommandLineTool):
471         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
472         if docker_req:
473             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
474                 # TODO: can be supported by containers API, but not jobs API.
475                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
476                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
477             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
478                                                        arvrunner.runtimeContext.force_docker_pull,
479                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
480                                                        arvrunner.runtimeContext.match_local_docker)
481         else:
482             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
483                                                        True, arvrunner.project_uuid,
484                                                        arvrunner.runtimeContext.force_docker_pull,
485                                                        arvrunner.runtimeContext.tmp_outdir_prefix,
486                                                        arvrunner.runtimeContext.match_local_docker)
487     elif isinstance(tool, cwltool.workflow.Workflow):
488         for s in tool.steps:
489             upload_docker(arvrunner, s.embedded_tool)
490
491
492 def packed_workflow(arvrunner, tool, merged_map):
493     """Create a packed workflow.
494
495     A "packed" workflow is one where all the components have been combined into a single document."""
496
497     rewrites = {}
498     packed = pack(arvrunner.loadingContext, tool.tool["id"],
499                   rewrite_out=rewrites,
500                   loader=tool.doc_loader)
501
502     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
503
504     def visit(v, cur_id):
505         if isinstance(v, dict):
506             if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
507                 if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
508                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
509                 if "id" in v:
510                     cur_id = rewrite_to_orig.get(v["id"], v["id"])
511             if "path" in v and "location" not in v:
512                 v["location"] = v["path"]
513                 del v["path"]
514             if "location" in v and cur_id in merged_map:
515                 if v["location"] in merged_map[cur_id].resolved:
516                     v["location"] = merged_map[cur_id].resolved[v["location"]]
517                 if v["location"] in merged_map[cur_id].secondaryFiles:
518                     v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
519             if v.get("class") == "DockerRequirement":
520                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
521                                                                                                              arvrunner.project_uuid,
522                                                                                                              arvrunner.runtimeContext.force_docker_pull,
523                                                                                                              arvrunner.runtimeContext.tmp_outdir_prefix,
524                                                                                                              arvrunner.runtimeContext.match_local_docker)
525             for l in v:
526                 visit(v[l], cur_id)
527         if isinstance(v, list):
528             for l in v:
529                 visit(l, cur_id)
530     visit(packed, None)
531     return packed
532
533
534 def tag_git_version(packed):
535     if tool.tool["id"].startswith("file://"):
536         path = os.path.dirname(tool.tool["id"][7:])
537         try:
538             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
539         except (OSError, subprocess.CalledProcessError):
540             pass
541         else:
542             packed["http://schema.org/version"] = githash
543
544
545 def upload_job_order(arvrunner, name, tool, job_order):
546     """Upload local files referenced in the input object and return updated input
547     object with 'location' updated to the proper keep references.
548     """
549
550     # Make a copy of the job order and set defaults.
551     builder_job_order = copy.copy(job_order)
552
553     # fill_in_defaults throws an error if there are any
554     # missing required parameters, we don't want it to do that
555     # so make them all optional.
556     inputs_copy = copy.deepcopy(tool.tool["inputs"])
557     for i in inputs_copy:
558         if "null" not in i["type"]:
559             i["type"] = ["null"] + aslist(i["type"])
560
561     fill_in_defaults(inputs_copy,
562                      builder_job_order,
563                      arvrunner.fs_access)
564     # Need to create a builder object to evaluate expressions.
565     builder = make_builder(builder_job_order,
566                            tool.hints,
567                            tool.requirements,
568                            ArvRuntimeContext(),
569                            tool.metadata)
570     # Now update job_order with secondaryFiles
571     discover_secondary_files(arvrunner.fs_access,
572                              builder,
573                              tool.tool["inputs"],
574                              job_order)
575
576     jobmapper = upload_dependencies(arvrunner,
577                                     name,
578                                     tool.doc_loader,
579                                     job_order,
580                                     job_order.get("id", "#"),
581                                     False)
582
583     if "id" in job_order:
584         del job_order["id"]
585
586     # Need to filter this out, gets added by cwltool when providing
587     # parameters on the command line.
588     if "job_order" in job_order:
589         del job_order["job_order"]
590
591     return job_order
592
593 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
594
595 def upload_workflow_deps(arvrunner, tool):
596     # Ensure that Docker images needed by this workflow are available
597
598     upload_docker(arvrunner, tool)
599
600     document_loader = tool.doc_loader
601
602     merged_map = {}
603
604     def upload_tool_deps(deptool):
605         if "id" in deptool:
606             discovered_secondaryfiles = {}
607             pm = upload_dependencies(arvrunner,
608                                      "%s dependencies" % (shortname(deptool["id"])),
609                                      document_loader,
610                                      deptool,
611                                      deptool["id"],
612                                      False,
613                                      include_primary=False,
614                                      discovered_secondaryfiles=discovered_secondaryfiles)
615             document_loader.idx[deptool["id"]] = deptool
616             toolmap = {}
617             for k,v in pm.items():
618                 toolmap[k] = v.resolved
619             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
620
621     tool.visit(upload_tool_deps)
622
623     return merged_map
624
625 def arvados_jobs_image(arvrunner, img):
626     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
627
628     try:
629         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
630                                                           arvrunner.runtimeContext.force_docker_pull,
631                                                           arvrunner.runtimeContext.tmp_outdir_prefix,
632                                                           arvrunner.runtimeContext.match_local_docker)
633     except Exception as e:
634         raise Exception("Docker image %s is not available\n%s" % (img, e) )
635
636
637 def upload_workflow_collection(arvrunner, name, packed):
638     collection = arvados.collection.Collection(api_client=arvrunner.api,
639                                                keep_client=arvrunner.keep_client,
640                                                num_retries=arvrunner.num_retries)
641     with collection.open("workflow.cwl", "w") as f:
642         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
643
644     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
645                ["name", "like", name+"%"]]
646     if arvrunner.project_uuid:
647         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
648     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
649
650     if exists["items"]:
651         logger.info("Using collection %s", exists["items"][0]["uuid"])
652     else:
653         collection.save_new(name=name,
654                             owner_uuid=arvrunner.project_uuid,
655                             ensure_unique_name=True,
656                             num_retries=arvrunner.num_retries)
657         logger.info("Uploaded to %s", collection.manifest_locator())
658
659     return collection.portable_data_hash()
660
661
662 class Runner(Process):
663     """Base class for runner processes, which submit an instance of
664     arvados-cwl-runner and wait for the final result."""
665
666     def __init__(self, runner, updated_tool,
667                  tool, loadingContext, enable_reuse,
668                  output_name, output_tags, submit_runner_ram=0,
669                  name=None, on_error=None, submit_runner_image=None,
670                  intermediate_output_ttl=0, merged_map=None,
671                  priority=None, secret_store=None,
672                  collection_cache_size=256,
673                  collection_cache_is_default=True):
674
675         loadingContext = loadingContext.copy()
676         loadingContext.metadata = updated_tool.metadata.copy()
677
678         super(Runner, self).__init__(updated_tool.tool, loadingContext)
679
680         self.arvrunner = runner
681         self.embedded_tool = tool
682         self.job_order = None
683         self.running = False
684         if enable_reuse:
685             # If reuse is permitted by command line arguments but
686             # disabled by the workflow itself, disable it.
687             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
688             if reuse_req:
689                 enable_reuse = reuse_req["enableReuse"]
690         self.enable_reuse = enable_reuse
691         self.uuid = None
692         self.final_output = None
693         self.output_name = output_name
694         self.output_tags = output_tags
695         self.name = name
696         self.on_error = on_error
697         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
698         self.intermediate_output_ttl = intermediate_output_ttl
699         self.priority = priority
700         self.secret_store = secret_store
701         self.enable_dev = loadingContext.enable_dev
702
703         self.submit_runner_cores = 1
704         self.submit_runner_ram = 1024  # defaut 1 GiB
705         self.collection_cache_size = collection_cache_size
706
707         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
708         if runner_resource_req:
709             if runner_resource_req.get("coresMin"):
710                 self.submit_runner_cores = runner_resource_req["coresMin"]
711             if runner_resource_req.get("ramMin"):
712                 self.submit_runner_ram = runner_resource_req["ramMin"]
713             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
714                 self.collection_cache_size = runner_resource_req["keep_cache"]
715
716         if submit_runner_ram:
717             # Command line / initializer overrides default and/or spec from workflow
718             self.submit_runner_ram = submit_runner_ram
719
720         if self.submit_runner_ram <= 0:
721             raise Exception("Value of submit-runner-ram must be greater than zero")
722
723         if self.submit_runner_cores <= 0:
724             raise Exception("Value of submit-runner-cores must be greater than zero")
725
726         self.merged_map = merged_map or {}
727
728     def job(self,
729             job_order,         # type: Mapping[Text, Text]
730             output_callbacks,  # type: Callable[[Any, Any], Any]
731             runtimeContext     # type: RuntimeContext
732            ):  # type: (...) -> Generator[Any, None, None]
733         self.job_order = job_order
734         self._init_job(job_order, runtimeContext)
735         yield self
736
737     def update_pipeline_component(self, record):
738         pass
739
740     def done(self, record):
741         """Base method for handling a completed runner."""
742
743         try:
744             if record["state"] == "Complete":
745                 if record.get("exit_code") is not None:
746                     if record["exit_code"] == 33:
747                         processStatus = "UnsupportedRequirement"
748                     elif record["exit_code"] == 0:
749                         processStatus = "success"
750                     else:
751                         processStatus = "permanentFail"
752                 else:
753                     processStatus = "success"
754             else:
755                 processStatus = "permanentFail"
756
757             outputs = {}
758
759             if processStatus == "permanentFail":
760                 logc = arvados.collection.CollectionReader(record["log"],
761                                                            api_client=self.arvrunner.api,
762                                                            keep_client=self.arvrunner.keep_client,
763                                                            num_retries=self.arvrunner.num_retries)
764                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
765
766             self.final_output = record["output"]
767             outc = arvados.collection.CollectionReader(self.final_output,
768                                                        api_client=self.arvrunner.api,
769                                                        keep_client=self.arvrunner.keep_client,
770                                                        num_retries=self.arvrunner.num_retries)
771             if "cwl.output.json" in outc:
772                 with outc.open("cwl.output.json", "rb") as f:
773                     if f.size() > 0:
774                         outputs = json.loads(f.read().decode())
775             def keepify(fileobj):
776                 path = fileobj["location"]
777                 if not path.startswith("keep:"):
778                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
779             adjustFileObjs(outputs, keepify)
780             adjustDirObjs(outputs, keepify)
781         except Exception:
782             logger.exception("[%s] While getting final output object", self.name)
783             self.arvrunner.output_callback({}, "permanentFail")
784         else:
785             self.arvrunner.output_callback(outputs, processStatus)