14510: Set collection metadata cache size as 150% of sum of inputs
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import time
15
16 from cwltool.errors import WorkflowException
17 import cwltool.workflow
18 from schema_salad.sourceline import SourceLine
19 import schema_salad.validate as validate
20
21 import arvados
22 import arvados.config
23 from arvados.keep import KeepClient
24 from arvados.errors import ApiError
25
26 import arvados_cwl.util
27 from .arvcontainer import RunnerContainer
28 from .arvjob import RunnerJob, RunnerTemplate
29 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
30 from .arvtool import ArvadosCommandTool, validate_cluster_target
31 from .arvworkflow import ArvadosWorkflow, upload_workflow
32 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
33 from .perf import Perf
34 from .pathmapper import NoFollowPathMapper
35 from .task_queue import TaskQueue
36 from .context import ArvLoadingContext, ArvRuntimeContext
37 from ._version import __version__
38
39 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
41 from cwltool.command_line_tool import compute_checksums
42
43 logger = logging.getLogger('arvados.cwl-runner')
44 metrics = logging.getLogger('arvados.cwl-runner.metrics')
45
46 DEFAULT_PRIORITY = 500
47
48 class RuntimeStatusLoggingHandler(logging.Handler):
49     """
50     Intercepts logging calls and report them as runtime statuses on runner
51     containers.
52     """
53     def __init__(self, runtime_status_update_func):
54         super(RuntimeStatusLoggingHandler, self).__init__()
55         self.runtime_status_update = runtime_status_update_func
56
57     def emit(self, record):
58         kind = None
59         if record.levelno >= logging.ERROR:
60             kind = 'error'
61         elif record.levelno >= logging.WARNING:
62             kind = 'warning'
63         if kind is not None:
64             log_msg = record.getMessage()
65             if '\n' in log_msg:
66                 # If the logged message is multi-line, use its first line as status
67                 # and the rest as detail.
68                 status, detail = log_msg.split('\n', 1)
69                 self.runtime_status_update(
70                     kind,
71                     "%s: %s" % (record.name, status),
72                     detail
73                 )
74             else:
75                 self.runtime_status_update(
76                     kind,
77                     "%s: %s" % (record.name, record.getMessage())
78                 )
79
80 class ArvCwlExecutor(object):
81     """Execute a CWL tool or workflow, submit work (using either jobs or
82     containers API), wait for them to complete, and report output.
83
84     """
85
86     def __init__(self, api_client,
87                  arvargs=None,
88                  keep_client=None,
89                  num_retries=4,
90                  thread_count=4):
91
92         if arvargs is None:
93             arvargs = argparse.Namespace()
94             arvargs.work_api = None
95             arvargs.output_name = None
96             arvargs.output_tags = None
97             arvargs.thread_count = 1
98             arvargs.collection_cache_size = None
99
100         self.api = api_client
101         self.processes = {}
102         self.workflow_eval_lock = threading.Condition(threading.RLock())
103         self.final_output = None
104         self.final_status = None
105         self.num_retries = num_retries
106         self.uuid = None
107         self.stop_polling = threading.Event()
108         self.poll_api = None
109         self.pipeline = None
110         self.final_output_collection = None
111         self.output_name = arvargs.output_name
112         self.output_tags = arvargs.output_tags
113         self.project_uuid = None
114         self.intermediate_output_ttl = 0
115         self.intermediate_output_collections = []
116         self.trash_intermediate = False
117         self.thread_count = arvargs.thread_count
118         self.poll_interval = 12
119         self.loadingContext = None
120         self.should_estimate_cache_size = True
121
122         if keep_client is not None:
123             self.keep_client = keep_client
124         else:
125             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
126
127         if arvargs.collection_cache_size:
128             collection_cache_size = arvargs.collection_cache_size*1024*1024
129             self.should_estimate_cache_size = False
130         else:
131             collection_cache_size = 256*1024*1024
132
133         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
134                                                 cap=collection_cache_size)
135
136         self.fetcher_constructor = partial(CollectionFetcher,
137                                            api_client=self.api,
138                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
139                                            num_retries=self.num_retries)
140
141         self.work_api = None
142         expected_api = ["jobs", "containers"]
143         for api in expected_api:
144             try:
145                 methods = self.api._rootDesc.get('resources')[api]['methods']
146                 if ('httpMethod' in methods['create'] and
147                     (arvargs.work_api == api or arvargs.work_api is None)):
148                     self.work_api = api
149                     break
150             except KeyError:
151                 pass
152
153         if not self.work_api:
154             if arvargs.work_api is None:
155                 raise Exception("No supported APIs")
156             else:
157                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
158
159         if self.work_api == "jobs":
160             logger.warn("""
161 *******************************
162 Using the deprecated 'jobs' API.
163
164 To get rid of this warning:
165
166 Users: read about migrating at
167 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
168 and use the option --api=containers
169
170 Admins: configure the cluster to disable the 'jobs' API as described at:
171 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
172 *******************************""")
173
174         self.loadingContext = ArvLoadingContext(vars(arvargs))
175         self.loadingContext.fetcher_constructor = self.fetcher_constructor
176         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
177         self.loadingContext.construct_tool_object = self.arv_make_tool
178
179         # Add a custom logging handler to the root logger for runtime status reporting
180         # if running inside a container
181         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
182             root_logger = logging.getLogger('')
183             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
184             root_logger.addHandler(handler)
185
186         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
187         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
188                                                      collection_cache=self.collection_cache)
189
190         validate_cluster_target(self, self.runtimeContext)
191
192
193     def arv_make_tool(self, toolpath_object, loadingContext):
194         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
195             return ArvadosCommandTool(self, toolpath_object, loadingContext)
196         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
197             return ArvadosWorkflow(self, toolpath_object, loadingContext)
198         else:
199             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
200
201     def output_callback(self, out, processStatus):
202         with self.workflow_eval_lock:
203             if processStatus == "success":
204                 logger.info("Overall process status is %s", processStatus)
205                 state = "Complete"
206             else:
207                 logger.error("Overall process status is %s", processStatus)
208                 state = "Failed"
209             if self.pipeline:
210                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
211                                                         body={"state": state}).execute(num_retries=self.num_retries)
212             self.final_status = processStatus
213             self.final_output = out
214             self.workflow_eval_lock.notifyAll()
215
216
217     def start_run(self, runnable, runtimeContext):
218         self.task_queue.add(partial(runnable.run, runtimeContext),
219                             self.workflow_eval_lock, self.stop_polling)
220
221     def process_submitted(self, container):
222         with self.workflow_eval_lock:
223             self.processes[container.uuid] = container
224
225     def process_done(self, uuid, record):
226         with self.workflow_eval_lock:
227             j = self.processes[uuid]
228             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
229             self.task_queue.add(partial(j.done, record),
230                                 self.workflow_eval_lock, self.stop_polling)
231             del self.processes[uuid]
232
233     def runtime_status_update(self, kind, message, detail=None):
234         """
235         Updates the runtime_status field on the runner container.
236         Called when there's a need to report errors, warnings or just
237         activity statuses, for example in the RuntimeStatusLoggingHandler.
238         """
239         with self.workflow_eval_lock:
240             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
241             if current is None:
242                 return
243             runtime_status = current.get('runtime_status', {})
244             # In case of status being an error, only report the first one.
245             if kind == 'error':
246                 if not runtime_status.get('error'):
247                     runtime_status.update({
248                         'error': message
249                     })
250                     if detail is not None:
251                         runtime_status.update({
252                             'errorDetail': detail
253                         })
254                 # Further errors are only mentioned as a count.
255                 else:
256                     # Get anything before an optional 'and N more' string.
257                     try:
258                         error_msg = re.match(
259                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
260                         more_failures = re.match(
261                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
262                     except TypeError:
263                         # Ignore tests stubbing errors
264                         return
265                     if more_failures:
266                         failure_qty = int(more_failures.groups()[0])
267                         runtime_status.update({
268                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
269                         })
270                     else:
271                         runtime_status.update({
272                             'error': "%s (and 1 more)" % error_msg
273                         })
274             elif kind in ['warning', 'activity']:
275                 # Record the last warning/activity status without regard of
276                 # previous occurences.
277                 runtime_status.update({
278                     kind: message
279                 })
280                 if detail is not None:
281                     runtime_status.update({
282                         kind+"Detail": detail
283                     })
284             else:
285                 # Ignore any other status kind
286                 return
287             try:
288                 self.api.containers().update(uuid=current['uuid'],
289                                             body={
290                                                 'runtime_status': runtime_status,
291                                             }).execute(num_retries=self.num_retries)
292             except Exception as e:
293                 logger.info("Couldn't update runtime_status: %s", e)
294
295     def wrapped_callback(self, cb, obj, st):
296         with self.workflow_eval_lock:
297             cb(obj, st)
298             self.workflow_eval_lock.notifyAll()
299
300     def get_wrapped_callback(self, cb):
301         return partial(self.wrapped_callback, cb)
302
303     def on_message(self, event):
304         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
305             uuid = event["object_uuid"]
306             if event["properties"]["new_attributes"]["state"] == "Running":
307                 with self.workflow_eval_lock:
308                     j = self.processes[uuid]
309                     if j.running is False:
310                         j.running = True
311                         j.update_pipeline_component(event["properties"]["new_attributes"])
312                         logger.info("%s %s is Running", self.label(j), uuid)
313             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
314                 self.process_done(uuid, event["properties"]["new_attributes"])
315
316     def label(self, obj):
317         return "[%s %s]" % (self.work_api[0:-1], obj.name)
318
319     def poll_states(self):
320         """Poll status of jobs or containers listed in the processes dict.
321
322         Runs in a separate thread.
323         """
324
325         try:
326             remain_wait = self.poll_interval
327             while True:
328                 if remain_wait > 0:
329                     self.stop_polling.wait(remain_wait)
330                 if self.stop_polling.is_set():
331                     break
332                 with self.workflow_eval_lock:
333                     keys = list(self.processes.keys())
334                 if not keys:
335                     remain_wait = self.poll_interval
336                     continue
337
338                 begin_poll = time.time()
339                 if self.work_api == "containers":
340                     table = self.poll_api.container_requests()
341                 elif self.work_api == "jobs":
342                     table = self.poll_api.jobs()
343
344                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
345
346                 while keys:
347                     page = keys[:pageSize]
348                     keys = keys[pageSize:]
349                     try:
350                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
351                     except Exception as e:
352                         logger.warn("Error checking states on API server: %s", e)
353                         remain_wait = self.poll_interval
354                         continue
355
356                     for p in proc_states["items"]:
357                         self.on_message({
358                             "object_uuid": p["uuid"],
359                             "event_type": "update",
360                             "properties": {
361                                 "new_attributes": p
362                             }
363                         })
364                 finish_poll = time.time()
365                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
366         except:
367             logger.exception("Fatal error in state polling thread.")
368             with self.workflow_eval_lock:
369                 self.processes.clear()
370                 self.workflow_eval_lock.notifyAll()
371         finally:
372             self.stop_polling.set()
373
374     def add_intermediate_output(self, uuid):
375         if uuid:
376             self.intermediate_output_collections.append(uuid)
377
378     def trash_intermediate_output(self):
379         logger.info("Cleaning up intermediate output collections")
380         for i in self.intermediate_output_collections:
381             try:
382                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
383             except:
384                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
385             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
386                 break
387
388     def check_features(self, obj):
389         if isinstance(obj, dict):
390             if obj.get("writable") and self.work_api != "containers":
391                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
392             if obj.get("class") == "DockerRequirement":
393                 if obj.get("dockerOutputDirectory"):
394                     if self.work_api != "containers":
395                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
396                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
397                     if not obj.get("dockerOutputDirectory").startswith('/'):
398                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
399                             "Option 'dockerOutputDirectory' must be an absolute path.")
400             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
401                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
402             for v in obj.itervalues():
403                 self.check_features(v)
404         elif isinstance(obj, list):
405             for i,v in enumerate(obj):
406                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
407                     self.check_features(v)
408
409     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
410         outputObj = copy.deepcopy(outputObj)
411
412         files = []
413         def capture(fileobj):
414             files.append(fileobj)
415
416         adjustDirObjs(outputObj, capture)
417         adjustFileObjs(outputObj, capture)
418
419         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
420
421         final = arvados.collection.Collection(api_client=self.api,
422                                               keep_client=self.keep_client,
423                                               num_retries=self.num_retries)
424
425         for k,v in generatemapper.items():
426             if k.startswith("_:"):
427                 if v.type == "Directory":
428                     continue
429                 if v.type == "CreateFile":
430                     with final.open(v.target, "wb") as f:
431                         f.write(v.resolved.encode("utf-8"))
432                     continue
433
434             if not k.startswith("keep:"):
435                 raise Exception("Output source is not in keep or a literal")
436             sp = k.split("/")
437             srccollection = sp[0][5:]
438             try:
439                 reader = self.collection_cache.get(srccollection)
440                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
441                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
442             except arvados.errors.ArgumentError as e:
443                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
444                 raise
445             except IOError as e:
446                 logger.warn("While preparing output collection: %s", e)
447
448         def rewrite(fileobj):
449             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
450             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
451                 if k in fileobj:
452                     del fileobj[k]
453
454         adjustDirObjs(outputObj, rewrite)
455         adjustFileObjs(outputObj, rewrite)
456
457         with final.open("cwl.output.json", "w") as f:
458             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
459
460         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
461
462         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
463                     final.api_response()["name"],
464                     final.manifest_locator())
465
466         final_uuid = final.manifest_locator()
467         tags = tagsString.split(',')
468         for tag in tags:
469              self.api.links().create(body={
470                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
471                 }).execute(num_retries=self.num_retries)
472
473         def finalcollection(fileobj):
474             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
475
476         adjustDirObjs(outputObj, finalcollection)
477         adjustFileObjs(outputObj, finalcollection)
478
479         return (outputObj, final)
480
481     def set_crunch_output(self):
482         if self.work_api == "containers":
483             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
484             if current is None:
485                 return
486             try:
487                 self.api.containers().update(uuid=current['uuid'],
488                                              body={
489                                                  'output': self.final_output_collection.portable_data_hash(),
490                                              }).execute(num_retries=self.num_retries)
491                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
492                                               body={
493                                                   'is_trashed': True
494                                               }).execute(num_retries=self.num_retries)
495             except Exception as e:
496                 logger.info("Setting container output: %s", e)
497         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
498             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
499                                    body={
500                                        'output': self.final_output_collection.portable_data_hash(),
501                                        'success': self.final_status == "success",
502                                        'progress':1.0
503                                    }).execute(num_retries=self.num_retries)
504
505     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
506         self.debug = runtimeContext.debug
507
508         tool.visit(self.check_features)
509
510         self.project_uuid = runtimeContext.project_uuid
511         self.pipeline = None
512         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
513         self.secret_store = runtimeContext.secret_store
514
515         self.trash_intermediate = runtimeContext.trash_intermediate
516         if self.trash_intermediate and self.work_api != "containers":
517             raise Exception("--trash-intermediate is only supported with --api=containers.")
518
519         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
520         if self.intermediate_output_ttl and self.work_api != "containers":
521             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
522         if self.intermediate_output_ttl < 0:
523             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
524
525         if runtimeContext.submit_request_uuid and self.work_api != "containers":
526             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
527
528         if not runtimeContext.name:
529             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
530
531         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
532         # Also uploads docker images.
533         merged_map = upload_workflow_deps(self, tool)
534
535         # Reload tool object which may have been updated by
536         # upload_workflow_deps
537         # Don't validate this time because it will just print redundant errors.
538         loadingContext = self.loadingContext.copy()
539         loadingContext.loader = tool.doc_loader
540         loadingContext.avsc_names = tool.doc_schema
541         loadingContext.metadata = tool.metadata
542         loadingContext.do_validate = False
543
544         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
545                                   loadingContext)
546
547         # Upload local file references in the job order.
548         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
549                                      tool, job_order)
550
551         existing_uuid = runtimeContext.update_workflow
552         if existing_uuid or runtimeContext.create_workflow:
553             # Create a pipeline template or workflow record and exit.
554             if self.work_api == "jobs":
555                 tmpl = RunnerTemplate(self, tool, job_order,
556                                       runtimeContext.enable_reuse,
557                                       uuid=existing_uuid,
558                                       submit_runner_ram=runtimeContext.submit_runner_ram,
559                                       name=runtimeContext.name,
560                                       merged_map=merged_map)
561                 tmpl.save()
562                 # cwltool.main will write our return value to stdout.
563                 return (tmpl.uuid, "success")
564             elif self.work_api == "containers":
565                 return (upload_workflow(self, tool, job_order,
566                                         self.project_uuid,
567                                         uuid=existing_uuid,
568                                         submit_runner_ram=runtimeContext.submit_runner_ram,
569                                         name=runtimeContext.name,
570                                         merged_map=merged_map),
571                         "success")
572
573         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
574         self.eval_timeout = runtimeContext.eval_timeout
575
576         runtimeContext = runtimeContext.copy()
577         runtimeContext.use_container = True
578         runtimeContext.tmpdir_prefix = "tmp"
579         runtimeContext.work_api = self.work_api
580
581         if self.work_api == "containers":
582             if self.ignore_docker_for_reuse:
583                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
584             runtimeContext.outdir = "/var/spool/cwl"
585             runtimeContext.docker_outdir = "/var/spool/cwl"
586             runtimeContext.tmpdir = "/tmp"
587             runtimeContext.docker_tmpdir = "/tmp"
588         elif self.work_api == "jobs":
589             if runtimeContext.priority != DEFAULT_PRIORITY:
590                 raise Exception("--priority not implemented for jobs API.")
591             runtimeContext.outdir = "$(task.outdir)"
592             runtimeContext.docker_outdir = "$(task.outdir)"
593             runtimeContext.tmpdir = "$(task.tmpdir)"
594
595         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
596             raise Exception("--priority must be in the range 1..1000.")
597
598         if self.should_estimate_cache_size:
599             visited = set()
600             estimated_size = [0]
601             def estimate_collection_cache(obj):
602                 if obj.get("location", "").startswith("keep:"):
603                     m = pdh_size.match(obj["location"][5:])
604                     if m and m.group(1) not in visited:
605                         visited.add(m.group(1))
606                         estimated_size[0] += int(m.group(2))
607             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
608             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) / (1024*1024))+1, 256)
609             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
610
611         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
612
613         runnerjob = None
614         if runtimeContext.submit:
615             # Submit a runner job to run the workflow for us.
616             if self.work_api == "containers":
617                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
618                     runtimeContext.runnerjob = tool.tool["id"]
619                     runnerjob = tool.job(job_order,
620                                          self.output_callback,
621                                          runtimeContext).next()
622                 else:
623                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
624                                                 self.output_name,
625                                                 self.output_tags,
626                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
627                                                 name=runtimeContext.name,
628                                                 on_error=runtimeContext.on_error,
629                                                 submit_runner_image=runtimeContext.submit_runner_image,
630                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
631                                                 merged_map=merged_map,
632                                                 priority=runtimeContext.priority,
633                                                 secret_store=self.secret_store,
634                                                 collection_cache_size=runtimeContext.collection_cache_size)
635             elif self.work_api == "jobs":
636                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
637                                       self.output_name,
638                                       self.output_tags,
639                                       submit_runner_ram=runtimeContext.submit_runner_ram,
640                                       name=runtimeContext.name,
641                                       on_error=runtimeContext.on_error,
642                                       submit_runner_image=runtimeContext.submit_runner_image,
643                                       merged_map=merged_map)
644         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
645             # Create pipeline for local run
646             self.pipeline = self.api.pipeline_instances().create(
647                 body={
648                     "owner_uuid": self.project_uuid,
649                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
650                     "components": {},
651                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
652             logger.info("Pipeline instance %s", self.pipeline["uuid"])
653
654         if runnerjob and not runtimeContext.wait:
655             submitargs = runtimeContext.copy()
656             submitargs.submit = False
657             runnerjob.run(submitargs)
658             return (runnerjob.uuid, "success")
659
660         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
661         if current_container:
662             logger.info("Running inside container %s", current_container.get("uuid"))
663
664         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
665         self.polling_thread = threading.Thread(target=self.poll_states)
666         self.polling_thread.start()
667
668         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
669
670         try:
671             self.workflow_eval_lock.acquire()
672             if runnerjob:
673                 jobiter = iter((runnerjob,))
674             else:
675                 if runtimeContext.cwl_runner_job is not None:
676                     self.uuid = runtimeContext.cwl_runner_job.get('uuid')
677                 jobiter = tool.job(job_order,
678                                    self.output_callback,
679                                    runtimeContext)
680
681             # Holds the lock while this code runs and releases it when
682             # it is safe to do so in self.workflow_eval_lock.wait(),
683             # at which point on_message can update job state and
684             # process output callbacks.
685
686             loopperf = Perf(metrics, "jobiter")
687             loopperf.__enter__()
688             for runnable in jobiter:
689                 loopperf.__exit__()
690
691                 if self.stop_polling.is_set():
692                     break
693
694                 if self.task_queue.error is not None:
695                     raise self.task_queue.error
696
697                 if runnable:
698                     with Perf(metrics, "run"):
699                         self.start_run(runnable, runtimeContext)
700                 else:
701                     if (self.task_queue.in_flight + len(self.processes)) > 0:
702                         self.workflow_eval_lock.wait(3)
703                     else:
704                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
705                         break
706
707                 if self.stop_polling.is_set():
708                     break
709
710                 loopperf.__enter__()
711             loopperf.__exit__()
712
713             while (self.task_queue.in_flight + len(self.processes)) > 0:
714                 if self.task_queue.error is not None:
715                     raise self.task_queue.error
716                 self.workflow_eval_lock.wait(3)
717
718         except UnsupportedRequirement:
719             raise
720         except:
721             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
722                 logger.error("Interrupted, workflow will be cancelled")
723             else:
724                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
725             if self.pipeline:
726                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
727                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
728             if runnerjob and runnerjob.uuid and self.work_api == "containers":
729                 self.api.container_requests().update(uuid=runnerjob.uuid,
730                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
731         finally:
732             self.workflow_eval_lock.release()
733             self.task_queue.drain()
734             self.stop_polling.set()
735             self.polling_thread.join()
736             self.task_queue.join()
737
738         if self.final_status == "UnsupportedRequirement":
739             raise UnsupportedRequirement("Check log for details.")
740
741         if self.final_output is None:
742             raise WorkflowException("Workflow did not return a result.")
743
744         if runtimeContext.submit and isinstance(runnerjob, Runner):
745             logger.info("Final output collection %s", runnerjob.final_output)
746         else:
747             if self.output_name is None:
748                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
749             if self.output_tags is None:
750                 self.output_tags = ""
751
752             storage_classes = runtimeContext.storage_classes.strip().split(",")
753             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
754             self.set_crunch_output()
755
756         if runtimeContext.compute_checksum:
757             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
758             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
759
760         if self.trash_intermediate and self.final_status == "success":
761             self.trash_intermediate_output()
762
763         return (self.final_output, self.final_status)