caf954fcb038108961c1cf296010ff0468a24fbc
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 import arvados_cwl.util
27 from .arvcontainer import RunnerContainer
28 from .arvjob import RunnerJob, RunnerTemplate
29 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
30 from .arvtool import ArvadosCommandTool, validate_cluster_target
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
33 from .perf import Perf
34 from .pathmapper import NoFollowPathMapper
35 from .task_queue import TaskQueue
36 from .context import ArvLoadingContext, ArvRuntimeContext
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 DEFAULT_PRIORITY = 500
47
48 class RuntimeStatusLoggingHandler(logging.Handler):
49     """
50     Intercepts logging calls and report them as runtime statuses on runner
51     containers.
52     """
53     def __init__(self, runtime_status_update_func):
54         super(RuntimeStatusLoggingHandler, self).__init__()
55         self.runtime_status_update = runtime_status_update_func
56
57     def emit(self, record):
58         kind = None
59         if record.levelno >= logging.ERROR:
60             kind = 'error'
61         elif record.levelno >= logging.WARNING:
62             kind = 'warning'
63         if kind is not None:
64             log_msg = record.getMessage()
65             if '\n' in log_msg:
66                 # If the logged message is multi-line, use its first line as status
67                 # and the rest as detail.
68                 status, detail = log_msg.split('\n', 1)
69                 self.runtime_status_update(
70                     kind,
71                     "%s: %s" % (record.name, status),
72                     detail
73                 )
74             else:
75                 self.runtime_status_update(
76                     kind,
77                     "%s: %s" % (record.name, record.getMessage())
78                 )
79
80 class ArvCwlExecutor(object):
81     """Execute a CWL tool or workflow, submit work (using either jobs or
82     containers API), wait for them to complete, and report output.
83
84     """
85
86     def __init__(self, api_client,
87                  arvargs=None,
88                  keep_client=None,
89                  num_retries=4,
90                  thread_count=4):
91
92         if arvargs is None:
93             arvargs = argparse.Namespace()
94             arvargs.work_api = None
95             arvargs.output_name = None
96             arvargs.output_tags = None
97             arvargs.thread_count = 1
98             arvargs.collection_cache_size = None
99
100         self.api = api_client
101         self.processes = {}
102         self.workflow_eval_lock = threading.Condition(threading.RLock())
103         self.final_output = None
104         self.final_status = None
105         self.num_retries = num_retries
106         self.uuid = None
107         self.stop_polling = threading.Event()
108         self.poll_api = None
109         self.pipeline = None
110         self.final_output_collection = None
111         self.output_name = arvargs.output_name
112         self.output_tags = arvargs.output_tags
113         self.project_uuid = None
114         self.intermediate_output_ttl = 0
115         self.intermediate_output_collections = []
116         self.trash_intermediate = False
117         self.thread_count = arvargs.thread_count
118         self.poll_interval = 12
119         self.loadingContext = None
120
121         if keep_client is not None:
122             self.keep_client = keep_client
123         else:
124             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
125
126         if arvargs.collection_cache_size:
127             collection_cache_size = arvargs.collection_cache_size*1024*1024
128         else:
129             collection_cache_size = 256*1024*1024
130
131         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
132                                                 cap=collection_cache_size)
133
134         self.fetcher_constructor = partial(CollectionFetcher,
135                                            api_client=self.api,
136                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
137                                            num_retries=self.num_retries)
138
139         self.work_api = None
140         expected_api = ["jobs", "containers"]
141         for api in expected_api:
142             try:
143                 methods = self.api._rootDesc.get('resources')[api]['methods']
144                 if ('httpMethod' in methods['create'] and
145                     (arvargs.work_api == api or arvargs.work_api is None)):
146                     self.work_api = api
147                     break
148             except KeyError:
149                 pass
150
151         if not self.work_api:
152             if arvargs.work_api is None:
153                 raise Exception("No supported APIs")
154             else:
155                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
156
157         if self.work_api == "jobs":
158             logger.warn("""
159 *******************************
160 Using the deprecated 'jobs' API.
161
162 To get rid of this warning:
163
164 Users: read about migrating at
165 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
166 and use the option --api=containers
167
168 Admins: configure the cluster to disable the 'jobs' API as described at:
169 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
170 *******************************""")
171
172         self.loadingContext = ArvLoadingContext(vars(arvargs))
173         self.loadingContext.fetcher_constructor = self.fetcher_constructor
174         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
175         self.loadingContext.construct_tool_object = self.arv_make_tool
176
177         # Add a custom logging handler to the root logger for runtime status reporting
178         # if running inside a container
179         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
180             root_logger = logging.getLogger('')
181             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
182             root_logger.addHandler(handler)
183
184         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
185         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
186                                                      collection_cache=self.collection_cache)
187
188         validate_cluster_target(self, self.runtimeContext)
189
190
191     def arv_make_tool(self, toolpath_object, loadingContext):
192         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
193             return ArvadosCommandTool(self, toolpath_object, loadingContext)
194         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
195             return ArvadosWorkflow(self, toolpath_object, loadingContext)
196         else:
197             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
198
199     def output_callback(self, out, processStatus):
200         with self.workflow_eval_lock:
201             if processStatus == "success":
202                 logger.info("Overall process status is %s", processStatus)
203                 state = "Complete"
204             else:
205                 logger.error("Overall process status is %s", processStatus)
206                 state = "Failed"
207             if self.pipeline:
208                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
209                                                         body={"state": state}).execute(num_retries=self.num_retries)
210             self.final_status = processStatus
211             self.final_output = out
212             self.workflow_eval_lock.notifyAll()
213
214
215     def start_run(self, runnable, runtimeContext):
216         self.task_queue.add(partial(runnable.run, runtimeContext),
217                             self.workflow_eval_lock, self.stop_polling)
218
219     def process_submitted(self, container):
220         with self.workflow_eval_lock:
221             self.processes[container.uuid] = container
222
223     def process_done(self, uuid, record):
224         with self.workflow_eval_lock:
225             j = self.processes[uuid]
226             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
227             self.task_queue.add(partial(j.done, record),
228                                 self.workflow_eval_lock, self.stop_polling)
229             del self.processes[uuid]
230
231     def runtime_status_update(self, kind, message, detail=None):
232         """
233         Updates the runtime_status field on the runner container.
234         Called when there's a need to report errors, warnings or just
235         activity statuses, for example in the RuntimeStatusLoggingHandler.
236         """
237         with self.workflow_eval_lock:
238             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
239             if current is None:
240                 return
241             runtime_status = current.get('runtime_status', {})
242             # In case of status being an error, only report the first one.
243             if kind == 'error':
244                 if not runtime_status.get('error'):
245                     runtime_status.update({
246                         'error': message
247                     })
248                     if detail is not None:
249                         runtime_status.update({
250                             'errorDetail': detail
251                         })
252                 # Further errors are only mentioned as a count.
253                 else:
254                     # Get anything before an optional 'and N more' string.
255                     try:
256                         error_msg = re.match(
257                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
258                         more_failures = re.match(
259                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
260                     except TypeError:
261                         # Ignore tests stubbing errors
262                         return
263                     if more_failures:
264                         failure_qty = int(more_failures.groups()[0])
265                         runtime_status.update({
266                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
267                         })
268                     else:
269                         runtime_status.update({
270                             'error': "%s (and 1 more)" % error_msg
271                         })
272             elif kind in ['warning', 'activity']:
273                 # Record the last warning/activity status without regard of
274                 # previous occurences.
275                 runtime_status.update({
276                     kind: message
277                 })
278                 if detail is not None:
279                     runtime_status.update({
280                         kind+"Detail": detail
281                     })
282             else:
283                 # Ignore any other status kind
284                 return
285             try:
286                 self.api.containers().update(uuid=current['uuid'],
287                                             body={
288                                                 'runtime_status': runtime_status,
289                                             }).execute(num_retries=self.num_retries)
290             except Exception as e:
291                 logger.info("Couldn't update runtime_status: %s", e)
292
293     def wrapped_callback(self, cb, obj, st):
294         with self.workflow_eval_lock:
295             cb(obj, st)
296             self.workflow_eval_lock.notifyAll()
297
298     def get_wrapped_callback(self, cb):
299         return partial(self.wrapped_callback, cb)
300
301     def on_message(self, event):
302         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
303             uuid = event["object_uuid"]
304             if event["properties"]["new_attributes"]["state"] == "Running":
305                 with self.workflow_eval_lock:
306                     j = self.processes[uuid]
307                     if j.running is False:
308                         j.running = True
309                         j.update_pipeline_component(event["properties"]["new_attributes"])
310                         logger.info("%s %s is Running", self.label(j), uuid)
311             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
312                 self.process_done(uuid, event["properties"]["new_attributes"])
313
314     def label(self, obj):
315         return "[%s %s]" % (self.work_api[0:-1], obj.name)
316
317     def poll_states(self):
318         """Poll status of jobs or containers listed in the processes dict.
319
320         Runs in a separate thread.
321         """
322
323         try:
324             remain_wait = self.poll_interval
325             while True:
326                 if remain_wait > 0:
327                     self.stop_polling.wait(remain_wait)
328                 if self.stop_polling.is_set():
329                     break
330                 with self.workflow_eval_lock:
331                     keys = list(self.processes.keys())
332                 if not keys:
333                     remain_wait = self.poll_interval
334                     continue
335
336                 begin_poll = time.time()
337                 if self.work_api == "containers":
338                     table = self.poll_api.container_requests()
339                 elif self.work_api == "jobs":
340                     table = self.poll_api.jobs()
341
342                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
343
344                 while keys:
345                     page = keys[:pageSize]
346                     keys = keys[pageSize:]
347                     try:
348                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
349                     except Exception as e:
350                         logger.warn("Error checking states on API server: %s", e)
351                         remain_wait = self.poll_interval
352                         continue
353
354                     for p in proc_states["items"]:
355                         self.on_message({
356                             "object_uuid": p["uuid"],
357                             "event_type": "update",
358                             "properties": {
359                                 "new_attributes": p
360                             }
361                         })
362                 finish_poll = time.time()
363                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
364         except:
365             logger.exception("Fatal error in state polling thread.")
366             with self.workflow_eval_lock:
367                 self.processes.clear()
368                 self.workflow_eval_lock.notifyAll()
369         finally:
370             self.stop_polling.set()
371
372     def add_intermediate_output(self, uuid):
373         if uuid:
374             self.intermediate_output_collections.append(uuid)
375
376     def trash_intermediate_output(self):
377         logger.info("Cleaning up intermediate output collections")
378         for i in self.intermediate_output_collections:
379             try:
380                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
381             except:
382                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
383             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
384                 break
385
386     def check_features(self, obj):
387         if isinstance(obj, dict):
388             if obj.get("writable") and self.work_api != "containers":
389                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
390             if obj.get("class") == "DockerRequirement":
391                 if obj.get("dockerOutputDirectory"):
392                     if self.work_api != "containers":
393                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
394                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
395                     if not obj.get("dockerOutputDirectory").startswith('/'):
396                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
397                             "Option 'dockerOutputDirectory' must be an absolute path.")
398             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
399                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
400             for v in obj.itervalues():
401                 self.check_features(v)
402         elif isinstance(obj, list):
403             for i,v in enumerate(obj):
404                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
405                     self.check_features(v)
406
407     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
408         outputObj = copy.deepcopy(outputObj)
409
410         files = []
411         def capture(fileobj):
412             files.append(fileobj)
413
414         adjustDirObjs(outputObj, capture)
415         adjustFileObjs(outputObj, capture)
416
417         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
418
419         final = arvados.collection.Collection(api_client=self.api,
420                                               keep_client=self.keep_client,
421                                               num_retries=self.num_retries)
422
423         for k,v in generatemapper.items():
424             if k.startswith("_:"):
425                 if v.type == "Directory":
426                     continue
427                 if v.type == "CreateFile":
428                     with final.open(v.target, "wb") as f:
429                         f.write(v.resolved.encode("utf-8"))
430                     continue
431
432             if not k.startswith("keep:"):
433                 raise Exception("Output source is not in keep or a literal")
434             sp = k.split("/")
435             srccollection = sp[0][5:]
436             try:
437                 reader = self.collection_cache.get(srccollection)
438                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
439                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
440             except arvados.errors.ArgumentError as e:
441                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
442                 raise
443             except IOError as e:
444                 logger.warn("While preparing output collection: %s", e)
445
446         def rewrite(fileobj):
447             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
448             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
449                 if k in fileobj:
450                     del fileobj[k]
451
452         adjustDirObjs(outputObj, rewrite)
453         adjustFileObjs(outputObj, rewrite)
454
455         with final.open("cwl.output.json", "w") as f:
456             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
457
458         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
459
460         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
461                     final.api_response()["name"],
462                     final.manifest_locator())
463
464         final_uuid = final.manifest_locator()
465         tags = tagsString.split(',')
466         for tag in tags:
467              self.api.links().create(body={
468                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
469                 }).execute(num_retries=self.num_retries)
470
471         def finalcollection(fileobj):
472             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
473
474         adjustDirObjs(outputObj, finalcollection)
475         adjustFileObjs(outputObj, finalcollection)
476
477         return (outputObj, final)
478
479     def set_crunch_output(self):
480         if self.work_api == "containers":
481             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
482             if current is None:
483                 return
484             try:
485                 self.api.containers().update(uuid=current['uuid'],
486                                              body={
487                                                  'output': self.final_output_collection.portable_data_hash(),
488                                              }).execute(num_retries=self.num_retries)
489                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
490                                               body={
491                                                   'is_trashed': True
492                                               }).execute(num_retries=self.num_retries)
493             except Exception as e:
494                 logger.info("Setting container output: %s", e)
495         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
496             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
497                                    body={
498                                        'output': self.final_output_collection.portable_data_hash(),
499                                        'success': self.final_status == "success",
500                                        'progress':1.0
501                                    }).execute(num_retries=self.num_retries)
502
503     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
504         self.debug = runtimeContext.debug
505
506         tool.visit(self.check_features)
507
508         self.project_uuid = runtimeContext.project_uuid
509         self.pipeline = None
510         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
511         self.secret_store = runtimeContext.secret_store
512
513         self.trash_intermediate = runtimeContext.trash_intermediate
514         if self.trash_intermediate and self.work_api != "containers":
515             raise Exception("--trash-intermediate is only supported with --api=containers.")
516
517         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
518         if self.intermediate_output_ttl and self.work_api != "containers":
519             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
520         if self.intermediate_output_ttl < 0:
521             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
522
523         if runtimeContext.submit_request_uuid and self.work_api != "containers":
524             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
525
526         if not runtimeContext.name:
527             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
528
529         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
530         # Also uploads docker images.
531         merged_map = upload_workflow_deps(self, tool)
532
533         # Reload tool object which may have been updated by
534         # upload_workflow_deps
535         # Don't validate this time because it will just print redundant errors.
536         loadingContext = self.loadingContext.copy()
537         loadingContext.loader = tool.doc_loader
538         loadingContext.avsc_names = tool.doc_schema
539         loadingContext.metadata = tool.metadata
540         loadingContext.do_validate = False
541
542         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
543                                   loadingContext)
544
545         # Upload local file references in the job order.
546         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
547                                      tool, job_order)
548
549         existing_uuid = runtimeContext.update_workflow
550         if existing_uuid or runtimeContext.create_workflow:
551             # Create a pipeline template or workflow record and exit.
552             if self.work_api == "jobs":
553                 tmpl = RunnerTemplate(self, tool, job_order,
554                                       runtimeContext.enable_reuse,
555                                       uuid=existing_uuid,
556                                       submit_runner_ram=runtimeContext.submit_runner_ram,
557                                       name=runtimeContext.name,
558                                       merged_map=merged_map)
559                 tmpl.save()
560                 # cwltool.main will write our return value to stdout.
561                 return (tmpl.uuid, "success")
562             elif self.work_api == "containers":
563                 return (upload_workflow(self, tool, job_order,
564                                         self.project_uuid,
565                                         uuid=existing_uuid,
566                                         submit_runner_ram=runtimeContext.submit_runner_ram,
567                                         name=runtimeContext.name,
568                                         merged_map=merged_map),
569                         "success")
570
571         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
572         self.eval_timeout = runtimeContext.eval_timeout
573
574         runtimeContext = runtimeContext.copy()
575         runtimeContext.use_container = True
576         runtimeContext.tmpdir_prefix = "tmp"
577         runtimeContext.work_api = self.work_api
578
579         if self.work_api == "containers":
580             if self.ignore_docker_for_reuse:
581                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
582             runtimeContext.outdir = "/var/spool/cwl"
583             runtimeContext.docker_outdir = "/var/spool/cwl"
584             runtimeContext.tmpdir = "/tmp"
585             runtimeContext.docker_tmpdir = "/tmp"
586         elif self.work_api == "jobs":
587             if runtimeContext.priority != DEFAULT_PRIORITY:
588                 raise Exception("--priority not implemented for jobs API.")
589             runtimeContext.outdir = "$(task.outdir)"
590             runtimeContext.docker_outdir = "$(task.outdir)"
591             runtimeContext.tmpdir = "$(task.tmpdir)"
592
593         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
594             raise Exception("--priority must be in the range 1..1000.")
595
596         runnerjob = None
597         if runtimeContext.submit:
598             # Submit a runner job to run the workflow for us.
599             if self.work_api == "containers":
600                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
601                     runtimeContext.runnerjob = tool.tool["id"]
602                     runnerjob = tool.job(job_order,
603                                          self.output_callback,
604                                          runtimeContext).next()
605                 else:
606                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
607                                                 self.output_name,
608                                                 self.output_tags,
609                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
610                                                 name=runtimeContext.name,
611                                                 on_error=runtimeContext.on_error,
612                                                 submit_runner_image=runtimeContext.submit_runner_image,
613                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
614                                                 merged_map=merged_map,
615                                                 priority=runtimeContext.priority,
616                                                 secret_store=self.secret_store,
617                                                 collection_cache_size=runtimeContext.collection_cache_size)
618             elif self.work_api == "jobs":
619                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
620                                       self.output_name,
621                                       self.output_tags,
622                                       submit_runner_ram=runtimeContext.submit_runner_ram,
623                                       name=runtimeContext.name,
624                                       on_error=runtimeContext.on_error,
625                                       submit_runner_image=runtimeContext.submit_runner_image,
626                                       merged_map=merged_map)
627         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
628             # Create pipeline for local run
629             self.pipeline = self.api.pipeline_instances().create(
630                 body={
631                     "owner_uuid": self.project_uuid,
632                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
633                     "components": {},
634                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
635             logger.info("Pipeline instance %s", self.pipeline["uuid"])
636
637         if runnerjob and not runtimeContext.wait:
638             submitargs = runtimeContext.copy()
639             submitargs.submit = False
640             runnerjob.run(submitargs)
641             return (runnerjob.uuid, "success")
642
643         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
644         if current_container:
645             logger.info("Running inside container %s", current_container.get("uuid"))
646
647         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
648         self.polling_thread = threading.Thread(target=self.poll_states)
649         self.polling_thread.start()
650
651         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
652
653         try:
654             self.workflow_eval_lock.acquire()
655             if runnerjob:
656                 jobiter = iter((runnerjob,))
657             else:
658                 if runtimeContext.cwl_runner_job is not None:
659                     self.uuid = runtimeContext.cwl_runner_job.get('uuid')
660                 jobiter = tool.job(job_order,
661                                    self.output_callback,
662                                    runtimeContext)
663
664             # Holds the lock while this code runs and releases it when
665             # it is safe to do so in self.workflow_eval_lock.wait(),
666             # at which point on_message can update job state and
667             # process output callbacks.
668
669             loopperf = Perf(metrics, "jobiter")
670             loopperf.__enter__()
671             for runnable in jobiter:
672                 loopperf.__exit__()
673
674                 if self.stop_polling.is_set():
675                     break
676
677                 if self.task_queue.error is not None:
678                     raise self.task_queue.error
679
680                 if runnable:
681                     with Perf(metrics, "run"):
682                         self.start_run(runnable, runtimeContext)
683                 else:
684                     if (self.task_queue.in_flight + len(self.processes)) > 0:
685                         self.workflow_eval_lock.wait(3)
686                     else:
687                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
688                         break
689
690                 if self.stop_polling.is_set():
691                     break
692
693                 loopperf.__enter__()
694             loopperf.__exit__()
695
696             while (self.task_queue.in_flight + len(self.processes)) > 0:
697                 if self.task_queue.error is not None:
698                     raise self.task_queue.error
699                 self.workflow_eval_lock.wait(3)
700
701         except UnsupportedRequirement:
702             raise
703         except:
704             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
705                 logger.error("Interrupted, workflow will be cancelled")
706             else:
707                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
708             if self.pipeline:
709                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
710                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
711             if runnerjob and runnerjob.uuid and self.work_api == "containers":
712                 self.api.container_requests().update(uuid=runnerjob.uuid,
713                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
714         finally:
715             self.workflow_eval_lock.release()
716             self.task_queue.drain()
717             self.stop_polling.set()
718             self.polling_thread.join()
719             self.task_queue.join()
720
721         if self.final_status == "UnsupportedRequirement":
722             raise UnsupportedRequirement("Check log for details.")
723
724         if self.final_output is None:
725             raise WorkflowException("Workflow did not return a result.")
726
727         if runtimeContext.submit and isinstance(runnerjob, Runner):
728             logger.info("Final output collection %s", runnerjob.final_output)
729         else:
730             if self.output_name is None:
731                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
732             if self.output_tags is None:
733                 self.output_tags = ""
734
735             storage_classes = runtimeContext.storage_classes.strip().split(",")
736             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
737             self.set_crunch_output()
738
739         if runtimeContext.compute_checksum:
740             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
741             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
742
743         if self.trash_intermediate and self.final_status == "success":
744             self.trash_intermediate_output()
745
746         return (self.final_output, self.final_status)