17301: Fold subsequent messages into details
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21 import urllib
22
23 from cwltool.errors import WorkflowException
24 import cwltool.workflow
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 import arvados_cwl.util
34 from .arvcontainer import RunnerContainer
35 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
36 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from cwltool.task_queue import TaskQueue
42 from .context import ArvLoadingContext, ArvRuntimeContext
43 from ._version import __version__
44
45 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
46 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
47 from cwltool.command_line_tool import compute_checksums
48 from cwltool.load_tool import load_tool
49
50 logger = logging.getLogger('arvados.cwl-runner')
51 metrics = logging.getLogger('arvados.cwl-runner.metrics')
52
53 DEFAULT_PRIORITY = 500
54
55 class RuntimeStatusLoggingHandler(logging.Handler):
56     """
57     Intercepts logging calls and report them as runtime statuses on runner
58     containers.
59     """
60     def __init__(self, runtime_status_update_func):
61         super(RuntimeStatusLoggingHandler, self).__init__()
62         self.runtime_status_update = runtime_status_update_func
63         self.updatingRuntimeStatus = False
64
65     def emit(self, record):
66         kind = None
67         if record.levelno >= logging.ERROR:
68             kind = 'error'
69         elif record.levelno >= logging.WARNING:
70             kind = 'warning'
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using containers API),
95     wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4,
104                  stdout=sys.stdout):
105
106         if arvargs is None:
107             arvargs = argparse.Namespace()
108             arvargs.work_api = None
109             arvargs.output_name = None
110             arvargs.output_tags = None
111             arvargs.thread_count = 1
112             arvargs.collection_cache_size = None
113
114         self.api = api_client
115         self.processes = {}
116         self.workflow_eval_lock = threading.Condition(threading.RLock())
117         self.final_output = None
118         self.final_status = None
119         self.num_retries = num_retries
120         self.uuid = None
121         self.stop_polling = threading.Event()
122         self.poll_api = None
123         self.pipeline = None
124         self.final_output_collection = None
125         self.output_name = arvargs.output_name
126         self.output_tags = arvargs.output_tags
127         self.project_uuid = None
128         self.intermediate_output_ttl = 0
129         self.intermediate_output_collections = []
130         self.trash_intermediate = False
131         self.thread_count = arvargs.thread_count
132         self.poll_interval = 12
133         self.loadingContext = None
134         self.should_estimate_cache_size = True
135         self.fs_access = None
136         self.secret_store = None
137         self.stdout = stdout
138
139         if keep_client is not None:
140             self.keep_client = keep_client
141         else:
142             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
143
144         if arvargs.collection_cache_size:
145             collection_cache_size = arvargs.collection_cache_size*1024*1024
146             self.should_estimate_cache_size = False
147         else:
148             collection_cache_size = 256*1024*1024
149
150         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
151                                                 cap=collection_cache_size)
152
153         self.fetcher_constructor = partial(CollectionFetcher,
154                                            api_client=self.api,
155                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
156                                            num_retries=self.num_retries)
157
158         self.work_api = None
159         expected_api = ["containers"]
160         for api in expected_api:
161             try:
162                 methods = self.api._rootDesc.get('resources')[api]['methods']
163                 if ('httpMethod' in methods['create'] and
164                     (arvargs.work_api == api or arvargs.work_api is None)):
165                     self.work_api = api
166                     break
167             except KeyError:
168                 pass
169
170         if not self.work_api:
171             if arvargs.work_api is None:
172                 raise Exception("No supported APIs")
173             else:
174                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
175
176         if self.work_api == "jobs":
177             logger.error("""
178 *******************************
179 The 'jobs' API is no longer supported.
180 *******************************""")
181             exit(1)
182
183         self.loadingContext = ArvLoadingContext(vars(arvargs))
184         self.loadingContext.fetcher_constructor = self.fetcher_constructor
185         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
186         self.loadingContext.construct_tool_object = self.arv_make_tool
187
188         # Add a custom logging handler to the root logger for runtime status reporting
189         # if running inside a container
190         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
191             root_logger = logging.getLogger('')
192
193             # Remove existing RuntimeStatusLoggingHandlers if they exist
194             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
195             root_logger.handlers = handlers
196
197             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
198             root_logger.addHandler(handler)
199
200         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
201         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
202                                                      collection_cache=self.collection_cache)
203
204         validate_cluster_target(self, self.runtimeContext)
205
206
207     def arv_make_tool(self, toolpath_object, loadingContext):
208         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
209             return ArvadosCommandTool(self, toolpath_object, loadingContext)
210         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
211             return ArvadosWorkflow(self, toolpath_object, loadingContext)
212         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
213             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
214         else:
215             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
216
217     def output_callback(self, out, processStatus):
218         with self.workflow_eval_lock:
219             if processStatus == "success":
220                 logger.info("Overall process status is %s", processStatus)
221                 state = "Complete"
222             else:
223                 logger.error("Overall process status is %s", processStatus)
224                 state = "Failed"
225             if self.pipeline:
226                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
227                                                         body={"state": state}).execute(num_retries=self.num_retries)
228             self.final_status = processStatus
229             self.final_output = out
230             self.workflow_eval_lock.notifyAll()
231
232
233     def start_run(self, runnable, runtimeContext):
234         self.task_queue.add(partial(runnable.run, runtimeContext),
235                             self.workflow_eval_lock, self.stop_polling)
236
237     def process_submitted(self, container):
238         with self.workflow_eval_lock:
239             self.processes[container.uuid] = container
240
241     def process_done(self, uuid, record):
242         with self.workflow_eval_lock:
243             j = self.processes[uuid]
244             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
245             self.task_queue.add(partial(j.done, record),
246                                 self.workflow_eval_lock, self.stop_polling)
247             del self.processes[uuid]
248
249     def runtime_status_update(self, kind, message, detail=None):
250         """
251         Updates the runtime_status field on the runner container.
252         Called when there's a need to report errors, warnings or just
253         activity statuses, for example in the RuntimeStatusLoggingHandler.
254         """
255         with self.workflow_eval_lock:
256             current = None
257             try:
258                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
259             except Exception as e:
260                 logger.info("Couldn't get current container: %s", e)
261             if current is None:
262                 return
263             runtime_status = current.get('runtime_status', {})
264             # In case of status being an error, only report the first one.
265             if kind in ('error', 'warning', 'activity'):
266                 updatemessage = runtime_status.get(kind, "")
267                 if not updatemessage:
268                     updatemessage = message;
269
270                 # Subsequent messages tacked on in detail
271                 updatedetail = runtime_status.get(kind+'Detail', "")
272                 if updatedetail:
273                     updatedetail += "\n"
274                 updatedetail += message + "\n"
275                 if detail:
276                     updatedetail += detail + "\n"
277                 runtime_status.update({
278                     kind: updatemessage,
279                     kind+'Detail': updatedetail,
280                 })
281             else:
282                 # Ignore any other status kind
283                 return
284             try:
285                 self.api.containers().update(uuid=current['uuid'],
286                                             body={
287                                                 'runtime_status': runtime_status,
288                                             }).execute(num_retries=self.num_retries)
289             except Exception as e:
290                 logger.info("Couldn't update runtime_status: %s", e)
291
292     def wrapped_callback(self, cb, obj, st):
293         with self.workflow_eval_lock:
294             cb(obj, st)
295             self.workflow_eval_lock.notifyAll()
296
297     def get_wrapped_callback(self, cb):
298         return partial(self.wrapped_callback, cb)
299
300     def on_message(self, event):
301         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
302             uuid = event["object_uuid"]
303             if event["properties"]["new_attributes"]["state"] == "Running":
304                 with self.workflow_eval_lock:
305                     j = self.processes[uuid]
306                     if j.running is False:
307                         j.running = True
308                         j.update_pipeline_component(event["properties"]["new_attributes"])
309                         logger.info("%s %s is Running", self.label(j), uuid)
310             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
311                 self.process_done(uuid, event["properties"]["new_attributes"])
312
313     def label(self, obj):
314         return "[%s %s]" % (self.work_api[0:-1], obj.name)
315
316     def poll_states(self):
317         """Poll status of containers listed in the processes dict.
318
319         Runs in a separate thread.
320         """
321
322         try:
323             remain_wait = self.poll_interval
324             while True:
325                 if remain_wait > 0:
326                     self.stop_polling.wait(remain_wait)
327                 if self.stop_polling.is_set():
328                     break
329                 with self.workflow_eval_lock:
330                     keys = list(self.processes)
331                 if not keys:
332                     remain_wait = self.poll_interval
333                     continue
334
335                 begin_poll = time.time()
336                 if self.work_api == "containers":
337                     table = self.poll_api.container_requests()
338
339                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
340
341                 while keys:
342                     page = keys[:pageSize]
343                     try:
344                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
345                     except Exception:
346                         logger.exception("Error checking states on API server: %s")
347                         remain_wait = self.poll_interval
348                         continue
349
350                     for p in proc_states["items"]:
351                         self.on_message({
352                             "object_uuid": p["uuid"],
353                             "event_type": "update",
354                             "properties": {
355                                 "new_attributes": p
356                             }
357                         })
358                     keys = keys[pageSize:]
359
360                 finish_poll = time.time()
361                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
362         except:
363             logger.exception("Fatal error in state polling thread.")
364             with self.workflow_eval_lock:
365                 self.processes.clear()
366                 self.workflow_eval_lock.notifyAll()
367         finally:
368             self.stop_polling.set()
369
370     def add_intermediate_output(self, uuid):
371         if uuid:
372             self.intermediate_output_collections.append(uuid)
373
374     def trash_intermediate_output(self):
375         logger.info("Cleaning up intermediate output collections")
376         for i in self.intermediate_output_collections:
377             try:
378                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
379             except Exception:
380                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
381             except (KeyboardInterrupt, SystemExit):
382                 break
383
384     def check_features(self, obj, parentfield=""):
385         if isinstance(obj, dict):
386             if obj.get("class") == "DockerRequirement":
387                 if obj.get("dockerOutputDirectory"):
388                     if not obj.get("dockerOutputDirectory").startswith('/'):
389                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
390                             "Option 'dockerOutputDirectory' must be an absolute path.")
391             if obj.get("class") == "InplaceUpdateRequirement":
392                 if obj["inplaceUpdate"] and parentfield == "requirements":
393                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
394             for k,v in viewitems(obj):
395                 self.check_features(v, parentfield=k)
396         elif isinstance(obj, list):
397             for i,v in enumerate(obj):
398                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
399                     self.check_features(v, parentfield=parentfield)
400
401     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
402         outputObj = copy.deepcopy(outputObj)
403
404         files = []
405         def capture(fileobj):
406             files.append(fileobj)
407
408         adjustDirObjs(outputObj, capture)
409         adjustFileObjs(outputObj, capture)
410
411         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
412
413         final = arvados.collection.Collection(api_client=self.api,
414                                               keep_client=self.keep_client,
415                                               num_retries=self.num_retries)
416
417         for k,v in generatemapper.items():
418             if v.type == "Directory" and v.resolved.startswith("_:"):
419                     continue
420             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
421                 with final.open(v.target, "wb") as f:
422                     f.write(v.resolved.encode("utf-8"))
423                     continue
424
425             if not v.resolved.startswith("keep:"):
426                 raise Exception("Output source is not in keep or a literal")
427             sp = v.resolved.split("/")
428             srccollection = sp[0][5:]
429             try:
430                 reader = self.collection_cache.get(srccollection)
431                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
432                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
433             except arvados.errors.ArgumentError as e:
434                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
435                 raise
436             except IOError as e:
437                 logger.error("While preparing output collection: %s", e)
438                 raise
439
440         def rewrite(fileobj):
441             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
442             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
443                 if k in fileobj:
444                     del fileobj[k]
445
446         adjustDirObjs(outputObj, rewrite)
447         adjustFileObjs(outputObj, rewrite)
448
449         with final.open("cwl.output.json", "w") as f:
450             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
451             f.write(res)
452
453         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
454
455         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
456                     final.api_response()["name"],
457                     final.manifest_locator())
458
459         final_uuid = final.manifest_locator()
460         tags = tagsString.split(',')
461         for tag in tags:
462              self.api.links().create(body={
463                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
464                 }).execute(num_retries=self.num_retries)
465
466         def finalcollection(fileobj):
467             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
468
469         adjustDirObjs(outputObj, finalcollection)
470         adjustFileObjs(outputObj, finalcollection)
471
472         return (outputObj, final)
473
474     def set_crunch_output(self):
475         if self.work_api == "containers":
476             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
477             if current is None:
478                 return
479             try:
480                 self.api.containers().update(uuid=current['uuid'],
481                                              body={
482                                                  'output': self.final_output_collection.portable_data_hash(),
483                                              }).execute(num_retries=self.num_retries)
484                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
485                                               body={
486                                                   'is_trashed': True
487                                               }).execute(num_retries=self.num_retries)
488             except Exception:
489                 logger.exception("Setting container output")
490                 raise
491
492     def apply_reqs(self, job_order_object, tool):
493         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
494             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
495                 raise WorkflowException(
496                     "`cwl:requirements` in the input object is not part of CWL "
497                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
498                     "can set the cwlVersion to v1.1 or greater and re-run with "
499                     "--enable-dev.")
500             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
501             for req in job_reqs:
502                 tool.requirements.append(req)
503
504     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
505         self.debug = runtimeContext.debug
506
507         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
508         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
509         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
510         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
511
512         updated_tool.visit(self.check_features)
513
514         self.project_uuid = runtimeContext.project_uuid
515         self.pipeline = None
516         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
517         self.secret_store = runtimeContext.secret_store
518
519         self.trash_intermediate = runtimeContext.trash_intermediate
520         if self.trash_intermediate and self.work_api != "containers":
521             raise Exception("--trash-intermediate is only supported with --api=containers.")
522
523         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
524         if self.intermediate_output_ttl and self.work_api != "containers":
525             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
526         if self.intermediate_output_ttl < 0:
527             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
528
529         if runtimeContext.submit_request_uuid and self.work_api != "containers":
530             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
531
532         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
533         if runtimeContext.storage_classes == "default":
534             runtimeContext.storage_classes = default_storage_classes
535         if runtimeContext.intermediate_storage_classes == "default":
536             runtimeContext.intermediate_storage_classes = default_storage_classes
537
538         if not runtimeContext.name:
539             runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
540
541         # Upload local file references in the job order.
542         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
543                                      updated_tool, job_order)
544
545         # the last clause means: if it is a command line tool, and we
546         # are going to wait for the result, and always_submit_runner
547         # is false, then we don't submit a runner process.
548
549         submitting = (runtimeContext.update_workflow or
550                       runtimeContext.create_workflow or
551                       (runtimeContext.submit and not
552                        (updated_tool.tool["class"] == "CommandLineTool" and
553                         runtimeContext.wait and
554                         not runtimeContext.always_submit_runner)))
555
556         loadingContext = self.loadingContext.copy()
557         loadingContext.do_validate = False
558         if submitting:
559             loadingContext.do_update = False
560             # Document may have been auto-updated. Reload the original
561             # document with updating disabled because we want to
562             # submit the document with its original CWL version, not
563             # the auto-updated one.
564             tool = load_tool(updated_tool.tool["id"], loadingContext)
565         else:
566             tool = updated_tool
567
568         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
569         # Also uploads docker images.
570         merged_map = upload_workflow_deps(self, tool)
571
572         # Recreate process object (ArvadosWorkflow or
573         # ArvadosCommandTool) because tool document may have been
574         # updated by upload_workflow_deps in ways that modify
575         # inheritance of hints or requirements.
576         loadingContext.loader = tool.doc_loader
577         loadingContext.avsc_names = tool.doc_schema
578         loadingContext.metadata = tool.metadata
579         tool = load_tool(tool.tool, loadingContext)
580
581         existing_uuid = runtimeContext.update_workflow
582         if existing_uuid or runtimeContext.create_workflow:
583             # Create a pipeline template or workflow record and exit.
584             if self.work_api == "containers":
585                 uuid = upload_workflow(self, tool, job_order,
586                                         self.project_uuid,
587                                         uuid=existing_uuid,
588                                         submit_runner_ram=runtimeContext.submit_runner_ram,
589                                         name=runtimeContext.name,
590                                         merged_map=merged_map,
591                                         submit_runner_image=runtimeContext.submit_runner_image)
592                 self.stdout.write(uuid + "\n")
593                 return (None, "success")
594
595         self.apply_reqs(job_order, tool)
596
597         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
598         self.eval_timeout = runtimeContext.eval_timeout
599
600         runtimeContext = runtimeContext.copy()
601         runtimeContext.use_container = True
602         runtimeContext.tmpdir_prefix = "tmp"
603         runtimeContext.work_api = self.work_api
604
605         if self.work_api == "containers":
606             if self.ignore_docker_for_reuse:
607                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
608             runtimeContext.outdir = "/var/spool/cwl"
609             runtimeContext.docker_outdir = "/var/spool/cwl"
610             runtimeContext.tmpdir = "/tmp"
611             runtimeContext.docker_tmpdir = "/tmp"
612
613         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
614             raise Exception("--priority must be in the range 1..1000.")
615
616         if self.should_estimate_cache_size:
617             visited = set()
618             estimated_size = [0]
619             def estimate_collection_cache(obj):
620                 if obj.get("location", "").startswith("keep:"):
621                     m = pdh_size.match(obj["location"][5:])
622                     if m and m.group(1) not in visited:
623                         visited.add(m.group(1))
624                         estimated_size[0] += int(m.group(2))
625             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
626             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
627             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
628
629         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
630
631         runnerjob = None
632         if runtimeContext.submit:
633             # Submit a runner job to run the workflow for us.
634             if self.work_api == "containers":
635                 if submitting:
636                     tool = RunnerContainer(self, updated_tool,
637                                            tool, loadingContext, runtimeContext.enable_reuse,
638                                            self.output_name,
639                                            self.output_tags,
640                                            submit_runner_ram=runtimeContext.submit_runner_ram,
641                                            name=runtimeContext.name,
642                                            on_error=runtimeContext.on_error,
643                                            submit_runner_image=runtimeContext.submit_runner_image,
644                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
645                                            merged_map=merged_map,
646                                            priority=runtimeContext.priority,
647                                            secret_store=self.secret_store,
648                                            collection_cache_size=runtimeContext.collection_cache_size,
649                                            collection_cache_is_default=self.should_estimate_cache_size)
650                 else:
651                     runtimeContext.runnerjob = tool.tool["id"]
652
653         if runtimeContext.cwl_runner_job is not None:
654             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
655
656         jobiter = tool.job(job_order,
657                            self.output_callback,
658                            runtimeContext)
659
660         if runtimeContext.submit and not runtimeContext.wait:
661             runnerjob = next(jobiter)
662             runnerjob.run(runtimeContext)
663             self.stdout.write(runnerjob.uuid+"\n")
664             return (None, "success")
665
666         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
667         if current_container:
668             logger.info("Running inside container %s", current_container.get("uuid"))
669
670         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
671         self.polling_thread = threading.Thread(target=self.poll_states)
672         self.polling_thread.start()
673
674         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
675
676         try:
677             self.workflow_eval_lock.acquire()
678
679             # Holds the lock while this code runs and releases it when
680             # it is safe to do so in self.workflow_eval_lock.wait(),
681             # at which point on_message can update job state and
682             # process output callbacks.
683
684             loopperf = Perf(metrics, "jobiter")
685             loopperf.__enter__()
686             for runnable in jobiter:
687                 loopperf.__exit__()
688
689                 if self.stop_polling.is_set():
690                     break
691
692                 if self.task_queue.error is not None:
693                     raise self.task_queue.error
694
695                 if runnable:
696                     with Perf(metrics, "run"):
697                         self.start_run(runnable, runtimeContext)
698                 else:
699                     if (self.task_queue.in_flight + len(self.processes)) > 0:
700                         self.workflow_eval_lock.wait(3)
701                     else:
702                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
703                         break
704
705                 if self.stop_polling.is_set():
706                     break
707
708                 loopperf.__enter__()
709             loopperf.__exit__()
710
711             while (self.task_queue.in_flight + len(self.processes)) > 0:
712                 if self.task_queue.error is not None:
713                     raise self.task_queue.error
714                 self.workflow_eval_lock.wait(3)
715
716         except UnsupportedRequirement:
717             raise
718         except:
719             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
720                 logger.error("Interrupted, workflow will be cancelled")
721             elif isinstance(sys.exc_info()[1], WorkflowException):
722                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
723             else:
724                 logger.exception("Workflow execution failed")
725
726             if self.pipeline:
727                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
728                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
729
730             if self.work_api == "containers" and not current_container:
731                 # Not running in a crunch container, so cancel any outstanding processes.
732                 for p in self.processes:
733                     try:
734                         self.api.container_requests().update(uuid=p,
735                                                              body={"priority": "0"}
736                         ).execute(num_retries=self.num_retries)
737                     except Exception:
738                         pass
739         finally:
740             self.workflow_eval_lock.release()
741             self.task_queue.drain()
742             self.stop_polling.set()
743             self.polling_thread.join()
744             self.task_queue.join()
745
746         if self.final_status == "UnsupportedRequirement":
747             raise UnsupportedRequirement("Check log for details.")
748
749         if self.final_output is None:
750             raise WorkflowException("Workflow did not return a result.")
751
752         if runtimeContext.submit and isinstance(tool, Runner):
753             logger.info("Final output collection %s", tool.final_output)
754             if workbench2 or workbench1:
755                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
756         else:
757             if self.output_name is None:
758                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
759             if self.output_tags is None:
760                 self.output_tags = ""
761
762             storage_classes = ""
763             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
764             if storage_class_req and storage_class_req.get("finalStorageClass"):
765                 storage_classes = aslist(storage_class_req["finalStorageClass"])
766             else:
767                 storage_classes = runtimeContext.storage_classes.strip().split(",")
768
769             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
770             self.set_crunch_output()
771
772         if runtimeContext.compute_checksum:
773             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
774             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
775
776         if self.trash_intermediate and self.final_status == "success":
777             self.trash_intermediate_output()
778
779         return (self.final_output, self.final_status)