c6dc716298cc51182c9dee29a685c4e102dd5746
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 import arvados_cwl.util
27 from .arvcontainer import RunnerContainer
28 from .arvjob import RunnerJob, RunnerTemplate
29 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
30 from .arvtool import ArvadosCommandTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
33 from .perf import Perf
34 from .pathmapper import NoFollowPathMapper
35 from .task_queue import TaskQueue
36 from .context import ArvLoadingContext, ArvRuntimeContext
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 DEFAULT_PRIORITY = 500
47
48 class RuntimeStatusLoggingHandler(logging.Handler):
49     """
50     Intercepts logging calls and report them as runtime statuses on runner
51     containers.
52     """
53     def __init__(self, runtime_status_update_func):
54         super(RuntimeStatusLoggingHandler, self).__init__()
55         self.runtime_status_update = runtime_status_update_func
56
57     def emit(self, record):
58         kind = None
59         if record.levelno >= logging.ERROR:
60             kind = 'error'
61         elif record.levelno >= logging.WARNING:
62             kind = 'warning'
63         if kind is not None:
64             log_msg = record.getMessage()
65             if '\n' in log_msg:
66                 # If the logged message is multi-line, use its first line as status
67                 # and the rest as detail.
68                 status, detail = log_msg.split('\n', 1)
69                 self.runtime_status_update(
70                     kind,
71                     "%s: %s" % (record.name, status),
72                     detail
73                 )
74             else:
75                 self.runtime_status_update(
76                     kind,
77                     "%s: %s" % (record.name, record.getMessage())
78                 )
79
80 class ArvCwlExecutor(object):
81     """Execute a CWL tool or workflow, submit work (using either jobs or
82     containers API), wait for them to complete, and report output.
83
84     """
85
86     def __init__(self, api_client,
87                  arvargs=None,
88                  keep_client=None,
89                  num_retries=4,
90                  thread_count=4):
91
92         if arvargs is None:
93             arvargs = argparse.Namespace()
94             arvargs.work_api = None
95             arvargs.output_name = None
96             arvargs.output_tags = None
97             arvargs.thread_count = 1
98
99         self.api = api_client
100         self.processes = {}
101         self.workflow_eval_lock = threading.Condition(threading.RLock())
102         self.final_output = None
103         self.final_status = None
104         self.num_retries = num_retries
105         self.uuid = None
106         self.stop_polling = threading.Event()
107         self.poll_api = None
108         self.pipeline = None
109         self.final_output_collection = None
110         self.output_name = arvargs.output_name
111         self.output_tags = arvargs.output_tags
112         self.project_uuid = None
113         self.intermediate_output_ttl = 0
114         self.intermediate_output_collections = []
115         self.trash_intermediate = False
116         self.thread_count = arvargs.thread_count
117         self.poll_interval = 12
118         self.loadingContext = None
119
120         if keep_client is not None:
121             self.keep_client = keep_client
122         else:
123             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
124
125         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
126
127         self.fetcher_constructor = partial(CollectionFetcher,
128                                            api_client=self.api,
129                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
130                                            num_retries=self.num_retries)
131
132         self.work_api = None
133         expected_api = ["jobs", "containers"]
134         for api in expected_api:
135             try:
136                 methods = self.api._rootDesc.get('resources')[api]['methods']
137                 if ('httpMethod' in methods['create'] and
138                     (arvargs.work_api == api or arvargs.work_api is None)):
139                     self.work_api = api
140                     break
141             except KeyError:
142                 pass
143
144         if not self.work_api:
145             if arvargs.work_api is None:
146                 raise Exception("No supported APIs")
147             else:
148                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
149
150         if self.work_api == "jobs":
151             logger.warn("""
152 *******************************
153 Using the deprecated 'jobs' API.
154
155 To get rid of this warning:
156
157 Users: read about migrating at
158 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
159 and use the option --api=containers
160
161 Admins: configure the cluster to disable the 'jobs' API as described at:
162 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
163 *******************************""")
164
165         self.loadingContext = ArvLoadingContext(vars(arvargs))
166         self.loadingContext.fetcher_constructor = self.fetcher_constructor
167         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
168         self.loadingContext.construct_tool_object = self.arv_make_tool
169
170         # Add a custom logging handler to the root logger for runtime status reporting
171         # if running inside a container
172         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
173             root_logger = logging.getLogger('')
174             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
175             root_logger.addHandler(handler)
176
177         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
178         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
179                                                      collection_cache=self.collection_cache)
180
181
182     def arv_make_tool(self, toolpath_object, loadingContext):
183         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
184             return ArvadosCommandTool(self, toolpath_object, loadingContext)
185         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
186             return ArvadosWorkflow(self, toolpath_object, loadingContext)
187         else:
188             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
189
190     def output_callback(self, out, processStatus):
191         with self.workflow_eval_lock:
192             if processStatus == "success":
193                 logger.info("Overall process status is %s", processStatus)
194                 state = "Complete"
195             else:
196                 logger.error("Overall process status is %s", processStatus)
197                 state = "Failed"
198             if self.pipeline:
199                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
200                                                         body={"state": state}).execute(num_retries=self.num_retries)
201             self.final_status = processStatus
202             self.final_output = out
203             self.workflow_eval_lock.notifyAll()
204
205
206     def start_run(self, runnable, runtimeContext):
207         self.task_queue.add(partial(runnable.run, runtimeContext))
208
209     def process_submitted(self, container):
210         with self.workflow_eval_lock:
211             self.processes[container.uuid] = container
212
213     def process_done(self, uuid, record):
214         with self.workflow_eval_lock:
215             j = self.processes[uuid]
216             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
217             self.task_queue.add(partial(j.done, record))
218             del self.processes[uuid]
219
220     def runtime_status_update(self, kind, message, detail=None):
221         """
222         Updates the runtime_status field on the runner container.
223         Called when there's a need to report errors, warnings or just
224         activity statuses, for example in the RuntimeStatusLoggingHandler.
225         """
226         with self.workflow_eval_lock:
227             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
228             if current is None:
229                 return
230             runtime_status = current.get('runtime_status', {})
231             # In case of status being an error, only report the first one.
232             if kind == 'error':
233                 if not runtime_status.get('error'):
234                     runtime_status.update({
235                         'error': message
236                     })
237                     if detail is not None:
238                         runtime_status.update({
239                             'errorDetail': detail
240                         })
241                 # Further errors are only mentioned as a count.
242                 else:
243                     # Get anything before an optional 'and N more' string.
244                     try:
245                         error_msg = re.match(
246                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
247                         more_failures = re.match(
248                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
249                     except TypeError:
250                         # Ignore tests stubbing errors
251                         return
252                     if more_failures:
253                         failure_qty = int(more_failures.groups()[0])
254                         runtime_status.update({
255                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
256                         })
257                     else:
258                         runtime_status.update({
259                             'error': "%s (and 1 more)" % error_msg
260                         })
261             elif kind in ['warning', 'activity']:
262                 # Record the last warning/activity status without regard of
263                 # previous occurences.
264                 runtime_status.update({
265                     kind: message
266                 })
267                 if detail is not None:
268                     runtime_status.update({
269                         kind+"Detail": detail
270                     })
271             else:
272                 # Ignore any other status kind
273                 return
274             try:
275                 self.api.containers().update(uuid=current['uuid'],
276                                             body={
277                                                 'runtime_status': runtime_status,
278                                             }).execute(num_retries=self.num_retries)
279             except Exception as e:
280                 logger.info("Couldn't update runtime_status: %s", e)
281
282     def wrapped_callback(self, cb, obj, st):
283         with self.workflow_eval_lock:
284             cb(obj, st)
285             self.workflow_eval_lock.notifyAll()
286
287     def get_wrapped_callback(self, cb):
288         return partial(self.wrapped_callback, cb)
289
290     def on_message(self, event):
291         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
292             uuid = event["object_uuid"]
293             if event["properties"]["new_attributes"]["state"] == "Running":
294                 with self.workflow_eval_lock:
295                     j = self.processes[uuid]
296                     if j.running is False:
297                         j.running = True
298                         j.update_pipeline_component(event["properties"]["new_attributes"])
299                         logger.info("%s %s is Running", self.label(j), uuid)
300             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
301                 self.process_done(uuid, event["properties"]["new_attributes"])
302
303     def label(self, obj):
304         return "[%s %s]" % (self.work_api[0:-1], obj.name)
305
306     def poll_states(self):
307         """Poll status of jobs or containers listed in the processes dict.
308
309         Runs in a separate thread.
310         """
311
312         try:
313             remain_wait = self.poll_interval
314             while True:
315                 if remain_wait > 0:
316                     self.stop_polling.wait(remain_wait)
317                 if self.stop_polling.is_set():
318                     break
319                 with self.workflow_eval_lock:
320                     keys = list(self.processes.keys())
321                 if not keys:
322                     remain_wait = self.poll_interval
323                     continue
324
325                 begin_poll = time.time()
326                 if self.work_api == "containers":
327                     table = self.poll_api.container_requests()
328                 elif self.work_api == "jobs":
329                     table = self.poll_api.jobs()
330
331                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
332
333                 while keys:
334                     page = keys[:pageSize]
335                     keys = keys[pageSize:]
336                     try:
337                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
338                     except Exception as e:
339                         logger.warn("Error checking states on API server: %s", e)
340                         remain_wait = self.poll_interval
341                         continue
342
343                     for p in proc_states["items"]:
344                         self.on_message({
345                             "object_uuid": p["uuid"],
346                             "event_type": "update",
347                             "properties": {
348                                 "new_attributes": p
349                             }
350                         })
351                 finish_poll = time.time()
352                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
353         except:
354             logger.exception("Fatal error in state polling thread.")
355             with self.workflow_eval_lock:
356                 self.processes.clear()
357                 self.workflow_eval_lock.notifyAll()
358         finally:
359             self.stop_polling.set()
360
361     def add_intermediate_output(self, uuid):
362         if uuid:
363             self.intermediate_output_collections.append(uuid)
364
365     def trash_intermediate_output(self):
366         logger.info("Cleaning up intermediate output collections")
367         for i in self.intermediate_output_collections:
368             try:
369                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
370             except:
371                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
372             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
373                 break
374
375     def check_features(self, obj):
376         if isinstance(obj, dict):
377             if obj.get("writable") and self.work_api != "containers":
378                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
379             if obj.get("class") == "DockerRequirement":
380                 if obj.get("dockerOutputDirectory"):
381                     if self.work_api != "containers":
382                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
383                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
384                     if not obj.get("dockerOutputDirectory").startswith('/'):
385                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
386                             "Option 'dockerOutputDirectory' must be an absolute path.")
387             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
388                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
389             for v in obj.itervalues():
390                 self.check_features(v)
391         elif isinstance(obj, list):
392             for i,v in enumerate(obj):
393                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
394                     self.check_features(v)
395
396     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
397         outputObj = copy.deepcopy(outputObj)
398
399         files = []
400         def capture(fileobj):
401             files.append(fileobj)
402
403         adjustDirObjs(outputObj, capture)
404         adjustFileObjs(outputObj, capture)
405
406         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
407
408         final = arvados.collection.Collection(api_client=self.api,
409                                               keep_client=self.keep_client,
410                                               num_retries=self.num_retries)
411
412         for k,v in generatemapper.items():
413             if k.startswith("_:"):
414                 if v.type == "Directory":
415                     continue
416                 if v.type == "CreateFile":
417                     with final.open(v.target, "wb") as f:
418                         f.write(v.resolved.encode("utf-8"))
419                     continue
420
421             if not k.startswith("keep:"):
422                 raise Exception("Output source is not in keep or a literal")
423             sp = k.split("/")
424             srccollection = sp[0][5:]
425             try:
426                 reader = self.collection_cache.get(srccollection)
427                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
428                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
429             except arvados.errors.ArgumentError as e:
430                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
431                 raise
432             except IOError as e:
433                 logger.warn("While preparing output collection: %s", e)
434
435         def rewrite(fileobj):
436             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
437             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
438                 if k in fileobj:
439                     del fileobj[k]
440
441         adjustDirObjs(outputObj, rewrite)
442         adjustFileObjs(outputObj, rewrite)
443
444         with final.open("cwl.output.json", "w") as f:
445             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
446
447         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
448
449         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
450                     final.api_response()["name"],
451                     final.manifest_locator())
452
453         final_uuid = final.manifest_locator()
454         tags = tagsString.split(',')
455         for tag in tags:
456              self.api.links().create(body={
457                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
458                 }).execute(num_retries=self.num_retries)
459
460         def finalcollection(fileobj):
461             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
462
463         adjustDirObjs(outputObj, finalcollection)
464         adjustFileObjs(outputObj, finalcollection)
465
466         return (outputObj, final)
467
468     def set_crunch_output(self):
469         if self.work_api == "containers":
470             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
471             if current is None:
472                 return
473             try:
474                 self.api.containers().update(uuid=current['uuid'],
475                                              body={
476                                                  'output': self.final_output_collection.portable_data_hash(),
477                                              }).execute(num_retries=self.num_retries)
478                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
479                                               body={
480                                                   'is_trashed': True
481                                               }).execute(num_retries=self.num_retries)
482             except Exception as e:
483                 logger.info("Setting container output: %s", e)
484         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
485             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
486                                    body={
487                                        'output': self.final_output_collection.portable_data_hash(),
488                                        'success': self.final_status == "success",
489                                        'progress':1.0
490                                    }).execute(num_retries=self.num_retries)
491
492     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
493         self.debug = runtimeContext.debug
494
495         tool.visit(self.check_features)
496
497         self.project_uuid = runtimeContext.project_uuid
498         self.pipeline = None
499         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
500         self.secret_store = runtimeContext.secret_store
501
502         self.trash_intermediate = runtimeContext.trash_intermediate
503         if self.trash_intermediate and self.work_api != "containers":
504             raise Exception("--trash-intermediate is only supported with --api=containers.")
505
506         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
507         if self.intermediate_output_ttl and self.work_api != "containers":
508             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
509         if self.intermediate_output_ttl < 0:
510             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
511
512         if runtimeContext.submit_request_uuid and self.work_api != "containers":
513             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
514
515         if not runtimeContext.name:
516             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
517
518         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
519         # Also uploads docker images.
520         merged_map = upload_workflow_deps(self, tool)
521
522         # Reload tool object which may have been updated by
523         # upload_workflow_deps
524         # Don't validate this time because it will just print redundant errors.
525         loadingContext = self.loadingContext.copy()
526         loadingContext.loader = tool.doc_loader
527         loadingContext.avsc_names = tool.doc_schema
528         loadingContext.metadata = tool.metadata
529         loadingContext.do_validate = False
530
531         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
532                                   loadingContext)
533
534         # Upload local file references in the job order.
535         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
536                                      tool, job_order)
537
538         existing_uuid = runtimeContext.update_workflow
539         if existing_uuid or runtimeContext.create_workflow:
540             # Create a pipeline template or workflow record and exit.
541             if self.work_api == "jobs":
542                 tmpl = RunnerTemplate(self, tool, job_order,
543                                       runtimeContext.enable_reuse,
544                                       uuid=existing_uuid,
545                                       submit_runner_ram=runtimeContext.submit_runner_ram,
546                                       name=runtimeContext.name,
547                                       merged_map=merged_map)
548                 tmpl.save()
549                 # cwltool.main will write our return value to stdout.
550                 return (tmpl.uuid, "success")
551             elif self.work_api == "containers":
552                 return (upload_workflow(self, tool, job_order,
553                                         self.project_uuid,
554                                         uuid=existing_uuid,
555                                         submit_runner_ram=runtimeContext.submit_runner_ram,
556                                         name=runtimeContext.name,
557                                         merged_map=merged_map),
558                         "success")
559
560         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
561         self.eval_timeout = runtimeContext.eval_timeout
562
563         runtimeContext = runtimeContext.copy()
564         runtimeContext.use_container = True
565         runtimeContext.tmpdir_prefix = "tmp"
566         runtimeContext.work_api = self.work_api
567
568         if self.work_api == "containers":
569             if self.ignore_docker_for_reuse:
570                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
571             runtimeContext.outdir = "/var/spool/cwl"
572             runtimeContext.docker_outdir = "/var/spool/cwl"
573             runtimeContext.tmpdir = "/tmp"
574             runtimeContext.docker_tmpdir = "/tmp"
575         elif self.work_api == "jobs":
576             if runtimeContext.priority != DEFAULT_PRIORITY:
577                 raise Exception("--priority not implemented for jobs API.")
578             runtimeContext.outdir = "$(task.outdir)"
579             runtimeContext.docker_outdir = "$(task.outdir)"
580             runtimeContext.tmpdir = "$(task.tmpdir)"
581
582         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
583             raise Exception("--priority must be in the range 1..1000.")
584
585         runnerjob = None
586         if runtimeContext.submit:
587             # Submit a runner job to run the workflow for us.
588             if self.work_api == "containers":
589                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
590                     runtimeContext.runnerjob = tool.tool["id"]
591                     runnerjob = tool.job(job_order,
592                                          self.output_callback,
593                                          runtimeContext).next()
594                 else:
595                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
596                                                 self.output_name,
597                                                 self.output_tags,
598                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
599                                                 name=runtimeContext.name,
600                                                 on_error=runtimeContext.on_error,
601                                                 submit_runner_image=runtimeContext.submit_runner_image,
602                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
603                                                 merged_map=merged_map,
604                                                 priority=runtimeContext.priority,
605                                                 secret_store=self.secret_store)
606             elif self.work_api == "jobs":
607                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
608                                       self.output_name,
609                                       self.output_tags,
610                                       submit_runner_ram=runtimeContext.submit_runner_ram,
611                                       name=runtimeContext.name,
612                                       on_error=runtimeContext.on_error,
613                                       submit_runner_image=runtimeContext.submit_runner_image,
614                                       merged_map=merged_map)
615         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
616             # Create pipeline for local run
617             self.pipeline = self.api.pipeline_instances().create(
618                 body={
619                     "owner_uuid": self.project_uuid,
620                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
621                     "components": {},
622                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
623             logger.info("Pipeline instance %s", self.pipeline["uuid"])
624
625         if runnerjob and not runtimeContext.wait:
626             submitargs = runtimeContext.copy()
627             submitargs.submit = False
628             runnerjob.run(submitargs)
629             return (runnerjob.uuid, "success")
630
631         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
632         if current_container:
633             logger.info("Running inside container %s", current_container.get("uuid"))
634
635         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
636         self.polling_thread = threading.Thread(target=self.poll_states)
637         self.polling_thread.start()
638
639         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
640
641         try:
642             self.workflow_eval_lock.acquire()
643             if runnerjob:
644                 jobiter = iter((runnerjob,))
645             else:
646                 if runtimeContext.cwl_runner_job is not None:
647                     self.uuid = runtimeContext.cwl_runner_job.get('uuid')
648                 jobiter = tool.job(job_order,
649                                    self.output_callback,
650                                    runtimeContext)
651
652             # Holds the lock while this code runs and releases it when
653             # it is safe to do so in self.workflow_eval_lock.wait(),
654             # at which point on_message can update job state and
655             # process output callbacks.
656
657             loopperf = Perf(metrics, "jobiter")
658             loopperf.__enter__()
659             for runnable in jobiter:
660                 loopperf.__exit__()
661
662                 if self.stop_polling.is_set():
663                     break
664
665                 if self.task_queue.error is not None:
666                     raise self.task_queue.error
667
668                 if runnable:
669                     with Perf(metrics, "run"):
670                         self.start_run(runnable, runtimeContext)
671                 else:
672                     if (self.task_queue.in_flight + len(self.processes)) > 0:
673                         self.workflow_eval_lock.wait(3)
674                     else:
675                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
676                         break
677                 loopperf.__enter__()
678             loopperf.__exit__()
679
680             while (self.task_queue.in_flight + len(self.processes)) > 0:
681                 if self.task_queue.error is not None:
682                     raise self.task_queue.error
683                 self.workflow_eval_lock.wait(3)
684
685         except UnsupportedRequirement:
686             raise
687         except:
688             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
689                 logger.error("Interrupted, workflow will be cancelled")
690             else:
691                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
692             if self.pipeline:
693                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
694                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
695             if runnerjob and runnerjob.uuid and self.work_api == "containers":
696                 self.api.container_requests().update(uuid=runnerjob.uuid,
697                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
698         finally:
699             self.workflow_eval_lock.release()
700             self.task_queue.drain()
701             self.stop_polling.set()
702             self.polling_thread.join()
703             self.task_queue.join()
704
705         if self.final_status == "UnsupportedRequirement":
706             raise UnsupportedRequirement("Check log for details.")
707
708         if self.final_output is None:
709             raise WorkflowException("Workflow did not return a result.")
710
711         if runtimeContext.submit and isinstance(runnerjob, Runner):
712             logger.info("Final output collection %s", runnerjob.final_output)
713         else:
714             if self.output_name is None:
715                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
716             if self.output_tags is None:
717                 self.output_tags = ""
718
719             storage_classes = runtimeContext.storage_classes.strip().split(",")
720             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
721             self.set_crunch_output()
722
723         if runtimeContext.compute_checksum:
724             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
725             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
726
727         if self.trash_intermediate and self.final_status == "success":
728             self.trash_intermediate_output()
729
730         return (self.final_output, self.final_status)