15028: Fixing 1.0 conformance tests
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21
22 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 from schema_salad.sourceline import SourceLine
25 import schema_salad.validate as validate
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 import arvados_cwl.util
33 from .arvcontainer import RunnerContainer
34 from .arvjob import RunnerJob, RunnerTemplate
35 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
36 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from .task_queue import TaskQueue
42 from .context import ArvLoadingContext, ArvRuntimeContext
43 from ._version import __version__
44
45 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
46 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
47 from cwltool.command_line_tool import compute_checksums
48 from cwltool.load_tool import load_tool
49
50 logger = logging.getLogger('arvados.cwl-runner')
51 metrics = logging.getLogger('arvados.cwl-runner.metrics')
52
53 DEFAULT_PRIORITY = 500
54
55 class RuntimeStatusLoggingHandler(logging.Handler):
56     """
57     Intercepts logging calls and report them as runtime statuses on runner
58     containers.
59     """
60     def __init__(self, runtime_status_update_func):
61         super(RuntimeStatusLoggingHandler, self).__init__()
62         self.runtime_status_update = runtime_status_update_func
63         self.updatingRuntimeStatus = False
64
65     def emit(self, record):
66         kind = None
67         if record.levelno >= logging.ERROR:
68             kind = 'error'
69         elif record.levelno >= logging.WARNING:
70             kind = 'warning'
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using either jobs or
95     containers API), wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4):
104
105         if arvargs is None:
106             arvargs = argparse.Namespace()
107             arvargs.work_api = None
108             arvargs.output_name = None
109             arvargs.output_tags = None
110             arvargs.thread_count = 1
111             arvargs.collection_cache_size = None
112
113         self.api = api_client
114         self.processes = {}
115         self.workflow_eval_lock = threading.Condition(threading.RLock())
116         self.final_output = None
117         self.final_status = None
118         self.num_retries = num_retries
119         self.uuid = None
120         self.stop_polling = threading.Event()
121         self.poll_api = None
122         self.pipeline = None
123         self.final_output_collection = None
124         self.output_name = arvargs.output_name
125         self.output_tags = arvargs.output_tags
126         self.project_uuid = None
127         self.intermediate_output_ttl = 0
128         self.intermediate_output_collections = []
129         self.trash_intermediate = False
130         self.thread_count = arvargs.thread_count
131         self.poll_interval = 12
132         self.loadingContext = None
133         self.should_estimate_cache_size = True
134
135         if keep_client is not None:
136             self.keep_client = keep_client
137         else:
138             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
139
140         if arvargs.collection_cache_size:
141             collection_cache_size = arvargs.collection_cache_size*1024*1024
142             self.should_estimate_cache_size = False
143         else:
144             collection_cache_size = 256*1024*1024
145
146         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
147                                                 cap=collection_cache_size)
148
149         self.fetcher_constructor = partial(CollectionFetcher,
150                                            api_client=self.api,
151                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
152                                            num_retries=self.num_retries)
153
154         self.work_api = None
155         expected_api = ["jobs", "containers"]
156         for api in expected_api:
157             try:
158                 methods = self.api._rootDesc.get('resources')[api]['methods']
159                 if ('httpMethod' in methods['create'] and
160                     (arvargs.work_api == api or arvargs.work_api is None)):
161                     self.work_api = api
162                     break
163             except KeyError:
164                 pass
165
166         if not self.work_api:
167             if arvargs.work_api is None:
168                 raise Exception("No supported APIs")
169             else:
170                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
171
172         if self.work_api == "jobs":
173             logger.warning("""
174 *******************************
175 Using the deprecated 'jobs' API.
176
177 To get rid of this warning:
178
179 Users: read about migrating at
180 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
181 and use the option --api=containers
182
183 Admins: configure the cluster to disable the 'jobs' API as described at:
184 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
185 *******************************""")
186
187         self.loadingContext = ArvLoadingContext(vars(arvargs))
188         self.loadingContext.fetcher_constructor = self.fetcher_constructor
189         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
190         self.loadingContext.construct_tool_object = self.arv_make_tool
191         self.loadingContext.do_update = False
192
193         # Add a custom logging handler to the root logger for runtime status reporting
194         # if running inside a container
195         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
196             root_logger = logging.getLogger('')
197
198             # Remove existing RuntimeStatusLoggingHandlers if they exist
199             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
200             root_logger.handlers = handlers
201
202             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
203             root_logger.addHandler(handler)
204
205         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
206         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
207                                                      collection_cache=self.collection_cache)
208
209         validate_cluster_target(self, self.runtimeContext)
210
211
212     def arv_make_tool(self, toolpath_object, loadingContext):
213         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
214             return ArvadosCommandTool(self, toolpath_object, loadingContext)
215         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
216             return ArvadosWorkflow(self, toolpath_object, loadingContext)
217         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
218             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
219         else:
220             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
221
222     def output_callback(self, out, processStatus):
223         with self.workflow_eval_lock:
224             if processStatus == "success":
225                 logger.info("Overall process status is %s", processStatus)
226                 state = "Complete"
227             else:
228                 logger.error("Overall process status is %s", processStatus)
229                 state = "Failed"
230             if self.pipeline:
231                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
232                                                         body={"state": state}).execute(num_retries=self.num_retries)
233             self.final_status = processStatus
234             self.final_output = out
235             self.workflow_eval_lock.notifyAll()
236
237
238     def start_run(self, runnable, runtimeContext):
239         self.task_queue.add(partial(runnable.run, runtimeContext),
240                             self.workflow_eval_lock, self.stop_polling)
241
242     def process_submitted(self, container):
243         with self.workflow_eval_lock:
244             self.processes[container.uuid] = container
245
246     def process_done(self, uuid, record):
247         with self.workflow_eval_lock:
248             j = self.processes[uuid]
249             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
250             self.task_queue.add(partial(j.done, record),
251                                 self.workflow_eval_lock, self.stop_polling)
252             del self.processes[uuid]
253
254     def runtime_status_update(self, kind, message, detail=None):
255         """
256         Updates the runtime_status field on the runner container.
257         Called when there's a need to report errors, warnings or just
258         activity statuses, for example in the RuntimeStatusLoggingHandler.
259         """
260         with self.workflow_eval_lock:
261             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
262             if current is None:
263                 return
264             runtime_status = current.get('runtime_status', {})
265             # In case of status being an error, only report the first one.
266             if kind == 'error':
267                 if not runtime_status.get('error'):
268                     runtime_status.update({
269                         'error': message
270                     })
271                     if detail is not None:
272                         runtime_status.update({
273                             'errorDetail': detail
274                         })
275                 # Further errors are only mentioned as a count.
276                 else:
277                     # Get anything before an optional 'and N more' string.
278                     try:
279                         error_msg = re.match(
280                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
281                         more_failures = re.match(
282                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
283                     except TypeError:
284                         # Ignore tests stubbing errors
285                         return
286                     if more_failures:
287                         failure_qty = int(more_failures.groups()[0])
288                         runtime_status.update({
289                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
290                         })
291                     else:
292                         runtime_status.update({
293                             'error': "%s (and 1 more)" % error_msg
294                         })
295             elif kind in ['warning', 'activity']:
296                 # Record the last warning/activity status without regard of
297                 # previous occurences.
298                 runtime_status.update({
299                     kind: message
300                 })
301                 if detail is not None:
302                     runtime_status.update({
303                         kind+"Detail": detail
304                     })
305             else:
306                 # Ignore any other status kind
307                 return
308             try:
309                 self.api.containers().update(uuid=current['uuid'],
310                                             body={
311                                                 'runtime_status': runtime_status,
312                                             }).execute(num_retries=self.num_retries)
313             except Exception as e:
314                 logger.info("Couldn't update runtime_status: %s", e)
315
316     def wrapped_callback(self, cb, obj, st):
317         with self.workflow_eval_lock:
318             cb(obj, st)
319             self.workflow_eval_lock.notifyAll()
320
321     def get_wrapped_callback(self, cb):
322         return partial(self.wrapped_callback, cb)
323
324     def on_message(self, event):
325         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
326             uuid = event["object_uuid"]
327             if event["properties"]["new_attributes"]["state"] == "Running":
328                 with self.workflow_eval_lock:
329                     j = self.processes[uuid]
330                     if j.running is False:
331                         j.running = True
332                         j.update_pipeline_component(event["properties"]["new_attributes"])
333                         logger.info("%s %s is Running", self.label(j), uuid)
334             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
335                 self.process_done(uuid, event["properties"]["new_attributes"])
336
337     def label(self, obj):
338         return "[%s %s]" % (self.work_api[0:-1], obj.name)
339
340     def poll_states(self):
341         """Poll status of jobs or containers listed in the processes dict.
342
343         Runs in a separate thread.
344         """
345
346         try:
347             remain_wait = self.poll_interval
348             while True:
349                 if remain_wait > 0:
350                     self.stop_polling.wait(remain_wait)
351                 if self.stop_polling.is_set():
352                     break
353                 with self.workflow_eval_lock:
354                     keys = list(self.processes)
355                 if not keys:
356                     remain_wait = self.poll_interval
357                     continue
358
359                 begin_poll = time.time()
360                 if self.work_api == "containers":
361                     table = self.poll_api.container_requests()
362                 elif self.work_api == "jobs":
363                     table = self.poll_api.jobs()
364
365                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
366
367                 while keys:
368                     page = keys[:pageSize]
369                     keys = keys[pageSize:]
370                     try:
371                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
372                     except Exception:
373                         logger.exception("Error checking states on API server: %s")
374                         remain_wait = self.poll_interval
375                         continue
376
377                     for p in proc_states["items"]:
378                         self.on_message({
379                             "object_uuid": p["uuid"],
380                             "event_type": "update",
381                             "properties": {
382                                 "new_attributes": p
383                             }
384                         })
385                 finish_poll = time.time()
386                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
387         except:
388             logger.exception("Fatal error in state polling thread.")
389             with self.workflow_eval_lock:
390                 self.processes.clear()
391                 self.workflow_eval_lock.notifyAll()
392         finally:
393             self.stop_polling.set()
394
395     def add_intermediate_output(self, uuid):
396         if uuid:
397             self.intermediate_output_collections.append(uuid)
398
399     def trash_intermediate_output(self):
400         logger.info("Cleaning up intermediate output collections")
401         for i in self.intermediate_output_collections:
402             try:
403                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
404             except Exception:
405                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
406             except (KeyboardInterrupt, SystemExit):
407                 break
408
409     def check_features(self, obj):
410         if isinstance(obj, dict):
411             if obj.get("writable") and self.work_api != "containers":
412                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
413             if obj.get("class") == "DockerRequirement":
414                 if obj.get("dockerOutputDirectory"):
415                     if self.work_api != "containers":
416                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
417                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
418                     if not obj.get("dockerOutputDirectory").startswith('/'):
419                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
420                             "Option 'dockerOutputDirectory' must be an absolute path.")
421             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
422                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
423             for v in viewvalues(obj):
424                 self.check_features(v)
425         elif isinstance(obj, list):
426             for i,v in enumerate(obj):
427                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
428                     self.check_features(v)
429
430     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
431         outputObj = copy.deepcopy(outputObj)
432
433         files = []
434         def capture(fileobj):
435             files.append(fileobj)
436
437         adjustDirObjs(outputObj, capture)
438         adjustFileObjs(outputObj, capture)
439
440         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
441
442         final = arvados.collection.Collection(api_client=self.api,
443                                               keep_client=self.keep_client,
444                                               num_retries=self.num_retries)
445
446         for k,v in generatemapper.items():
447             if v.type == "Directory" and v.resolved.startswith("_:"):
448                     continue
449             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
450                 with final.open(v.target, "wb") as f:
451                     f.write(v.resolved.encode("utf-8"))
452                     continue
453
454             if not v.resolved.startswith("keep:"):
455                 raise Exception("Output source is not in keep or a literal")
456             sp = v.resolved.split("/")
457             srccollection = sp[0][5:]
458             try:
459                 reader = self.collection_cache.get(srccollection)
460                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
461                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
462             except arvados.errors.ArgumentError as e:
463                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
464                 raise
465             except IOError as e:
466                 logger.error("While preparing output collection: %s", e)
467                 raise
468
469         def rewrite(fileobj):
470             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
471             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
472                 if k in fileobj:
473                     del fileobj[k]
474
475         adjustDirObjs(outputObj, rewrite)
476         adjustFileObjs(outputObj, rewrite)
477
478         with final.open("cwl.output.json", "w") as f:
479             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
480             f.write(res)
481
482         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
483
484         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
485                     final.api_response()["name"],
486                     final.manifest_locator())
487
488         final_uuid = final.manifest_locator()
489         tags = tagsString.split(',')
490         for tag in tags:
491              self.api.links().create(body={
492                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
493                 }).execute(num_retries=self.num_retries)
494
495         def finalcollection(fileobj):
496             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
497
498         adjustDirObjs(outputObj, finalcollection)
499         adjustFileObjs(outputObj, finalcollection)
500
501         return (outputObj, final)
502
503     def set_crunch_output(self):
504         if self.work_api == "containers":
505             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
506             if current is None:
507                 return
508             try:
509                 self.api.containers().update(uuid=current['uuid'],
510                                              body={
511                                                  'output': self.final_output_collection.portable_data_hash(),
512                                              }).execute(num_retries=self.num_retries)
513                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
514                                               body={
515                                                   'is_trashed': True
516                                               }).execute(num_retries=self.num_retries)
517             except Exception:
518                 logger.exception("Setting container output")
519                 return
520         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
521             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
522                                    body={
523                                        'output': self.final_output_collection.portable_data_hash(),
524                                        'success': self.final_status == "success",
525                                        'progress':1.0
526                                    }).execute(num_retries=self.num_retries)
527
528     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
529         self.debug = runtimeContext.debug
530
531         tool.visit(self.check_features)
532
533         self.project_uuid = runtimeContext.project_uuid
534         self.pipeline = None
535         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
536         self.secret_store = runtimeContext.secret_store
537
538         self.trash_intermediate = runtimeContext.trash_intermediate
539         if self.trash_intermediate and self.work_api != "containers":
540             raise Exception("--trash-intermediate is only supported with --api=containers.")
541
542         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
543         if self.intermediate_output_ttl and self.work_api != "containers":
544             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
545         if self.intermediate_output_ttl < 0:
546             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
547
548         if runtimeContext.submit_request_uuid and self.work_api != "containers":
549             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
550
551         if not runtimeContext.name:
552             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
553
554         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
555         # Also uploads docker images.
556         merged_map = upload_workflow_deps(self, tool)
557
558         # Reload tool object which may have been updated by
559         # upload_workflow_deps
560         # Don't validate this time because it will just print redundant errors.
561         loadingContext = self.loadingContext.copy()
562         loadingContext.loader = tool.doc_loader
563         loadingContext.avsc_names = tool.doc_schema
564         loadingContext.metadata = tool.metadata
565         loadingContext.do_validate = False
566
567         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
568                                   loadingContext)
569
570         # Upload local file references in the job order.
571         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
572                                      tool, job_order)
573
574         existing_uuid = runtimeContext.update_workflow
575         if existing_uuid or runtimeContext.create_workflow:
576             # Create a pipeline template or workflow record and exit.
577             if self.work_api == "jobs":
578                 tmpl = RunnerTemplate(self, tool, job_order,
579                                       runtimeContext.enable_reuse,
580                                       uuid=existing_uuid,
581                                       submit_runner_ram=runtimeContext.submit_runner_ram,
582                                       name=runtimeContext.name,
583                                       merged_map=merged_map,
584                                       loadingContext=loadingContext)
585                 tmpl.save()
586                 # cwltool.main will write our return value to stdout.
587                 return (tmpl.uuid, "success")
588             elif self.work_api == "containers":
589                 return (upload_workflow(self, tool, job_order,
590                                         self.project_uuid,
591                                         uuid=existing_uuid,
592                                         submit_runner_ram=runtimeContext.submit_runner_ram,
593                                         name=runtimeContext.name,
594                                         merged_map=merged_map),
595                         "success")
596
597         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
598         self.eval_timeout = runtimeContext.eval_timeout
599
600         runtimeContext = runtimeContext.copy()
601         runtimeContext.use_container = True
602         runtimeContext.tmpdir_prefix = "tmp"
603         runtimeContext.work_api = self.work_api
604
605         if self.work_api == "containers":
606             if self.ignore_docker_for_reuse:
607                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
608             runtimeContext.outdir = "/var/spool/cwl"
609             runtimeContext.docker_outdir = "/var/spool/cwl"
610             runtimeContext.tmpdir = "/tmp"
611             runtimeContext.docker_tmpdir = "/tmp"
612         elif self.work_api == "jobs":
613             if runtimeContext.priority != DEFAULT_PRIORITY:
614                 raise Exception("--priority not implemented for jobs API.")
615             runtimeContext.outdir = "$(task.outdir)"
616             runtimeContext.docker_outdir = "$(task.outdir)"
617             runtimeContext.tmpdir = "$(task.tmpdir)"
618
619         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
620             raise Exception("--priority must be in the range 1..1000.")
621
622         if self.should_estimate_cache_size:
623             visited = set()
624             estimated_size = [0]
625             def estimate_collection_cache(obj):
626                 if obj.get("location", "").startswith("keep:"):
627                     m = pdh_size.match(obj["location"][5:])
628                     if m and m.group(1) not in visited:
629                         visited.add(m.group(1))
630                         estimated_size[0] += int(m.group(2))
631             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
632             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
633             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
634
635         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
636
637         runnerjob = None
638         if runtimeContext.submit:
639             # Submit a runner job to run the workflow for us.
640             if self.work_api == "containers":
641                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
642                     runtimeContext.runnerjob = tool.tool["id"]
643                 else:
644                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
645                                                 self.output_name,
646                                                 self.output_tags,
647                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
648                                                 name=runtimeContext.name,
649                                                 on_error=runtimeContext.on_error,
650                                                 submit_runner_image=runtimeContext.submit_runner_image,
651                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
652                                                 merged_map=merged_map,
653                                                 priority=runtimeContext.priority,
654                                                 secret_store=self.secret_store,
655                                                 collection_cache_size=runtimeContext.collection_cache_size,
656                                                 collection_cache_is_default=self.should_estimate_cache_size)
657             elif self.work_api == "jobs":
658                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
659                                       self.output_name,
660                                       self.output_tags,
661                                       submit_runner_ram=runtimeContext.submit_runner_ram,
662                                       name=runtimeContext.name,
663                                       on_error=runtimeContext.on_error,
664                                       submit_runner_image=runtimeContext.submit_runner_image,
665                                       merged_map=merged_map)
666         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
667             # Create pipeline for local run
668             self.pipeline = self.api.pipeline_instances().create(
669                 body={
670                     "owner_uuid": self.project_uuid,
671                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
672                     "components": {},
673                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
674             logger.info("Pipeline instance %s", self.pipeline["uuid"])
675
676         if not isinstance(tool, Runner):
677             loadingContext.do_update = True
678             tool = load_tool(tool.doc_loader.idx[tool.tool["id"]],
679                              loadingContext)
680
681         if runtimeContext.cwl_runner_job is not None:
682             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
683
684         jobiter = tool.job(job_order,
685                            self.output_callback,
686                            runtimeContext)
687
688         if runtimeContext.submit and not runtimeContext.wait:
689             runnerjob = next(jobiter)
690             runnerjob.run(runtimeContext)
691             return (runnerjob.uuid, "success")
692
693         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
694         if current_container:
695             logger.info("Running inside container %s", current_container.get("uuid"))
696
697         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
698         self.polling_thread = threading.Thread(target=self.poll_states)
699         self.polling_thread.start()
700
701         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
702
703         try:
704             self.workflow_eval_lock.acquire()
705
706             # Holds the lock while this code runs and releases it when
707             # it is safe to do so in self.workflow_eval_lock.wait(),
708             # at which point on_message can update job state and
709             # process output callbacks.
710
711             loopperf = Perf(metrics, "jobiter")
712             loopperf.__enter__()
713             for runnable in jobiter:
714                 loopperf.__exit__()
715
716                 if self.stop_polling.is_set():
717                     break
718
719                 if self.task_queue.error is not None:
720                     raise self.task_queue.error
721
722                 if runnable:
723                     with Perf(metrics, "run"):
724                         self.start_run(runnable, runtimeContext)
725                 else:
726                     if (self.task_queue.in_flight + len(self.processes)) > 0:
727                         self.workflow_eval_lock.wait(3)
728                     else:
729                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
730                         break
731
732                 if self.stop_polling.is_set():
733                     break
734
735                 loopperf.__enter__()
736             loopperf.__exit__()
737
738             while (self.task_queue.in_flight + len(self.processes)) > 0:
739                 if self.task_queue.error is not None:
740                     raise self.task_queue.error
741                 self.workflow_eval_lock.wait(3)
742
743         except UnsupportedRequirement:
744             raise
745         except:
746             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
747                 logger.error("Interrupted, workflow will be cancelled")
748             elif isinstance(sys.exc_info()[1], WorkflowException):
749                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
750             else:
751                 logger.exception("Workflow execution failed")
752
753             if self.pipeline:
754                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
755                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
756             if runtimeContext.submit and isinstance(tool, Runner):
757                 runnerjob = tool
758                 if runnerjob.uuid and self.work_api == "containers":
759                     self.api.container_requests().update(uuid=runnerjob.uuid,
760                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
761         finally:
762             self.workflow_eval_lock.release()
763             self.task_queue.drain()
764             self.stop_polling.set()
765             self.polling_thread.join()
766             self.task_queue.join()
767
768         if self.final_status == "UnsupportedRequirement":
769             raise UnsupportedRequirement("Check log for details.")
770
771         if self.final_output is None:
772             raise WorkflowException("Workflow did not return a result.")
773
774         if runtimeContext.submit and isinstance(tool, Runner):
775             logger.info("Final output collection %s", tool.final_output)
776         else:
777             if self.output_name is None:
778                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
779             if self.output_tags is None:
780                 self.output_tags = ""
781
782             storage_classes = runtimeContext.storage_classes.strip().split(",")
783             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
784             self.set_crunch_output()
785
786         if runtimeContext.compute_checksum:
787             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
788             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
789
790         if self.trash_intermediate and self.final_status == "success":
791             self.trash_intermediate_output()
792
793         return (self.final_output, self.final_status)