15181: Remove some more jobs API references
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21
22 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 from schema_salad.sourceline import SourceLine
25 import schema_salad.validate as validate
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 import arvados_cwl.util
33 from .arvcontainer import RunnerContainer
34 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
35 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
36 from .arvworkflow import ArvadosWorkflow, upload_workflow
37 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
38 from .perf import Perf
39 from .pathmapper import NoFollowPathMapper
40 from .task_queue import TaskQueue
41 from .context import ArvLoadingContext, ArvRuntimeContext
42 from ._version import __version__
43
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
46 from cwltool.command_line_tool import compute_checksums
47 from cwltool.load_tool import load_tool
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51
52 DEFAULT_PRIORITY = 500
53
54 class RuntimeStatusLoggingHandler(logging.Handler):
55     """
56     Intercepts logging calls and report them as runtime statuses on runner
57     containers.
58     """
59     def __init__(self, runtime_status_update_func):
60         super(RuntimeStatusLoggingHandler, self).__init__()
61         self.runtime_status_update = runtime_status_update_func
62         self.updatingRuntimeStatus = False
63
64     def emit(self, record):
65         kind = None
66         if record.levelno >= logging.ERROR:
67             kind = 'error'
68         elif record.levelno >= logging.WARNING:
69             kind = 'warning'
70         if kind is not None and self.updatingRuntimeStatus is not True:
71             self.updatingRuntimeStatus = True
72             try:
73                 log_msg = record.getMessage()
74                 if '\n' in log_msg:
75                     # If the logged message is multi-line, use its first line as status
76                     # and the rest as detail.
77                     status, detail = log_msg.split('\n', 1)
78                     self.runtime_status_update(
79                         kind,
80                         "%s: %s" % (record.name, status),
81                         detail
82                     )
83                 else:
84                     self.runtime_status_update(
85                         kind,
86                         "%s: %s" % (record.name, record.getMessage())
87                     )
88             finally:
89                 self.updatingRuntimeStatus = False
90
91
92 class ArvCwlExecutor(object):
93     """Execute a CWL tool or workflow, submit work (using containers API),
94     wait for them to complete, and report output.
95
96     """
97
98     def __init__(self, api_client,
99                  arvargs=None,
100                  keep_client=None,
101                  num_retries=4,
102                  thread_count=4):
103
104         if arvargs is None:
105             arvargs = argparse.Namespace()
106             arvargs.work_api = None
107             arvargs.output_name = None
108             arvargs.output_tags = None
109             arvargs.thread_count = 1
110             arvargs.collection_cache_size = None
111
112         self.api = api_client
113         self.processes = {}
114         self.workflow_eval_lock = threading.Condition(threading.RLock())
115         self.final_output = None
116         self.final_status = None
117         self.num_retries = num_retries
118         self.uuid = None
119         self.stop_polling = threading.Event()
120         self.poll_api = None
121         self.pipeline = None
122         self.final_output_collection = None
123         self.output_name = arvargs.output_name
124         self.output_tags = arvargs.output_tags
125         self.project_uuid = None
126         self.intermediate_output_ttl = 0
127         self.intermediate_output_collections = []
128         self.trash_intermediate = False
129         self.thread_count = arvargs.thread_count
130         self.poll_interval = 12
131         self.loadingContext = None
132         self.should_estimate_cache_size = True
133         self.fs_access = None
134         self.secret_store = None
135
136         if keep_client is not None:
137             self.keep_client = keep_client
138         else:
139             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
140
141         if arvargs.collection_cache_size:
142             collection_cache_size = arvargs.collection_cache_size*1024*1024
143             self.should_estimate_cache_size = False
144         else:
145             collection_cache_size = 256*1024*1024
146
147         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
148                                                 cap=collection_cache_size)
149
150         self.fetcher_constructor = partial(CollectionFetcher,
151                                            api_client=self.api,
152                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
153                                            num_retries=self.num_retries)
154
155         self.work_api = None
156         expected_api = ["containers"]
157         for api in expected_api:
158             try:
159                 methods = self.api._rootDesc.get('resources')[api]['methods']
160                 if ('httpMethod' in methods['create'] and
161                     (arvargs.work_api == api or arvargs.work_api is None)):
162                     self.work_api = api
163                     break
164             except KeyError:
165                 pass
166
167         if not self.work_api:
168             if arvargs.work_api is None:
169                 raise Exception("No supported APIs")
170             else:
171                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
172
173         if self.work_api == "jobs":
174             logger.error("""
175 *******************************
176 The 'jobs' API is no longer supported.
177 *******************************""")
178             exit(1)
179
180         self.loadingContext = ArvLoadingContext(vars(arvargs))
181         self.loadingContext.fetcher_constructor = self.fetcher_constructor
182         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
183         self.loadingContext.construct_tool_object = self.arv_make_tool
184
185         # Add a custom logging handler to the root logger for runtime status reporting
186         # if running inside a container
187         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
188             root_logger = logging.getLogger('')
189
190             # Remove existing RuntimeStatusLoggingHandlers if they exist
191             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
192             root_logger.handlers = handlers
193
194             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
195             root_logger.addHandler(handler)
196
197         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
198         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
199                                                      collection_cache=self.collection_cache)
200
201         validate_cluster_target(self, self.runtimeContext)
202
203
204     def arv_make_tool(self, toolpath_object, loadingContext):
205         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
206             return ArvadosCommandTool(self, toolpath_object, loadingContext)
207         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
208             return ArvadosWorkflow(self, toolpath_object, loadingContext)
209         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
210             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
211         else:
212             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
213
214     def output_callback(self, out, processStatus):
215         with self.workflow_eval_lock:
216             if processStatus == "success":
217                 logger.info("Overall process status is %s", processStatus)
218                 state = "Complete"
219             else:
220                 logger.error("Overall process status is %s", processStatus)
221                 state = "Failed"
222             if self.pipeline:
223                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
224                                                         body={"state": state}).execute(num_retries=self.num_retries)
225             self.final_status = processStatus
226             self.final_output = out
227             self.workflow_eval_lock.notifyAll()
228
229
230     def start_run(self, runnable, runtimeContext):
231         self.task_queue.add(partial(runnable.run, runtimeContext),
232                             self.workflow_eval_lock, self.stop_polling)
233
234     def process_submitted(self, container):
235         with self.workflow_eval_lock:
236             self.processes[container.uuid] = container
237
238     def process_done(self, uuid, record):
239         with self.workflow_eval_lock:
240             j = self.processes[uuid]
241             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
242             self.task_queue.add(partial(j.done, record),
243                                 self.workflow_eval_lock, self.stop_polling)
244             del self.processes[uuid]
245
246     def runtime_status_update(self, kind, message, detail=None):
247         """
248         Updates the runtime_status field on the runner container.
249         Called when there's a need to report errors, warnings or just
250         activity statuses, for example in the RuntimeStatusLoggingHandler.
251         """
252         with self.workflow_eval_lock:
253             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
254             if current is None:
255                 return
256             runtime_status = current.get('runtime_status', {})
257             # In case of status being an error, only report the first one.
258             if kind == 'error':
259                 if not runtime_status.get('error'):
260                     runtime_status.update({
261                         'error': message
262                     })
263                     if detail is not None:
264                         runtime_status.update({
265                             'errorDetail': detail
266                         })
267                 # Further errors are only mentioned as a count.
268                 else:
269                     # Get anything before an optional 'and N more' string.
270                     try:
271                         error_msg = re.match(
272                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
273                         more_failures = re.match(
274                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
275                     except TypeError:
276                         # Ignore tests stubbing errors
277                         return
278                     if more_failures:
279                         failure_qty = int(more_failures.groups()[0])
280                         runtime_status.update({
281                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
282                         })
283                     else:
284                         runtime_status.update({
285                             'error': "%s (and 1 more)" % error_msg
286                         })
287             elif kind in ['warning', 'activity']:
288                 # Record the last warning/activity status without regard of
289                 # previous occurences.
290                 runtime_status.update({
291                     kind: message
292                 })
293                 if detail is not None:
294                     runtime_status.update({
295                         kind+"Detail": detail
296                     })
297             else:
298                 # Ignore any other status kind
299                 return
300             try:
301                 self.api.containers().update(uuid=current['uuid'],
302                                             body={
303                                                 'runtime_status': runtime_status,
304                                             }).execute(num_retries=self.num_retries)
305             except Exception as e:
306                 logger.info("Couldn't update runtime_status: %s", e)
307
308     def wrapped_callback(self, cb, obj, st):
309         with self.workflow_eval_lock:
310             cb(obj, st)
311             self.workflow_eval_lock.notifyAll()
312
313     def get_wrapped_callback(self, cb):
314         return partial(self.wrapped_callback, cb)
315
316     def on_message(self, event):
317         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
318             uuid = event["object_uuid"]
319             if event["properties"]["new_attributes"]["state"] == "Running":
320                 with self.workflow_eval_lock:
321                     j = self.processes[uuid]
322                     if j.running is False:
323                         j.running = True
324                         j.update_pipeline_component(event["properties"]["new_attributes"])
325                         logger.info("%s %s is Running", self.label(j), uuid)
326             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
327                 self.process_done(uuid, event["properties"]["new_attributes"])
328
329     def label(self, obj):
330         return "[%s %s]" % (self.work_api[0:-1], obj.name)
331
332     def poll_states(self):
333         """Poll status of containers listed in the processes dict.
334
335         Runs in a separate thread.
336         """
337
338         try:
339             remain_wait = self.poll_interval
340             while True:
341                 if remain_wait > 0:
342                     self.stop_polling.wait(remain_wait)
343                 if self.stop_polling.is_set():
344                     break
345                 with self.workflow_eval_lock:
346                     keys = list(self.processes)
347                 if not keys:
348                     remain_wait = self.poll_interval
349                     continue
350
351                 begin_poll = time.time()
352                 if self.work_api == "containers":
353                     table = self.poll_api.container_requests()
354
355                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
356
357                 while keys:
358                     page = keys[:pageSize]
359                     try:
360                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
361                     except Exception:
362                         logger.exception("Error checking states on API server: %s")
363                         remain_wait = self.poll_interval
364                         continue
365
366                     for p in proc_states["items"]:
367                         self.on_message({
368                             "object_uuid": p["uuid"],
369                             "event_type": "update",
370                             "properties": {
371                                 "new_attributes": p
372                             }
373                         })
374                     keys = keys[pageSize:]
375
376                 finish_poll = time.time()
377                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
378         except:
379             logger.exception("Fatal error in state polling thread.")
380             with self.workflow_eval_lock:
381                 self.processes.clear()
382                 self.workflow_eval_lock.notifyAll()
383         finally:
384             self.stop_polling.set()
385
386     def add_intermediate_output(self, uuid):
387         if uuid:
388             self.intermediate_output_collections.append(uuid)
389
390     def trash_intermediate_output(self):
391         logger.info("Cleaning up intermediate output collections")
392         for i in self.intermediate_output_collections:
393             try:
394                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
395             except Exception:
396                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
397             except (KeyboardInterrupt, SystemExit):
398                 break
399
400     def check_features(self, obj, parentfield=""):
401         if isinstance(obj, dict):
402             if obj.get("class") == "DockerRequirement":
403                 if obj.get("dockerOutputDirectory"):
404                     if not obj.get("dockerOutputDirectory").startswith('/'):
405                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
406                             "Option 'dockerOutputDirectory' must be an absolute path.")
407             if obj.get("class") == "InplaceUpdateRequirement":
408                 if obj["inplaceUpdate"] and parentfield == "requirements":
409                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
410             for k,v in viewitems(obj):
411                 self.check_features(v, parentfield=k)
412         elif isinstance(obj, list):
413             for i,v in enumerate(obj):
414                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
415                     self.check_features(v, parentfield=parentfield)
416
417     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
418         outputObj = copy.deepcopy(outputObj)
419
420         files = []
421         def capture(fileobj):
422             files.append(fileobj)
423
424         adjustDirObjs(outputObj, capture)
425         adjustFileObjs(outputObj, capture)
426
427         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
428
429         final = arvados.collection.Collection(api_client=self.api,
430                                               keep_client=self.keep_client,
431                                               num_retries=self.num_retries)
432
433         for k,v in generatemapper.items():
434             if v.type == "Directory" and v.resolved.startswith("_:"):
435                     continue
436             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
437                 with final.open(v.target, "wb") as f:
438                     f.write(v.resolved.encode("utf-8"))
439                     continue
440
441             if not v.resolved.startswith("keep:"):
442                 raise Exception("Output source is not in keep or a literal")
443             sp = v.resolved.split("/")
444             srccollection = sp[0][5:]
445             try:
446                 reader = self.collection_cache.get(srccollection)
447                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
448                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
449             except arvados.errors.ArgumentError as e:
450                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
451                 raise
452             except IOError as e:
453                 logger.error("While preparing output collection: %s", e)
454                 raise
455
456         def rewrite(fileobj):
457             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
458             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
459                 if k in fileobj:
460                     del fileobj[k]
461
462         adjustDirObjs(outputObj, rewrite)
463         adjustFileObjs(outputObj, rewrite)
464
465         with final.open("cwl.output.json", "w") as f:
466             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
467             f.write(res)
468
469         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
470
471         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
472                     final.api_response()["name"],
473                     final.manifest_locator())
474
475         final_uuid = final.manifest_locator()
476         tags = tagsString.split(',')
477         for tag in tags:
478              self.api.links().create(body={
479                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
480                 }).execute(num_retries=self.num_retries)
481
482         def finalcollection(fileobj):
483             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
484
485         adjustDirObjs(outputObj, finalcollection)
486         adjustFileObjs(outputObj, finalcollection)
487
488         return (outputObj, final)
489
490     def set_crunch_output(self):
491         if self.work_api == "containers":
492             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
493             if current is None:
494                 return
495             try:
496                 self.api.containers().update(uuid=current['uuid'],
497                                              body={
498                                                  'output': self.final_output_collection.portable_data_hash(),
499                                              }).execute(num_retries=self.num_retries)
500                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
501                                               body={
502                                                   'is_trashed': True
503                                               }).execute(num_retries=self.num_retries)
504             except Exception:
505                 logger.exception("Setting container output")
506                 return
507
508     def apply_reqs(self, job_order_object, tool):
509         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
510             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
511                 raise WorkflowException(
512                     "`cwl:requirements` in the input object is not part of CWL "
513                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
514                     "can set the cwlVersion to v1.1 or greater and re-run with "
515                     "--enable-dev.")
516             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
517             for req in job_reqs:
518                 tool.requirements.append(req)
519
520     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
521         self.debug = runtimeContext.debug
522
523         tool.visit(self.check_features)
524
525         self.project_uuid = runtimeContext.project_uuid
526         self.pipeline = None
527         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
528         self.secret_store = runtimeContext.secret_store
529
530         self.trash_intermediate = runtimeContext.trash_intermediate
531         if self.trash_intermediate and self.work_api != "containers":
532             raise Exception("--trash-intermediate is only supported with --api=containers.")
533
534         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
535         if self.intermediate_output_ttl and self.work_api != "containers":
536             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
537         if self.intermediate_output_ttl < 0:
538             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
539
540         if runtimeContext.submit_request_uuid and self.work_api != "containers":
541             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
542
543         if not runtimeContext.name:
544             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
545
546         # Upload local file references in the job order.
547         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
548                                      tool, job_order)
549
550         submitting = (runtimeContext.update_workflow or
551                       runtimeContext.create_workflow or
552                       (runtimeContext.submit and not
553                        (tool.tool["class"] == "CommandLineTool" and
554                         runtimeContext.wait and
555                         not runtimeContext.always_submit_runner)))
556
557         loadingContext = self.loadingContext.copy()
558         loadingContext.do_validate = False
559         loadingContext.do_update = False
560         if submitting:
561             # Document may have been auto-updated. Reload the original
562             # document with updating disabled because we want to
563             # submit the original document, not the auto-updated one.
564             tool = load_tool(tool.tool["id"], loadingContext)
565
566         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
567         # Also uploads docker images.
568         merged_map = upload_workflow_deps(self, tool)
569
570         # Recreate process object (ArvadosWorkflow or
571         # ArvadosCommandTool) because tool document may have been
572         # updated by upload_workflow_deps in ways that modify
573         # inheritance of hints or requirements.
574         loadingContext.loader = tool.doc_loader
575         loadingContext.avsc_names = tool.doc_schema
576         loadingContext.metadata = tool.metadata
577         tool = load_tool(tool.tool, loadingContext)
578
579         existing_uuid = runtimeContext.update_workflow
580         if existing_uuid or runtimeContext.create_workflow:
581             # Create a pipeline template or workflow record and exit.
582             if self.work_api == "containers":
583                 return (upload_workflow(self, tool, job_order,
584                                         self.project_uuid,
585                                         uuid=existing_uuid,
586                                         submit_runner_ram=runtimeContext.submit_runner_ram,
587                                         name=runtimeContext.name,
588                                         merged_map=merged_map),
589                         "success")
590
591         self.apply_reqs(job_order, tool)
592
593         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
594         self.eval_timeout = runtimeContext.eval_timeout
595
596         runtimeContext = runtimeContext.copy()
597         runtimeContext.use_container = True
598         runtimeContext.tmpdir_prefix = "tmp"
599         runtimeContext.work_api = self.work_api
600
601         if self.work_api == "containers":
602             if self.ignore_docker_for_reuse:
603                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
604             runtimeContext.outdir = "/var/spool/cwl"
605             runtimeContext.docker_outdir = "/var/spool/cwl"
606             runtimeContext.tmpdir = "/tmp"
607             runtimeContext.docker_tmpdir = "/tmp"
608
609         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
610             raise Exception("--priority must be in the range 1..1000.")
611
612         if self.should_estimate_cache_size:
613             visited = set()
614             estimated_size = [0]
615             def estimate_collection_cache(obj):
616                 if obj.get("location", "").startswith("keep:"):
617                     m = pdh_size.match(obj["location"][5:])
618                     if m and m.group(1) not in visited:
619                         visited.add(m.group(1))
620                         estimated_size[0] += int(m.group(2))
621             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
622             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
623             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
624
625         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
626
627         runnerjob = None
628         if runtimeContext.submit:
629             # Submit a runner job to run the workflow for us.
630             if self.work_api == "containers":
631                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
632                     runtimeContext.runnerjob = tool.tool["id"]
633                 else:
634                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
635                                                 self.output_name,
636                                                 self.output_tags,
637                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
638                                                 name=runtimeContext.name,
639                                                 on_error=runtimeContext.on_error,
640                                                 submit_runner_image=runtimeContext.submit_runner_image,
641                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
642                                                 merged_map=merged_map,
643                                                 priority=runtimeContext.priority,
644                                                 secret_store=self.secret_store,
645                                                 collection_cache_size=runtimeContext.collection_cache_size,
646                                                 collection_cache_is_default=self.should_estimate_cache_size)
647
648         if runtimeContext.cwl_runner_job is not None:
649             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
650
651         jobiter = tool.job(job_order,
652                            self.output_callback,
653                            runtimeContext)
654
655         if runtimeContext.submit and not runtimeContext.wait:
656             runnerjob = next(jobiter)
657             runnerjob.run(runtimeContext)
658             return (runnerjob.uuid, "success")
659
660         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
661         if current_container:
662             logger.info("Running inside container %s", current_container.get("uuid"))
663
664         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
665         self.polling_thread = threading.Thread(target=self.poll_states)
666         self.polling_thread.start()
667
668         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
669
670         try:
671             self.workflow_eval_lock.acquire()
672
673             # Holds the lock while this code runs and releases it when
674             # it is safe to do so in self.workflow_eval_lock.wait(),
675             # at which point on_message can update job state and
676             # process output callbacks.
677
678             loopperf = Perf(metrics, "jobiter")
679             loopperf.__enter__()
680             for runnable in jobiter:
681                 loopperf.__exit__()
682
683                 if self.stop_polling.is_set():
684                     break
685
686                 if self.task_queue.error is not None:
687                     raise self.task_queue.error
688
689                 if runnable:
690                     with Perf(metrics, "run"):
691                         self.start_run(runnable, runtimeContext)
692                 else:
693                     if (self.task_queue.in_flight + len(self.processes)) > 0:
694                         self.workflow_eval_lock.wait(3)
695                     else:
696                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
697                         break
698
699                 if self.stop_polling.is_set():
700                     break
701
702                 loopperf.__enter__()
703             loopperf.__exit__()
704
705             while (self.task_queue.in_flight + len(self.processes)) > 0:
706                 if self.task_queue.error is not None:
707                     raise self.task_queue.error
708                 self.workflow_eval_lock.wait(3)
709
710         except UnsupportedRequirement:
711             raise
712         except:
713             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
714                 logger.error("Interrupted, workflow will be cancelled")
715             elif isinstance(sys.exc_info()[1], WorkflowException):
716                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
717             else:
718                 logger.exception("Workflow execution failed")
719
720             if self.pipeline:
721                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
722                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
723
724             if self.work_api == "containers" and not current_container:
725                 # Not running in a crunch container, so cancel any outstanding processes.
726                 for p in self.processes:
727                     try:
728                         self.api.container_requests().update(uuid=p,
729                                                              body={"priority": "0"}
730                         ).execute(num_retries=self.num_retries)
731                     except Exception:
732                         pass
733         finally:
734             self.workflow_eval_lock.release()
735             self.task_queue.drain()
736             self.stop_polling.set()
737             self.polling_thread.join()
738             self.task_queue.join()
739
740         if self.final_status == "UnsupportedRequirement":
741             raise UnsupportedRequirement("Check log for details.")
742
743         if self.final_output is None:
744             raise WorkflowException("Workflow did not return a result.")
745
746         if runtimeContext.submit and isinstance(tool, Runner):
747             logger.info("Final output collection %s", tool.final_output)
748         else:
749             if self.output_name is None:
750                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
751             if self.output_tags is None:
752                 self.output_tags = ""
753
754             storage_classes = runtimeContext.storage_classes.strip().split(",")
755             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
756             self.set_crunch_output()
757
758         if runtimeContext.compute_checksum:
759             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
760             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
761
762         if self.trash_intermediate and self.final_status == "success":
763             self.trash_intermediate_output()
764
765         return (self.final_output, self.final_status)