13306: Changes to arvados-cwl-runner code after running futurize --stage2
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 from __future__ import division
2 from builtins import next
3 from builtins import object
4 from past.utils import old_div
5 # Copyright (C) The Arvados Authors. All rights reserved.
6 #
7 # SPDX-License-Identifier: Apache-2.0
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import copy
15 import json
16 import re
17 from functools import partial
18 import time
19
20 from cwltool.errors import WorkflowException
21 import cwltool.workflow
22 from schema_salad.sourceline import SourceLine
23 import schema_salad.validate as validate
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 import arvados_cwl.util
31 from .arvcontainer import RunnerContainer
32 from .arvjob import RunnerJob, RunnerTemplate
33 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
34 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
35 from .arvworkflow import ArvadosWorkflow, upload_workflow
36 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
37 from .perf import Perf
38 from .pathmapper import NoFollowPathMapper
39 from .task_queue import TaskQueue
40 from .context import ArvLoadingContext, ArvRuntimeContext
41 from ._version import __version__
42
43 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
44 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
45 from cwltool.command_line_tool import compute_checksums
46
47 logger = logging.getLogger('arvados.cwl-runner')
48 metrics = logging.getLogger('arvados.cwl-runner.metrics')
49
50 DEFAULT_PRIORITY = 500
51
52 class RuntimeStatusLoggingHandler(logging.Handler):
53     """
54     Intercepts logging calls and report them as runtime statuses on runner
55     containers.
56     """
57     def __init__(self, runtime_status_update_func):
58         super(RuntimeStatusLoggingHandler, self).__init__()
59         self.runtime_status_update = runtime_status_update_func
60
61     def emit(self, record):
62         kind = None
63         if record.levelno >= logging.ERROR:
64             kind = 'error'
65         elif record.levelno >= logging.WARNING:
66             kind = 'warning'
67         if kind is not None:
68             log_msg = record.getMessage()
69             if '\n' in log_msg:
70                 # If the logged message is multi-line, use its first line as status
71                 # and the rest as detail.
72                 status, detail = log_msg.split('\n', 1)
73                 self.runtime_status_update(
74                     kind,
75                     "%s: %s" % (record.name, status),
76                     detail
77                 )
78             else:
79                 self.runtime_status_update(
80                     kind,
81                     "%s: %s" % (record.name, record.getMessage())
82                 )
83
84 class ArvCwlExecutor(object):
85     """Execute a CWL tool or workflow, submit work (using either jobs or
86     containers API), wait for them to complete, and report output.
87
88     """
89
90     def __init__(self, api_client,
91                  arvargs=None,
92                  keep_client=None,
93                  num_retries=4,
94                  thread_count=4):
95
96         if arvargs is None:
97             arvargs = argparse.Namespace()
98             arvargs.work_api = None
99             arvargs.output_name = None
100             arvargs.output_tags = None
101             arvargs.thread_count = 1
102             arvargs.collection_cache_size = None
103
104         self.api = api_client
105         self.processes = {}
106         self.workflow_eval_lock = threading.Condition(threading.RLock())
107         self.final_output = None
108         self.final_status = None
109         self.num_retries = num_retries
110         self.uuid = None
111         self.stop_polling = threading.Event()
112         self.poll_api = None
113         self.pipeline = None
114         self.final_output_collection = None
115         self.output_name = arvargs.output_name
116         self.output_tags = arvargs.output_tags
117         self.project_uuid = None
118         self.intermediate_output_ttl = 0
119         self.intermediate_output_collections = []
120         self.trash_intermediate = False
121         self.thread_count = arvargs.thread_count
122         self.poll_interval = 12
123         self.loadingContext = None
124         self.should_estimate_cache_size = True
125
126         if keep_client is not None:
127             self.keep_client = keep_client
128         else:
129             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
130
131         if arvargs.collection_cache_size:
132             collection_cache_size = arvargs.collection_cache_size*1024*1024
133             self.should_estimate_cache_size = False
134         else:
135             collection_cache_size = 256*1024*1024
136
137         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
138                                                 cap=collection_cache_size)
139
140         self.fetcher_constructor = partial(CollectionFetcher,
141                                            api_client=self.api,
142                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
143                                            num_retries=self.num_retries)
144
145         self.work_api = None
146         expected_api = ["jobs", "containers"]
147         for api in expected_api:
148             try:
149                 methods = self.api._rootDesc.get('resources')[api]['methods']
150                 if ('httpMethod' in methods['create'] and
151                     (arvargs.work_api == api or arvargs.work_api is None)):
152                     self.work_api = api
153                     break
154             except KeyError:
155                 pass
156
157         if not self.work_api:
158             if arvargs.work_api is None:
159                 raise Exception("No supported APIs")
160             else:
161                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
162
163         if self.work_api == "jobs":
164             logger.warn("""
165 *******************************
166 Using the deprecated 'jobs' API.
167
168 To get rid of this warning:
169
170 Users: read about migrating at
171 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
172 and use the option --api=containers
173
174 Admins: configure the cluster to disable the 'jobs' API as described at:
175 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
176 *******************************""")
177
178         self.loadingContext = ArvLoadingContext(vars(arvargs))
179         self.loadingContext.fetcher_constructor = self.fetcher_constructor
180         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
181         self.loadingContext.construct_tool_object = self.arv_make_tool
182
183         # Add a custom logging handler to the root logger for runtime status reporting
184         # if running inside a container
185         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
186             root_logger = logging.getLogger('')
187             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
188             root_logger.addHandler(handler)
189
190         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
191         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
192                                                      collection_cache=self.collection_cache)
193
194         validate_cluster_target(self, self.runtimeContext)
195
196
197     def arv_make_tool(self, toolpath_object, loadingContext):
198         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
199             return ArvadosCommandTool(self, toolpath_object, loadingContext)
200         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
201             return ArvadosWorkflow(self, toolpath_object, loadingContext)
202         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
203             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
204         else:
205             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
206
207     def output_callback(self, out, processStatus):
208         with self.workflow_eval_lock:
209             if processStatus == "success":
210                 logger.info("Overall process status is %s", processStatus)
211                 state = "Complete"
212             else:
213                 logger.error("Overall process status is %s", processStatus)
214                 state = "Failed"
215             if self.pipeline:
216                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
217                                                         body={"state": state}).execute(num_retries=self.num_retries)
218             self.final_status = processStatus
219             self.final_output = out
220             self.workflow_eval_lock.notifyAll()
221
222
223     def start_run(self, runnable, runtimeContext):
224         self.task_queue.add(partial(runnable.run, runtimeContext),
225                             self.workflow_eval_lock, self.stop_polling)
226
227     def process_submitted(self, container):
228         with self.workflow_eval_lock:
229             self.processes[container.uuid] = container
230
231     def process_done(self, uuid, record):
232         with self.workflow_eval_lock:
233             j = self.processes[uuid]
234             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
235             self.task_queue.add(partial(j.done, record),
236                                 self.workflow_eval_lock, self.stop_polling)
237             del self.processes[uuid]
238
239     def runtime_status_update(self, kind, message, detail=None):
240         """
241         Updates the runtime_status field on the runner container.
242         Called when there's a need to report errors, warnings or just
243         activity statuses, for example in the RuntimeStatusLoggingHandler.
244         """
245         with self.workflow_eval_lock:
246             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
247             if current is None:
248                 return
249             runtime_status = current.get('runtime_status', {})
250             # In case of status being an error, only report the first one.
251             if kind == 'error':
252                 if not runtime_status.get('error'):
253                     runtime_status.update({
254                         'error': message
255                     })
256                     if detail is not None:
257                         runtime_status.update({
258                             'errorDetail': detail
259                         })
260                 # Further errors are only mentioned as a count.
261                 else:
262                     # Get anything before an optional 'and N more' string.
263                     try:
264                         error_msg = re.match(
265                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
266                         more_failures = re.match(
267                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
268                     except TypeError:
269                         # Ignore tests stubbing errors
270                         return
271                     if more_failures:
272                         failure_qty = int(more_failures.groups()[0])
273                         runtime_status.update({
274                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
275                         })
276                     else:
277                         runtime_status.update({
278                             'error': "%s (and 1 more)" % error_msg
279                         })
280             elif kind in ['warning', 'activity']:
281                 # Record the last warning/activity status without regard of
282                 # previous occurences.
283                 runtime_status.update({
284                     kind: message
285                 })
286                 if detail is not None:
287                     runtime_status.update({
288                         kind+"Detail": detail
289                     })
290             else:
291                 # Ignore any other status kind
292                 return
293             try:
294                 self.api.containers().update(uuid=current['uuid'],
295                                             body={
296                                                 'runtime_status': runtime_status,
297                                             }).execute(num_retries=self.num_retries)
298             except Exception as e:
299                 logger.info("Couldn't update runtime_status: %s", e)
300
301     def wrapped_callback(self, cb, obj, st):
302         with self.workflow_eval_lock:
303             cb(obj, st)
304             self.workflow_eval_lock.notifyAll()
305
306     def get_wrapped_callback(self, cb):
307         return partial(self.wrapped_callback, cb)
308
309     def on_message(self, event):
310         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
311             uuid = event["object_uuid"]
312             if event["properties"]["new_attributes"]["state"] == "Running":
313                 with self.workflow_eval_lock:
314                     j = self.processes[uuid]
315                     if j.running is False:
316                         j.running = True
317                         j.update_pipeline_component(event["properties"]["new_attributes"])
318                         logger.info("%s %s is Running", self.label(j), uuid)
319             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
320                 self.process_done(uuid, event["properties"]["new_attributes"])
321
322     def label(self, obj):
323         return "[%s %s]" % (self.work_api[0:-1], obj.name)
324
325     def poll_states(self):
326         """Poll status of jobs or containers listed in the processes dict.
327
328         Runs in a separate thread.
329         """
330
331         try:
332             remain_wait = self.poll_interval
333             while True:
334                 if remain_wait > 0:
335                     self.stop_polling.wait(remain_wait)
336                 if self.stop_polling.is_set():
337                     break
338                 with self.workflow_eval_lock:
339                     keys = list(self.processes.keys())
340                 if not keys:
341                     remain_wait = self.poll_interval
342                     continue
343
344                 begin_poll = time.time()
345                 if self.work_api == "containers":
346                     table = self.poll_api.container_requests()
347                 elif self.work_api == "jobs":
348                     table = self.poll_api.jobs()
349
350                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
351
352                 while keys:
353                     page = keys[:pageSize]
354                     keys = keys[pageSize:]
355                     try:
356                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
357                     except Exception as e:
358                         logger.warn("Error checking states on API server: %s", e)
359                         remain_wait = self.poll_interval
360                         continue
361
362                     for p in proc_states["items"]:
363                         self.on_message({
364                             "object_uuid": p["uuid"],
365                             "event_type": "update",
366                             "properties": {
367                                 "new_attributes": p
368                             }
369                         })
370                 finish_poll = time.time()
371                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
372         except:
373             logger.exception("Fatal error in state polling thread.")
374             with self.workflow_eval_lock:
375                 self.processes.clear()
376                 self.workflow_eval_lock.notifyAll()
377         finally:
378             self.stop_polling.set()
379
380     def add_intermediate_output(self, uuid):
381         if uuid:
382             self.intermediate_output_collections.append(uuid)
383
384     def trash_intermediate_output(self):
385         logger.info("Cleaning up intermediate output collections")
386         for i in self.intermediate_output_collections:
387             try:
388                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
389             except:
390                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
391             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
392                 break
393
394     def check_features(self, obj):
395         if isinstance(obj, dict):
396             if obj.get("writable") and self.work_api != "containers":
397                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
398             if obj.get("class") == "DockerRequirement":
399                 if obj.get("dockerOutputDirectory"):
400                     if self.work_api != "containers":
401                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
402                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
403                     if not obj.get("dockerOutputDirectory").startswith('/'):
404                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
405                             "Option 'dockerOutputDirectory' must be an absolute path.")
406             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
407                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
408             for v in obj.values():
409                 self.check_features(v)
410         elif isinstance(obj, list):
411             for i,v in enumerate(obj):
412                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
413                     self.check_features(v)
414
415     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
416         outputObj = copy.deepcopy(outputObj)
417
418         files = []
419         def capture(fileobj):
420             files.append(fileobj)
421
422         adjustDirObjs(outputObj, capture)
423         adjustFileObjs(outputObj, capture)
424
425         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
426
427         final = arvados.collection.Collection(api_client=self.api,
428                                               keep_client=self.keep_client,
429                                               num_retries=self.num_retries)
430
431         for k,v in list(generatemapper.items()):
432             if k.startswith("_:"):
433                 if v.type == "Directory":
434                     continue
435                 if v.type == "CreateFile":
436                     with final.open(v.target, "wb") as f:
437                         f.write(v.resolved.encode("utf-8"))
438                     continue
439
440             if not k.startswith("keep:"):
441                 raise Exception("Output source is not in keep or a literal")
442             sp = k.split("/")
443             srccollection = sp[0][5:]
444             try:
445                 reader = self.collection_cache.get(srccollection)
446                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
447                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
448             except arvados.errors.ArgumentError as e:
449                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
450                 raise
451             except IOError as e:
452                 logger.warn("While preparing output collection: %s", e)
453
454         def rewrite(fileobj):
455             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
456             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
457                 if k in fileobj:
458                     del fileobj[k]
459
460         adjustDirObjs(outputObj, rewrite)
461         adjustFileObjs(outputObj, rewrite)
462
463         with final.open("cwl.output.json", "w") as f:
464             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
465
466         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
467
468         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
469                     final.api_response()["name"],
470                     final.manifest_locator())
471
472         final_uuid = final.manifest_locator()
473         tags = tagsString.split(',')
474         for tag in tags:
475              self.api.links().create(body={
476                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
477                 }).execute(num_retries=self.num_retries)
478
479         def finalcollection(fileobj):
480             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
481
482         adjustDirObjs(outputObj, finalcollection)
483         adjustFileObjs(outputObj, finalcollection)
484
485         return (outputObj, final)
486
487     def set_crunch_output(self):
488         if self.work_api == "containers":
489             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
490             if current is None:
491                 return
492             try:
493                 self.api.containers().update(uuid=current['uuid'],
494                                              body={
495                                                  'output': self.final_output_collection.portable_data_hash(),
496                                              }).execute(num_retries=self.num_retries)
497                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
498                                               body={
499                                                   'is_trashed': True
500                                               }).execute(num_retries=self.num_retries)
501             except Exception as e:
502                 logger.info("Setting container output: %s", e)
503         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
504             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
505                                    body={
506                                        'output': self.final_output_collection.portable_data_hash(),
507                                        'success': self.final_status == "success",
508                                        'progress':1.0
509                                    }).execute(num_retries=self.num_retries)
510
511     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
512         self.debug = runtimeContext.debug
513
514         tool.visit(self.check_features)
515
516         self.project_uuid = runtimeContext.project_uuid
517         self.pipeline = None
518         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
519         self.secret_store = runtimeContext.secret_store
520
521         self.trash_intermediate = runtimeContext.trash_intermediate
522         if self.trash_intermediate and self.work_api != "containers":
523             raise Exception("--trash-intermediate is only supported with --api=containers.")
524
525         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
526         if self.intermediate_output_ttl and self.work_api != "containers":
527             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
528         if self.intermediate_output_ttl < 0:
529             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
530
531         if runtimeContext.submit_request_uuid and self.work_api != "containers":
532             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
533
534         if not runtimeContext.name:
535             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
536
537         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
538         # Also uploads docker images.
539         merged_map = upload_workflow_deps(self, tool)
540
541         # Reload tool object which may have been updated by
542         # upload_workflow_deps
543         # Don't validate this time because it will just print redundant errors.
544         loadingContext = self.loadingContext.copy()
545         loadingContext.loader = tool.doc_loader
546         loadingContext.avsc_names = tool.doc_schema
547         loadingContext.metadata = tool.metadata
548         loadingContext.do_validate = False
549
550         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
551                                   loadingContext)
552
553         # Upload local file references in the job order.
554         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
555                                      tool, job_order)
556
557         existing_uuid = runtimeContext.update_workflow
558         if existing_uuid or runtimeContext.create_workflow:
559             # Create a pipeline template or workflow record and exit.
560             if self.work_api == "jobs":
561                 tmpl = RunnerTemplate(self, tool, job_order,
562                                       runtimeContext.enable_reuse,
563                                       uuid=existing_uuid,
564                                       submit_runner_ram=runtimeContext.submit_runner_ram,
565                                       name=runtimeContext.name,
566                                       merged_map=merged_map,
567                                       loadingContext=loadingContext)
568                 tmpl.save()
569                 # cwltool.main will write our return value to stdout.
570                 return (tmpl.uuid, "success")
571             elif self.work_api == "containers":
572                 return (upload_workflow(self, tool, job_order,
573                                         self.project_uuid,
574                                         uuid=existing_uuid,
575                                         submit_runner_ram=runtimeContext.submit_runner_ram,
576                                         name=runtimeContext.name,
577                                         merged_map=merged_map),
578                         "success")
579
580         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
581         self.eval_timeout = runtimeContext.eval_timeout
582
583         runtimeContext = runtimeContext.copy()
584         runtimeContext.use_container = True
585         runtimeContext.tmpdir_prefix = "tmp"
586         runtimeContext.work_api = self.work_api
587
588         if self.work_api == "containers":
589             if self.ignore_docker_for_reuse:
590                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
591             runtimeContext.outdir = "/var/spool/cwl"
592             runtimeContext.docker_outdir = "/var/spool/cwl"
593             runtimeContext.tmpdir = "/tmp"
594             runtimeContext.docker_tmpdir = "/tmp"
595         elif self.work_api == "jobs":
596             if runtimeContext.priority != DEFAULT_PRIORITY:
597                 raise Exception("--priority not implemented for jobs API.")
598             runtimeContext.outdir = "$(task.outdir)"
599             runtimeContext.docker_outdir = "$(task.outdir)"
600             runtimeContext.tmpdir = "$(task.tmpdir)"
601
602         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
603             raise Exception("--priority must be in the range 1..1000.")
604
605         if self.should_estimate_cache_size:
606             visited = set()
607             estimated_size = [0]
608             def estimate_collection_cache(obj):
609                 if obj.get("location", "").startswith("keep:"):
610                     m = pdh_size.match(obj["location"][5:])
611                     if m and m.group(1) not in visited:
612                         visited.add(m.group(1))
613                         estimated_size[0] += int(m.group(2))
614             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
615             runtimeContext.collection_cache_size = max((old_div((estimated_size[0]*192), (1024*1024)))+1, 256)
616             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
617
618         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
619
620         runnerjob = None
621         if runtimeContext.submit:
622             # Submit a runner job to run the workflow for us.
623             if self.work_api == "containers":
624                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
625                     runtimeContext.runnerjob = tool.tool["id"]
626                 else:
627                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
628                                                 self.output_name,
629                                                 self.output_tags,
630                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
631                                                 name=runtimeContext.name,
632                                                 on_error=runtimeContext.on_error,
633                                                 submit_runner_image=runtimeContext.submit_runner_image,
634                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
635                                                 merged_map=merged_map,
636                                                 priority=runtimeContext.priority,
637                                                 secret_store=self.secret_store,
638                                                 collection_cache_size=runtimeContext.collection_cache_size,
639                                                 collection_cache_is_default=self.should_estimate_cache_size)
640             elif self.work_api == "jobs":
641                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
642                                       self.output_name,
643                                       self.output_tags,
644                                       submit_runner_ram=runtimeContext.submit_runner_ram,
645                                       name=runtimeContext.name,
646                                       on_error=runtimeContext.on_error,
647                                       submit_runner_image=runtimeContext.submit_runner_image,
648                                       merged_map=merged_map)
649         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
650             # Create pipeline for local run
651             self.pipeline = self.api.pipeline_instances().create(
652                 body={
653                     "owner_uuid": self.project_uuid,
654                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
655                     "components": {},
656                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
657             logger.info("Pipeline instance %s", self.pipeline["uuid"])
658
659         if runtimeContext.cwl_runner_job is not None:
660             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
661
662         jobiter = tool.job(job_order,
663                            self.output_callback,
664                            runtimeContext)
665
666         if runtimeContext.submit and not runtimeContext.wait:
667             runnerjob = next(jobiter)
668             runnerjob.run(runtimeContext)
669             return (runnerjob.uuid, "success")
670
671         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
672         if current_container:
673             logger.info("Running inside container %s", current_container.get("uuid"))
674
675         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
676         self.polling_thread = threading.Thread(target=self.poll_states)
677         self.polling_thread.start()
678
679         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
680
681         try:
682             self.workflow_eval_lock.acquire()
683
684             # Holds the lock while this code runs and releases it when
685             # it is safe to do so in self.workflow_eval_lock.wait(),
686             # at which point on_message can update job state and
687             # process output callbacks.
688
689             loopperf = Perf(metrics, "jobiter")
690             loopperf.__enter__()
691             for runnable in jobiter:
692                 loopperf.__exit__()
693
694                 if self.stop_polling.is_set():
695                     break
696
697                 if self.task_queue.error is not None:
698                     raise self.task_queue.error
699
700                 if runnable:
701                     with Perf(metrics, "run"):
702                         self.start_run(runnable, runtimeContext)
703                 else:
704                     if (self.task_queue.in_flight + len(self.processes)) > 0:
705                         self.workflow_eval_lock.wait(3)
706                     else:
707                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
708                         break
709
710                 if self.stop_polling.is_set():
711                     break
712
713                 loopperf.__enter__()
714             loopperf.__exit__()
715
716             while (self.task_queue.in_flight + len(self.processes)) > 0:
717                 if self.task_queue.error is not None:
718                     raise self.task_queue.error
719                 self.workflow_eval_lock.wait(3)
720
721         except UnsupportedRequirement:
722             raise
723         except:
724             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
725                 logger.error("Interrupted, workflow will be cancelled")
726             else:
727                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
728             if self.pipeline:
729                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
730                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
731             if runtimeContext.submit and isinstance(tool, Runner):
732                 runnerjob = tool
733                 if runnerjob.uuid and self.work_api == "containers":
734                     self.api.container_requests().update(uuid=runnerjob.uuid,
735                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
736         finally:
737             self.workflow_eval_lock.release()
738             self.task_queue.drain()
739             self.stop_polling.set()
740             self.polling_thread.join()
741             self.task_queue.join()
742
743         if self.final_status == "UnsupportedRequirement":
744             raise UnsupportedRequirement("Check log for details.")
745
746         if self.final_output is None:
747             raise WorkflowException("Workflow did not return a result.")
748
749         if runtimeContext.submit and isinstance(tool, Runner):
750             logger.info("Final output collection %s", tool.final_output)
751         else:
752             if self.output_name is None:
753                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
754             if self.output_tags is None:
755                 self.output_tags = ""
756
757             storage_classes = runtimeContext.storage_classes.strip().split(",")
758             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
759             self.set_crunch_output()
760
761         if runtimeContext.compute_checksum:
762             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
763             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
764
765         if self.trash_intermediate and self.final_status == "success":
766             self.trash_intermediate_output()
767
768         return (self.final_output, self.final_status)