14538: Merge branch 'master' into 14538-async-write
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 import arvados_cwl.util
27 from .arvcontainer import RunnerContainer
28 from .arvjob import RunnerJob, RunnerTemplate
29 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
30 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
33 from .perf import Perf
34 from .pathmapper import NoFollowPathMapper
35 from .task_queue import TaskQueue
36 from .context import ArvLoadingContext, ArvRuntimeContext
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 DEFAULT_PRIORITY = 500
47
48 class RuntimeStatusLoggingHandler(logging.Handler):
49     """
50     Intercepts logging calls and report them as runtime statuses on runner
51     containers.
52     """
53     def __init__(self, runtime_status_update_func):
54         super(RuntimeStatusLoggingHandler, self).__init__()
55         self.runtime_status_update = runtime_status_update_func
56
57     def emit(self, record):
58         kind = None
59         if record.levelno >= logging.ERROR:
60             kind = 'error'
61         elif record.levelno >= logging.WARNING:
62             kind = 'warning'
63         if kind is not None:
64             log_msg = record.getMessage()
65             if '\n' in log_msg:
66                 # If the logged message is multi-line, use its first line as status
67                 # and the rest as detail.
68                 status, detail = log_msg.split('\n', 1)
69                 self.runtime_status_update(
70                     kind,
71                     "%s: %s" % (record.name, status),
72                     detail
73                 )
74             else:
75                 self.runtime_status_update(
76                     kind,
77                     "%s: %s" % (record.name, record.getMessage())
78                 )
79
80 class ArvCwlExecutor(object):
81     """Execute a CWL tool or workflow, submit work (using either jobs or
82     containers API), wait for them to complete, and report output.
83
84     """
85
86     def __init__(self, api_client,
87                  arvargs=None,
88                  keep_client=None,
89                  num_retries=4,
90                  thread_count=4):
91
92         if arvargs is None:
93             arvargs = argparse.Namespace()
94             arvargs.work_api = None
95             arvargs.output_name = None
96             arvargs.output_tags = None
97             arvargs.thread_count = 1
98             arvargs.collection_cache_size = None
99
100         self.api = api_client
101         self.processes = {}
102         self.workflow_eval_lock = threading.Condition(threading.RLock())
103         self.final_output = None
104         self.final_status = None
105         self.num_retries = num_retries
106         self.uuid = None
107         self.stop_polling = threading.Event()
108         self.poll_api = None
109         self.pipeline = None
110         self.final_output_collection = None
111         self.output_name = arvargs.output_name
112         self.output_tags = arvargs.output_tags
113         self.project_uuid = None
114         self.intermediate_output_ttl = 0
115         self.intermediate_output_collections = []
116         self.trash_intermediate = False
117         self.thread_count = arvargs.thread_count
118         self.poll_interval = 12
119         self.loadingContext = None
120         self.should_estimate_cache_size = True
121
122         if keep_client is not None:
123             self.keep_client = keep_client
124         else:
125             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
126
127         if arvargs.collection_cache_size:
128             collection_cache_size = arvargs.collection_cache_size*1024*1024
129             self.should_estimate_cache_size = False
130         else:
131             collection_cache_size = 256*1024*1024
132
133         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
134                                                 cap=collection_cache_size)
135
136         self.fetcher_constructor = partial(CollectionFetcher,
137                                            api_client=self.api,
138                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
139                                            num_retries=self.num_retries)
140
141         self.work_api = None
142         expected_api = ["jobs", "containers"]
143         for api in expected_api:
144             try:
145                 methods = self.api._rootDesc.get('resources')[api]['methods']
146                 if ('httpMethod' in methods['create'] and
147                     (arvargs.work_api == api or arvargs.work_api is None)):
148                     self.work_api = api
149                     break
150             except KeyError:
151                 pass
152
153         if not self.work_api:
154             if arvargs.work_api is None:
155                 raise Exception("No supported APIs")
156             else:
157                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
158
159         if self.work_api == "jobs":
160             logger.warn("""
161 *******************************
162 Using the deprecated 'jobs' API.
163
164 To get rid of this warning:
165
166 Users: read about migrating at
167 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
168 and use the option --api=containers
169
170 Admins: configure the cluster to disable the 'jobs' API as described at:
171 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
172 *******************************""")
173
174         self.loadingContext = ArvLoadingContext(vars(arvargs))
175         self.loadingContext.fetcher_constructor = self.fetcher_constructor
176         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
177         self.loadingContext.construct_tool_object = self.arv_make_tool
178
179         # Add a custom logging handler to the root logger for runtime status reporting
180         # if running inside a container
181         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
182             root_logger = logging.getLogger('')
183             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
184             root_logger.addHandler(handler)
185
186         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
187         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
188                                                      collection_cache=self.collection_cache)
189
190         validate_cluster_target(self, self.runtimeContext)
191
192
193     def arv_make_tool(self, toolpath_object, loadingContext):
194         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
195             return ArvadosCommandTool(self, toolpath_object, loadingContext)
196         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
197             return ArvadosWorkflow(self, toolpath_object, loadingContext)
198         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
199             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
200         else:
201             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
202
203     def output_callback(self, out, processStatus):
204         with self.workflow_eval_lock:
205             if processStatus == "success":
206                 logger.info("Overall process status is %s", processStatus)
207                 state = "Complete"
208             else:
209                 logger.error("Overall process status is %s", processStatus)
210                 state = "Failed"
211             if self.pipeline:
212                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
213                                                         body={"state": state}).execute(num_retries=self.num_retries)
214             self.final_status = processStatus
215             self.final_output = out
216             self.workflow_eval_lock.notifyAll()
217
218
219     def start_run(self, runnable, runtimeContext):
220         self.task_queue.add(partial(runnable.run, runtimeContext),
221                             self.workflow_eval_lock, self.stop_polling)
222
223     def process_submitted(self, container):
224         with self.workflow_eval_lock:
225             self.processes[container.uuid] = container
226
227     def process_done(self, uuid, record):
228         with self.workflow_eval_lock:
229             j = self.processes[uuid]
230             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
231             self.task_queue.add(partial(j.done, record),
232                                 self.workflow_eval_lock, self.stop_polling)
233             del self.processes[uuid]
234
235     def runtime_status_update(self, kind, message, detail=None):
236         """
237         Updates the runtime_status field on the runner container.
238         Called when there's a need to report errors, warnings or just
239         activity statuses, for example in the RuntimeStatusLoggingHandler.
240         """
241         with self.workflow_eval_lock:
242             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
243             if current is None:
244                 return
245             runtime_status = current.get('runtime_status', {})
246             # In case of status being an error, only report the first one.
247             if kind == 'error':
248                 if not runtime_status.get('error'):
249                     runtime_status.update({
250                         'error': message
251                     })
252                     if detail is not None:
253                         runtime_status.update({
254                             'errorDetail': detail
255                         })
256                 # Further errors are only mentioned as a count.
257                 else:
258                     # Get anything before an optional 'and N more' string.
259                     try:
260                         error_msg = re.match(
261                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
262                         more_failures = re.match(
263                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
264                     except TypeError:
265                         # Ignore tests stubbing errors
266                         return
267                     if more_failures:
268                         failure_qty = int(more_failures.groups()[0])
269                         runtime_status.update({
270                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
271                         })
272                     else:
273                         runtime_status.update({
274                             'error': "%s (and 1 more)" % error_msg
275                         })
276             elif kind in ['warning', 'activity']:
277                 # Record the last warning/activity status without regard of
278                 # previous occurences.
279                 runtime_status.update({
280                     kind: message
281                 })
282                 if detail is not None:
283                     runtime_status.update({
284                         kind+"Detail": detail
285                     })
286             else:
287                 # Ignore any other status kind
288                 return
289             try:
290                 self.api.containers().update(uuid=current['uuid'],
291                                             body={
292                                                 'runtime_status': runtime_status,
293                                             }).execute(num_retries=self.num_retries)
294             except Exception as e:
295                 logger.info("Couldn't update runtime_status: %s", e)
296
297     def wrapped_callback(self, cb, obj, st):
298         with self.workflow_eval_lock:
299             cb(obj, st)
300             self.workflow_eval_lock.notifyAll()
301
302     def get_wrapped_callback(self, cb):
303         return partial(self.wrapped_callback, cb)
304
305     def on_message(self, event):
306         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
307             uuid = event["object_uuid"]
308             if event["properties"]["new_attributes"]["state"] == "Running":
309                 with self.workflow_eval_lock:
310                     j = self.processes[uuid]
311                     if j.running is False:
312                         j.running = True
313                         j.update_pipeline_component(event["properties"]["new_attributes"])
314                         logger.info("%s %s is Running", self.label(j), uuid)
315             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
316                 self.process_done(uuid, event["properties"]["new_attributes"])
317
318     def label(self, obj):
319         return "[%s %s]" % (self.work_api[0:-1], obj.name)
320
321     def poll_states(self):
322         """Poll status of jobs or containers listed in the processes dict.
323
324         Runs in a separate thread.
325         """
326
327         try:
328             remain_wait = self.poll_interval
329             while True:
330                 if remain_wait > 0:
331                     self.stop_polling.wait(remain_wait)
332                 if self.stop_polling.is_set():
333                     break
334                 with self.workflow_eval_lock:
335                     keys = list(self.processes.keys())
336                 if not keys:
337                     remain_wait = self.poll_interval
338                     continue
339
340                 begin_poll = time.time()
341                 if self.work_api == "containers":
342                     table = self.poll_api.container_requests()
343                 elif self.work_api == "jobs":
344                     table = self.poll_api.jobs()
345
346                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
347
348                 while keys:
349                     page = keys[:pageSize]
350                     keys = keys[pageSize:]
351                     try:
352                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
353                     except Exception as e:
354                         logger.warn("Error checking states on API server: %s", e)
355                         remain_wait = self.poll_interval
356                         continue
357
358                     for p in proc_states["items"]:
359                         self.on_message({
360                             "object_uuid": p["uuid"],
361                             "event_type": "update",
362                             "properties": {
363                                 "new_attributes": p
364                             }
365                         })
366                 finish_poll = time.time()
367                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
368         except:
369             logger.exception("Fatal error in state polling thread.")
370             with self.workflow_eval_lock:
371                 self.processes.clear()
372                 self.workflow_eval_lock.notifyAll()
373         finally:
374             self.stop_polling.set()
375
376     def add_intermediate_output(self, uuid):
377         if uuid:
378             self.intermediate_output_collections.append(uuid)
379
380     def trash_intermediate_output(self):
381         logger.info("Cleaning up intermediate output collections")
382         for i in self.intermediate_output_collections:
383             try:
384                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
385             except:
386                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
387             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
388                 break
389
390     def check_features(self, obj):
391         if isinstance(obj, dict):
392             if obj.get("writable") and self.work_api != "containers":
393                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
394             if obj.get("class") == "DockerRequirement":
395                 if obj.get("dockerOutputDirectory"):
396                     if self.work_api != "containers":
397                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
398                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
399                     if not obj.get("dockerOutputDirectory").startswith('/'):
400                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
401                             "Option 'dockerOutputDirectory' must be an absolute path.")
402             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
403                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
404             for v in obj.itervalues():
405                 self.check_features(v)
406         elif isinstance(obj, list):
407             for i,v in enumerate(obj):
408                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
409                     self.check_features(v)
410
411     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
412         outputObj = copy.deepcopy(outputObj)
413
414         files = []
415         def capture(fileobj):
416             files.append(fileobj)
417
418         adjustDirObjs(outputObj, capture)
419         adjustFileObjs(outputObj, capture)
420
421         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
422
423         final = arvados.collection.Collection(api_client=self.api,
424                                               keep_client=self.keep_client,
425                                               num_retries=self.num_retries)
426
427         for k,v in generatemapper.items():
428             if k.startswith("_:"):
429                 if v.type == "Directory":
430                     continue
431                 if v.type == "CreateFile":
432                     with final.open(v.target, "wb") as f:
433                         f.write(v.resolved.encode("utf-8"))
434                     continue
435
436             if not k.startswith("keep:"):
437                 raise Exception("Output source is not in keep or a literal")
438             sp = k.split("/")
439             srccollection = sp[0][5:]
440             try:
441                 reader = self.collection_cache.get(srccollection)
442                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
443                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
444             except arvados.errors.ArgumentError as e:
445                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
446                 raise
447             except IOError as e:
448                 logger.warn("While preparing output collection: %s", e)
449
450         def rewrite(fileobj):
451             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
452             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
453                 if k in fileobj:
454                     del fileobj[k]
455
456         adjustDirObjs(outputObj, rewrite)
457         adjustFileObjs(outputObj, rewrite)
458
459         with final.open("cwl.output.json", "w") as f:
460             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
461
462         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
463
464         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
465                     final.api_response()["name"],
466                     final.manifest_locator())
467
468         final_uuid = final.manifest_locator()
469         tags = tagsString.split(',')
470         for tag in tags:
471              self.api.links().create(body={
472                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
473                 }).execute(num_retries=self.num_retries)
474
475         def finalcollection(fileobj):
476             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
477
478         adjustDirObjs(outputObj, finalcollection)
479         adjustFileObjs(outputObj, finalcollection)
480
481         return (outputObj, final)
482
483     def set_crunch_output(self):
484         if self.work_api == "containers":
485             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
486             if current is None:
487                 return
488             try:
489                 self.api.containers().update(uuid=current['uuid'],
490                                              body={
491                                                  'output': self.final_output_collection.portable_data_hash(),
492                                              }).execute(num_retries=self.num_retries)
493                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
494                                               body={
495                                                   'is_trashed': True
496                                               }).execute(num_retries=self.num_retries)
497             except Exception as e:
498                 logger.info("Setting container output: %s", e)
499         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
500             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
501                                    body={
502                                        'output': self.final_output_collection.portable_data_hash(),
503                                        'success': self.final_status == "success",
504                                        'progress':1.0
505                                    }).execute(num_retries=self.num_retries)
506
507     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
508         self.debug = runtimeContext.debug
509
510         tool.visit(self.check_features)
511
512         self.project_uuid = runtimeContext.project_uuid
513         self.pipeline = None
514         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
515         self.secret_store = runtimeContext.secret_store
516
517         self.trash_intermediate = runtimeContext.trash_intermediate
518         if self.trash_intermediate and self.work_api != "containers":
519             raise Exception("--trash-intermediate is only supported with --api=containers.")
520
521         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
522         if self.intermediate_output_ttl and self.work_api != "containers":
523             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
524         if self.intermediate_output_ttl < 0:
525             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
526
527         if runtimeContext.submit_request_uuid and self.work_api != "containers":
528             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
529
530         if not runtimeContext.name:
531             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
532
533         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
534         # Also uploads docker images.
535         merged_map = upload_workflow_deps(self, tool)
536
537         # Reload tool object which may have been updated by
538         # upload_workflow_deps
539         # Don't validate this time because it will just print redundant errors.
540         loadingContext = self.loadingContext.copy()
541         loadingContext.loader = tool.doc_loader
542         loadingContext.avsc_names = tool.doc_schema
543         loadingContext.metadata = tool.metadata
544         loadingContext.do_validate = False
545
546         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
547                                   loadingContext)
548
549         # Upload local file references in the job order.
550         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
551                                      tool, job_order)
552
553         existing_uuid = runtimeContext.update_workflow
554         if existing_uuid or runtimeContext.create_workflow:
555             # Create a pipeline template or workflow record and exit.
556             if self.work_api == "jobs":
557                 tmpl = RunnerTemplate(self, tool, job_order,
558                                       runtimeContext.enable_reuse,
559                                       uuid=existing_uuid,
560                                       submit_runner_ram=runtimeContext.submit_runner_ram,
561                                       name=runtimeContext.name,
562                                       merged_map=merged_map,
563                                       loadingContext=loadingContext)
564                 tmpl.save()
565                 # cwltool.main will write our return value to stdout.
566                 return (tmpl.uuid, "success")
567             elif self.work_api == "containers":
568                 return (upload_workflow(self, tool, job_order,
569                                         self.project_uuid,
570                                         uuid=existing_uuid,
571                                         submit_runner_ram=runtimeContext.submit_runner_ram,
572                                         name=runtimeContext.name,
573                                         merged_map=merged_map),
574                         "success")
575
576         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
577         self.eval_timeout = runtimeContext.eval_timeout
578
579         runtimeContext = runtimeContext.copy()
580         runtimeContext.use_container = True
581         runtimeContext.tmpdir_prefix = "tmp"
582         runtimeContext.work_api = self.work_api
583
584         if self.work_api == "containers":
585             if self.ignore_docker_for_reuse:
586                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
587             runtimeContext.outdir = "/var/spool/cwl"
588             runtimeContext.docker_outdir = "/var/spool/cwl"
589             runtimeContext.tmpdir = "/tmp"
590             runtimeContext.docker_tmpdir = "/tmp"
591         elif self.work_api == "jobs":
592             if runtimeContext.priority != DEFAULT_PRIORITY:
593                 raise Exception("--priority not implemented for jobs API.")
594             runtimeContext.outdir = "$(task.outdir)"
595             runtimeContext.docker_outdir = "$(task.outdir)"
596             runtimeContext.tmpdir = "$(task.tmpdir)"
597
598         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
599             raise Exception("--priority must be in the range 1..1000.")
600
601         if self.should_estimate_cache_size:
602             visited = set()
603             estimated_size = [0]
604             def estimate_collection_cache(obj):
605                 if obj.get("location", "").startswith("keep:"):
606                     m = pdh_size.match(obj["location"][5:])
607                     if m and m.group(1) not in visited:
608                         visited.add(m.group(1))
609                         estimated_size[0] += int(m.group(2))
610             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
611             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) / (1024*1024))+1, 256)
612             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
613
614         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
615
616         runnerjob = None
617         if runtimeContext.submit:
618             # Submit a runner job to run the workflow for us.
619             if self.work_api == "containers":
620                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
621                     runtimeContext.runnerjob = tool.tool["id"]
622                 else:
623                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
624                                                 self.output_name,
625                                                 self.output_tags,
626                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
627                                                 name=runtimeContext.name,
628                                                 on_error=runtimeContext.on_error,
629                                                 submit_runner_image=runtimeContext.submit_runner_image,
630                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
631                                                 merged_map=merged_map,
632                                                 priority=runtimeContext.priority,
633                                                 secret_store=self.secret_store,
634                                                 collection_cache_size=runtimeContext.collection_cache_size,
635                                                 collection_cache_is_default=self.should_estimate_cache_size)
636             elif self.work_api == "jobs":
637                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
638                                       self.output_name,
639                                       self.output_tags,
640                                       submit_runner_ram=runtimeContext.submit_runner_ram,
641                                       name=runtimeContext.name,
642                                       on_error=runtimeContext.on_error,
643                                       submit_runner_image=runtimeContext.submit_runner_image,
644                                       merged_map=merged_map)
645         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
646             # Create pipeline for local run
647             self.pipeline = self.api.pipeline_instances().create(
648                 body={
649                     "owner_uuid": self.project_uuid,
650                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
651                     "components": {},
652                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
653             logger.info("Pipeline instance %s", self.pipeline["uuid"])
654
655         if runtimeContext.cwl_runner_job is not None:
656             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
657
658         jobiter = tool.job(job_order,
659                            self.output_callback,
660                            runtimeContext)
661
662         if runtimeContext.submit and not runtimeContext.wait:
663             runnerjob = jobiter.next()
664             runnerjob.run(runtimeContext)
665             return (runnerjob.uuid, "success")
666
667         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
668         if current_container:
669             logger.info("Running inside container %s", current_container.get("uuid"))
670
671         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
672         self.polling_thread = threading.Thread(target=self.poll_states)
673         self.polling_thread.start()
674
675         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
676
677         try:
678             self.workflow_eval_lock.acquire()
679
680             # Holds the lock while this code runs and releases it when
681             # it is safe to do so in self.workflow_eval_lock.wait(),
682             # at which point on_message can update job state and
683             # process output callbacks.
684
685             loopperf = Perf(metrics, "jobiter")
686             loopperf.__enter__()
687             for runnable in jobiter:
688                 loopperf.__exit__()
689
690                 if self.stop_polling.is_set():
691                     break
692
693                 if self.task_queue.error is not None:
694                     raise self.task_queue.error
695
696                 if runnable:
697                     with Perf(metrics, "run"):
698                         self.start_run(runnable, runtimeContext)
699                 else:
700                     if (self.task_queue.in_flight + len(self.processes)) > 0:
701                         self.workflow_eval_lock.wait(3)
702                     else:
703                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
704                         break
705
706                 if self.stop_polling.is_set():
707                     break
708
709                 loopperf.__enter__()
710             loopperf.__exit__()
711
712             while (self.task_queue.in_flight + len(self.processes)) > 0:
713                 if self.task_queue.error is not None:
714                     raise self.task_queue.error
715                 self.workflow_eval_lock.wait(3)
716
717         except UnsupportedRequirement:
718             raise
719         except:
720             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
721                 logger.error("Interrupted, workflow will be cancelled")
722             else:
723                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
724             if self.pipeline:
725                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
726                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
727             if runtimeContext.submit and isinstance(tool, Runner):
728                 runnerjob = tool
729                 if runnerjob.uuid and self.work_api == "containers":
730                     self.api.container_requests().update(uuid=runnerjob.uuid,
731                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
732         finally:
733             self.workflow_eval_lock.release()
734             self.task_queue.drain()
735             self.stop_polling.set()
736             self.polling_thread.join()
737             self.task_queue.join()
738
739         if self.final_status == "UnsupportedRequirement":
740             raise UnsupportedRequirement("Check log for details.")
741
742         if self.final_output is None:
743             raise WorkflowException("Workflow did not return a result.")
744
745         if runtimeContext.submit and isinstance(tool, Runner):
746             logger.info("Final output collection %s", tool.final_output)
747         else:
748             if self.output_name is None:
749                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
750             if self.output_tags is None:
751                 self.output_tags = ""
752
753             storage_classes = runtimeContext.storage_classes.strip().split(",")
754             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
755             self.set_crunch_output()
756
757         if runtimeContext.compute_checksum:
758             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
759             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
760
761         if self.trash_intermediate and self.final_status == "success":
762             self.trash_intermediate_output()
763
764         return (self.final_output, self.final_status)