15181: Remove jobs API support from arvados-cwl-runner
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21
22 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 from schema_salad.sourceline import SourceLine
25 import schema_salad.validate as validate
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 import arvados_cwl.util
33 from .arvcontainer import RunnerContainer
34 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
35 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
36 from .arvworkflow import ArvadosWorkflow, upload_workflow
37 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
38 from .perf import Perf
39 from .pathmapper import NoFollowPathMapper
40 from .task_queue import TaskQueue
41 from .context import ArvLoadingContext, ArvRuntimeContext
42 from ._version import __version__
43
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
46 from cwltool.command_line_tool import compute_checksums
47 from cwltool.load_tool import load_tool
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51
52 DEFAULT_PRIORITY = 500
53
54 class RuntimeStatusLoggingHandler(logging.Handler):
55     """
56     Intercepts logging calls and report them as runtime statuses on runner
57     containers.
58     """
59     def __init__(self, runtime_status_update_func):
60         super(RuntimeStatusLoggingHandler, self).__init__()
61         self.runtime_status_update = runtime_status_update_func
62         self.updatingRuntimeStatus = False
63
64     def emit(self, record):
65         kind = None
66         if record.levelno >= logging.ERROR:
67             kind = 'error'
68         elif record.levelno >= logging.WARNING:
69             kind = 'warning'
70         if kind is not None and self.updatingRuntimeStatus is not True:
71             self.updatingRuntimeStatus = True
72             try:
73                 log_msg = record.getMessage()
74                 if '\n' in log_msg:
75                     # If the logged message is multi-line, use its first line as status
76                     # and the rest as detail.
77                     status, detail = log_msg.split('\n', 1)
78                     self.runtime_status_update(
79                         kind,
80                         "%s: %s" % (record.name, status),
81                         detail
82                     )
83                 else:
84                     self.runtime_status_update(
85                         kind,
86                         "%s: %s" % (record.name, record.getMessage())
87                     )
88             finally:
89                 self.updatingRuntimeStatus = False
90
91
92 class ArvCwlExecutor(object):
93     """Execute a CWL tool or workflow, submit work (using containers API),
94     wait for them to complete, and report output.
95
96     """
97
98     def __init__(self, api_client,
99                  arvargs=None,
100                  keep_client=None,
101                  num_retries=4,
102                  thread_count=4):
103
104         if arvargs is None:
105             arvargs = argparse.Namespace()
106             arvargs.work_api = None
107             arvargs.output_name = None
108             arvargs.output_tags = None
109             arvargs.thread_count = 1
110             arvargs.collection_cache_size = None
111
112         self.api = api_client
113         self.processes = {}
114         self.workflow_eval_lock = threading.Condition(threading.RLock())
115         self.final_output = None
116         self.final_status = None
117         self.num_retries = num_retries
118         self.uuid = None
119         self.stop_polling = threading.Event()
120         self.poll_api = None
121         self.pipeline = None
122         self.final_output_collection = None
123         self.output_name = arvargs.output_name
124         self.output_tags = arvargs.output_tags
125         self.project_uuid = None
126         self.intermediate_output_ttl = 0
127         self.intermediate_output_collections = []
128         self.trash_intermediate = False
129         self.thread_count = arvargs.thread_count
130         self.poll_interval = 12
131         self.loadingContext = None
132         self.should_estimate_cache_size = True
133         self.fs_access = None
134         self.secret_store = None
135
136         if keep_client is not None:
137             self.keep_client = keep_client
138         else:
139             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
140
141         if arvargs.collection_cache_size:
142             collection_cache_size = arvargs.collection_cache_size*1024*1024
143             self.should_estimate_cache_size = False
144         else:
145             collection_cache_size = 256*1024*1024
146
147         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
148                                                 cap=collection_cache_size)
149
150         self.fetcher_constructor = partial(CollectionFetcher,
151                                            api_client=self.api,
152                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
153                                            num_retries=self.num_retries)
154
155         self.work_api = None
156         expected_api = ["containers"]
157         for api in expected_api:
158             try:
159                 methods = self.api._rootDesc.get('resources')[api]['methods']
160                 if ('httpMethod' in methods['create'] and
161                     (arvargs.work_api == api or arvargs.work_api is None)):
162                     self.work_api = api
163                     break
164             except KeyError:
165                 pass
166
167         if not self.work_api:
168             if arvargs.work_api is None:
169                 raise Exception("No supported APIs")
170             else:
171                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
172
173         if self.work_api == "jobs":
174             logger.error("""
175 *******************************
176 The 'jobs' API is no longer supported.
177 *******************************""")
178             exit(1)
179
180         self.loadingContext = ArvLoadingContext(vars(arvargs))
181         self.loadingContext.fetcher_constructor = self.fetcher_constructor
182         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
183         self.loadingContext.construct_tool_object = self.arv_make_tool
184
185         # Add a custom logging handler to the root logger for runtime status reporting
186         # if running inside a container
187         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
188             root_logger = logging.getLogger('')
189
190             # Remove existing RuntimeStatusLoggingHandlers if they exist
191             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
192             root_logger.handlers = handlers
193
194             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
195             root_logger.addHandler(handler)
196
197         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
198         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
199                                                      collection_cache=self.collection_cache)
200
201         validate_cluster_target(self, self.runtimeContext)
202
203
204     def arv_make_tool(self, toolpath_object, loadingContext):
205         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
206             return ArvadosCommandTool(self, toolpath_object, loadingContext)
207         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
208             return ArvadosWorkflow(self, toolpath_object, loadingContext)
209         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
210             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
211         else:
212             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
213
214     def output_callback(self, out, processStatus):
215         with self.workflow_eval_lock:
216             if processStatus == "success":
217                 logger.info("Overall process status is %s", processStatus)
218                 state = "Complete"
219             else:
220                 logger.error("Overall process status is %s", processStatus)
221                 state = "Failed"
222             if self.pipeline:
223                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
224                                                         body={"state": state}).execute(num_retries=self.num_retries)
225             self.final_status = processStatus
226             self.final_output = out
227             self.workflow_eval_lock.notifyAll()
228
229
230     def start_run(self, runnable, runtimeContext):
231         self.task_queue.add(partial(runnable.run, runtimeContext),
232                             self.workflow_eval_lock, self.stop_polling)
233
234     def process_submitted(self, container):
235         with self.workflow_eval_lock:
236             self.processes[container.uuid] = container
237
238     def process_done(self, uuid, record):
239         with self.workflow_eval_lock:
240             j = self.processes[uuid]
241             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
242             self.task_queue.add(partial(j.done, record),
243                                 self.workflow_eval_lock, self.stop_polling)
244             del self.processes[uuid]
245
246     def runtime_status_update(self, kind, message, detail=None):
247         """
248         Updates the runtime_status field on the runner container.
249         Called when there's a need to report errors, warnings or just
250         activity statuses, for example in the RuntimeStatusLoggingHandler.
251         """
252         with self.workflow_eval_lock:
253             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
254             if current is None:
255                 return
256             runtime_status = current.get('runtime_status', {})
257             # In case of status being an error, only report the first one.
258             if kind == 'error':
259                 if not runtime_status.get('error'):
260                     runtime_status.update({
261                         'error': message
262                     })
263                     if detail is not None:
264                         runtime_status.update({
265                             'errorDetail': detail
266                         })
267                 # Further errors are only mentioned as a count.
268                 else:
269                     # Get anything before an optional 'and N more' string.
270                     try:
271                         error_msg = re.match(
272                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
273                         more_failures = re.match(
274                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
275                     except TypeError:
276                         # Ignore tests stubbing errors
277                         return
278                     if more_failures:
279                         failure_qty = int(more_failures.groups()[0])
280                         runtime_status.update({
281                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
282                         })
283                     else:
284                         runtime_status.update({
285                             'error': "%s (and 1 more)" % error_msg
286                         })
287             elif kind in ['warning', 'activity']:
288                 # Record the last warning/activity status without regard of
289                 # previous occurences.
290                 runtime_status.update({
291                     kind: message
292                 })
293                 if detail is not None:
294                     runtime_status.update({
295                         kind+"Detail": detail
296                     })
297             else:
298                 # Ignore any other status kind
299                 return
300             try:
301                 self.api.containers().update(uuid=current['uuid'],
302                                             body={
303                                                 'runtime_status': runtime_status,
304                                             }).execute(num_retries=self.num_retries)
305             except Exception as e:
306                 logger.info("Couldn't update runtime_status: %s", e)
307
308     def wrapped_callback(self, cb, obj, st):
309         with self.workflow_eval_lock:
310             cb(obj, st)
311             self.workflow_eval_lock.notifyAll()
312
313     def get_wrapped_callback(self, cb):
314         return partial(self.wrapped_callback, cb)
315
316     def on_message(self, event):
317         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
318             uuid = event["object_uuid"]
319             if event["properties"]["new_attributes"]["state"] == "Running":
320                 with self.workflow_eval_lock:
321                     j = self.processes[uuid]
322                     if j.running is False:
323                         j.running = True
324                         j.update_pipeline_component(event["properties"]["new_attributes"])
325                         logger.info("%s %s is Running", self.label(j), uuid)
326             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
327                 self.process_done(uuid, event["properties"]["new_attributes"])
328
329     def label(self, obj):
330         return "[%s %s]" % (self.work_api[0:-1], obj.name)
331
332     def poll_states(self):
333         """Poll status of containers listed in the processes dict.
334
335         Runs in a separate thread.
336         """
337
338         try:
339             remain_wait = self.poll_interval
340             while True:
341                 if remain_wait > 0:
342                     self.stop_polling.wait(remain_wait)
343                 if self.stop_polling.is_set():
344                     break
345                 with self.workflow_eval_lock:
346                     keys = list(self.processes)
347                 if not keys:
348                     remain_wait = self.poll_interval
349                     continue
350
351                 begin_poll = time.time()
352                 if self.work_api == "containers":
353                     table = self.poll_api.container_requests()
354
355                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
356
357                 while keys:
358                     page = keys[:pageSize]
359                     try:
360                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
361                     except Exception:
362                         logger.exception("Error checking states on API server: %s")
363                         remain_wait = self.poll_interval
364                         continue
365
366                     for p in proc_states["items"]:
367                         self.on_message({
368                             "object_uuid": p["uuid"],
369                             "event_type": "update",
370                             "properties": {
371                                 "new_attributes": p
372                             }
373                         })
374                     keys = keys[pageSize:]
375
376                 finish_poll = time.time()
377                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
378         except:
379             logger.exception("Fatal error in state polling thread.")
380             with self.workflow_eval_lock:
381                 self.processes.clear()
382                 self.workflow_eval_lock.notifyAll()
383         finally:
384             self.stop_polling.set()
385
386     def add_intermediate_output(self, uuid):
387         if uuid:
388             self.intermediate_output_collections.append(uuid)
389
390     def trash_intermediate_output(self):
391         logger.info("Cleaning up intermediate output collections")
392         for i in self.intermediate_output_collections:
393             try:
394                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
395             except Exception:
396                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
397             except (KeyboardInterrupt, SystemExit):
398                 break
399
400     def check_features(self, obj, parentfield=""):
401         if isinstance(obj, dict):
402             if obj.get("writable") and self.work_api != "containers":
403                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
404             if obj.get("class") == "DockerRequirement":
405                 if obj.get("dockerOutputDirectory"):
406                     if self.work_api != "containers":
407                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
408                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
409                     if not obj.get("dockerOutputDirectory").startswith('/'):
410                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
411                             "Option 'dockerOutputDirectory' must be an absolute path.")
412             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
413                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
414             if obj.get("class") == "InplaceUpdateRequirement":
415                 if obj["inplaceUpdate"] and parentfield == "requirements":
416                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
417             for k,v in viewitems(obj):
418                 self.check_features(v, parentfield=k)
419         elif isinstance(obj, list):
420             for i,v in enumerate(obj):
421                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
422                     self.check_features(v, parentfield=parentfield)
423
424     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
425         outputObj = copy.deepcopy(outputObj)
426
427         files = []
428         def capture(fileobj):
429             files.append(fileobj)
430
431         adjustDirObjs(outputObj, capture)
432         adjustFileObjs(outputObj, capture)
433
434         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
435
436         final = arvados.collection.Collection(api_client=self.api,
437                                               keep_client=self.keep_client,
438                                               num_retries=self.num_retries)
439
440         for k,v in generatemapper.items():
441             if v.type == "Directory" and v.resolved.startswith("_:"):
442                     continue
443             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
444                 with final.open(v.target, "wb") as f:
445                     f.write(v.resolved.encode("utf-8"))
446                     continue
447
448             if not v.resolved.startswith("keep:"):
449                 raise Exception("Output source is not in keep or a literal")
450             sp = v.resolved.split("/")
451             srccollection = sp[0][5:]
452             try:
453                 reader = self.collection_cache.get(srccollection)
454                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
455                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
456             except arvados.errors.ArgumentError as e:
457                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
458                 raise
459             except IOError as e:
460                 logger.error("While preparing output collection: %s", e)
461                 raise
462
463         def rewrite(fileobj):
464             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
465             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
466                 if k in fileobj:
467                     del fileobj[k]
468
469         adjustDirObjs(outputObj, rewrite)
470         adjustFileObjs(outputObj, rewrite)
471
472         with final.open("cwl.output.json", "w") as f:
473             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
474             f.write(res)
475
476         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
477
478         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
479                     final.api_response()["name"],
480                     final.manifest_locator())
481
482         final_uuid = final.manifest_locator()
483         tags = tagsString.split(',')
484         for tag in tags:
485              self.api.links().create(body={
486                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
487                 }).execute(num_retries=self.num_retries)
488
489         def finalcollection(fileobj):
490             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
491
492         adjustDirObjs(outputObj, finalcollection)
493         adjustFileObjs(outputObj, finalcollection)
494
495         return (outputObj, final)
496
497     def set_crunch_output(self):
498         if self.work_api == "containers":
499             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
500             if current is None:
501                 return
502             try:
503                 self.api.containers().update(uuid=current['uuid'],
504                                              body={
505                                                  'output': self.final_output_collection.portable_data_hash(),
506                                              }).execute(num_retries=self.num_retries)
507                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
508                                               body={
509                                                   'is_trashed': True
510                                               }).execute(num_retries=self.num_retries)
511             except Exception:
512                 logger.exception("Setting container output")
513                 return
514
515     def apply_reqs(self, job_order_object, tool):
516         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
517             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
518                 raise WorkflowException(
519                     "`cwl:requirements` in the input object is not part of CWL "
520                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
521                     "can set the cwlVersion to v1.1 or greater and re-run with "
522                     "--enable-dev.")
523             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
524             for req in job_reqs:
525                 tool.requirements.append(req)
526
527     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
528         self.debug = runtimeContext.debug
529
530         tool.visit(self.check_features)
531
532         self.project_uuid = runtimeContext.project_uuid
533         self.pipeline = None
534         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
535         self.secret_store = runtimeContext.secret_store
536
537         self.trash_intermediate = runtimeContext.trash_intermediate
538         if self.trash_intermediate and self.work_api != "containers":
539             raise Exception("--trash-intermediate is only supported with --api=containers.")
540
541         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
542         if self.intermediate_output_ttl and self.work_api != "containers":
543             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
544         if self.intermediate_output_ttl < 0:
545             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
546
547         if runtimeContext.submit_request_uuid and self.work_api != "containers":
548             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
549
550         if not runtimeContext.name:
551             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
552
553         # Upload local file references in the job order.
554         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
555                                      tool, job_order)
556
557         submitting = (runtimeContext.update_workflow or
558                       runtimeContext.create_workflow or
559                       (runtimeContext.submit and not
560                        (tool.tool["class"] == "CommandLineTool" and
561                         runtimeContext.wait and
562                         not runtimeContext.always_submit_runner)))
563
564         loadingContext = self.loadingContext.copy()
565         loadingContext.do_validate = False
566         loadingContext.do_update = False
567         if submitting:
568             # Document may have been auto-updated. Reload the original
569             # document with updating disabled because we want to
570             # submit the original document, not the auto-updated one.
571             tool = load_tool(tool.tool["id"], loadingContext)
572
573         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
574         # Also uploads docker images.
575         merged_map = upload_workflow_deps(self, tool)
576
577         # Recreate process object (ArvadosWorkflow or
578         # ArvadosCommandTool) because tool document may have been
579         # updated by upload_workflow_deps in ways that modify
580         # inheritance of hints or requirements.
581         loadingContext.loader = tool.doc_loader
582         loadingContext.avsc_names = tool.doc_schema
583         loadingContext.metadata = tool.metadata
584         tool = load_tool(tool.tool, loadingContext)
585
586         existing_uuid = runtimeContext.update_workflow
587         if existing_uuid or runtimeContext.create_workflow:
588             # Create a pipeline template or workflow record and exit.
589             if self.work_api == "containers":
590                 return (upload_workflow(self, tool, job_order,
591                                         self.project_uuid,
592                                         uuid=existing_uuid,
593                                         submit_runner_ram=runtimeContext.submit_runner_ram,
594                                         name=runtimeContext.name,
595                                         merged_map=merged_map),
596                         "success")
597
598         self.apply_reqs(job_order, tool)
599
600         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
601         self.eval_timeout = runtimeContext.eval_timeout
602
603         runtimeContext = runtimeContext.copy()
604         runtimeContext.use_container = True
605         runtimeContext.tmpdir_prefix = "tmp"
606         runtimeContext.work_api = self.work_api
607
608         if self.work_api == "containers":
609             if self.ignore_docker_for_reuse:
610                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
611             runtimeContext.outdir = "/var/spool/cwl"
612             runtimeContext.docker_outdir = "/var/spool/cwl"
613             runtimeContext.tmpdir = "/tmp"
614             runtimeContext.docker_tmpdir = "/tmp"
615
616         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
617             raise Exception("--priority must be in the range 1..1000.")
618
619         if self.should_estimate_cache_size:
620             visited = set()
621             estimated_size = [0]
622             def estimate_collection_cache(obj):
623                 if obj.get("location", "").startswith("keep:"):
624                     m = pdh_size.match(obj["location"][5:])
625                     if m and m.group(1) not in visited:
626                         visited.add(m.group(1))
627                         estimated_size[0] += int(m.group(2))
628             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
629             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
630             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
631
632         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
633
634         runnerjob = None
635         if runtimeContext.submit:
636             # Submit a runner job to run the workflow for us.
637             if self.work_api == "containers":
638                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
639                     runtimeContext.runnerjob = tool.tool["id"]
640                 else:
641                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
642                                                 self.output_name,
643                                                 self.output_tags,
644                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
645                                                 name=runtimeContext.name,
646                                                 on_error=runtimeContext.on_error,
647                                                 submit_runner_image=runtimeContext.submit_runner_image,
648                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
649                                                 merged_map=merged_map,
650                                                 priority=runtimeContext.priority,
651                                                 secret_store=self.secret_store,
652                                                 collection_cache_size=runtimeContext.collection_cache_size,
653                                                 collection_cache_is_default=self.should_estimate_cache_size)
654
655         if runtimeContext.cwl_runner_job is not None:
656             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
657
658         jobiter = tool.job(job_order,
659                            self.output_callback,
660                            runtimeContext)
661
662         if runtimeContext.submit and not runtimeContext.wait:
663             runnerjob = next(jobiter)
664             runnerjob.run(runtimeContext)
665             return (runnerjob.uuid, "success")
666
667         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
668         if current_container:
669             logger.info("Running inside container %s", current_container.get("uuid"))
670
671         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
672         self.polling_thread = threading.Thread(target=self.poll_states)
673         self.polling_thread.start()
674
675         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
676
677         try:
678             self.workflow_eval_lock.acquire()
679
680             # Holds the lock while this code runs and releases it when
681             # it is safe to do so in self.workflow_eval_lock.wait(),
682             # at which point on_message can update job state and
683             # process output callbacks.
684
685             loopperf = Perf(metrics, "jobiter")
686             loopperf.__enter__()
687             for runnable in jobiter:
688                 loopperf.__exit__()
689
690                 if self.stop_polling.is_set():
691                     break
692
693                 if self.task_queue.error is not None:
694                     raise self.task_queue.error
695
696                 if runnable:
697                     with Perf(metrics, "run"):
698                         self.start_run(runnable, runtimeContext)
699                 else:
700                     if (self.task_queue.in_flight + len(self.processes)) > 0:
701                         self.workflow_eval_lock.wait(3)
702                     else:
703                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
704                         break
705
706                 if self.stop_polling.is_set():
707                     break
708
709                 loopperf.__enter__()
710             loopperf.__exit__()
711
712             while (self.task_queue.in_flight + len(self.processes)) > 0:
713                 if self.task_queue.error is not None:
714                     raise self.task_queue.error
715                 self.workflow_eval_lock.wait(3)
716
717         except UnsupportedRequirement:
718             raise
719         except:
720             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
721                 logger.error("Interrupted, workflow will be cancelled")
722             elif isinstance(sys.exc_info()[1], WorkflowException):
723                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
724             else:
725                 logger.exception("Workflow execution failed")
726
727             if self.pipeline:
728                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
729                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
730
731             if self.work_api == "containers" and not current_container:
732                 # Not running in a crunch container, so cancel any outstanding processes.
733                 for p in self.processes:
734                     try:
735                         self.api.container_requests().update(uuid=p,
736                                                              body={"priority": "0"}
737                         ).execute(num_retries=self.num_retries)
738                     except Exception:
739                         pass
740         finally:
741             self.workflow_eval_lock.release()
742             self.task_queue.drain()
743             self.stop_polling.set()
744             self.polling_thread.join()
745             self.task_queue.join()
746
747         if self.final_status == "UnsupportedRequirement":
748             raise UnsupportedRequirement("Check log for details.")
749
750         if self.final_output is None:
751             raise WorkflowException("Workflow did not return a result.")
752
753         if runtimeContext.submit and isinstance(tool, Runner):
754             logger.info("Final output collection %s", tool.final_output)
755         else:
756             if self.output_name is None:
757                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
758             if self.output_tags is None:
759                 self.output_tags = ""
760
761             storage_classes = runtimeContext.storage_classes.strip().split(",")
762             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
763             self.set_crunch_output()
764
765         if runtimeContext.compute_checksum:
766             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
767             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
768
769         if self.trash_intermediate and self.final_status == "success":
770             self.trash_intermediate_output()
771
772         return (self.final_output, self.final_status)