14198: Resolve to Docker images to PDH and set "http://arvados.org/cwl#dockerCollecti...
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 from .arvcontainer import RunnerContainer
27 from .arvjob import RunnerJob, RunnerTemplate
28 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
29 from .arvtool import ArvadosCommandTool
30 from .arvworkflow import ArvadosWorkflow, upload_workflow
31 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
32 from .perf import Perf
33 from .pathmapper import NoFollowPathMapper
34 from .task_queue import TaskQueue
35 from .context import ArvLoadingContext, ArvRuntimeContext
36 from .util import get_current_container
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 class RuntimeStatusLoggingHandler(logging.Handler):
47     """
48     Intercepts logging calls and report them as runtime statuses on runner
49     containers.
50     """
51     def __init__(self, runtime_status_update_func):
52         super(RuntimeStatusLoggingHandler, self).__init__()
53         self.runtime_status_update = runtime_status_update_func
54
55     def emit(self, record):
56         kind = None
57         if record.levelno >= logging.ERROR:
58             kind = 'error'
59         elif record.levelno >= logging.WARNING:
60             kind = 'warning'
61         if kind is not None:
62             log_msg = record.getMessage()
63             if '\n' in log_msg:
64                 # If the logged message is multi-line, use its first line as status
65                 # and the rest as detail.
66                 status, detail = log_msg.split('\n', 1)
67                 self.runtime_status_update(
68                     kind,
69                     "%s: %s" % (record.name, status),
70                     detail
71                 )
72             else:
73                 self.runtime_status_update(
74                     kind,
75                     "%s: %s" % (record.name, record.getMessage())
76                 )
77
78 class ArvCwlExecutor(object):
79     """Execute a CWL tool or workflow, submit work (using either jobs or
80     containers API), wait for them to complete, and report output.
81
82     """
83
84     def __init__(self, api_client,
85                  arvargs=None,
86                  keep_client=None,
87                  num_retries=4,
88                  thread_count=4):
89
90         if arvargs is None:
91             arvargs = argparse.Namespace()
92             arvargs.work_api = None
93             arvargs.output_name = None
94             arvargs.output_tags = None
95             arvargs.thread_count = 1
96
97         self.api = api_client
98         self.processes = {}
99         self.workflow_eval_lock = threading.Condition(threading.RLock())
100         self.final_output = None
101         self.final_status = None
102         self.num_retries = num_retries
103         self.uuid = None
104         self.stop_polling = threading.Event()
105         self.poll_api = None
106         self.pipeline = None
107         self.final_output_collection = None
108         self.output_name = arvargs.output_name
109         self.output_tags = arvargs.output_tags
110         self.project_uuid = None
111         self.intermediate_output_ttl = 0
112         self.intermediate_output_collections = []
113         self.trash_intermediate = False
114         self.thread_count = arvargs.thread_count
115         self.poll_interval = 12
116         self.loadingContext = None
117
118         if keep_client is not None:
119             self.keep_client = keep_client
120         else:
121             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
122
123         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
124
125         self.fetcher_constructor = partial(CollectionFetcher,
126                                            api_client=self.api,
127                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
128                                            num_retries=self.num_retries)
129
130         self.work_api = None
131         expected_api = ["jobs", "containers"]
132         for api in expected_api:
133             try:
134                 methods = self.api._rootDesc.get('resources')[api]['methods']
135                 if ('httpMethod' in methods['create'] and
136                     (arvargs.work_api == api or arvargs.work_api is None)):
137                     self.work_api = api
138                     break
139             except KeyError:
140                 pass
141
142         if not self.work_api:
143             if arvargs.work_api is None:
144                 raise Exception("No supported APIs")
145             else:
146                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
147
148         if self.work_api == "jobs":
149             logger.warn("""
150 *******************************
151 Using the deprecated 'jobs' API.
152
153 To get rid of this warning:
154
155 Users: read about migrating at
156 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
157 and use the option --api=containers
158
159 Admins: configure the cluster to disable the 'jobs' API as described at:
160 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
161 *******************************""")
162
163         self.loadingContext = ArvLoadingContext(vars(arvargs))
164         self.loadingContext.fetcher_constructor = self.fetcher_constructor
165         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
166         self.loadingContext.construct_tool_object = self.arv_make_tool
167
168         # Add a custom logging handler to the root logger for runtime status reporting
169         # if running inside a container
170         if get_current_container(self.api, self.num_retries, logger):
171             root_logger = logging.getLogger('')
172             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
173             root_logger.addHandler(handler)
174
175         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
176         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
177                                                      collection_cache=self.collection_cache)
178
179
180     def arv_make_tool(self, toolpath_object, loadingContext):
181         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
182             return ArvadosCommandTool(self, toolpath_object, loadingContext)
183         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
184             return ArvadosWorkflow(self, toolpath_object, loadingContext)
185         else:
186             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
187
188     def output_callback(self, out, processStatus):
189         with self.workflow_eval_lock:
190             if processStatus == "success":
191                 logger.info("Overall process status is %s", processStatus)
192                 state = "Complete"
193             else:
194                 logger.error("Overall process status is %s", processStatus)
195                 state = "Failed"
196             if self.pipeline:
197                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
198                                                         body={"state": state}).execute(num_retries=self.num_retries)
199             self.final_status = processStatus
200             self.final_output = out
201             self.workflow_eval_lock.notifyAll()
202
203
204     def start_run(self, runnable, runtimeContext):
205         self.task_queue.add(partial(runnable.run, runtimeContext))
206
207     def process_submitted(self, container):
208         with self.workflow_eval_lock:
209             self.processes[container.uuid] = container
210
211     def process_done(self, uuid, record):
212         with self.workflow_eval_lock:
213             j = self.processes[uuid]
214             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
215             self.task_queue.add(partial(j.done, record))
216             del self.processes[uuid]
217
218     def runtime_status_update(self, kind, message, detail=None):
219         """
220         Updates the runtime_status field on the runner container.
221         Called when there's a need to report errors, warnings or just
222         activity statuses, for example in the RuntimeStatusLoggingHandler.
223         """
224         with self.workflow_eval_lock:
225             current = get_current_container(self.api, self.num_retries, logger)
226             if current is None:
227                 return
228             runtime_status = current.get('runtime_status', {})
229             # In case of status being an error, only report the first one.
230             if kind == 'error':
231                 if not runtime_status.get('error'):
232                     runtime_status.update({
233                         'error': message
234                     })
235                     if detail is not None:
236                         runtime_status.update({
237                             'errorDetail': detail
238                         })
239                 # Further errors are only mentioned as a count.
240                 else:
241                     # Get anything before an optional 'and N more' string.
242                     try:
243                         error_msg = re.match(
244                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
245                         more_failures = re.match(
246                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
247                     except TypeError:
248                         # Ignore tests stubbing errors
249                         return
250                     if more_failures:
251                         failure_qty = int(more_failures.groups()[0])
252                         runtime_status.update({
253                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
254                         })
255                     else:
256                         runtime_status.update({
257                             'error': "%s (and 1 more)" % error_msg
258                         })
259             elif kind in ['warning', 'activity']:
260                 # Record the last warning/activity status without regard of
261                 # previous occurences.
262                 runtime_status.update({
263                     kind: message
264                 })
265                 if detail is not None:
266                     runtime_status.update({
267                         kind+"Detail": detail
268                     })
269             else:
270                 # Ignore any other status kind
271                 return
272             try:
273                 self.api.containers().update(uuid=current['uuid'],
274                                             body={
275                                                 'runtime_status': runtime_status,
276                                             }).execute(num_retries=self.num_retries)
277             except Exception as e:
278                 logger.info("Couldn't update runtime_status: %s", e)
279
280     def wrapped_callback(self, cb, obj, st):
281         with self.workflow_eval_lock:
282             cb(obj, st)
283             self.workflow_eval_lock.notifyAll()
284
285     def get_wrapped_callback(self, cb):
286         return partial(self.wrapped_callback, cb)
287
288     def on_message(self, event):
289         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
290             uuid = event["object_uuid"]
291             if event["properties"]["new_attributes"]["state"] == "Running":
292                 with self.workflow_eval_lock:
293                     j = self.processes[uuid]
294                     if j.running is False:
295                         j.running = True
296                         j.update_pipeline_component(event["properties"]["new_attributes"])
297                         logger.info("%s %s is Running", self.label(j), uuid)
298             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
299                 self.process_done(uuid, event["properties"]["new_attributes"])
300
301     def label(self, obj):
302         return "[%s %s]" % (self.work_api[0:-1], obj.name)
303
304     def poll_states(self):
305         """Poll status of jobs or containers listed in the processes dict.
306
307         Runs in a separate thread.
308         """
309
310         try:
311             remain_wait = self.poll_interval
312             while True:
313                 if remain_wait > 0:
314                     self.stop_polling.wait(remain_wait)
315                 if self.stop_polling.is_set():
316                     break
317                 with self.workflow_eval_lock:
318                     keys = list(self.processes.keys())
319                 if not keys:
320                     remain_wait = self.poll_interval
321                     continue
322
323                 begin_poll = time.time()
324                 if self.work_api == "containers":
325                     table = self.poll_api.container_requests()
326                 elif self.work_api == "jobs":
327                     table = self.poll_api.jobs()
328
329                 try:
330                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
331                 except Exception as e:
332                     logger.warn("Error checking states on API server: %s", e)
333                     remain_wait = self.poll_interval
334                     continue
335
336                 for p in proc_states["items"]:
337                     self.on_message({
338                         "object_uuid": p["uuid"],
339                         "event_type": "update",
340                         "properties": {
341                             "new_attributes": p
342                         }
343                     })
344                 finish_poll = time.time()
345                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
346         except:
347             logger.exception("Fatal error in state polling thread.")
348             with self.workflow_eval_lock:
349                 self.processes.clear()
350                 self.workflow_eval_lock.notifyAll()
351         finally:
352             self.stop_polling.set()
353
354     def add_intermediate_output(self, uuid):
355         if uuid:
356             self.intermediate_output_collections.append(uuid)
357
358     def trash_intermediate_output(self):
359         logger.info("Cleaning up intermediate output collections")
360         for i in self.intermediate_output_collections:
361             try:
362                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
363             except:
364                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
365             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
366                 break
367
368     def check_features(self, obj):
369         if isinstance(obj, dict):
370             if obj.get("writable") and self.work_api != "containers":
371                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
372             if obj.get("class") == "DockerRequirement":
373                 if obj.get("dockerOutputDirectory"):
374                     if self.work_api != "containers":
375                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
376                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
377                     if not obj.get("dockerOutputDirectory").startswith('/'):
378                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
379                             "Option 'dockerOutputDirectory' must be an absolute path.")
380             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
381                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
382             for v in obj.itervalues():
383                 self.check_features(v)
384         elif isinstance(obj, list):
385             for i,v in enumerate(obj):
386                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
387                     self.check_features(v)
388
389     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
390         outputObj = copy.deepcopy(outputObj)
391
392         files = []
393         def capture(fileobj):
394             files.append(fileobj)
395
396         adjustDirObjs(outputObj, capture)
397         adjustFileObjs(outputObj, capture)
398
399         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
400
401         final = arvados.collection.Collection(api_client=self.api,
402                                               keep_client=self.keep_client,
403                                               num_retries=self.num_retries)
404
405         for k,v in generatemapper.items():
406             if k.startswith("_:"):
407                 if v.type == "Directory":
408                     continue
409                 if v.type == "CreateFile":
410                     with final.open(v.target, "wb") as f:
411                         f.write(v.resolved.encode("utf-8"))
412                     continue
413
414             if not k.startswith("keep:"):
415                 raise Exception("Output source is not in keep or a literal")
416             sp = k.split("/")
417             srccollection = sp[0][5:]
418             try:
419                 reader = self.collection_cache.get(srccollection)
420                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
421                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
422             except arvados.errors.ArgumentError as e:
423                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
424                 raise
425             except IOError as e:
426                 logger.warn("While preparing output collection: %s", e)
427
428         def rewrite(fileobj):
429             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
430             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
431                 if k in fileobj:
432                     del fileobj[k]
433
434         adjustDirObjs(outputObj, rewrite)
435         adjustFileObjs(outputObj, rewrite)
436
437         with final.open("cwl.output.json", "w") as f:
438             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
439
440         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
441
442         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
443                     final.api_response()["name"],
444                     final.manifest_locator())
445
446         final_uuid = final.manifest_locator()
447         tags = tagsString.split(',')
448         for tag in tags:
449              self.api.links().create(body={
450                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
451                 }).execute(num_retries=self.num_retries)
452
453         def finalcollection(fileobj):
454             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
455
456         adjustDirObjs(outputObj, finalcollection)
457         adjustFileObjs(outputObj, finalcollection)
458
459         return (outputObj, final)
460
461     def set_crunch_output(self):
462         if self.work_api == "containers":
463             current = get_current_container(self.api, self.num_retries, logger)
464             if current is None:
465                 return
466             try:
467                 self.api.containers().update(uuid=current['uuid'],
468                                              body={
469                                                  'output': self.final_output_collection.portable_data_hash(),
470                                              }).execute(num_retries=self.num_retries)
471                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
472                                               body={
473                                                   'is_trashed': True
474                                               }).execute(num_retries=self.num_retries)
475             except Exception as e:
476                 logger.info("Setting container output: %s", e)
477         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
478             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
479                                    body={
480                                        'output': self.final_output_collection.portable_data_hash(),
481                                        'success': self.final_status == "success",
482                                        'progress':1.0
483                                    }).execute(num_retries=self.num_retries)
484
485     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
486         self.debug = runtimeContext.debug
487
488         tool.visit(self.check_features)
489
490         self.project_uuid = runtimeContext.project_uuid
491         self.pipeline = None
492         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
493         self.secret_store = runtimeContext.secret_store
494
495         self.trash_intermediate = runtimeContext.trash_intermediate
496         if self.trash_intermediate and self.work_api != "containers":
497             raise Exception("--trash-intermediate is only supported with --api=containers.")
498
499         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
500         if self.intermediate_output_ttl and self.work_api != "containers":
501             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
502         if self.intermediate_output_ttl < 0:
503             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
504
505         if runtimeContext.submit_request_uuid and self.work_api != "containers":
506             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
507
508         if not runtimeContext.name:
509             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
510
511         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
512         # Also uploads docker images.
513         merged_map = upload_workflow_deps(self, tool)
514
515         # Reload tool object which may have been updated by
516         # upload_workflow_deps
517         # Don't validate this time because it will just print redundant errors.
518         loadingContext = self.loadingContext.copy()
519         loadingContext.loader = tool.doc_loader
520         loadingContext.avsc_names = tool.doc_schema
521         loadingContext.metadata = tool.metadata
522         loadingContext.do_validate = False
523
524         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
525                                   loadingContext)
526
527         # Upload local file references in the job order.
528         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
529                                      tool, job_order)
530
531         existing_uuid = runtimeContext.update_workflow
532         if existing_uuid or runtimeContext.create_workflow:
533             # Create a pipeline template or workflow record and exit.
534             if self.work_api == "jobs":
535                 tmpl = RunnerTemplate(self, tool, job_order,
536                                       runtimeContext.enable_reuse,
537                                       uuid=existing_uuid,
538                                       submit_runner_ram=runtimeContext.submit_runner_ram,
539                                       name=runtimeContext.name,
540                                       merged_map=merged_map)
541                 tmpl.save()
542                 # cwltool.main will write our return value to stdout.
543                 return (tmpl.uuid, "success")
544             elif self.work_api == "containers":
545                 return (upload_workflow(self, tool, job_order,
546                                         self.project_uuid,
547                                         uuid=existing_uuid,
548                                         submit_runner_ram=runtimeContext.submit_runner_ram,
549                                         name=runtimeContext.name,
550                                         merged_map=merged_map),
551                         "success")
552
553         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
554         self.eval_timeout = runtimeContext.eval_timeout
555
556         runtimeContext = runtimeContext.copy()
557         runtimeContext.use_container = True
558         runtimeContext.tmpdir_prefix = "tmp"
559         runtimeContext.work_api = self.work_api
560
561         if self.work_api == "containers":
562             if self.ignore_docker_for_reuse:
563                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
564             runtimeContext.outdir = "/var/spool/cwl"
565             runtimeContext.docker_outdir = "/var/spool/cwl"
566             runtimeContext.tmpdir = "/tmp"
567             runtimeContext.docker_tmpdir = "/tmp"
568         elif self.work_api == "jobs":
569             if runtimeContext.priority != DEFAULT_PRIORITY:
570                 raise Exception("--priority not implemented for jobs API.")
571             runtimeContext.outdir = "$(task.outdir)"
572             runtimeContext.docker_outdir = "$(task.outdir)"
573             runtimeContext.tmpdir = "$(task.tmpdir)"
574
575         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
576             raise Exception("--priority must be in the range 1..1000.")
577
578         runnerjob = None
579         if runtimeContext.submit:
580             # Submit a runner job to run the workflow for us.
581             if self.work_api == "containers":
582                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
583                     runtimeContext.runnerjob = tool.tool["id"]
584                     runnerjob = tool.job(job_order,
585                                          self.output_callback,
586                                          runtimeContext).next()
587                 else:
588                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
589                                                 self.output_name,
590                                                 self.output_tags,
591                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
592                                                 name=runtimeContext.name,
593                                                 on_error=runtimeContext.on_error,
594                                                 submit_runner_image=runtimeContext.submit_runner_image,
595                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
596                                                 merged_map=merged_map,
597                                                 priority=runtimeContext.priority,
598                                                 secret_store=self.secret_store)
599             elif self.work_api == "jobs":
600                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
601                                       self.output_name,
602                                       self.output_tags,
603                                       submit_runner_ram=runtimeContext.submit_runner_ram,
604                                       name=runtimeContext.name,
605                                       on_error=runtimeContext.on_error,
606                                       submit_runner_image=runtimeContext.submit_runner_image,
607                                       merged_map=merged_map)
608         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
609             # Create pipeline for local run
610             self.pipeline = self.api.pipeline_instances().create(
611                 body={
612                     "owner_uuid": self.project_uuid,
613                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
614                     "components": {},
615                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
616             logger.info("Pipeline instance %s", self.pipeline["uuid"])
617
618         if runnerjob and not runtimeContext.wait:
619             submitargs = runtimeContext.copy()
620             submitargs.submit = False
621             runnerjob.run(submitargs)
622             return (runnerjob.uuid, "success")
623
624         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
625         self.polling_thread = threading.Thread(target=self.poll_states)
626         self.polling_thread.start()
627
628         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
629
630         if runnerjob:
631             jobiter = iter((runnerjob,))
632         else:
633             if runtimeContext.cwl_runner_job is not None:
634                 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
635             jobiter = tool.job(job_order,
636                                self.output_callback,
637                                runtimeContext)
638
639         try:
640             self.workflow_eval_lock.acquire()
641             # Holds the lock while this code runs and releases it when
642             # it is safe to do so in self.workflow_eval_lock.wait(),
643             # at which point on_message can update job state and
644             # process output callbacks.
645
646             loopperf = Perf(metrics, "jobiter")
647             loopperf.__enter__()
648             for runnable in jobiter:
649                 loopperf.__exit__()
650
651                 if self.stop_polling.is_set():
652                     break
653
654                 if self.task_queue.error is not None:
655                     raise self.task_queue.error
656
657                 if runnable:
658                     with Perf(metrics, "run"):
659                         self.start_run(runnable, runtimeContext)
660                 else:
661                     if (self.task_queue.in_flight + len(self.processes)) > 0:
662                         self.workflow_eval_lock.wait(3)
663                     else:
664                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
665                         break
666                 loopperf.__enter__()
667             loopperf.__exit__()
668
669             while (self.task_queue.in_flight + len(self.processes)) > 0:
670                 if self.task_queue.error is not None:
671                     raise self.task_queue.error
672                 self.workflow_eval_lock.wait(3)
673
674         except UnsupportedRequirement:
675             raise
676         except:
677             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
678                 logger.error("Interrupted, workflow will be cancelled")
679             else:
680                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
681             if self.pipeline:
682                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
683                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
684             if runnerjob and runnerjob.uuid and self.work_api == "containers":
685                 self.api.container_requests().update(uuid=runnerjob.uuid,
686                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
687         finally:
688             self.workflow_eval_lock.release()
689             self.task_queue.drain()
690             self.stop_polling.set()
691             self.polling_thread.join()
692             self.task_queue.join()
693
694         if self.final_status == "UnsupportedRequirement":
695             raise UnsupportedRequirement("Check log for details.")
696
697         if self.final_output is None:
698             raise WorkflowException("Workflow did not return a result.")
699
700         if runtimeContext.submit and isinstance(runnerjob, Runner):
701             logger.info("Final output collection %s", runnerjob.final_output)
702         else:
703             if self.output_name is None:
704                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
705             if self.output_tags is None:
706                 self.output_tags = ""
707
708             storage_classes = runtimeContext.storage_classes.strip().split(",")
709             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
710             self.set_crunch_output()
711
712         if runtimeContext.compute_checksum:
713             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
714             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
715
716         if self.trash_intermediate and self.final_status == "success":
717             self.trash_intermediate_output()
718
719         return (self.final_output, self.final_status)