Merge branch '14345-proppatch'
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 import arvados_cwl.util
27 from .arvcontainer import RunnerContainer
28 from .arvjob import RunnerJob, RunnerTemplate
29 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
30 from .arvtool import ArvadosCommandTool, validate_cluster_target
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
33 from .perf import Perf
34 from .pathmapper import NoFollowPathMapper
35 from .task_queue import TaskQueue
36 from .context import ArvLoadingContext, ArvRuntimeContext
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 DEFAULT_PRIORITY = 500
47
48 class RuntimeStatusLoggingHandler(logging.Handler):
49     """
50     Intercepts logging calls and report them as runtime statuses on runner
51     containers.
52     """
53     def __init__(self, runtime_status_update_func):
54         super(RuntimeStatusLoggingHandler, self).__init__()
55         self.runtime_status_update = runtime_status_update_func
56
57     def emit(self, record):
58         kind = None
59         if record.levelno >= logging.ERROR:
60             kind = 'error'
61         elif record.levelno >= logging.WARNING:
62             kind = 'warning'
63         if kind is not None:
64             log_msg = record.getMessage()
65             if '\n' in log_msg:
66                 # If the logged message is multi-line, use its first line as status
67                 # and the rest as detail.
68                 status, detail = log_msg.split('\n', 1)
69                 self.runtime_status_update(
70                     kind,
71                     "%s: %s" % (record.name, status),
72                     detail
73                 )
74             else:
75                 self.runtime_status_update(
76                     kind,
77                     "%s: %s" % (record.name, record.getMessage())
78                 )
79
80 class ArvCwlExecutor(object):
81     """Execute a CWL tool or workflow, submit work (using either jobs or
82     containers API), wait for them to complete, and report output.
83
84     """
85
86     def __init__(self, api_client,
87                  arvargs=None,
88                  keep_client=None,
89                  num_retries=4,
90                  thread_count=4):
91
92         if arvargs is None:
93             arvargs = argparse.Namespace()
94             arvargs.work_api = None
95             arvargs.output_name = None
96             arvargs.output_tags = None
97             arvargs.thread_count = 1
98
99         self.api = api_client
100         self.processes = {}
101         self.workflow_eval_lock = threading.Condition(threading.RLock())
102         self.final_output = None
103         self.final_status = None
104         self.num_retries = num_retries
105         self.uuid = None
106         self.stop_polling = threading.Event()
107         self.poll_api = None
108         self.pipeline = None
109         self.final_output_collection = None
110         self.output_name = arvargs.output_name
111         self.output_tags = arvargs.output_tags
112         self.project_uuid = None
113         self.intermediate_output_ttl = 0
114         self.intermediate_output_collections = []
115         self.trash_intermediate = False
116         self.thread_count = arvargs.thread_count
117         self.poll_interval = 12
118         self.loadingContext = None
119
120         if keep_client is not None:
121             self.keep_client = keep_client
122         else:
123             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
124
125         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
126
127         self.fetcher_constructor = partial(CollectionFetcher,
128                                            api_client=self.api,
129                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
130                                            num_retries=self.num_retries)
131
132         self.work_api = None
133         expected_api = ["jobs", "containers"]
134         for api in expected_api:
135             try:
136                 methods = self.api._rootDesc.get('resources')[api]['methods']
137                 if ('httpMethod' in methods['create'] and
138                     (arvargs.work_api == api or arvargs.work_api is None)):
139                     self.work_api = api
140                     break
141             except KeyError:
142                 pass
143
144         if not self.work_api:
145             if arvargs.work_api is None:
146                 raise Exception("No supported APIs")
147             else:
148                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
149
150         if self.work_api == "jobs":
151             logger.warn("""
152 *******************************
153 Using the deprecated 'jobs' API.
154
155 To get rid of this warning:
156
157 Users: read about migrating at
158 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
159 and use the option --api=containers
160
161 Admins: configure the cluster to disable the 'jobs' API as described at:
162 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
163 *******************************""")
164
165         self.loadingContext = ArvLoadingContext(vars(arvargs))
166         self.loadingContext.fetcher_constructor = self.fetcher_constructor
167         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
168         self.loadingContext.construct_tool_object = self.arv_make_tool
169
170         # Add a custom logging handler to the root logger for runtime status reporting
171         # if running inside a container
172         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
173             root_logger = logging.getLogger('')
174             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
175             root_logger.addHandler(handler)
176
177         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
178         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
179                                                      collection_cache=self.collection_cache)
180
181         validate_cluster_target(self, self.runtimeContext)
182
183
184     def arv_make_tool(self, toolpath_object, loadingContext):
185         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
186             return ArvadosCommandTool(self, toolpath_object, loadingContext)
187         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
188             return ArvadosWorkflow(self, toolpath_object, loadingContext)
189         else:
190             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
191
192     def output_callback(self, out, processStatus):
193         with self.workflow_eval_lock:
194             if processStatus == "success":
195                 logger.info("Overall process status is %s", processStatus)
196                 state = "Complete"
197             else:
198                 logger.error("Overall process status is %s", processStatus)
199                 state = "Failed"
200             if self.pipeline:
201                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
202                                                         body={"state": state}).execute(num_retries=self.num_retries)
203             self.final_status = processStatus
204             self.final_output = out
205             self.workflow_eval_lock.notifyAll()
206
207
208     def start_run(self, runnable, runtimeContext):
209         self.task_queue.add(partial(runnable.run, runtimeContext))
210
211     def process_submitted(self, container):
212         with self.workflow_eval_lock:
213             self.processes[container.uuid] = container
214
215     def process_done(self, uuid, record):
216         with self.workflow_eval_lock:
217             j = self.processes[uuid]
218             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
219             self.task_queue.add(partial(j.done, record))
220             del self.processes[uuid]
221
222     def runtime_status_update(self, kind, message, detail=None):
223         """
224         Updates the runtime_status field on the runner container.
225         Called when there's a need to report errors, warnings or just
226         activity statuses, for example in the RuntimeStatusLoggingHandler.
227         """
228         with self.workflow_eval_lock:
229             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
230             if current is None:
231                 return
232             runtime_status = current.get('runtime_status', {})
233             # In case of status being an error, only report the first one.
234             if kind == 'error':
235                 if not runtime_status.get('error'):
236                     runtime_status.update({
237                         'error': message
238                     })
239                     if detail is not None:
240                         runtime_status.update({
241                             'errorDetail': detail
242                         })
243                 # Further errors are only mentioned as a count.
244                 else:
245                     # Get anything before an optional 'and N more' string.
246                     try:
247                         error_msg = re.match(
248                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
249                         more_failures = re.match(
250                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
251                     except TypeError:
252                         # Ignore tests stubbing errors
253                         return
254                     if more_failures:
255                         failure_qty = int(more_failures.groups()[0])
256                         runtime_status.update({
257                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
258                         })
259                     else:
260                         runtime_status.update({
261                             'error': "%s (and 1 more)" % error_msg
262                         })
263             elif kind in ['warning', 'activity']:
264                 # Record the last warning/activity status without regard of
265                 # previous occurences.
266                 runtime_status.update({
267                     kind: message
268                 })
269                 if detail is not None:
270                     runtime_status.update({
271                         kind+"Detail": detail
272                     })
273             else:
274                 # Ignore any other status kind
275                 return
276             try:
277                 self.api.containers().update(uuid=current['uuid'],
278                                             body={
279                                                 'runtime_status': runtime_status,
280                                             }).execute(num_retries=self.num_retries)
281             except Exception as e:
282                 logger.info("Couldn't update runtime_status: %s", e)
283
284     def wrapped_callback(self, cb, obj, st):
285         with self.workflow_eval_lock:
286             cb(obj, st)
287             self.workflow_eval_lock.notifyAll()
288
289     def get_wrapped_callback(self, cb):
290         return partial(self.wrapped_callback, cb)
291
292     def on_message(self, event):
293         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
294             uuid = event["object_uuid"]
295             if event["properties"]["new_attributes"]["state"] == "Running":
296                 with self.workflow_eval_lock:
297                     j = self.processes[uuid]
298                     if j.running is False:
299                         j.running = True
300                         j.update_pipeline_component(event["properties"]["new_attributes"])
301                         logger.info("%s %s is Running", self.label(j), uuid)
302             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
303                 self.process_done(uuid, event["properties"]["new_attributes"])
304
305     def label(self, obj):
306         return "[%s %s]" % (self.work_api[0:-1], obj.name)
307
308     def poll_states(self):
309         """Poll status of jobs or containers listed in the processes dict.
310
311         Runs in a separate thread.
312         """
313
314         try:
315             remain_wait = self.poll_interval
316             while True:
317                 if remain_wait > 0:
318                     self.stop_polling.wait(remain_wait)
319                 if self.stop_polling.is_set():
320                     break
321                 with self.workflow_eval_lock:
322                     keys = list(self.processes.keys())
323                 if not keys:
324                     remain_wait = self.poll_interval
325                     continue
326
327                 begin_poll = time.time()
328                 if self.work_api == "containers":
329                     table = self.poll_api.container_requests()
330                 elif self.work_api == "jobs":
331                     table = self.poll_api.jobs()
332
333                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
334
335                 while keys:
336                     page = keys[:pageSize]
337                     keys = keys[pageSize:]
338                     try:
339                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
340                     except Exception as e:
341                         logger.warn("Error checking states on API server: %s", e)
342                         remain_wait = self.poll_interval
343                         continue
344
345                     for p in proc_states["items"]:
346                         self.on_message({
347                             "object_uuid": p["uuid"],
348                             "event_type": "update",
349                             "properties": {
350                                 "new_attributes": p
351                             }
352                         })
353                 finish_poll = time.time()
354                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
355         except:
356             logger.exception("Fatal error in state polling thread.")
357             with self.workflow_eval_lock:
358                 self.processes.clear()
359                 self.workflow_eval_lock.notifyAll()
360         finally:
361             self.stop_polling.set()
362
363     def add_intermediate_output(self, uuid):
364         if uuid:
365             self.intermediate_output_collections.append(uuid)
366
367     def trash_intermediate_output(self):
368         logger.info("Cleaning up intermediate output collections")
369         for i in self.intermediate_output_collections:
370             try:
371                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
372             except:
373                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
374             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
375                 break
376
377     def check_features(self, obj):
378         if isinstance(obj, dict):
379             if obj.get("writable") and self.work_api != "containers":
380                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
381             if obj.get("class") == "DockerRequirement":
382                 if obj.get("dockerOutputDirectory"):
383                     if self.work_api != "containers":
384                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
385                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
386                     if not obj.get("dockerOutputDirectory").startswith('/'):
387                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
388                             "Option 'dockerOutputDirectory' must be an absolute path.")
389             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
390                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
391             for v in obj.itervalues():
392                 self.check_features(v)
393         elif isinstance(obj, list):
394             for i,v in enumerate(obj):
395                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
396                     self.check_features(v)
397
398     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
399         outputObj = copy.deepcopy(outputObj)
400
401         files = []
402         def capture(fileobj):
403             files.append(fileobj)
404
405         adjustDirObjs(outputObj, capture)
406         adjustFileObjs(outputObj, capture)
407
408         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
409
410         final = arvados.collection.Collection(api_client=self.api,
411                                               keep_client=self.keep_client,
412                                               num_retries=self.num_retries)
413
414         for k,v in generatemapper.items():
415             if k.startswith("_:"):
416                 if v.type == "Directory":
417                     continue
418                 if v.type == "CreateFile":
419                     with final.open(v.target, "wb") as f:
420                         f.write(v.resolved.encode("utf-8"))
421                     continue
422
423             if not k.startswith("keep:"):
424                 raise Exception("Output source is not in keep or a literal")
425             sp = k.split("/")
426             srccollection = sp[0][5:]
427             try:
428                 reader = self.collection_cache.get(srccollection)
429                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
430                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
431             except arvados.errors.ArgumentError as e:
432                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
433                 raise
434             except IOError as e:
435                 logger.warn("While preparing output collection: %s", e)
436
437         def rewrite(fileobj):
438             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
439             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
440                 if k in fileobj:
441                     del fileobj[k]
442
443         adjustDirObjs(outputObj, rewrite)
444         adjustFileObjs(outputObj, rewrite)
445
446         with final.open("cwl.output.json", "w") as f:
447             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
448
449         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
450
451         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
452                     final.api_response()["name"],
453                     final.manifest_locator())
454
455         final_uuid = final.manifest_locator()
456         tags = tagsString.split(',')
457         for tag in tags:
458              self.api.links().create(body={
459                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
460                 }).execute(num_retries=self.num_retries)
461
462         def finalcollection(fileobj):
463             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
464
465         adjustDirObjs(outputObj, finalcollection)
466         adjustFileObjs(outputObj, finalcollection)
467
468         return (outputObj, final)
469
470     def set_crunch_output(self):
471         if self.work_api == "containers":
472             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
473             if current is None:
474                 return
475             try:
476                 self.api.containers().update(uuid=current['uuid'],
477                                              body={
478                                                  'output': self.final_output_collection.portable_data_hash(),
479                                              }).execute(num_retries=self.num_retries)
480                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
481                                               body={
482                                                   'is_trashed': True
483                                               }).execute(num_retries=self.num_retries)
484             except Exception as e:
485                 logger.info("Setting container output: %s", e)
486         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
487             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
488                                    body={
489                                        'output': self.final_output_collection.portable_data_hash(),
490                                        'success': self.final_status == "success",
491                                        'progress':1.0
492                                    }).execute(num_retries=self.num_retries)
493
494     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
495         self.debug = runtimeContext.debug
496
497         tool.visit(self.check_features)
498
499         self.project_uuid = runtimeContext.project_uuid
500         self.pipeline = None
501         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
502         self.secret_store = runtimeContext.secret_store
503
504         self.trash_intermediate = runtimeContext.trash_intermediate
505         if self.trash_intermediate and self.work_api != "containers":
506             raise Exception("--trash-intermediate is only supported with --api=containers.")
507
508         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
509         if self.intermediate_output_ttl and self.work_api != "containers":
510             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
511         if self.intermediate_output_ttl < 0:
512             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
513
514         if runtimeContext.submit_request_uuid and self.work_api != "containers":
515             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
516
517         if not runtimeContext.name:
518             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
519
520         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
521         # Also uploads docker images.
522         merged_map = upload_workflow_deps(self, tool)
523
524         # Reload tool object which may have been updated by
525         # upload_workflow_deps
526         # Don't validate this time because it will just print redundant errors.
527         loadingContext = self.loadingContext.copy()
528         loadingContext.loader = tool.doc_loader
529         loadingContext.avsc_names = tool.doc_schema
530         loadingContext.metadata = tool.metadata
531         loadingContext.do_validate = False
532
533         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
534                                   loadingContext)
535
536         # Upload local file references in the job order.
537         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
538                                      tool, job_order)
539
540         existing_uuid = runtimeContext.update_workflow
541         if existing_uuid or runtimeContext.create_workflow:
542             # Create a pipeline template or workflow record and exit.
543             if self.work_api == "jobs":
544                 tmpl = RunnerTemplate(self, tool, job_order,
545                                       runtimeContext.enable_reuse,
546                                       uuid=existing_uuid,
547                                       submit_runner_ram=runtimeContext.submit_runner_ram,
548                                       name=runtimeContext.name,
549                                       merged_map=merged_map)
550                 tmpl.save()
551                 # cwltool.main will write our return value to stdout.
552                 return (tmpl.uuid, "success")
553             elif self.work_api == "containers":
554                 return (upload_workflow(self, tool, job_order,
555                                         self.project_uuid,
556                                         uuid=existing_uuid,
557                                         submit_runner_ram=runtimeContext.submit_runner_ram,
558                                         name=runtimeContext.name,
559                                         merged_map=merged_map),
560                         "success")
561
562         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
563         self.eval_timeout = runtimeContext.eval_timeout
564
565         runtimeContext = runtimeContext.copy()
566         runtimeContext.use_container = True
567         runtimeContext.tmpdir_prefix = "tmp"
568         runtimeContext.work_api = self.work_api
569
570         if self.work_api == "containers":
571             if self.ignore_docker_for_reuse:
572                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
573             runtimeContext.outdir = "/var/spool/cwl"
574             runtimeContext.docker_outdir = "/var/spool/cwl"
575             runtimeContext.tmpdir = "/tmp"
576             runtimeContext.docker_tmpdir = "/tmp"
577         elif self.work_api == "jobs":
578             if runtimeContext.priority != DEFAULT_PRIORITY:
579                 raise Exception("--priority not implemented for jobs API.")
580             runtimeContext.outdir = "$(task.outdir)"
581             runtimeContext.docker_outdir = "$(task.outdir)"
582             runtimeContext.tmpdir = "$(task.tmpdir)"
583
584         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
585             raise Exception("--priority must be in the range 1..1000.")
586
587         runnerjob = None
588         if runtimeContext.submit:
589             # Submit a runner job to run the workflow for us.
590             if self.work_api == "containers":
591                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
592                     runtimeContext.runnerjob = tool.tool["id"]
593                     runnerjob = tool.job(job_order,
594                                          self.output_callback,
595                                          runtimeContext).next()
596                 else:
597                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
598                                                 self.output_name,
599                                                 self.output_tags,
600                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
601                                                 name=runtimeContext.name,
602                                                 on_error=runtimeContext.on_error,
603                                                 submit_runner_image=runtimeContext.submit_runner_image,
604                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
605                                                 merged_map=merged_map,
606                                                 priority=runtimeContext.priority,
607                                                 secret_store=self.secret_store)
608             elif self.work_api == "jobs":
609                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
610                                       self.output_name,
611                                       self.output_tags,
612                                       submit_runner_ram=runtimeContext.submit_runner_ram,
613                                       name=runtimeContext.name,
614                                       on_error=runtimeContext.on_error,
615                                       submit_runner_image=runtimeContext.submit_runner_image,
616                                       merged_map=merged_map)
617         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
618             # Create pipeline for local run
619             self.pipeline = self.api.pipeline_instances().create(
620                 body={
621                     "owner_uuid": self.project_uuid,
622                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
623                     "components": {},
624                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
625             logger.info("Pipeline instance %s", self.pipeline["uuid"])
626
627         if runnerjob and not runtimeContext.wait:
628             submitargs = runtimeContext.copy()
629             submitargs.submit = False
630             runnerjob.run(submitargs)
631             return (runnerjob.uuid, "success")
632
633         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
634         if current_container:
635             logger.info("Running inside container %s", current_container.get("uuid"))
636
637         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
638         self.polling_thread = threading.Thread(target=self.poll_states)
639         self.polling_thread.start()
640
641         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
642
643         try:
644             self.workflow_eval_lock.acquire()
645             if runnerjob:
646                 jobiter = iter((runnerjob,))
647             else:
648                 if runtimeContext.cwl_runner_job is not None:
649                     self.uuid = runtimeContext.cwl_runner_job.get('uuid')
650                 jobiter = tool.job(job_order,
651                                    self.output_callback,
652                                    runtimeContext)
653
654             # Holds the lock while this code runs and releases it when
655             # it is safe to do so in self.workflow_eval_lock.wait(),
656             # at which point on_message can update job state and
657             # process output callbacks.
658
659             loopperf = Perf(metrics, "jobiter")
660             loopperf.__enter__()
661             for runnable in jobiter:
662                 loopperf.__exit__()
663
664                 if self.stop_polling.is_set():
665                     break
666
667                 if self.task_queue.error is not None:
668                     raise self.task_queue.error
669
670                 if runnable:
671                     with Perf(metrics, "run"):
672                         self.start_run(runnable, runtimeContext)
673                 else:
674                     if (self.task_queue.in_flight + len(self.processes)) > 0:
675                         self.workflow_eval_lock.wait(3)
676                     else:
677                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
678                         break
679                 loopperf.__enter__()
680             loopperf.__exit__()
681
682             while (self.task_queue.in_flight + len(self.processes)) > 0:
683                 if self.task_queue.error is not None:
684                     raise self.task_queue.error
685                 self.workflow_eval_lock.wait(3)
686
687         except UnsupportedRequirement:
688             raise
689         except:
690             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
691                 logger.error("Interrupted, workflow will be cancelled")
692             else:
693                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
694             if self.pipeline:
695                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
696                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
697             if runnerjob and runnerjob.uuid and self.work_api == "containers":
698                 self.api.container_requests().update(uuid=runnerjob.uuid,
699                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
700         finally:
701             self.workflow_eval_lock.release()
702             self.task_queue.drain()
703             self.stop_polling.set()
704             self.polling_thread.join()
705             self.task_queue.join()
706
707         if self.final_status == "UnsupportedRequirement":
708             raise UnsupportedRequirement("Check log for details.")
709
710         if self.final_output is None:
711             raise WorkflowException("Workflow did not return a result.")
712
713         if runtimeContext.submit and isinstance(runnerjob, Runner):
714             logger.info("Final output collection %s", runnerjob.final_output)
715         else:
716             if self.output_name is None:
717                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
718             if self.output_tags is None:
719                 self.output_tags = ""
720
721             storage_classes = runtimeContext.storage_classes.strip().split(",")
722             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
723             self.set_crunch_output()
724
725         if runtimeContext.compute_checksum:
726             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
727             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
728
729         if self.trash_intermediate and self.final_status == "success":
730             self.trash_intermediate_output()
731
732         return (self.final_output, self.final_status)