13306: Fixed formatting for newly introduced imports
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from past.utils import old_div
9
10 import argparse
11 import logging
12 import os
13 import sys
14 import threading
15 import copy
16 import json
17 import re
18 from functools import partial
19 import time
20
21 from cwltool.errors import WorkflowException
22 import cwltool.workflow
23 from schema_salad.sourceline import SourceLine
24 import schema_salad.validate as validate
25
26 import arvados
27 import arvados.config
28 from arvados.keep import KeepClient
29 from arvados.errors import ApiError
30
31 import arvados_cwl.util
32 from .arvcontainer import RunnerContainer
33 from .arvjob import RunnerJob, RunnerTemplate
34 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
35 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
36 from .arvworkflow import ArvadosWorkflow, upload_workflow
37 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
38 from .perf import Perf
39 from .pathmapper import NoFollowPathMapper
40 from .task_queue import TaskQueue
41 from .context import ArvLoadingContext, ArvRuntimeContext
42 from ._version import __version__
43
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
46 from cwltool.command_line_tool import compute_checksums
47
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50
51 DEFAULT_PRIORITY = 500
52
53 class RuntimeStatusLoggingHandler(logging.Handler):
54     """
55     Intercepts logging calls and report them as runtime statuses on runner
56     containers.
57     """
58     def __init__(self, runtime_status_update_func):
59         super(RuntimeStatusLoggingHandler, self).__init__()
60         self.runtime_status_update = runtime_status_update_func
61
62     def emit(self, record):
63         kind = None
64         if record.levelno >= logging.ERROR:
65             kind = 'error'
66         elif record.levelno >= logging.WARNING:
67             kind = 'warning'
68         if kind is not None:
69             log_msg = record.getMessage()
70             if '\n' in log_msg:
71                 # If the logged message is multi-line, use its first line as status
72                 # and the rest as detail.
73                 status, detail = log_msg.split('\n', 1)
74                 self.runtime_status_update(
75                     kind,
76                     "%s: %s" % (record.name, status),
77                     detail
78                 )
79             else:
80                 self.runtime_status_update(
81                     kind,
82                     "%s: %s" % (record.name, record.getMessage())
83                 )
84
85 class ArvCwlExecutor(object):
86     """Execute a CWL tool or workflow, submit work (using either jobs or
87     containers API), wait for them to complete, and report output.
88
89     """
90
91     def __init__(self, api_client,
92                  arvargs=None,
93                  keep_client=None,
94                  num_retries=4,
95                  thread_count=4):
96
97         if arvargs is None:
98             arvargs = argparse.Namespace()
99             arvargs.work_api = None
100             arvargs.output_name = None
101             arvargs.output_tags = None
102             arvargs.thread_count = 1
103             arvargs.collection_cache_size = None
104
105         self.api = api_client
106         self.processes = {}
107         self.workflow_eval_lock = threading.Condition(threading.RLock())
108         self.final_output = None
109         self.final_status = None
110         self.num_retries = num_retries
111         self.uuid = None
112         self.stop_polling = threading.Event()
113         self.poll_api = None
114         self.pipeline = None
115         self.final_output_collection = None
116         self.output_name = arvargs.output_name
117         self.output_tags = arvargs.output_tags
118         self.project_uuid = None
119         self.intermediate_output_ttl = 0
120         self.intermediate_output_collections = []
121         self.trash_intermediate = False
122         self.thread_count = arvargs.thread_count
123         self.poll_interval = 12
124         self.loadingContext = None
125         self.should_estimate_cache_size = True
126
127         if keep_client is not None:
128             self.keep_client = keep_client
129         else:
130             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
131
132         if arvargs.collection_cache_size:
133             collection_cache_size = arvargs.collection_cache_size*1024*1024
134             self.should_estimate_cache_size = False
135         else:
136             collection_cache_size = 256*1024*1024
137
138         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
139                                                 cap=collection_cache_size)
140
141         self.fetcher_constructor = partial(CollectionFetcher,
142                                            api_client=self.api,
143                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
144                                            num_retries=self.num_retries)
145
146         self.work_api = None
147         expected_api = ["jobs", "containers"]
148         for api in expected_api:
149             try:
150                 methods = self.api._rootDesc.get('resources')[api]['methods']
151                 if ('httpMethod' in methods['create'] and
152                     (arvargs.work_api == api or arvargs.work_api is None)):
153                     self.work_api = api
154                     break
155             except KeyError:
156                 pass
157
158         if not self.work_api:
159             if arvargs.work_api is None:
160                 raise Exception("No supported APIs")
161             else:
162                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
163
164         if self.work_api == "jobs":
165             logger.warn("""
166 *******************************
167 Using the deprecated 'jobs' API.
168
169 To get rid of this warning:
170
171 Users: read about migrating at
172 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
173 and use the option --api=containers
174
175 Admins: configure the cluster to disable the 'jobs' API as described at:
176 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
177 *******************************""")
178
179         self.loadingContext = ArvLoadingContext(vars(arvargs))
180         self.loadingContext.fetcher_constructor = self.fetcher_constructor
181         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
182         self.loadingContext.construct_tool_object = self.arv_make_tool
183
184         # Add a custom logging handler to the root logger for runtime status reporting
185         # if running inside a container
186         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
187             root_logger = logging.getLogger('')
188             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
189             root_logger.addHandler(handler)
190
191         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
192         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
193                                                      collection_cache=self.collection_cache)
194
195         validate_cluster_target(self, self.runtimeContext)
196
197
198     def arv_make_tool(self, toolpath_object, loadingContext):
199         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
200             return ArvadosCommandTool(self, toolpath_object, loadingContext)
201         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
202             return ArvadosWorkflow(self, toolpath_object, loadingContext)
203         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
204             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
205         else:
206             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
207
208     def output_callback(self, out, processStatus):
209         with self.workflow_eval_lock:
210             if processStatus == "success":
211                 logger.info("Overall process status is %s", processStatus)
212                 state = "Complete"
213             else:
214                 logger.error("Overall process status is %s", processStatus)
215                 state = "Failed"
216             if self.pipeline:
217                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
218                                                         body={"state": state}).execute(num_retries=self.num_retries)
219             self.final_status = processStatus
220             self.final_output = out
221             self.workflow_eval_lock.notifyAll()
222
223
224     def start_run(self, runnable, runtimeContext):
225         self.task_queue.add(partial(runnable.run, runtimeContext),
226                             self.workflow_eval_lock, self.stop_polling)
227
228     def process_submitted(self, container):
229         with self.workflow_eval_lock:
230             self.processes[container.uuid] = container
231
232     def process_done(self, uuid, record):
233         with self.workflow_eval_lock:
234             j = self.processes[uuid]
235             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
236             self.task_queue.add(partial(j.done, record),
237                                 self.workflow_eval_lock, self.stop_polling)
238             del self.processes[uuid]
239
240     def runtime_status_update(self, kind, message, detail=None):
241         """
242         Updates the runtime_status field on the runner container.
243         Called when there's a need to report errors, warnings or just
244         activity statuses, for example in the RuntimeStatusLoggingHandler.
245         """
246         with self.workflow_eval_lock:
247             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
248             if current is None:
249                 return
250             runtime_status = current.get('runtime_status', {})
251             # In case of status being an error, only report the first one.
252             if kind == 'error':
253                 if not runtime_status.get('error'):
254                     runtime_status.update({
255                         'error': message
256                     })
257                     if detail is not None:
258                         runtime_status.update({
259                             'errorDetail': detail
260                         })
261                 # Further errors are only mentioned as a count.
262                 else:
263                     # Get anything before an optional 'and N more' string.
264                     try:
265                         error_msg = re.match(
266                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
267                         more_failures = re.match(
268                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
269                     except TypeError:
270                         # Ignore tests stubbing errors
271                         return
272                     if more_failures:
273                         failure_qty = int(more_failures.groups()[0])
274                         runtime_status.update({
275                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
276                         })
277                     else:
278                         runtime_status.update({
279                             'error': "%s (and 1 more)" % error_msg
280                         })
281             elif kind in ['warning', 'activity']:
282                 # Record the last warning/activity status without regard of
283                 # previous occurences.
284                 runtime_status.update({
285                     kind: message
286                 })
287                 if detail is not None:
288                     runtime_status.update({
289                         kind+"Detail": detail
290                     })
291             else:
292                 # Ignore any other status kind
293                 return
294             try:
295                 self.api.containers().update(uuid=current['uuid'],
296                                             body={
297                                                 'runtime_status': runtime_status,
298                                             }).execute(num_retries=self.num_retries)
299             except Exception as e:
300                 logger.info("Couldn't update runtime_status: %s", e)
301
302     def wrapped_callback(self, cb, obj, st):
303         with self.workflow_eval_lock:
304             cb(obj, st)
305             self.workflow_eval_lock.notifyAll()
306
307     def get_wrapped_callback(self, cb):
308         return partial(self.wrapped_callback, cb)
309
310     def on_message(self, event):
311         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
312             uuid = event["object_uuid"]
313             if event["properties"]["new_attributes"]["state"] == "Running":
314                 with self.workflow_eval_lock:
315                     j = self.processes[uuid]
316                     if j.running is False:
317                         j.running = True
318                         j.update_pipeline_component(event["properties"]["new_attributes"])
319                         logger.info("%s %s is Running", self.label(j), uuid)
320             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
321                 self.process_done(uuid, event["properties"]["new_attributes"])
322
323     def label(self, obj):
324         return "[%s %s]" % (self.work_api[0:-1], obj.name)
325
326     def poll_states(self):
327         """Poll status of jobs or containers listed in the processes dict.
328
329         Runs in a separate thread.
330         """
331
332         try:
333             remain_wait = self.poll_interval
334             while True:
335                 if remain_wait > 0:
336                     self.stop_polling.wait(remain_wait)
337                 if self.stop_polling.is_set():
338                     break
339                 with self.workflow_eval_lock:
340                     keys = list(self.processes.keys())
341                 if not keys:
342                     remain_wait = self.poll_interval
343                     continue
344
345                 begin_poll = time.time()
346                 if self.work_api == "containers":
347                     table = self.poll_api.container_requests()
348                 elif self.work_api == "jobs":
349                     table = self.poll_api.jobs()
350
351                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
352
353                 while keys:
354                     page = keys[:pageSize]
355                     keys = keys[pageSize:]
356                     try:
357                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
358                     except Exception as e:
359                         logger.warn("Error checking states on API server: %s", e)
360                         remain_wait = self.poll_interval
361                         continue
362
363                     for p in proc_states["items"]:
364                         self.on_message({
365                             "object_uuid": p["uuid"],
366                             "event_type": "update",
367                             "properties": {
368                                 "new_attributes": p
369                             }
370                         })
371                 finish_poll = time.time()
372                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
373         except:
374             logger.exception("Fatal error in state polling thread.")
375             with self.workflow_eval_lock:
376                 self.processes.clear()
377                 self.workflow_eval_lock.notifyAll()
378         finally:
379             self.stop_polling.set()
380
381     def add_intermediate_output(self, uuid):
382         if uuid:
383             self.intermediate_output_collections.append(uuid)
384
385     def trash_intermediate_output(self):
386         logger.info("Cleaning up intermediate output collections")
387         for i in self.intermediate_output_collections:
388             try:
389                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
390             except:
391                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
392             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
393                 break
394
395     def check_features(self, obj):
396         if isinstance(obj, dict):
397             if obj.get("writable") and self.work_api != "containers":
398                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
399             if obj.get("class") == "DockerRequirement":
400                 if obj.get("dockerOutputDirectory"):
401                     if self.work_api != "containers":
402                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
403                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
404                     if not obj.get("dockerOutputDirectory").startswith('/'):
405                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
406                             "Option 'dockerOutputDirectory' must be an absolute path.")
407             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
408                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
409             for v in obj.values():
410                 self.check_features(v)
411         elif isinstance(obj, list):
412             for i,v in enumerate(obj):
413                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
414                     self.check_features(v)
415
416     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
417         outputObj = copy.deepcopy(outputObj)
418
419         files = []
420         def capture(fileobj):
421             files.append(fileobj)
422
423         adjustDirObjs(outputObj, capture)
424         adjustFileObjs(outputObj, capture)
425
426         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
427
428         final = arvados.collection.Collection(api_client=self.api,
429                                               keep_client=self.keep_client,
430                                               num_retries=self.num_retries)
431
432         for k,v in list(generatemapper.items()):
433             if k.startswith("_:"):
434                 if v.type == "Directory":
435                     continue
436                 if v.type == "CreateFile":
437                     with final.open(v.target, "wb") as f:
438                         f.write(v.resolved.encode("utf-8"))
439                     continue
440
441             if not k.startswith("keep:"):
442                 raise Exception("Output source is not in keep or a literal")
443             sp = k.split("/")
444             srccollection = sp[0][5:]
445             try:
446                 reader = self.collection_cache.get(srccollection)
447                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
448                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
449             except arvados.errors.ArgumentError as e:
450                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
451                 raise
452             except IOError as e:
453                 logger.warn("While preparing output collection: %s", e)
454
455         def rewrite(fileobj):
456             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
457             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
458                 if k in fileobj:
459                     del fileobj[k]
460
461         adjustDirObjs(outputObj, rewrite)
462         adjustFileObjs(outputObj, rewrite)
463
464         with final.open("cwl.output.json", "w") as f:
465             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
466
467         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
468
469         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
470                     final.api_response()["name"],
471                     final.manifest_locator())
472
473         final_uuid = final.manifest_locator()
474         tags = tagsString.split(',')
475         for tag in tags:
476              self.api.links().create(body={
477                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
478                 }).execute(num_retries=self.num_retries)
479
480         def finalcollection(fileobj):
481             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
482
483         adjustDirObjs(outputObj, finalcollection)
484         adjustFileObjs(outputObj, finalcollection)
485
486         return (outputObj, final)
487
488     def set_crunch_output(self):
489         if self.work_api == "containers":
490             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
491             if current is None:
492                 return
493             try:
494                 self.api.containers().update(uuid=current['uuid'],
495                                              body={
496                                                  'output': self.final_output_collection.portable_data_hash(),
497                                              }).execute(num_retries=self.num_retries)
498                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
499                                               body={
500                                                   'is_trashed': True
501                                               }).execute(num_retries=self.num_retries)
502             except Exception as e:
503                 logger.info("Setting container output: %s", e)
504         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
505             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
506                                    body={
507                                        'output': self.final_output_collection.portable_data_hash(),
508                                        'success': self.final_status == "success",
509                                        'progress':1.0
510                                    }).execute(num_retries=self.num_retries)
511
512     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
513         self.debug = runtimeContext.debug
514
515         tool.visit(self.check_features)
516
517         self.project_uuid = runtimeContext.project_uuid
518         self.pipeline = None
519         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
520         self.secret_store = runtimeContext.secret_store
521
522         self.trash_intermediate = runtimeContext.trash_intermediate
523         if self.trash_intermediate and self.work_api != "containers":
524             raise Exception("--trash-intermediate is only supported with --api=containers.")
525
526         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
527         if self.intermediate_output_ttl and self.work_api != "containers":
528             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
529         if self.intermediate_output_ttl < 0:
530             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
531
532         if runtimeContext.submit_request_uuid and self.work_api != "containers":
533             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
534
535         if not runtimeContext.name:
536             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
537
538         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
539         # Also uploads docker images.
540         merged_map = upload_workflow_deps(self, tool)
541
542         # Reload tool object which may have been updated by
543         # upload_workflow_deps
544         # Don't validate this time because it will just print redundant errors.
545         loadingContext = self.loadingContext.copy()
546         loadingContext.loader = tool.doc_loader
547         loadingContext.avsc_names = tool.doc_schema
548         loadingContext.metadata = tool.metadata
549         loadingContext.do_validate = False
550
551         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
552                                   loadingContext)
553
554         # Upload local file references in the job order.
555         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
556                                      tool, job_order)
557
558         existing_uuid = runtimeContext.update_workflow
559         if existing_uuid or runtimeContext.create_workflow:
560             # Create a pipeline template or workflow record and exit.
561             if self.work_api == "jobs":
562                 tmpl = RunnerTemplate(self, tool, job_order,
563                                       runtimeContext.enable_reuse,
564                                       uuid=existing_uuid,
565                                       submit_runner_ram=runtimeContext.submit_runner_ram,
566                                       name=runtimeContext.name,
567                                       merged_map=merged_map,
568                                       loadingContext=loadingContext)
569                 tmpl.save()
570                 # cwltool.main will write our return value to stdout.
571                 return (tmpl.uuid, "success")
572             elif self.work_api == "containers":
573                 return (upload_workflow(self, tool, job_order,
574                                         self.project_uuid,
575                                         uuid=existing_uuid,
576                                         submit_runner_ram=runtimeContext.submit_runner_ram,
577                                         name=runtimeContext.name,
578                                         merged_map=merged_map),
579                         "success")
580
581         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
582         self.eval_timeout = runtimeContext.eval_timeout
583
584         runtimeContext = runtimeContext.copy()
585         runtimeContext.use_container = True
586         runtimeContext.tmpdir_prefix = "tmp"
587         runtimeContext.work_api = self.work_api
588
589         if self.work_api == "containers":
590             if self.ignore_docker_for_reuse:
591                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
592             runtimeContext.outdir = "/var/spool/cwl"
593             runtimeContext.docker_outdir = "/var/spool/cwl"
594             runtimeContext.tmpdir = "/tmp"
595             runtimeContext.docker_tmpdir = "/tmp"
596         elif self.work_api == "jobs":
597             if runtimeContext.priority != DEFAULT_PRIORITY:
598                 raise Exception("--priority not implemented for jobs API.")
599             runtimeContext.outdir = "$(task.outdir)"
600             runtimeContext.docker_outdir = "$(task.outdir)"
601             runtimeContext.tmpdir = "$(task.tmpdir)"
602
603         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
604             raise Exception("--priority must be in the range 1..1000.")
605
606         if self.should_estimate_cache_size:
607             visited = set()
608             estimated_size = [0]
609             def estimate_collection_cache(obj):
610                 if obj.get("location", "").startswith("keep:"):
611                     m = pdh_size.match(obj["location"][5:])
612                     if m and m.group(1) not in visited:
613                         visited.add(m.group(1))
614                         estimated_size[0] += int(m.group(2))
615             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
616             runtimeContext.collection_cache_size = max((old_div((estimated_size[0]*192), (1024*1024)))+1, 256)
617             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
618
619         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
620
621         runnerjob = None
622         if runtimeContext.submit:
623             # Submit a runner job to run the workflow for us.
624             if self.work_api == "containers":
625                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
626                     runtimeContext.runnerjob = tool.tool["id"]
627                 else:
628                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
629                                                 self.output_name,
630                                                 self.output_tags,
631                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
632                                                 name=runtimeContext.name,
633                                                 on_error=runtimeContext.on_error,
634                                                 submit_runner_image=runtimeContext.submit_runner_image,
635                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
636                                                 merged_map=merged_map,
637                                                 priority=runtimeContext.priority,
638                                                 secret_store=self.secret_store,
639                                                 collection_cache_size=runtimeContext.collection_cache_size,
640                                                 collection_cache_is_default=self.should_estimate_cache_size)
641             elif self.work_api == "jobs":
642                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
643                                       self.output_name,
644                                       self.output_tags,
645                                       submit_runner_ram=runtimeContext.submit_runner_ram,
646                                       name=runtimeContext.name,
647                                       on_error=runtimeContext.on_error,
648                                       submit_runner_image=runtimeContext.submit_runner_image,
649                                       merged_map=merged_map)
650         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
651             # Create pipeline for local run
652             self.pipeline = self.api.pipeline_instances().create(
653                 body={
654                     "owner_uuid": self.project_uuid,
655                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
656                     "components": {},
657                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
658             logger.info("Pipeline instance %s", self.pipeline["uuid"])
659
660         if runtimeContext.cwl_runner_job is not None:
661             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
662
663         jobiter = tool.job(job_order,
664                            self.output_callback,
665                            runtimeContext)
666
667         if runtimeContext.submit and not runtimeContext.wait:
668             runnerjob = next(jobiter)
669             runnerjob.run(runtimeContext)
670             return (runnerjob.uuid, "success")
671
672         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
673         if current_container:
674             logger.info("Running inside container %s", current_container.get("uuid"))
675
676         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
677         self.polling_thread = threading.Thread(target=self.poll_states)
678         self.polling_thread.start()
679
680         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
681
682         try:
683             self.workflow_eval_lock.acquire()
684
685             # Holds the lock while this code runs and releases it when
686             # it is safe to do so in self.workflow_eval_lock.wait(),
687             # at which point on_message can update job state and
688             # process output callbacks.
689
690             loopperf = Perf(metrics, "jobiter")
691             loopperf.__enter__()
692             for runnable in jobiter:
693                 loopperf.__exit__()
694
695                 if self.stop_polling.is_set():
696                     break
697
698                 if self.task_queue.error is not None:
699                     raise self.task_queue.error
700
701                 if runnable:
702                     with Perf(metrics, "run"):
703                         self.start_run(runnable, runtimeContext)
704                 else:
705                     if (self.task_queue.in_flight + len(self.processes)) > 0:
706                         self.workflow_eval_lock.wait(3)
707                     else:
708                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
709                         break
710
711                 if self.stop_polling.is_set():
712                     break
713
714                 loopperf.__enter__()
715             loopperf.__exit__()
716
717             while (self.task_queue.in_flight + len(self.processes)) > 0:
718                 if self.task_queue.error is not None:
719                     raise self.task_queue.error
720                 self.workflow_eval_lock.wait(3)
721
722         except UnsupportedRequirement:
723             raise
724         except:
725             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
726                 logger.error("Interrupted, workflow will be cancelled")
727             else:
728                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
729             if self.pipeline:
730                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
731                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
732             if runtimeContext.submit and isinstance(tool, Runner):
733                 runnerjob = tool
734                 if runnerjob.uuid and self.work_api == "containers":
735                     self.api.container_requests().update(uuid=runnerjob.uuid,
736                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
737         finally:
738             self.workflow_eval_lock.release()
739             self.task_queue.drain()
740             self.stop_polling.set()
741             self.polling_thread.join()
742             self.task_queue.join()
743
744         if self.final_status == "UnsupportedRequirement":
745             raise UnsupportedRequirement("Check log for details.")
746
747         if self.final_output is None:
748             raise WorkflowException("Workflow did not return a result.")
749
750         if runtimeContext.submit and isinstance(tool, Runner):
751             logger.info("Final output collection %s", tool.final_output)
752         else:
753             if self.output_name is None:
754                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
755             if self.output_tags is None:
756                 self.output_tags = ""
757
758             storage_classes = runtimeContext.storage_classes.strip().split(",")
759             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
760             self.set_crunch_output()
761
762         if runtimeContext.compute_checksum:
763             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
764             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
765
766         if self.trash_intermediate and self.final_status == "success":
767             self.trash_intermediate_output()
768
769         return (self.final_output, self.final_status)