Merge branch '14574-expression-fix' refs #14574
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 import arvados_cwl.util
27 from .arvcontainer import RunnerContainer
28 from .arvjob import RunnerJob, RunnerTemplate
29 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
30 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
33 from .perf import Perf
34 from .pathmapper import NoFollowPathMapper
35 from .task_queue import TaskQueue
36 from .context import ArvLoadingContext, ArvRuntimeContext
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 DEFAULT_PRIORITY = 500
47
48 class RuntimeStatusLoggingHandler(logging.Handler):
49     """
50     Intercepts logging calls and report them as runtime statuses on runner
51     containers.
52     """
53     def __init__(self, runtime_status_update_func):
54         super(RuntimeStatusLoggingHandler, self).__init__()
55         self.runtime_status_update = runtime_status_update_func
56
57     def emit(self, record):
58         kind = None
59         if record.levelno >= logging.ERROR:
60             kind = 'error'
61         elif record.levelno >= logging.WARNING:
62             kind = 'warning'
63         if kind is not None:
64             log_msg = record.getMessage()
65             if '\n' in log_msg:
66                 # If the logged message is multi-line, use its first line as status
67                 # and the rest as detail.
68                 status, detail = log_msg.split('\n', 1)
69                 self.runtime_status_update(
70                     kind,
71                     "%s: %s" % (record.name, status),
72                     detail
73                 )
74             else:
75                 self.runtime_status_update(
76                     kind,
77                     "%s: %s" % (record.name, record.getMessage())
78                 )
79
80 class ArvCwlExecutor(object):
81     """Execute a CWL tool or workflow, submit work (using either jobs or
82     containers API), wait for them to complete, and report output.
83
84     """
85
86     def __init__(self, api_client,
87                  arvargs=None,
88                  keep_client=None,
89                  num_retries=4,
90                  thread_count=4):
91
92         if arvargs is None:
93             arvargs = argparse.Namespace()
94             arvargs.work_api = None
95             arvargs.output_name = None
96             arvargs.output_tags = None
97             arvargs.thread_count = 1
98             arvargs.collection_cache_size = None
99
100         self.api = api_client
101         self.processes = {}
102         self.workflow_eval_lock = threading.Condition(threading.RLock())
103         self.final_output = None
104         self.final_status = None
105         self.num_retries = num_retries
106         self.uuid = None
107         self.stop_polling = threading.Event()
108         self.poll_api = None
109         self.pipeline = None
110         self.final_output_collection = None
111         self.output_name = arvargs.output_name
112         self.output_tags = arvargs.output_tags
113         self.project_uuid = None
114         self.intermediate_output_ttl = 0
115         self.intermediate_output_collections = []
116         self.trash_intermediate = False
117         self.thread_count = arvargs.thread_count
118         self.poll_interval = 12
119         self.loadingContext = None
120         self.should_estimate_cache_size = True
121
122         if keep_client is not None:
123             self.keep_client = keep_client
124         else:
125             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
126
127         if arvargs.collection_cache_size:
128             collection_cache_size = arvargs.collection_cache_size*1024*1024
129             self.should_estimate_cache_size = False
130         else:
131             collection_cache_size = 256*1024*1024
132
133         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
134                                                 cap=collection_cache_size)
135
136         self.fetcher_constructor = partial(CollectionFetcher,
137                                            api_client=self.api,
138                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
139                                            num_retries=self.num_retries)
140
141         self.work_api = None
142         expected_api = ["jobs", "containers"]
143         for api in expected_api:
144             try:
145                 methods = self.api._rootDesc.get('resources')[api]['methods']
146                 if ('httpMethod' in methods['create'] and
147                     (arvargs.work_api == api or arvargs.work_api is None)):
148                     self.work_api = api
149                     break
150             except KeyError:
151                 pass
152
153         if not self.work_api:
154             if arvargs.work_api is None:
155                 raise Exception("No supported APIs")
156             else:
157                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
158
159         if self.work_api == "jobs":
160             logger.warn("""
161 *******************************
162 Using the deprecated 'jobs' API.
163
164 To get rid of this warning:
165
166 Users: read about migrating at
167 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
168 and use the option --api=containers
169
170 Admins: configure the cluster to disable the 'jobs' API as described at:
171 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
172 *******************************""")
173
174         self.loadingContext = ArvLoadingContext(vars(arvargs))
175         self.loadingContext.fetcher_constructor = self.fetcher_constructor
176         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
177         self.loadingContext.construct_tool_object = self.arv_make_tool
178
179         # Add a custom logging handler to the root logger for runtime status reporting
180         # if running inside a container
181         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
182             root_logger = logging.getLogger('')
183             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
184             root_logger.addHandler(handler)
185
186         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
187         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
188                                                      collection_cache=self.collection_cache)
189
190         validate_cluster_target(self, self.runtimeContext)
191
192
193     def arv_make_tool(self, toolpath_object, loadingContext):
194         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
195             return ArvadosCommandTool(self, toolpath_object, loadingContext)
196         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
197             return ArvadosWorkflow(self, toolpath_object, loadingContext)
198         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
199             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
200         else:
201             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
202
203     def output_callback(self, out, processStatus):
204         with self.workflow_eval_lock:
205             if processStatus == "success":
206                 logger.info("Overall process status is %s", processStatus)
207                 state = "Complete"
208             else:
209                 logger.error("Overall process status is %s", processStatus)
210                 state = "Failed"
211             if self.pipeline:
212                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
213                                                         body={"state": state}).execute(num_retries=self.num_retries)
214             self.final_status = processStatus
215             self.final_output = out
216             self.workflow_eval_lock.notifyAll()
217
218
219     def start_run(self, runnable, runtimeContext):
220         self.task_queue.add(partial(runnable.run, runtimeContext),
221                             self.workflow_eval_lock, self.stop_polling)
222
223     def process_submitted(self, container):
224         with self.workflow_eval_lock:
225             self.processes[container.uuid] = container
226
227     def process_done(self, uuid, record):
228         with self.workflow_eval_lock:
229             j = self.processes[uuid]
230             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
231             self.task_queue.add(partial(j.done, record),
232                                 self.workflow_eval_lock, self.stop_polling)
233             del self.processes[uuid]
234
235     def runtime_status_update(self, kind, message, detail=None):
236         """
237         Updates the runtime_status field on the runner container.
238         Called when there's a need to report errors, warnings or just
239         activity statuses, for example in the RuntimeStatusLoggingHandler.
240         """
241         with self.workflow_eval_lock:
242             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
243             if current is None:
244                 return
245             runtime_status = current.get('runtime_status', {})
246             # In case of status being an error, only report the first one.
247             if kind == 'error':
248                 if not runtime_status.get('error'):
249                     runtime_status.update({
250                         'error': message
251                     })
252                     if detail is not None:
253                         runtime_status.update({
254                             'errorDetail': detail
255                         })
256                 # Further errors are only mentioned as a count.
257                 else:
258                     # Get anything before an optional 'and N more' string.
259                     try:
260                         error_msg = re.match(
261                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
262                         more_failures = re.match(
263                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
264                     except TypeError:
265                         # Ignore tests stubbing errors
266                         return
267                     if more_failures:
268                         failure_qty = int(more_failures.groups()[0])
269                         runtime_status.update({
270                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
271                         })
272                     else:
273                         runtime_status.update({
274                             'error': "%s (and 1 more)" % error_msg
275                         })
276             elif kind in ['warning', 'activity']:
277                 # Record the last warning/activity status without regard of
278                 # previous occurences.
279                 runtime_status.update({
280                     kind: message
281                 })
282                 if detail is not None:
283                     runtime_status.update({
284                         kind+"Detail": detail
285                     })
286             else:
287                 # Ignore any other status kind
288                 return
289             try:
290                 self.api.containers().update(uuid=current['uuid'],
291                                             body={
292                                                 'runtime_status': runtime_status,
293                                             }).execute(num_retries=self.num_retries)
294             except Exception as e:
295                 logger.info("Couldn't update runtime_status: %s", e)
296
297     def wrapped_callback(self, cb, obj, st):
298         with self.workflow_eval_lock:
299             cb(obj, st)
300             self.workflow_eval_lock.notifyAll()
301
302     def get_wrapped_callback(self, cb):
303         return partial(self.wrapped_callback, cb)
304
305     def on_message(self, event):
306         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
307             uuid = event["object_uuid"]
308             if event["properties"]["new_attributes"]["state"] == "Running":
309                 with self.workflow_eval_lock:
310                     j = self.processes[uuid]
311                     if j.running is False:
312                         j.running = True
313                         j.update_pipeline_component(event["properties"]["new_attributes"])
314                         logger.info("%s %s is Running", self.label(j), uuid)
315             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
316                 self.process_done(uuid, event["properties"]["new_attributes"])
317
318     def label(self, obj):
319         return "[%s %s]" % (self.work_api[0:-1], obj.name)
320
321     def poll_states(self):
322         """Poll status of jobs or containers listed in the processes dict.
323
324         Runs in a separate thread.
325         """
326
327         try:
328             remain_wait = self.poll_interval
329             while True:
330                 if remain_wait > 0:
331                     self.stop_polling.wait(remain_wait)
332                 if self.stop_polling.is_set():
333                     break
334                 with self.workflow_eval_lock:
335                     keys = list(self.processes.keys())
336                 if not keys:
337                     remain_wait = self.poll_interval
338                     continue
339
340                 begin_poll = time.time()
341                 if self.work_api == "containers":
342                     table = self.poll_api.container_requests()
343                 elif self.work_api == "jobs":
344                     table = self.poll_api.jobs()
345
346                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
347
348                 while keys:
349                     page = keys[:pageSize]
350                     keys = keys[pageSize:]
351                     try:
352                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
353                     except Exception as e:
354                         logger.warn("Error checking states on API server: %s", e)
355                         remain_wait = self.poll_interval
356                         continue
357
358                     for p in proc_states["items"]:
359                         self.on_message({
360                             "object_uuid": p["uuid"],
361                             "event_type": "update",
362                             "properties": {
363                                 "new_attributes": p
364                             }
365                         })
366                 finish_poll = time.time()
367                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
368         except:
369             logger.exception("Fatal error in state polling thread.")
370             with self.workflow_eval_lock:
371                 self.processes.clear()
372                 self.workflow_eval_lock.notifyAll()
373         finally:
374             self.stop_polling.set()
375
376     def add_intermediate_output(self, uuid):
377         if uuid:
378             self.intermediate_output_collections.append(uuid)
379
380     def trash_intermediate_output(self):
381         logger.info("Cleaning up intermediate output collections")
382         for i in self.intermediate_output_collections:
383             try:
384                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
385             except:
386                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
387             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
388                 break
389
390     def check_features(self, obj):
391         if isinstance(obj, dict):
392             if obj.get("writable") and self.work_api != "containers":
393                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
394             if obj.get("class") == "DockerRequirement":
395                 if obj.get("dockerOutputDirectory"):
396                     if self.work_api != "containers":
397                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
398                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
399                     if not obj.get("dockerOutputDirectory").startswith('/'):
400                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
401                             "Option 'dockerOutputDirectory' must be an absolute path.")
402             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
403                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
404             for v in obj.itervalues():
405                 self.check_features(v)
406         elif isinstance(obj, list):
407             for i,v in enumerate(obj):
408                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
409                     self.check_features(v)
410
411     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
412         outputObj = copy.deepcopy(outputObj)
413
414         files = []
415         def capture(fileobj):
416             files.append(fileobj)
417
418         adjustDirObjs(outputObj, capture)
419         adjustFileObjs(outputObj, capture)
420
421         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
422
423         final = arvados.collection.Collection(api_client=self.api,
424                                               keep_client=self.keep_client,
425                                               num_retries=self.num_retries)
426
427         for k,v in generatemapper.items():
428             if k.startswith("_:"):
429                 if v.type == "Directory":
430                     continue
431                 if v.type == "CreateFile":
432                     with final.open(v.target, "wb") as f:
433                         f.write(v.resolved.encode("utf-8"))
434                     continue
435
436             if not k.startswith("keep:"):
437                 raise Exception("Output source is not in keep or a literal")
438             sp = k.split("/")
439             srccollection = sp[0][5:]
440             try:
441                 reader = self.collection_cache.get(srccollection)
442                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
443                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
444             except arvados.errors.ArgumentError as e:
445                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
446                 raise
447             except IOError as e:
448                 logger.warn("While preparing output collection: %s", e)
449
450         def rewrite(fileobj):
451             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
452             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
453                 if k in fileobj:
454                     del fileobj[k]
455
456         adjustDirObjs(outputObj, rewrite)
457         adjustFileObjs(outputObj, rewrite)
458
459         with final.open("cwl.output.json", "w") as f:
460             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
461
462         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
463
464         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
465                     final.api_response()["name"],
466                     final.manifest_locator())
467
468         final_uuid = final.manifest_locator()
469         tags = tagsString.split(',')
470         for tag in tags:
471              self.api.links().create(body={
472                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
473                 }).execute(num_retries=self.num_retries)
474
475         def finalcollection(fileobj):
476             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
477
478         adjustDirObjs(outputObj, finalcollection)
479         adjustFileObjs(outputObj, finalcollection)
480
481         return (outputObj, final)
482
483     def set_crunch_output(self):
484         if self.work_api == "containers":
485             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
486             if current is None:
487                 return
488             try:
489                 self.api.containers().update(uuid=current['uuid'],
490                                              body={
491                                                  'output': self.final_output_collection.portable_data_hash(),
492                                              }).execute(num_retries=self.num_retries)
493                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
494                                               body={
495                                                   'is_trashed': True
496                                               }).execute(num_retries=self.num_retries)
497             except Exception as e:
498                 logger.info("Setting container output: %s", e)
499         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
500             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
501                                    body={
502                                        'output': self.final_output_collection.portable_data_hash(),
503                                        'success': self.final_status == "success",
504                                        'progress':1.0
505                                    }).execute(num_retries=self.num_retries)
506
507     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
508         self.debug = runtimeContext.debug
509
510         tool.visit(self.check_features)
511
512         self.project_uuid = runtimeContext.project_uuid
513         self.pipeline = None
514         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
515         self.secret_store = runtimeContext.secret_store
516
517         self.trash_intermediate = runtimeContext.trash_intermediate
518         if self.trash_intermediate and self.work_api != "containers":
519             raise Exception("--trash-intermediate is only supported with --api=containers.")
520
521         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
522         if self.intermediate_output_ttl and self.work_api != "containers":
523             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
524         if self.intermediate_output_ttl < 0:
525             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
526
527         if runtimeContext.submit_request_uuid and self.work_api != "containers":
528             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
529
530         if not runtimeContext.name:
531             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
532
533         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
534         # Also uploads docker images.
535         merged_map = upload_workflow_deps(self, tool)
536
537         # Reload tool object which may have been updated by
538         # upload_workflow_deps
539         # Don't validate this time because it will just print redundant errors.
540         loadingContext = self.loadingContext.copy()
541         loadingContext.loader = tool.doc_loader
542         loadingContext.avsc_names = tool.doc_schema
543         loadingContext.metadata = tool.metadata
544         loadingContext.do_validate = False
545
546         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
547                                   loadingContext)
548
549         # Upload local file references in the job order.
550         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
551                                      tool, job_order)
552
553         existing_uuid = runtimeContext.update_workflow
554         if existing_uuid or runtimeContext.create_workflow:
555             # Create a pipeline template or workflow record and exit.
556             if self.work_api == "jobs":
557                 tmpl = RunnerTemplate(self, tool, job_order,
558                                       runtimeContext.enable_reuse,
559                                       uuid=existing_uuid,
560                                       submit_runner_ram=runtimeContext.submit_runner_ram,
561                                       name=runtimeContext.name,
562                                       merged_map=merged_map)
563                 tmpl.save()
564                 # cwltool.main will write our return value to stdout.
565                 return (tmpl.uuid, "success")
566             elif self.work_api == "containers":
567                 return (upload_workflow(self, tool, job_order,
568                                         self.project_uuid,
569                                         uuid=existing_uuid,
570                                         submit_runner_ram=runtimeContext.submit_runner_ram,
571                                         name=runtimeContext.name,
572                                         merged_map=merged_map),
573                         "success")
574
575         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
576         self.eval_timeout = runtimeContext.eval_timeout
577
578         runtimeContext = runtimeContext.copy()
579         runtimeContext.use_container = True
580         runtimeContext.tmpdir_prefix = "tmp"
581         runtimeContext.work_api = self.work_api
582
583         if self.work_api == "containers":
584             if self.ignore_docker_for_reuse:
585                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
586             runtimeContext.outdir = "/var/spool/cwl"
587             runtimeContext.docker_outdir = "/var/spool/cwl"
588             runtimeContext.tmpdir = "/tmp"
589             runtimeContext.docker_tmpdir = "/tmp"
590         elif self.work_api == "jobs":
591             if runtimeContext.priority != DEFAULT_PRIORITY:
592                 raise Exception("--priority not implemented for jobs API.")
593             runtimeContext.outdir = "$(task.outdir)"
594             runtimeContext.docker_outdir = "$(task.outdir)"
595             runtimeContext.tmpdir = "$(task.tmpdir)"
596
597         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
598             raise Exception("--priority must be in the range 1..1000.")
599
600         if self.should_estimate_cache_size:
601             visited = set()
602             estimated_size = [0]
603             def estimate_collection_cache(obj):
604                 if obj.get("location", "").startswith("keep:"):
605                     m = pdh_size.match(obj["location"][5:])
606                     if m and m.group(1) not in visited:
607                         visited.add(m.group(1))
608                         estimated_size[0] += int(m.group(2))
609             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
610             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) / (1024*1024))+1, 256)
611             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
612
613         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
614
615         runnerjob = None
616         if runtimeContext.submit:
617             # Submit a runner job to run the workflow for us.
618             if self.work_api == "containers":
619                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
620                     runtimeContext.runnerjob = tool.tool["id"]
621                     runnerjob = tool.job(job_order,
622                                          self.output_callback,
623                                          runtimeContext).next()
624                 else:
625                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
626                                                 self.output_name,
627                                                 self.output_tags,
628                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
629                                                 name=runtimeContext.name,
630                                                 on_error=runtimeContext.on_error,
631                                                 submit_runner_image=runtimeContext.submit_runner_image,
632                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
633                                                 merged_map=merged_map,
634                                                 priority=runtimeContext.priority,
635                                                 secret_store=self.secret_store,
636                                                 collection_cache_size=runtimeContext.collection_cache_size,
637                                                 collection_cache_is_default=self.should_estimate_cache_size)
638             elif self.work_api == "jobs":
639                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
640                                       self.output_name,
641                                       self.output_tags,
642                                       submit_runner_ram=runtimeContext.submit_runner_ram,
643                                       name=runtimeContext.name,
644                                       on_error=runtimeContext.on_error,
645                                       submit_runner_image=runtimeContext.submit_runner_image,
646                                       merged_map=merged_map)
647         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
648             # Create pipeline for local run
649             self.pipeline = self.api.pipeline_instances().create(
650                 body={
651                     "owner_uuid": self.project_uuid,
652                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
653                     "components": {},
654                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
655             logger.info("Pipeline instance %s", self.pipeline["uuid"])
656
657         if runnerjob and not runtimeContext.wait:
658             submitargs = runtimeContext.copy()
659             submitargs.submit = False
660             runnerjob.run(submitargs)
661             return (runnerjob.uuid, "success")
662
663         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
664         if current_container:
665             logger.info("Running inside container %s", current_container.get("uuid"))
666
667         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
668         self.polling_thread = threading.Thread(target=self.poll_states)
669         self.polling_thread.start()
670
671         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
672
673         try:
674             self.workflow_eval_lock.acquire()
675             if runnerjob:
676                 jobiter = iter((runnerjob,))
677             else:
678                 if runtimeContext.cwl_runner_job is not None:
679                     self.uuid = runtimeContext.cwl_runner_job.get('uuid')
680                 jobiter = tool.job(job_order,
681                                    self.output_callback,
682                                    runtimeContext)
683
684             # Holds the lock while this code runs and releases it when
685             # it is safe to do so in self.workflow_eval_lock.wait(),
686             # at which point on_message can update job state and
687             # process output callbacks.
688
689             loopperf = Perf(metrics, "jobiter")
690             loopperf.__enter__()
691             for runnable in jobiter:
692                 loopperf.__exit__()
693
694                 if self.stop_polling.is_set():
695                     break
696
697                 if self.task_queue.error is not None:
698                     raise self.task_queue.error
699
700                 if runnable:
701                     with Perf(metrics, "run"):
702                         self.start_run(runnable, runtimeContext)
703                 else:
704                     if (self.task_queue.in_flight + len(self.processes)) > 0:
705                         self.workflow_eval_lock.wait(3)
706                     else:
707                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
708                         break
709
710                 if self.stop_polling.is_set():
711                     break
712
713                 loopperf.__enter__()
714             loopperf.__exit__()
715
716             while (self.task_queue.in_flight + len(self.processes)) > 0:
717                 if self.task_queue.error is not None:
718                     raise self.task_queue.error
719                 self.workflow_eval_lock.wait(3)
720
721         except UnsupportedRequirement:
722             raise
723         except:
724             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
725                 logger.error("Interrupted, workflow will be cancelled")
726             else:
727                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
728             if self.pipeline:
729                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
730                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
731             if runnerjob and runnerjob.uuid and self.work_api == "containers":
732                 self.api.container_requests().update(uuid=runnerjob.uuid,
733                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
734         finally:
735             self.workflow_eval_lock.release()
736             self.task_queue.drain()
737             self.stop_polling.set()
738             self.polling_thread.join()
739             self.task_queue.join()
740
741         if self.final_status == "UnsupportedRequirement":
742             raise UnsupportedRequirement("Check log for details.")
743
744         if self.final_output is None:
745             raise WorkflowException("Workflow did not return a result.")
746
747         if runtimeContext.submit and isinstance(runnerjob, Runner):
748             logger.info("Final output collection %s", runnerjob.final_output)
749         else:
750             if self.output_name is None:
751                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
752             if self.output_tags is None:
753                 self.output_tags = ""
754
755             storage_classes = runtimeContext.storage_classes.strip().split(",")
756             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
757             self.set_crunch_output()
758
759         if runtimeContext.compute_checksum:
760             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
761             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
762
763         if self.trash_intermediate and self.final_status == "success":
764             self.trash_intermediate_output()
765
766         return (self.final_output, self.final_status)