15028: Update cwltool/schema-salad deps, fix tests
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21
22 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 from schema_salad.sourceline import SourceLine
25 import schema_salad.validate as validate
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 import arvados_cwl.util
33 from .arvcontainer import RunnerContainer
34 from .arvjob import RunnerJob, RunnerTemplate
35 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
36 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from .task_queue import TaskQueue
42 from .context import ArvLoadingContext, ArvRuntimeContext
43 from ._version import __version__
44
45 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
46 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
47 from cwltool.command_line_tool import compute_checksums
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51
52 DEFAULT_PRIORITY = 500
53
54 class RuntimeStatusLoggingHandler(logging.Handler):
55     """
56     Intercepts logging calls and report them as runtime statuses on runner
57     containers.
58     """
59     def __init__(self, runtime_status_update_func):
60         super(RuntimeStatusLoggingHandler, self).__init__()
61         self.runtime_status_update = runtime_status_update_func
62         self.updatingRuntimeStatus = False
63
64     def emit(self, record):
65         kind = None
66         if record.levelno >= logging.ERROR:
67             kind = 'error'
68         elif record.levelno >= logging.WARNING:
69             kind = 'warning'
70         if kind is not None and self.updatingRuntimeStatus is not True:
71             self.updatingRuntimeStatus = True
72             try:
73                 log_msg = record.getMessage()
74                 if '\n' in log_msg:
75                     # If the logged message is multi-line, use its first line as status
76                     # and the rest as detail.
77                     status, detail = log_msg.split('\n', 1)
78                     self.runtime_status_update(
79                         kind,
80                         "%s: %s" % (record.name, status),
81                         detail
82                     )
83                 else:
84                     self.runtime_status_update(
85                         kind,
86                         "%s: %s" % (record.name, record.getMessage())
87                     )
88             finally:
89                 self.updatingRuntimeStatus = False
90
91
92 class ArvCwlExecutor(object):
93     """Execute a CWL tool or workflow, submit work (using either jobs or
94     containers API), wait for them to complete, and report output.
95
96     """
97
98     def __init__(self, api_client,
99                  arvargs=None,
100                  keep_client=None,
101                  num_retries=4,
102                  thread_count=4):
103
104         if arvargs is None:
105             arvargs = argparse.Namespace()
106             arvargs.work_api = None
107             arvargs.output_name = None
108             arvargs.output_tags = None
109             arvargs.thread_count = 1
110             arvargs.collection_cache_size = None
111
112         self.api = api_client
113         self.processes = {}
114         self.workflow_eval_lock = threading.Condition(threading.RLock())
115         self.final_output = None
116         self.final_status = None
117         self.num_retries = num_retries
118         self.uuid = None
119         self.stop_polling = threading.Event()
120         self.poll_api = None
121         self.pipeline = None
122         self.final_output_collection = None
123         self.output_name = arvargs.output_name
124         self.output_tags = arvargs.output_tags
125         self.project_uuid = None
126         self.intermediate_output_ttl = 0
127         self.intermediate_output_collections = []
128         self.trash_intermediate = False
129         self.thread_count = arvargs.thread_count
130         self.poll_interval = 12
131         self.loadingContext = None
132         self.should_estimate_cache_size = True
133
134         if keep_client is not None:
135             self.keep_client = keep_client
136         else:
137             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
138
139         if arvargs.collection_cache_size:
140             collection_cache_size = arvargs.collection_cache_size*1024*1024
141             self.should_estimate_cache_size = False
142         else:
143             collection_cache_size = 256*1024*1024
144
145         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
146                                                 cap=collection_cache_size)
147
148         self.fetcher_constructor = partial(CollectionFetcher,
149                                            api_client=self.api,
150                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
151                                            num_retries=self.num_retries)
152
153         self.work_api = None
154         expected_api = ["jobs", "containers"]
155         for api in expected_api:
156             try:
157                 methods = self.api._rootDesc.get('resources')[api]['methods']
158                 if ('httpMethod' in methods['create'] and
159                     (arvargs.work_api == api or arvargs.work_api is None)):
160                     self.work_api = api
161                     break
162             except KeyError:
163                 pass
164
165         if not self.work_api:
166             if arvargs.work_api is None:
167                 raise Exception("No supported APIs")
168             else:
169                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
170
171         if self.work_api == "jobs":
172             logger.warning("""
173 *******************************
174 Using the deprecated 'jobs' API.
175
176 To get rid of this warning:
177
178 Users: read about migrating at
179 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
180 and use the option --api=containers
181
182 Admins: configure the cluster to disable the 'jobs' API as described at:
183 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
184 *******************************""")
185
186         self.loadingContext = ArvLoadingContext(vars(arvargs))
187         self.loadingContext.fetcher_constructor = self.fetcher_constructor
188         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
189         self.loadingContext.construct_tool_object = self.arv_make_tool
190         self.loadingContext.do_update = False
191
192         # Add a custom logging handler to the root logger for runtime status reporting
193         # if running inside a container
194         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
195             root_logger = logging.getLogger('')
196
197             # Remove existing RuntimeStatusLoggingHandlers if they exist
198             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
199             root_logger.handlers = handlers
200
201             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
202             root_logger.addHandler(handler)
203
204         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
205         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
206                                                      collection_cache=self.collection_cache)
207
208         validate_cluster_target(self, self.runtimeContext)
209
210
211     def arv_make_tool(self, toolpath_object, loadingContext):
212         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
213             return ArvadosCommandTool(self, toolpath_object, loadingContext)
214         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
215             return ArvadosWorkflow(self, toolpath_object, loadingContext)
216         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
217             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
218         else:
219             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
220
221     def output_callback(self, out, processStatus):
222         with self.workflow_eval_lock:
223             if processStatus == "success":
224                 logger.info("Overall process status is %s", processStatus)
225                 state = "Complete"
226             else:
227                 logger.error("Overall process status is %s", processStatus)
228                 state = "Failed"
229             if self.pipeline:
230                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
231                                                         body={"state": state}).execute(num_retries=self.num_retries)
232             self.final_status = processStatus
233             self.final_output = out
234             self.workflow_eval_lock.notifyAll()
235
236
237     def start_run(self, runnable, runtimeContext):
238         self.task_queue.add(partial(runnable.run, runtimeContext),
239                             self.workflow_eval_lock, self.stop_polling)
240
241     def process_submitted(self, container):
242         with self.workflow_eval_lock:
243             self.processes[container.uuid] = container
244
245     def process_done(self, uuid, record):
246         with self.workflow_eval_lock:
247             j = self.processes[uuid]
248             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
249             self.task_queue.add(partial(j.done, record),
250                                 self.workflow_eval_lock, self.stop_polling)
251             del self.processes[uuid]
252
253     def runtime_status_update(self, kind, message, detail=None):
254         """
255         Updates the runtime_status field on the runner container.
256         Called when there's a need to report errors, warnings or just
257         activity statuses, for example in the RuntimeStatusLoggingHandler.
258         """
259         with self.workflow_eval_lock:
260             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
261             if current is None:
262                 return
263             runtime_status = current.get('runtime_status', {})
264             # In case of status being an error, only report the first one.
265             if kind == 'error':
266                 if not runtime_status.get('error'):
267                     runtime_status.update({
268                         'error': message
269                     })
270                     if detail is not None:
271                         runtime_status.update({
272                             'errorDetail': detail
273                         })
274                 # Further errors are only mentioned as a count.
275                 else:
276                     # Get anything before an optional 'and N more' string.
277                     try:
278                         error_msg = re.match(
279                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
280                         more_failures = re.match(
281                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
282                     except TypeError:
283                         # Ignore tests stubbing errors
284                         return
285                     if more_failures:
286                         failure_qty = int(more_failures.groups()[0])
287                         runtime_status.update({
288                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
289                         })
290                     else:
291                         runtime_status.update({
292                             'error': "%s (and 1 more)" % error_msg
293                         })
294             elif kind in ['warning', 'activity']:
295                 # Record the last warning/activity status without regard of
296                 # previous occurences.
297                 runtime_status.update({
298                     kind: message
299                 })
300                 if detail is not None:
301                     runtime_status.update({
302                         kind+"Detail": detail
303                     })
304             else:
305                 # Ignore any other status kind
306                 return
307             try:
308                 self.api.containers().update(uuid=current['uuid'],
309                                             body={
310                                                 'runtime_status': runtime_status,
311                                             }).execute(num_retries=self.num_retries)
312             except Exception as e:
313                 logger.info("Couldn't update runtime_status: %s", e)
314
315     def wrapped_callback(self, cb, obj, st):
316         with self.workflow_eval_lock:
317             cb(obj, st)
318             self.workflow_eval_lock.notifyAll()
319
320     def get_wrapped_callback(self, cb):
321         return partial(self.wrapped_callback, cb)
322
323     def on_message(self, event):
324         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
325             uuid = event["object_uuid"]
326             if event["properties"]["new_attributes"]["state"] == "Running":
327                 with self.workflow_eval_lock:
328                     j = self.processes[uuid]
329                     if j.running is False:
330                         j.running = True
331                         j.update_pipeline_component(event["properties"]["new_attributes"])
332                         logger.info("%s %s is Running", self.label(j), uuid)
333             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
334                 self.process_done(uuid, event["properties"]["new_attributes"])
335
336     def label(self, obj):
337         return "[%s %s]" % (self.work_api[0:-1], obj.name)
338
339     def poll_states(self):
340         """Poll status of jobs or containers listed in the processes dict.
341
342         Runs in a separate thread.
343         """
344
345         try:
346             remain_wait = self.poll_interval
347             while True:
348                 if remain_wait > 0:
349                     self.stop_polling.wait(remain_wait)
350                 if self.stop_polling.is_set():
351                     break
352                 with self.workflow_eval_lock:
353                     keys = list(self.processes)
354                 if not keys:
355                     remain_wait = self.poll_interval
356                     continue
357
358                 begin_poll = time.time()
359                 if self.work_api == "containers":
360                     table = self.poll_api.container_requests()
361                 elif self.work_api == "jobs":
362                     table = self.poll_api.jobs()
363
364                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
365
366                 while keys:
367                     page = keys[:pageSize]
368                     keys = keys[pageSize:]
369                     try:
370                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
371                     except Exception:
372                         logger.exception("Error checking states on API server: %s")
373                         remain_wait = self.poll_interval
374                         continue
375
376                     for p in proc_states["items"]:
377                         self.on_message({
378                             "object_uuid": p["uuid"],
379                             "event_type": "update",
380                             "properties": {
381                                 "new_attributes": p
382                             }
383                         })
384                 finish_poll = time.time()
385                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
386         except:
387             logger.exception("Fatal error in state polling thread.")
388             with self.workflow_eval_lock:
389                 self.processes.clear()
390                 self.workflow_eval_lock.notifyAll()
391         finally:
392             self.stop_polling.set()
393
394     def add_intermediate_output(self, uuid):
395         if uuid:
396             self.intermediate_output_collections.append(uuid)
397
398     def trash_intermediate_output(self):
399         logger.info("Cleaning up intermediate output collections")
400         for i in self.intermediate_output_collections:
401             try:
402                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
403             except Exception:
404                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
405             except (KeyboardInterrupt, SystemExit):
406                 break
407
408     def check_features(self, obj):
409         if isinstance(obj, dict):
410             if obj.get("writable") and self.work_api != "containers":
411                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
412             if obj.get("class") == "DockerRequirement":
413                 if obj.get("dockerOutputDirectory"):
414                     if self.work_api != "containers":
415                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
416                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
417                     if not obj.get("dockerOutputDirectory").startswith('/'):
418                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
419                             "Option 'dockerOutputDirectory' must be an absolute path.")
420             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
421                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
422             for v in viewvalues(obj):
423                 self.check_features(v)
424         elif isinstance(obj, list):
425             for i,v in enumerate(obj):
426                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
427                     self.check_features(v)
428
429     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
430         outputObj = copy.deepcopy(outputObj)
431
432         files = []
433         def capture(fileobj):
434             files.append(fileobj)
435
436         adjustDirObjs(outputObj, capture)
437         adjustFileObjs(outputObj, capture)
438
439         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
440
441         final = arvados.collection.Collection(api_client=self.api,
442                                               keep_client=self.keep_client,
443                                               num_retries=self.num_retries)
444
445         for k,v in generatemapper.items():
446             if v.type == "Directory" and v.resolved.startswith("_:"):
447                     continue
448             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
449                 with final.open(v.target, "wb") as f:
450                     f.write(v.resolved.encode("utf-8"))
451                     continue
452
453             if not v.resolved.startswith("keep:"):
454                 raise Exception("Output source is not in keep or a literal")
455             sp = v.resolved.split("/")
456             srccollection = sp[0][5:]
457             try:
458                 reader = self.collection_cache.get(srccollection)
459                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
460                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
461             except arvados.errors.ArgumentError as e:
462                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
463                 raise
464             except IOError as e:
465                 logger.error("While preparing output collection: %s", e)
466                 raise
467
468         def rewrite(fileobj):
469             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
470             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
471                 if k in fileobj:
472                     del fileobj[k]
473
474         adjustDirObjs(outputObj, rewrite)
475         adjustFileObjs(outputObj, rewrite)
476
477         with final.open("cwl.output.json", "w") as f:
478             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
479             f.write(res)
480
481         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
482
483         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
484                     final.api_response()["name"],
485                     final.manifest_locator())
486
487         final_uuid = final.manifest_locator()
488         tags = tagsString.split(',')
489         for tag in tags:
490              self.api.links().create(body={
491                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
492                 }).execute(num_retries=self.num_retries)
493
494         def finalcollection(fileobj):
495             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
496
497         adjustDirObjs(outputObj, finalcollection)
498         adjustFileObjs(outputObj, finalcollection)
499
500         return (outputObj, final)
501
502     def set_crunch_output(self):
503         if self.work_api == "containers":
504             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
505             if current is None:
506                 return
507             try:
508                 self.api.containers().update(uuid=current['uuid'],
509                                              body={
510                                                  'output': self.final_output_collection.portable_data_hash(),
511                                              }).execute(num_retries=self.num_retries)
512                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
513                                               body={
514                                                   'is_trashed': True
515                                               }).execute(num_retries=self.num_retries)
516             except Exception:
517                 logger.exception("Setting container output")
518                 return
519         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
520             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
521                                    body={
522                                        'output': self.final_output_collection.portable_data_hash(),
523                                        'success': self.final_status == "success",
524                                        'progress':1.0
525                                    }).execute(num_retries=self.num_retries)
526
527     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
528         self.debug = runtimeContext.debug
529
530         tool.visit(self.check_features)
531
532         self.project_uuid = runtimeContext.project_uuid
533         self.pipeline = None
534         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
535         self.secret_store = runtimeContext.secret_store
536
537         self.trash_intermediate = runtimeContext.trash_intermediate
538         if self.trash_intermediate and self.work_api != "containers":
539             raise Exception("--trash-intermediate is only supported with --api=containers.")
540
541         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
542         if self.intermediate_output_ttl and self.work_api != "containers":
543             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
544         if self.intermediate_output_ttl < 0:
545             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
546
547         if runtimeContext.submit_request_uuid and self.work_api != "containers":
548             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
549
550         if not runtimeContext.name:
551             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
552
553         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
554         # Also uploads docker images.
555         merged_map = upload_workflow_deps(self, tool)
556
557         # Reload tool object which may have been updated by
558         # upload_workflow_deps
559         # Don't validate this time because it will just print redundant errors.
560         loadingContext = self.loadingContext.copy()
561         loadingContext.loader = tool.doc_loader
562         loadingContext.avsc_names = tool.doc_schema
563         loadingContext.metadata = tool.metadata
564         loadingContext.do_validate = False
565
566         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
567                                   loadingContext)
568
569         # Upload local file references in the job order.
570         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
571                                      tool, job_order)
572
573         existing_uuid = runtimeContext.update_workflow
574         if existing_uuid or runtimeContext.create_workflow:
575             # Create a pipeline template or workflow record and exit.
576             if self.work_api == "jobs":
577                 tmpl = RunnerTemplate(self, tool, job_order,
578                                       runtimeContext.enable_reuse,
579                                       uuid=existing_uuid,
580                                       submit_runner_ram=runtimeContext.submit_runner_ram,
581                                       name=runtimeContext.name,
582                                       merged_map=merged_map,
583                                       loadingContext=loadingContext)
584                 tmpl.save()
585                 # cwltool.main will write our return value to stdout.
586                 return (tmpl.uuid, "success")
587             elif self.work_api == "containers":
588                 return (upload_workflow(self, tool, job_order,
589                                         self.project_uuid,
590                                         uuid=existing_uuid,
591                                         submit_runner_ram=runtimeContext.submit_runner_ram,
592                                         name=runtimeContext.name,
593                                         merged_map=merged_map),
594                         "success")
595
596         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
597         self.eval_timeout = runtimeContext.eval_timeout
598
599         runtimeContext = runtimeContext.copy()
600         runtimeContext.use_container = True
601         runtimeContext.tmpdir_prefix = "tmp"
602         runtimeContext.work_api = self.work_api
603
604         if self.work_api == "containers":
605             if self.ignore_docker_for_reuse:
606                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
607             runtimeContext.outdir = "/var/spool/cwl"
608             runtimeContext.docker_outdir = "/var/spool/cwl"
609             runtimeContext.tmpdir = "/tmp"
610             runtimeContext.docker_tmpdir = "/tmp"
611         elif self.work_api == "jobs":
612             if runtimeContext.priority != DEFAULT_PRIORITY:
613                 raise Exception("--priority not implemented for jobs API.")
614             runtimeContext.outdir = "$(task.outdir)"
615             runtimeContext.docker_outdir = "$(task.outdir)"
616             runtimeContext.tmpdir = "$(task.tmpdir)"
617
618         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
619             raise Exception("--priority must be in the range 1..1000.")
620
621         if self.should_estimate_cache_size:
622             visited = set()
623             estimated_size = [0]
624             def estimate_collection_cache(obj):
625                 if obj.get("location", "").startswith("keep:"):
626                     m = pdh_size.match(obj["location"][5:])
627                     if m and m.group(1) not in visited:
628                         visited.add(m.group(1))
629                         estimated_size[0] += int(m.group(2))
630             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
631             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
632             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
633
634         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
635
636         runnerjob = None
637         if runtimeContext.submit:
638             # Submit a runner job to run the workflow for us.
639             if self.work_api == "containers":
640                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
641                     runtimeContext.runnerjob = tool.tool["id"]
642                 else:
643                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
644                                                 self.output_name,
645                                                 self.output_tags,
646                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
647                                                 name=runtimeContext.name,
648                                                 on_error=runtimeContext.on_error,
649                                                 submit_runner_image=runtimeContext.submit_runner_image,
650                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
651                                                 merged_map=merged_map,
652                                                 priority=runtimeContext.priority,
653                                                 secret_store=self.secret_store,
654                                                 collection_cache_size=runtimeContext.collection_cache_size,
655                                                 collection_cache_is_default=self.should_estimate_cache_size)
656             elif self.work_api == "jobs":
657                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
658                                       self.output_name,
659                                       self.output_tags,
660                                       submit_runner_ram=runtimeContext.submit_runner_ram,
661                                       name=runtimeContext.name,
662                                       on_error=runtimeContext.on_error,
663                                       submit_runner_image=runtimeContext.submit_runner_image,
664                                       merged_map=merged_map)
665         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
666             # Create pipeline for local run
667             self.pipeline = self.api.pipeline_instances().create(
668                 body={
669                     "owner_uuid": self.project_uuid,
670                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
671                     "components": {},
672                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
673             logger.info("Pipeline instance %s", self.pipeline["uuid"])
674
675         if runtimeContext.cwl_runner_job is not None:
676             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
677
678         jobiter = tool.job(job_order,
679                            self.output_callback,
680                            runtimeContext)
681
682         if runtimeContext.submit and not runtimeContext.wait:
683             runnerjob = next(jobiter)
684             runnerjob.run(runtimeContext)
685             return (runnerjob.uuid, "success")
686
687         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
688         if current_container:
689             logger.info("Running inside container %s", current_container.get("uuid"))
690
691         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
692         self.polling_thread = threading.Thread(target=self.poll_states)
693         self.polling_thread.start()
694
695         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
696
697         try:
698             self.workflow_eval_lock.acquire()
699
700             # Holds the lock while this code runs and releases it when
701             # it is safe to do so in self.workflow_eval_lock.wait(),
702             # at which point on_message can update job state and
703             # process output callbacks.
704
705             loopperf = Perf(metrics, "jobiter")
706             loopperf.__enter__()
707             for runnable in jobiter:
708                 loopperf.__exit__()
709
710                 if self.stop_polling.is_set():
711                     break
712
713                 if self.task_queue.error is not None:
714                     raise self.task_queue.error
715
716                 if runnable:
717                     with Perf(metrics, "run"):
718                         self.start_run(runnable, runtimeContext)
719                 else:
720                     if (self.task_queue.in_flight + len(self.processes)) > 0:
721                         self.workflow_eval_lock.wait(3)
722                     else:
723                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
724                         break
725
726                 if self.stop_polling.is_set():
727                     break
728
729                 loopperf.__enter__()
730             loopperf.__exit__()
731
732             while (self.task_queue.in_flight + len(self.processes)) > 0:
733                 if self.task_queue.error is not None:
734                     raise self.task_queue.error
735                 self.workflow_eval_lock.wait(3)
736
737         except UnsupportedRequirement:
738             raise
739         except:
740             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
741                 logger.error("Interrupted, workflow will be cancelled")
742             elif isinstance(sys.exc_info()[1], WorkflowException):
743                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
744             else:
745                 logger.exception("Workflow execution failed")
746
747             if self.pipeline:
748                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
749                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
750             if runtimeContext.submit and isinstance(tool, Runner):
751                 runnerjob = tool
752                 if runnerjob.uuid and self.work_api == "containers":
753                     self.api.container_requests().update(uuid=runnerjob.uuid,
754                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
755         finally:
756             self.workflow_eval_lock.release()
757             self.task_queue.drain()
758             self.stop_polling.set()
759             self.polling_thread.join()
760             self.task_queue.join()
761
762         if self.final_status == "UnsupportedRequirement":
763             raise UnsupportedRequirement("Check log for details.")
764
765         if self.final_output is None:
766             raise WorkflowException("Workflow did not return a result.")
767
768         if runtimeContext.submit and isinstance(tool, Runner):
769             logger.info("Final output collection %s", tool.final_output)
770         else:
771             if self.output_name is None:
772                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
773             if self.output_tags is None:
774                 self.output_tags = ""
775
776             storage_classes = runtimeContext.storage_classes.strip().split(",")
777             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
778             self.set_crunch_output()
779
780         if runtimeContext.compute_checksum:
781             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
782             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
783
784         if self.trash_intermediate and self.final_status == "success":
785             self.trash_intermediate_output()
786
787         return (self.final_output, self.final_status)