18773: --match-submitter-images to compare the local docker image with Arvados
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21
22 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 from schema_salad.sourceline import SourceLine
25 import schema_salad.validate as validate
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 import arvados_cwl.util
33 from .arvcontainer import RunnerContainer
34 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
35 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
36 from .arvworkflow import ArvadosWorkflow, upload_workflow
37 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
38 from .perf import Perf
39 from .pathmapper import NoFollowPathMapper
40 from cwltool.task_queue import TaskQueue
41 from .context import ArvLoadingContext, ArvRuntimeContext
42 from ._version import __version__
43
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
46 from cwltool.command_line_tool import compute_checksums
47 from cwltool.load_tool import load_tool
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51
52 DEFAULT_PRIORITY = 500
53
54 class RuntimeStatusLoggingHandler(logging.Handler):
55     """
56     Intercepts logging calls and report them as runtime statuses on runner
57     containers.
58     """
59     def __init__(self, runtime_status_update_func):
60         super(RuntimeStatusLoggingHandler, self).__init__()
61         self.runtime_status_update = runtime_status_update_func
62         self.updatingRuntimeStatus = False
63
64     def emit(self, record):
65         kind = None
66         if record.levelno >= logging.ERROR:
67             kind = 'error'
68         elif record.levelno >= logging.WARNING:
69             kind = 'warning'
70         if kind is not None and self.updatingRuntimeStatus is not True:
71             self.updatingRuntimeStatus = True
72             try:
73                 log_msg = record.getMessage()
74                 if '\n' in log_msg:
75                     # If the logged message is multi-line, use its first line as status
76                     # and the rest as detail.
77                     status, detail = log_msg.split('\n', 1)
78                     self.runtime_status_update(
79                         kind,
80                         "%s: %s" % (record.name, status),
81                         detail
82                     )
83                 else:
84                     self.runtime_status_update(
85                         kind,
86                         "%s: %s" % (record.name, record.getMessage())
87                     )
88             finally:
89                 self.updatingRuntimeStatus = False
90
91
92 class ArvCwlExecutor(object):
93     """Execute a CWL tool or workflow, submit work (using containers API),
94     wait for them to complete, and report output.
95
96     """
97
98     def __init__(self, api_client,
99                  arvargs=None,
100                  keep_client=None,
101                  num_retries=4,
102                  thread_count=4,
103                  stdout=sys.stdout):
104
105         if arvargs is None:
106             arvargs = argparse.Namespace()
107             arvargs.work_api = None
108             arvargs.output_name = None
109             arvargs.output_tags = None
110             arvargs.thread_count = 1
111             arvargs.collection_cache_size = None
112
113         self.api = api_client
114         self.processes = {}
115         self.workflow_eval_lock = threading.Condition(threading.RLock())
116         self.final_output = None
117         self.final_status = None
118         self.num_retries = num_retries
119         self.uuid = None
120         self.stop_polling = threading.Event()
121         self.poll_api = None
122         self.pipeline = None
123         self.final_output_collection = None
124         self.output_name = arvargs.output_name
125         self.output_tags = arvargs.output_tags
126         self.project_uuid = None
127         self.intermediate_output_ttl = 0
128         self.intermediate_output_collections = []
129         self.trash_intermediate = False
130         self.thread_count = arvargs.thread_count
131         self.poll_interval = 12
132         self.loadingContext = None
133         self.should_estimate_cache_size = True
134         self.fs_access = None
135         self.secret_store = None
136         self.stdout = stdout
137
138         if keep_client is not None:
139             self.keep_client = keep_client
140         else:
141             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
142
143         if arvargs.collection_cache_size:
144             collection_cache_size = arvargs.collection_cache_size*1024*1024
145             self.should_estimate_cache_size = False
146         else:
147             collection_cache_size = 256*1024*1024
148
149         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
150                                                 cap=collection_cache_size)
151
152         self.fetcher_constructor = partial(CollectionFetcher,
153                                            api_client=self.api,
154                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
155                                            num_retries=self.num_retries)
156
157         self.work_api = None
158         expected_api = ["containers"]
159         for api in expected_api:
160             try:
161                 methods = self.api._rootDesc.get('resources')[api]['methods']
162                 if ('httpMethod' in methods['create'] and
163                     (arvargs.work_api == api or arvargs.work_api is None)):
164                     self.work_api = api
165                     break
166             except KeyError:
167                 pass
168
169         if not self.work_api:
170             if arvargs.work_api is None:
171                 raise Exception("No supported APIs")
172             else:
173                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
174
175         if self.work_api == "jobs":
176             logger.error("""
177 *******************************
178 The 'jobs' API is no longer supported.
179 *******************************""")
180             exit(1)
181
182         self.loadingContext = ArvLoadingContext(vars(arvargs))
183         self.loadingContext.fetcher_constructor = self.fetcher_constructor
184         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
185         self.loadingContext.construct_tool_object = self.arv_make_tool
186
187         # Add a custom logging handler to the root logger for runtime status reporting
188         # if running inside a container
189         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
190             root_logger = logging.getLogger('')
191
192             # Remove existing RuntimeStatusLoggingHandlers if they exist
193             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
194             root_logger.handlers = handlers
195
196             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
197             root_logger.addHandler(handler)
198
199         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
200         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
201                                                      collection_cache=self.collection_cache)
202
203         validate_cluster_target(self, self.runtimeContext)
204
205
206     def arv_make_tool(self, toolpath_object, loadingContext):
207         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
208             return ArvadosCommandTool(self, toolpath_object, loadingContext)
209         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
210             return ArvadosWorkflow(self, toolpath_object, loadingContext)
211         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
212             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
213         else:
214             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
215
216     def output_callback(self, out, processStatus):
217         with self.workflow_eval_lock:
218             if processStatus == "success":
219                 logger.info("Overall process status is %s", processStatus)
220                 state = "Complete"
221             else:
222                 logger.error("Overall process status is %s", processStatus)
223                 state = "Failed"
224             if self.pipeline:
225                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
226                                                         body={"state": state}).execute(num_retries=self.num_retries)
227             self.final_status = processStatus
228             self.final_output = out
229             self.workflow_eval_lock.notifyAll()
230
231
232     def start_run(self, runnable, runtimeContext):
233         self.task_queue.add(partial(runnable.run, runtimeContext),
234                             self.workflow_eval_lock, self.stop_polling)
235
236     def process_submitted(self, container):
237         with self.workflow_eval_lock:
238             self.processes[container.uuid] = container
239
240     def process_done(self, uuid, record):
241         with self.workflow_eval_lock:
242             j = self.processes[uuid]
243             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
244             self.task_queue.add(partial(j.done, record),
245                                 self.workflow_eval_lock, self.stop_polling)
246             del self.processes[uuid]
247
248     def runtime_status_update(self, kind, message, detail=None):
249         """
250         Updates the runtime_status field on the runner container.
251         Called when there's a need to report errors, warnings or just
252         activity statuses, for example in the RuntimeStatusLoggingHandler.
253         """
254         with self.workflow_eval_lock:
255             current = None
256             try:
257                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
258             except Exception as e:
259                 logger.info("Couldn't get current container: %s", e)
260             if current is None:
261                 return
262             runtime_status = current.get('runtime_status', {})
263             # In case of status being an error, only report the first one.
264             if kind == 'error':
265                 if not runtime_status.get('error'):
266                     runtime_status.update({
267                         'error': message
268                     })
269                     if detail is not None:
270                         runtime_status.update({
271                             'errorDetail': detail
272                         })
273                 # Further errors are only mentioned as a count.
274                 else:
275                     # Get anything before an optional 'and N more' string.
276                     try:
277                         error_msg = re.match(
278                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
279                         more_failures = re.match(
280                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
281                     except TypeError:
282                         # Ignore tests stubbing errors
283                         return
284                     if more_failures:
285                         failure_qty = int(more_failures.groups()[0])
286                         runtime_status.update({
287                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
288                         })
289                     else:
290                         runtime_status.update({
291                             'error': "%s (and 1 more)" % error_msg
292                         })
293             elif kind in ['warning', 'activity']:
294                 # Record the last warning/activity status without regard of
295                 # previous occurences.
296                 runtime_status.update({
297                     kind: message
298                 })
299                 if detail is not None:
300                     runtime_status.update({
301                         kind+"Detail": detail
302                     })
303             else:
304                 # Ignore any other status kind
305                 return
306             try:
307                 self.api.containers().update(uuid=current['uuid'],
308                                             body={
309                                                 'runtime_status': runtime_status,
310                                             }).execute(num_retries=self.num_retries)
311             except Exception as e:
312                 logger.info("Couldn't update runtime_status: %s", e)
313
314     def wrapped_callback(self, cb, obj, st):
315         with self.workflow_eval_lock:
316             cb(obj, st)
317             self.workflow_eval_lock.notifyAll()
318
319     def get_wrapped_callback(self, cb):
320         return partial(self.wrapped_callback, cb)
321
322     def on_message(self, event):
323         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
324             uuid = event["object_uuid"]
325             if event["properties"]["new_attributes"]["state"] == "Running":
326                 with self.workflow_eval_lock:
327                     j = self.processes[uuid]
328                     if j.running is False:
329                         j.running = True
330                         j.update_pipeline_component(event["properties"]["new_attributes"])
331                         logger.info("%s %s is Running", self.label(j), uuid)
332             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
333                 self.process_done(uuid, event["properties"]["new_attributes"])
334
335     def label(self, obj):
336         return "[%s %s]" % (self.work_api[0:-1], obj.name)
337
338     def poll_states(self):
339         """Poll status of containers listed in the processes dict.
340
341         Runs in a separate thread.
342         """
343
344         try:
345             remain_wait = self.poll_interval
346             while True:
347                 if remain_wait > 0:
348                     self.stop_polling.wait(remain_wait)
349                 if self.stop_polling.is_set():
350                     break
351                 with self.workflow_eval_lock:
352                     keys = list(self.processes)
353                 if not keys:
354                     remain_wait = self.poll_interval
355                     continue
356
357                 begin_poll = time.time()
358                 if self.work_api == "containers":
359                     table = self.poll_api.container_requests()
360
361                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
362
363                 while keys:
364                     page = keys[:pageSize]
365                     try:
366                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
367                     except Exception:
368                         logger.exception("Error checking states on API server: %s")
369                         remain_wait = self.poll_interval
370                         continue
371
372                     for p in proc_states["items"]:
373                         self.on_message({
374                             "object_uuid": p["uuid"],
375                             "event_type": "update",
376                             "properties": {
377                                 "new_attributes": p
378                             }
379                         })
380                     keys = keys[pageSize:]
381
382                 finish_poll = time.time()
383                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
384         except:
385             logger.exception("Fatal error in state polling thread.")
386             with self.workflow_eval_lock:
387                 self.processes.clear()
388                 self.workflow_eval_lock.notifyAll()
389         finally:
390             self.stop_polling.set()
391
392     def add_intermediate_output(self, uuid):
393         if uuid:
394             self.intermediate_output_collections.append(uuid)
395
396     def trash_intermediate_output(self):
397         logger.info("Cleaning up intermediate output collections")
398         for i in self.intermediate_output_collections:
399             try:
400                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
401             except Exception:
402                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
403             except (KeyboardInterrupt, SystemExit):
404                 break
405
406     def check_features(self, obj, parentfield=""):
407         if isinstance(obj, dict):
408             if obj.get("class") == "DockerRequirement":
409                 if obj.get("dockerOutputDirectory"):
410                     if not obj.get("dockerOutputDirectory").startswith('/'):
411                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
412                             "Option 'dockerOutputDirectory' must be an absolute path.")
413             if obj.get("class") == "InplaceUpdateRequirement":
414                 if obj["inplaceUpdate"] and parentfield == "requirements":
415                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
416             for k,v in viewitems(obj):
417                 self.check_features(v, parentfield=k)
418         elif isinstance(obj, list):
419             for i,v in enumerate(obj):
420                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
421                     self.check_features(v, parentfield=parentfield)
422
423     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
424         outputObj = copy.deepcopy(outputObj)
425
426         files = []
427         def capture(fileobj):
428             files.append(fileobj)
429
430         adjustDirObjs(outputObj, capture)
431         adjustFileObjs(outputObj, capture)
432
433         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
434
435         final = arvados.collection.Collection(api_client=self.api,
436                                               keep_client=self.keep_client,
437                                               num_retries=self.num_retries)
438
439         for k,v in generatemapper.items():
440             if v.type == "Directory" and v.resolved.startswith("_:"):
441                     continue
442             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
443                 with final.open(v.target, "wb") as f:
444                     f.write(v.resolved.encode("utf-8"))
445                     continue
446
447             if not v.resolved.startswith("keep:"):
448                 raise Exception("Output source is not in keep or a literal")
449             sp = v.resolved.split("/")
450             srccollection = sp[0][5:]
451             try:
452                 reader = self.collection_cache.get(srccollection)
453                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
454                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
455             except arvados.errors.ArgumentError as e:
456                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
457                 raise
458             except IOError as e:
459                 logger.error("While preparing output collection: %s", e)
460                 raise
461
462         def rewrite(fileobj):
463             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
464             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
465                 if k in fileobj:
466                     del fileobj[k]
467
468         adjustDirObjs(outputObj, rewrite)
469         adjustFileObjs(outputObj, rewrite)
470
471         with final.open("cwl.output.json", "w") as f:
472             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
473             f.write(res)
474
475         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
476
477         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
478                     final.api_response()["name"],
479                     final.manifest_locator())
480
481         final_uuid = final.manifest_locator()
482         tags = tagsString.split(',')
483         for tag in tags:
484              self.api.links().create(body={
485                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
486                 }).execute(num_retries=self.num_retries)
487
488         def finalcollection(fileobj):
489             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
490
491         adjustDirObjs(outputObj, finalcollection)
492         adjustFileObjs(outputObj, finalcollection)
493
494         return (outputObj, final)
495
496     def set_crunch_output(self):
497         if self.work_api == "containers":
498             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
499             if current is None:
500                 return
501             try:
502                 self.api.containers().update(uuid=current['uuid'],
503                                              body={
504                                                  'output': self.final_output_collection.portable_data_hash(),
505                                              }).execute(num_retries=self.num_retries)
506                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
507                                               body={
508                                                   'is_trashed': True
509                                               }).execute(num_retries=self.num_retries)
510             except Exception:
511                 logger.exception("Setting container output")
512                 raise
513
514     def apply_reqs(self, job_order_object, tool):
515         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
516             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
517                 raise WorkflowException(
518                     "`cwl:requirements` in the input object is not part of CWL "
519                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
520                     "can set the cwlVersion to v1.1 or greater and re-run with "
521                     "--enable-dev.")
522             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
523             for req in job_reqs:
524                 tool.requirements.append(req)
525
526     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
527         self.debug = runtimeContext.debug
528
529         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
530         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
531         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
532         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
533
534         updated_tool.visit(self.check_features)
535
536         self.project_uuid = runtimeContext.project_uuid
537         self.pipeline = None
538         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
539         self.secret_store = runtimeContext.secret_store
540
541         self.trash_intermediate = runtimeContext.trash_intermediate
542         if self.trash_intermediate and self.work_api != "containers":
543             raise Exception("--trash-intermediate is only supported with --api=containers.")
544
545         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
546         if self.intermediate_output_ttl and self.work_api != "containers":
547             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
548         if self.intermediate_output_ttl < 0:
549             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
550
551         if runtimeContext.submit_request_uuid and self.work_api != "containers":
552             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
553
554         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
555         if runtimeContext.storage_classes == "default":
556             runtimeContext.storage_classes = default_storage_classes
557         if runtimeContext.intermediate_storage_classes == "default":
558             runtimeContext.intermediate_storage_classes = default_storage_classes
559
560         if not runtimeContext.name:
561             runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
562
563         # Upload local file references in the job order.
564         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
565                                      updated_tool, job_order)
566
567         # the last clause means: if it is a command line tool, and we
568         # are going to wait for the result, and always_submit_runner
569         # is false, then we don't submit a runner process.
570
571         submitting = (runtimeContext.update_workflow or
572                       runtimeContext.create_workflow or
573                       (runtimeContext.submit and not
574                        (updated_tool.tool["class"] == "CommandLineTool" and
575                         runtimeContext.wait and
576                         not runtimeContext.always_submit_runner)))
577
578         loadingContext = self.loadingContext.copy()
579         loadingContext.do_validate = False
580         if submitting:
581             loadingContext.do_update = False
582             # Document may have been auto-updated. Reload the original
583             # document with updating disabled because we want to
584             # submit the document with its original CWL version, not
585             # the auto-updated one.
586             tool = load_tool(updated_tool.tool["id"], loadingContext)
587         else:
588             tool = updated_tool
589
590         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
591         # Also uploads docker images.
592         merged_map = upload_workflow_deps(self, tool)
593
594         # Recreate process object (ArvadosWorkflow or
595         # ArvadosCommandTool) because tool document may have been
596         # updated by upload_workflow_deps in ways that modify
597         # inheritance of hints or requirements.
598         loadingContext.loader = tool.doc_loader
599         loadingContext.avsc_names = tool.doc_schema
600         loadingContext.metadata = tool.metadata
601         tool = load_tool(tool.tool, loadingContext)
602
603         existing_uuid = runtimeContext.update_workflow
604         if existing_uuid or runtimeContext.create_workflow:
605             # Create a pipeline template or workflow record and exit.
606             if self.work_api == "containers":
607                 uuid = upload_workflow(self, tool, job_order,
608                                         self.project_uuid,
609                                         uuid=existing_uuid,
610                                         submit_runner_ram=runtimeContext.submit_runner_ram,
611                                         name=runtimeContext.name,
612                                         merged_map=merged_map,
613                                         submit_runner_image=runtimeContext.submit_runner_image)
614                 self.stdout.write(uuid + "\n")
615                 return (None, "success")
616
617         self.apply_reqs(job_order, tool)
618
619         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
620         self.eval_timeout = runtimeContext.eval_timeout
621
622         runtimeContext = runtimeContext.copy()
623         runtimeContext.use_container = True
624         runtimeContext.tmpdir_prefix = "tmp"
625         runtimeContext.work_api = self.work_api
626
627         if self.work_api == "containers":
628             if self.ignore_docker_for_reuse:
629                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
630             runtimeContext.outdir = "/var/spool/cwl"
631             runtimeContext.docker_outdir = "/var/spool/cwl"
632             runtimeContext.tmpdir = "/tmp"
633             runtimeContext.docker_tmpdir = "/tmp"
634
635         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
636             raise Exception("--priority must be in the range 1..1000.")
637
638         if self.should_estimate_cache_size:
639             visited = set()
640             estimated_size = [0]
641             def estimate_collection_cache(obj):
642                 if obj.get("location", "").startswith("keep:"):
643                     m = pdh_size.match(obj["location"][5:])
644                     if m and m.group(1) not in visited:
645                         visited.add(m.group(1))
646                         estimated_size[0] += int(m.group(2))
647             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
648             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
649             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
650
651         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
652
653         runnerjob = None
654         if runtimeContext.submit:
655             # Submit a runner job to run the workflow for us.
656             if self.work_api == "containers":
657                 if submitting:
658                     tool = RunnerContainer(self, updated_tool,
659                                            tool, loadingContext, runtimeContext.enable_reuse,
660                                            self.output_name,
661                                            self.output_tags,
662                                            submit_runner_ram=runtimeContext.submit_runner_ram,
663                                            name=runtimeContext.name,
664                                            on_error=runtimeContext.on_error,
665                                            submit_runner_image=runtimeContext.submit_runner_image,
666                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
667                                            merged_map=merged_map,
668                                            priority=runtimeContext.priority,
669                                            secret_store=self.secret_store,
670                                            collection_cache_size=runtimeContext.collection_cache_size,
671                                            collection_cache_is_default=self.should_estimate_cache_size)
672                 else:
673                     runtimeContext.runnerjob = tool.tool["id"]
674
675         if runtimeContext.cwl_runner_job is not None:
676             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
677
678         jobiter = tool.job(job_order,
679                            self.output_callback,
680                            runtimeContext)
681
682         if runtimeContext.submit and not runtimeContext.wait:
683             runnerjob = next(jobiter)
684             runnerjob.run(runtimeContext)
685             self.stdout.write(runnerjob.uuid+"\n")
686             return (None, "success")
687
688         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
689         if current_container:
690             logger.info("Running inside container %s", current_container.get("uuid"))
691
692         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
693         self.polling_thread = threading.Thread(target=self.poll_states)
694         self.polling_thread.start()
695
696         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
697
698         try:
699             self.workflow_eval_lock.acquire()
700
701             # Holds the lock while this code runs and releases it when
702             # it is safe to do so in self.workflow_eval_lock.wait(),
703             # at which point on_message can update job state and
704             # process output callbacks.
705
706             loopperf = Perf(metrics, "jobiter")
707             loopperf.__enter__()
708             for runnable in jobiter:
709                 loopperf.__exit__()
710
711                 if self.stop_polling.is_set():
712                     break
713
714                 if self.task_queue.error is not None:
715                     raise self.task_queue.error
716
717                 if runnable:
718                     with Perf(metrics, "run"):
719                         self.start_run(runnable, runtimeContext)
720                 else:
721                     if (self.task_queue.in_flight + len(self.processes)) > 0:
722                         self.workflow_eval_lock.wait(3)
723                     else:
724                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
725                         break
726
727                 if self.stop_polling.is_set():
728                     break
729
730                 loopperf.__enter__()
731             loopperf.__exit__()
732
733             while (self.task_queue.in_flight + len(self.processes)) > 0:
734                 if self.task_queue.error is not None:
735                     raise self.task_queue.error
736                 self.workflow_eval_lock.wait(3)
737
738         except UnsupportedRequirement:
739             raise
740         except:
741             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
742                 logger.error("Interrupted, workflow will be cancelled")
743             elif isinstance(sys.exc_info()[1], WorkflowException):
744                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
745             else:
746                 logger.exception("Workflow execution failed")
747
748             if self.pipeline:
749                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
750                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
751
752             if self.work_api == "containers" and not current_container:
753                 # Not running in a crunch container, so cancel any outstanding processes.
754                 for p in self.processes:
755                     try:
756                         self.api.container_requests().update(uuid=p,
757                                                              body={"priority": "0"}
758                         ).execute(num_retries=self.num_retries)
759                     except Exception:
760                         pass
761         finally:
762             self.workflow_eval_lock.release()
763             self.task_queue.drain()
764             self.stop_polling.set()
765             self.polling_thread.join()
766             self.task_queue.join()
767
768         if self.final_status == "UnsupportedRequirement":
769             raise UnsupportedRequirement("Check log for details.")
770
771         if self.final_output is None:
772             raise WorkflowException("Workflow did not return a result.")
773
774         if runtimeContext.submit and isinstance(tool, Runner):
775             logger.info("Final output collection %s", tool.final_output)
776             if workbench2 or workbench1:
777                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
778         else:
779             if self.output_name is None:
780                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
781             if self.output_tags is None:
782                 self.output_tags = ""
783
784             storage_classes = ""
785             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
786             if storage_class_req and storage_class_req.get("finalStorageClass"):
787                 storage_classes = aslist(storage_class_req["finalStorageClass"])
788             else:
789                 storage_classes = runtimeContext.storage_classes.strip().split(",")
790
791             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
792             self.set_crunch_output()
793
794         if runtimeContext.compute_checksum:
795             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
796             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
797
798         if self.trash_intermediate and self.final_status == "success":
799             self.trash_intermediate_output()
800
801         return (self.final_output, self.final_status)