14510: Perfomance fixes
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 import arvados_cwl.util
27 from .arvcontainer import RunnerContainer
28 from .arvjob import RunnerJob, RunnerTemplate
29 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
30 from .arvtool import ArvadosCommandTool, validate_cluster_target
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
33 from .perf import Perf
34 from .pathmapper import NoFollowPathMapper
35 from .task_queue import TaskQueue
36 from .context import ArvLoadingContext, ArvRuntimeContext
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 DEFAULT_PRIORITY = 500
47
48 class RuntimeStatusLoggingHandler(logging.Handler):
49     """
50     Intercepts logging calls and report them as runtime statuses on runner
51     containers.
52     """
53     def __init__(self, runtime_status_update_func):
54         super(RuntimeStatusLoggingHandler, self).__init__()
55         self.runtime_status_update = runtime_status_update_func
56
57     def emit(self, record):
58         kind = None
59         if record.levelno >= logging.ERROR:
60             kind = 'error'
61         elif record.levelno >= logging.WARNING:
62             kind = 'warning'
63         if kind is not None:
64             log_msg = record.getMessage()
65             if '\n' in log_msg:
66                 # If the logged message is multi-line, use its first line as status
67                 # and the rest as detail.
68                 status, detail = log_msg.split('\n', 1)
69                 self.runtime_status_update(
70                     kind,
71                     "%s: %s" % (record.name, status),
72                     detail
73                 )
74             else:
75                 self.runtime_status_update(
76                     kind,
77                     "%s: %s" % (record.name, record.getMessage())
78                 )
79
80 class ArvCwlExecutor(object):
81     """Execute a CWL tool or workflow, submit work (using either jobs or
82     containers API), wait for them to complete, and report output.
83
84     """
85
86     def __init__(self, api_client,
87                  arvargs=None,
88                  keep_client=None,
89                  num_retries=4,
90                  thread_count=4):
91
92         if arvargs is None:
93             arvargs = argparse.Namespace()
94             arvargs.work_api = None
95             arvargs.output_name = None
96             arvargs.output_tags = None
97             arvargs.thread_count = 1
98
99         self.api = api_client
100         self.processes = {}
101         self.workflow_eval_lock = threading.Condition(threading.RLock())
102         self.final_output = None
103         self.final_status = None
104         self.num_retries = num_retries
105         self.uuid = None
106         self.stop_polling = threading.Event()
107         self.poll_api = None
108         self.pipeline = None
109         self.final_output_collection = None
110         self.output_name = arvargs.output_name
111         self.output_tags = arvargs.output_tags
112         self.project_uuid = None
113         self.intermediate_output_ttl = 0
114         self.intermediate_output_collections = []
115         self.trash_intermediate = False
116         self.thread_count = arvargs.thread_count
117         self.poll_interval = 12
118         self.loadingContext = None
119
120         if keep_client is not None:
121             self.keep_client = keep_client
122         else:
123             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
124
125         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
126                                                 cap=arvargs.collection_cache)
127
128         self.fetcher_constructor = partial(CollectionFetcher,
129                                            api_client=self.api,
130                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
131                                            num_retries=self.num_retries)
132
133         self.work_api = None
134         expected_api = ["jobs", "containers"]
135         for api in expected_api:
136             try:
137                 methods = self.api._rootDesc.get('resources')[api]['methods']
138                 if ('httpMethod' in methods['create'] and
139                     (arvargs.work_api == api or arvargs.work_api is None)):
140                     self.work_api = api
141                     break
142             except KeyError:
143                 pass
144
145         if not self.work_api:
146             if arvargs.work_api is None:
147                 raise Exception("No supported APIs")
148             else:
149                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
150
151         if self.work_api == "jobs":
152             logger.warn("""
153 *******************************
154 Using the deprecated 'jobs' API.
155
156 To get rid of this warning:
157
158 Users: read about migrating at
159 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
160 and use the option --api=containers
161
162 Admins: configure the cluster to disable the 'jobs' API as described at:
163 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
164 *******************************""")
165
166         self.loadingContext = ArvLoadingContext(vars(arvargs))
167         self.loadingContext.fetcher_constructor = self.fetcher_constructor
168         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
169         self.loadingContext.construct_tool_object = self.arv_make_tool
170
171         # Add a custom logging handler to the root logger for runtime status reporting
172         # if running inside a container
173         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
174             root_logger = logging.getLogger('')
175             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
176             root_logger.addHandler(handler)
177
178         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
179         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
180                                                      collection_cache=self.collection_cache)
181
182         validate_cluster_target(self, self.runtimeContext)
183
184
185     def arv_make_tool(self, toolpath_object, loadingContext):
186         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
187             return ArvadosCommandTool(self, toolpath_object, loadingContext)
188         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
189             return ArvadosWorkflow(self, toolpath_object, loadingContext)
190         else:
191             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
192
193     def output_callback(self, out, processStatus):
194         with self.workflow_eval_lock:
195             if processStatus == "success":
196                 logger.info("Overall process status is %s", processStatus)
197                 state = "Complete"
198             else:
199                 logger.error("Overall process status is %s", processStatus)
200                 state = "Failed"
201             if self.pipeline:
202                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
203                                                         body={"state": state}).execute(num_retries=self.num_retries)
204             self.final_status = processStatus
205             self.final_output = out
206             self.workflow_eval_lock.notifyAll()
207
208
209     def start_run(self, runnable, runtimeContext):
210         self.task_queue.add(partial(runnable.run, runtimeContext),
211                             self.workflow_eval_lock, self.stop_polling)
212
213     def process_submitted(self, container):
214         with self.workflow_eval_lock:
215             self.processes[container.uuid] = container
216
217     def process_done(self, uuid, record):
218         with self.workflow_eval_lock:
219             j = self.processes[uuid]
220             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
221             self.task_queue.add(partial(j.done, record),
222                                 self.workflow_eval_lock, self.stop_polling)
223             del self.processes[uuid]
224
225     def runtime_status_update(self, kind, message, detail=None):
226         """
227         Updates the runtime_status field on the runner container.
228         Called when there's a need to report errors, warnings or just
229         activity statuses, for example in the RuntimeStatusLoggingHandler.
230         """
231         with self.workflow_eval_lock:
232             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
233             if current is None:
234                 return
235             runtime_status = current.get('runtime_status', {})
236             # In case of status being an error, only report the first one.
237             if kind == 'error':
238                 if not runtime_status.get('error'):
239                     runtime_status.update({
240                         'error': message
241                     })
242                     if detail is not None:
243                         runtime_status.update({
244                             'errorDetail': detail
245                         })
246                 # Further errors are only mentioned as a count.
247                 else:
248                     # Get anything before an optional 'and N more' string.
249                     try:
250                         error_msg = re.match(
251                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
252                         more_failures = re.match(
253                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
254                     except TypeError:
255                         # Ignore tests stubbing errors
256                         return
257                     if more_failures:
258                         failure_qty = int(more_failures.groups()[0])
259                         runtime_status.update({
260                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
261                         })
262                     else:
263                         runtime_status.update({
264                             'error': "%s (and 1 more)" % error_msg
265                         })
266             elif kind in ['warning', 'activity']:
267                 # Record the last warning/activity status without regard of
268                 # previous occurences.
269                 runtime_status.update({
270                     kind: message
271                 })
272                 if detail is not None:
273                     runtime_status.update({
274                         kind+"Detail": detail
275                     })
276             else:
277                 # Ignore any other status kind
278                 return
279             try:
280                 self.api.containers().update(uuid=current['uuid'],
281                                             body={
282                                                 'runtime_status': runtime_status,
283                                             }).execute(num_retries=self.num_retries)
284             except Exception as e:
285                 logger.info("Couldn't update runtime_status: %s", e)
286
287     def wrapped_callback(self, cb, obj, st):
288         with self.workflow_eval_lock:
289             cb(obj, st)
290             self.workflow_eval_lock.notifyAll()
291
292     def get_wrapped_callback(self, cb):
293         return partial(self.wrapped_callback, cb)
294
295     def on_message(self, event):
296         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
297             uuid = event["object_uuid"]
298             if event["properties"]["new_attributes"]["state"] == "Running":
299                 with self.workflow_eval_lock:
300                     j = self.processes[uuid]
301                     if j.running is False:
302                         j.running = True
303                         j.update_pipeline_component(event["properties"]["new_attributes"])
304                         logger.info("%s %s is Running", self.label(j), uuid)
305             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
306                 self.process_done(uuid, event["properties"]["new_attributes"])
307
308     def label(self, obj):
309         return "[%s %s]" % (self.work_api[0:-1], obj.name)
310
311     def poll_states(self):
312         """Poll status of jobs or containers listed in the processes dict.
313
314         Runs in a separate thread.
315         """
316
317         try:
318             remain_wait = self.poll_interval
319             while True:
320                 if remain_wait > 0:
321                     self.stop_polling.wait(remain_wait)
322                 if self.stop_polling.is_set():
323                     break
324                 with self.workflow_eval_lock:
325                     keys = list(self.processes.keys())
326                 if not keys:
327                     remain_wait = self.poll_interval
328                     continue
329
330                 begin_poll = time.time()
331                 if self.work_api == "containers":
332                     table = self.poll_api.container_requests()
333                 elif self.work_api == "jobs":
334                     table = self.poll_api.jobs()
335
336                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
337
338                 while keys:
339                     page = keys[:pageSize]
340                     keys = keys[pageSize:]
341                     try:
342                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
343                     except Exception as e:
344                         logger.warn("Error checking states on API server: %s", e)
345                         remain_wait = self.poll_interval
346                         continue
347
348                     for p in proc_states["items"]:
349                         self.on_message({
350                             "object_uuid": p["uuid"],
351                             "event_type": "update",
352                             "properties": {
353                                 "new_attributes": p
354                             }
355                         })
356                 finish_poll = time.time()
357                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
358         except:
359             logger.exception("Fatal error in state polling thread.")
360             with self.workflow_eval_lock:
361                 self.processes.clear()
362                 self.workflow_eval_lock.notifyAll()
363         finally:
364             self.stop_polling.set()
365
366     def add_intermediate_output(self, uuid):
367         if uuid:
368             self.intermediate_output_collections.append(uuid)
369
370     def trash_intermediate_output(self):
371         logger.info("Cleaning up intermediate output collections")
372         for i in self.intermediate_output_collections:
373             try:
374                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
375             except:
376                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
377             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
378                 break
379
380     def check_features(self, obj):
381         if isinstance(obj, dict):
382             if obj.get("writable") and self.work_api != "containers":
383                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
384             if obj.get("class") == "DockerRequirement":
385                 if obj.get("dockerOutputDirectory"):
386                     if self.work_api != "containers":
387                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
388                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
389                     if not obj.get("dockerOutputDirectory").startswith('/'):
390                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
391                             "Option 'dockerOutputDirectory' must be an absolute path.")
392             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
393                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
394             for v in obj.itervalues():
395                 self.check_features(v)
396         elif isinstance(obj, list):
397             for i,v in enumerate(obj):
398                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
399                     self.check_features(v)
400
401     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
402         outputObj = copy.deepcopy(outputObj)
403
404         files = []
405         def capture(fileobj):
406             files.append(fileobj)
407
408         adjustDirObjs(outputObj, capture)
409         adjustFileObjs(outputObj, capture)
410
411         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
412
413         final = arvados.collection.Collection(api_client=self.api,
414                                               keep_client=self.keep_client,
415                                               num_retries=self.num_retries)
416
417         for k,v in generatemapper.items():
418             if k.startswith("_:"):
419                 if v.type == "Directory":
420                     continue
421                 if v.type == "CreateFile":
422                     with final.open(v.target, "wb") as f:
423                         f.write(v.resolved.encode("utf-8"))
424                     continue
425
426             if not k.startswith("keep:"):
427                 raise Exception("Output source is not in keep or a literal")
428             sp = k.split("/")
429             srccollection = sp[0][5:]
430             try:
431                 reader = self.collection_cache.get(srccollection)
432                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
433                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
434             except arvados.errors.ArgumentError as e:
435                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
436                 raise
437             except IOError as e:
438                 logger.warn("While preparing output collection: %s", e)
439
440         def rewrite(fileobj):
441             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
442             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
443                 if k in fileobj:
444                     del fileobj[k]
445
446         adjustDirObjs(outputObj, rewrite)
447         adjustFileObjs(outputObj, rewrite)
448
449         with final.open("cwl.output.json", "w") as f:
450             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
451
452         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
453
454         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
455                     final.api_response()["name"],
456                     final.manifest_locator())
457
458         final_uuid = final.manifest_locator()
459         tags = tagsString.split(',')
460         for tag in tags:
461              self.api.links().create(body={
462                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
463                 }).execute(num_retries=self.num_retries)
464
465         def finalcollection(fileobj):
466             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
467
468         adjustDirObjs(outputObj, finalcollection)
469         adjustFileObjs(outputObj, finalcollection)
470
471         return (outputObj, final)
472
473     def set_crunch_output(self):
474         if self.work_api == "containers":
475             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
476             if current is None:
477                 return
478             try:
479                 self.api.containers().update(uuid=current['uuid'],
480                                              body={
481                                                  'output': self.final_output_collection.portable_data_hash(),
482                                              }).execute(num_retries=self.num_retries)
483                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
484                                               body={
485                                                   'is_trashed': True
486                                               }).execute(num_retries=self.num_retries)
487             except Exception as e:
488                 logger.info("Setting container output: %s", e)
489         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
490             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
491                                    body={
492                                        'output': self.final_output_collection.portable_data_hash(),
493                                        'success': self.final_status == "success",
494                                        'progress':1.0
495                                    }).execute(num_retries=self.num_retries)
496
497     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
498         self.debug = runtimeContext.debug
499
500         tool.visit(self.check_features)
501
502         self.project_uuid = runtimeContext.project_uuid
503         self.pipeline = None
504         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
505         self.secret_store = runtimeContext.secret_store
506
507         self.trash_intermediate = runtimeContext.trash_intermediate
508         if self.trash_intermediate and self.work_api != "containers":
509             raise Exception("--trash-intermediate is only supported with --api=containers.")
510
511         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
512         if self.intermediate_output_ttl and self.work_api != "containers":
513             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
514         if self.intermediate_output_ttl < 0:
515             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
516
517         if runtimeContext.submit_request_uuid and self.work_api != "containers":
518             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
519
520         if not runtimeContext.name:
521             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
522
523         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
524         # Also uploads docker images.
525         merged_map = upload_workflow_deps(self, tool)
526
527         # Reload tool object which may have been updated by
528         # upload_workflow_deps
529         # Don't validate this time because it will just print redundant errors.
530         loadingContext = self.loadingContext.copy()
531         loadingContext.loader = tool.doc_loader
532         loadingContext.avsc_names = tool.doc_schema
533         loadingContext.metadata = tool.metadata
534         loadingContext.do_validate = False
535
536         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
537                                   loadingContext)
538
539         # Upload local file references in the job order.
540         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
541                                      tool, job_order)
542
543         existing_uuid = runtimeContext.update_workflow
544         if existing_uuid or runtimeContext.create_workflow:
545             # Create a pipeline template or workflow record and exit.
546             if self.work_api == "jobs":
547                 tmpl = RunnerTemplate(self, tool, job_order,
548                                       runtimeContext.enable_reuse,
549                                       uuid=existing_uuid,
550                                       submit_runner_ram=runtimeContext.submit_runner_ram,
551                                       name=runtimeContext.name,
552                                       merged_map=merged_map)
553                 tmpl.save()
554                 # cwltool.main will write our return value to stdout.
555                 return (tmpl.uuid, "success")
556             elif self.work_api == "containers":
557                 return (upload_workflow(self, tool, job_order,
558                                         self.project_uuid,
559                                         uuid=existing_uuid,
560                                         submit_runner_ram=runtimeContext.submit_runner_ram,
561                                         name=runtimeContext.name,
562                                         merged_map=merged_map),
563                         "success")
564
565         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
566         self.eval_timeout = runtimeContext.eval_timeout
567
568         runtimeContext = runtimeContext.copy()
569         runtimeContext.use_container = True
570         runtimeContext.tmpdir_prefix = "tmp"
571         runtimeContext.work_api = self.work_api
572
573         if self.work_api == "containers":
574             if self.ignore_docker_for_reuse:
575                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
576             runtimeContext.outdir = "/var/spool/cwl"
577             runtimeContext.docker_outdir = "/var/spool/cwl"
578             runtimeContext.tmpdir = "/tmp"
579             runtimeContext.docker_tmpdir = "/tmp"
580         elif self.work_api == "jobs":
581             if runtimeContext.priority != DEFAULT_PRIORITY:
582                 raise Exception("--priority not implemented for jobs API.")
583             runtimeContext.outdir = "$(task.outdir)"
584             runtimeContext.docker_outdir = "$(task.outdir)"
585             runtimeContext.tmpdir = "$(task.tmpdir)"
586
587         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
588             raise Exception("--priority must be in the range 1..1000.")
589
590         runnerjob = None
591         if runtimeContext.submit:
592             # Submit a runner job to run the workflow for us.
593             if self.work_api == "containers":
594                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
595                     runtimeContext.runnerjob = tool.tool["id"]
596                     runnerjob = tool.job(job_order,
597                                          self.output_callback,
598                                          runtimeContext).next()
599                 else:
600                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
601                                                 self.output_name,
602                                                 self.output_tags,
603                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
604                                                 name=runtimeContext.name,
605                                                 on_error=runtimeContext.on_error,
606                                                 submit_runner_image=runtimeContext.submit_runner_image,
607                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
608                                                 merged_map=merged_map,
609                                                 priority=runtimeContext.priority,
610                                                 secret_store=self.secret_store)
611             elif self.work_api == "jobs":
612                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
613                                       self.output_name,
614                                       self.output_tags,
615                                       submit_runner_ram=runtimeContext.submit_runner_ram,
616                                       name=runtimeContext.name,
617                                       on_error=runtimeContext.on_error,
618                                       submit_runner_image=runtimeContext.submit_runner_image,
619                                       merged_map=merged_map)
620         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
621             # Create pipeline for local run
622             self.pipeline = self.api.pipeline_instances().create(
623                 body={
624                     "owner_uuid": self.project_uuid,
625                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
626                     "components": {},
627                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
628             logger.info("Pipeline instance %s", self.pipeline["uuid"])
629
630         if runnerjob and not runtimeContext.wait:
631             submitargs = runtimeContext.copy()
632             submitargs.submit = False
633             runnerjob.run(submitargs)
634             return (runnerjob.uuid, "success")
635
636         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
637         if current_container:
638             logger.info("Running inside container %s", current_container.get("uuid"))
639
640         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
641         self.polling_thread = threading.Thread(target=self.poll_states)
642         self.polling_thread.start()
643
644         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
645
646         try:
647             self.workflow_eval_lock.acquire()
648             if runnerjob:
649                 jobiter = iter((runnerjob,))
650             else:
651                 if runtimeContext.cwl_runner_job is not None:
652                     self.uuid = runtimeContext.cwl_runner_job.get('uuid')
653                 jobiter = tool.job(job_order,
654                                    self.output_callback,
655                                    runtimeContext)
656
657             # Holds the lock while this code runs and releases it when
658             # it is safe to do so in self.workflow_eval_lock.wait(),
659             # at which point on_message can update job state and
660             # process output callbacks.
661
662             loopperf = Perf(metrics, "jobiter")
663             loopperf.__enter__()
664             for runnable in jobiter:
665                 loopperf.__exit__()
666
667                 if self.stop_polling.is_set():
668                     break
669
670                 if self.task_queue.error is not None:
671                     raise self.task_queue.error
672
673                 if runnable:
674                     with Perf(metrics, "run"):
675                         self.start_run(runnable, runtimeContext)
676                 else:
677                     if (self.task_queue.in_flight + len(self.processes)) > 0:
678                         self.workflow_eval_lock.wait(3)
679                     else:
680                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
681                         break
682
683                 if self.stop_polling.is_set():
684                     break
685
686                 loopperf.__enter__()
687             loopperf.__exit__()
688
689             while (self.task_queue.in_flight + len(self.processes)) > 0:
690                 if self.task_queue.error is not None:
691                     raise self.task_queue.error
692                 self.workflow_eval_lock.wait(3)
693
694         except UnsupportedRequirement:
695             raise
696         except:
697             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
698                 logger.error("Interrupted, workflow will be cancelled")
699             else:
700                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
701             if self.pipeline:
702                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
703                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
704             if runnerjob and runnerjob.uuid and self.work_api == "containers":
705                 self.api.container_requests().update(uuid=runnerjob.uuid,
706                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
707         finally:
708             self.workflow_eval_lock.release()
709             self.task_queue.drain()
710             self.stop_polling.set()
711             self.polling_thread.join()
712             self.task_queue.join()
713
714         if self.final_status == "UnsupportedRequirement":
715             raise UnsupportedRequirement("Check log for details.")
716
717         if self.final_output is None:
718             raise WorkflowException("Workflow did not return a result.")
719
720         if runtimeContext.submit and isinstance(runnerjob, Runner):
721             logger.info("Final output collection %s", runnerjob.final_output)
722         else:
723             if self.output_name is None:
724                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
725             if self.output_tags is None:
726                 self.output_tags = ""
727
728             storage_classes = runtimeContext.storage_classes.strip().split(",")
729             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
730             self.set_crunch_output()
731
732         if runtimeContext.compute_checksum:
733             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
734             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
735
736         if self.trash_intermediate and self.final_status == "success":
737             self.trash_intermediate_output()
738
739         return (self.final_output, self.final_status)