14723: Fixes literal mapping and name conflict resolution
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21
22 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 from schema_salad.sourceline import SourceLine
25 import schema_salad.validate as validate
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 import arvados_cwl.util
33 from .arvcontainer import RunnerContainer
34 from .arvjob import RunnerJob, RunnerTemplate
35 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
36 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from .task_queue import TaskQueue
42 from .context import ArvLoadingContext, ArvRuntimeContext
43 from ._version import __version__
44
45 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
46 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
47 from cwltool.command_line_tool import compute_checksums
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51
52 DEFAULT_PRIORITY = 500
53
54 class RuntimeStatusLoggingHandler(logging.Handler):
55     """
56     Intercepts logging calls and report them as runtime statuses on runner
57     containers.
58     """
59     def __init__(self, runtime_status_update_func):
60         super(RuntimeStatusLoggingHandler, self).__init__()
61         self.runtime_status_update = runtime_status_update_func
62         self.updatingRuntimeStatus = False
63
64     def emit(self, record):
65         kind = None
66         if record.levelno >= logging.ERROR:
67             kind = 'error'
68         elif record.levelno >= logging.WARNING:
69             kind = 'warning'
70         if kind is not None and self.updatingRuntimeStatus is not True:
71             self.updatingRuntimeStatus = True
72             try:
73                 log_msg = record.getMessage()
74                 if '\n' in log_msg:
75                     # If the logged message is multi-line, use its first line as status
76                     # and the rest as detail.
77                     status, detail = log_msg.split('\n', 1)
78                     self.runtime_status_update(
79                         kind,
80                         "%s: %s" % (record.name, status),
81                         detail
82                     )
83                 else:
84                     self.runtime_status_update(
85                         kind,
86                         "%s: %s" % (record.name, record.getMessage())
87                     )
88             finally:
89                 self.updatingRuntimeStatus = False
90
91
92 class ArvCwlExecutor(object):
93     """Execute a CWL tool or workflow, submit work (using either jobs or
94     containers API), wait for them to complete, and report output.
95
96     """
97
98     def __init__(self, api_client,
99                  arvargs=None,
100                  keep_client=None,
101                  num_retries=4,
102                  thread_count=4):
103
104         if arvargs is None:
105             arvargs = argparse.Namespace()
106             arvargs.work_api = None
107             arvargs.output_name = None
108             arvargs.output_tags = None
109             arvargs.thread_count = 1
110             arvargs.collection_cache_size = None
111
112         self.api = api_client
113         self.processes = {}
114         self.workflow_eval_lock = threading.Condition(threading.RLock())
115         self.final_output = None
116         self.final_status = None
117         self.num_retries = num_retries
118         self.uuid = None
119         self.stop_polling = threading.Event()
120         self.poll_api = None
121         self.pipeline = None
122         self.final_output_collection = None
123         self.output_name = arvargs.output_name
124         self.output_tags = arvargs.output_tags
125         self.project_uuid = None
126         self.intermediate_output_ttl = 0
127         self.intermediate_output_collections = []
128         self.trash_intermediate = False
129         self.thread_count = arvargs.thread_count
130         self.poll_interval = 12
131         self.loadingContext = None
132         self.should_estimate_cache_size = True
133
134         if keep_client is not None:
135             self.keep_client = keep_client
136         else:
137             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
138
139         if arvargs.collection_cache_size:
140             collection_cache_size = arvargs.collection_cache_size*1024*1024
141             self.should_estimate_cache_size = False
142         else:
143             collection_cache_size = 256*1024*1024
144
145         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
146                                                 cap=collection_cache_size)
147
148         self.fetcher_constructor = partial(CollectionFetcher,
149                                            api_client=self.api,
150                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
151                                            num_retries=self.num_retries)
152
153         self.work_api = None
154         expected_api = ["jobs", "containers"]
155         for api in expected_api:
156             try:
157                 methods = self.api._rootDesc.get('resources')[api]['methods']
158                 if ('httpMethod' in methods['create'] and
159                     (arvargs.work_api == api or arvargs.work_api is None)):
160                     self.work_api = api
161                     break
162             except KeyError:
163                 pass
164
165         if not self.work_api:
166             if arvargs.work_api is None:
167                 raise Exception("No supported APIs")
168             else:
169                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
170
171         if self.work_api == "jobs":
172             logger.warning("""
173 *******************************
174 Using the deprecated 'jobs' API.
175
176 To get rid of this warning:
177
178 Users: read about migrating at
179 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
180 and use the option --api=containers
181
182 Admins: configure the cluster to disable the 'jobs' API as described at:
183 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
184 *******************************""")
185
186         self.loadingContext = ArvLoadingContext(vars(arvargs))
187         self.loadingContext.fetcher_constructor = self.fetcher_constructor
188         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
189         self.loadingContext.construct_tool_object = self.arv_make_tool
190
191         # Add a custom logging handler to the root logger for runtime status reporting
192         # if running inside a container
193         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
194             root_logger = logging.getLogger('')
195
196             # Remove existing RuntimeStatusLoggingHandlers if they exist
197             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
198             root_logger.handlers = handlers
199
200             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
201             root_logger.addHandler(handler)
202
203         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
204         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
205                                                      collection_cache=self.collection_cache)
206
207         validate_cluster_target(self, self.runtimeContext)
208
209
210     def arv_make_tool(self, toolpath_object, loadingContext):
211         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
212             return ArvadosCommandTool(self, toolpath_object, loadingContext)
213         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
214             return ArvadosWorkflow(self, toolpath_object, loadingContext)
215         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
216             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
217         else:
218             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
219
220     def output_callback(self, out, processStatus):
221         with self.workflow_eval_lock:
222             if processStatus == "success":
223                 logger.info("Overall process status is %s", processStatus)
224                 state = "Complete"
225             else:
226                 logger.error("Overall process status is %s", processStatus)
227                 state = "Failed"
228             if self.pipeline:
229                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
230                                                         body={"state": state}).execute(num_retries=self.num_retries)
231             self.final_status = processStatus
232             self.final_output = out
233             self.workflow_eval_lock.notifyAll()
234
235
236     def start_run(self, runnable, runtimeContext):
237         self.task_queue.add(partial(runnable.run, runtimeContext),
238                             self.workflow_eval_lock, self.stop_polling)
239
240     def process_submitted(self, container):
241         with self.workflow_eval_lock:
242             self.processes[container.uuid] = container
243
244     def process_done(self, uuid, record):
245         with self.workflow_eval_lock:
246             j = self.processes[uuid]
247             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
248             self.task_queue.add(partial(j.done, record),
249                                 self.workflow_eval_lock, self.stop_polling)
250             del self.processes[uuid]
251
252     def runtime_status_update(self, kind, message, detail=None):
253         """
254         Updates the runtime_status field on the runner container.
255         Called when there's a need to report errors, warnings or just
256         activity statuses, for example in the RuntimeStatusLoggingHandler.
257         """
258         with self.workflow_eval_lock:
259             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
260             if current is None:
261                 return
262             runtime_status = current.get('runtime_status', {})
263             # In case of status being an error, only report the first one.
264             if kind == 'error':
265                 if not runtime_status.get('error'):
266                     runtime_status.update({
267                         'error': message
268                     })
269                     if detail is not None:
270                         runtime_status.update({
271                             'errorDetail': detail
272                         })
273                 # Further errors are only mentioned as a count.
274                 else:
275                     # Get anything before an optional 'and N more' string.
276                     try:
277                         error_msg = re.match(
278                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
279                         more_failures = re.match(
280                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
281                     except TypeError:
282                         # Ignore tests stubbing errors
283                         return
284                     if more_failures:
285                         failure_qty = int(more_failures.groups()[0])
286                         runtime_status.update({
287                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
288                         })
289                     else:
290                         runtime_status.update({
291                             'error': "%s (and 1 more)" % error_msg
292                         })
293             elif kind in ['warning', 'activity']:
294                 # Record the last warning/activity status without regard of
295                 # previous occurences.
296                 runtime_status.update({
297                     kind: message
298                 })
299                 if detail is not None:
300                     runtime_status.update({
301                         kind+"Detail": detail
302                     })
303             else:
304                 # Ignore any other status kind
305                 return
306             try:
307                 self.api.containers().update(uuid=current['uuid'],
308                                             body={
309                                                 'runtime_status': runtime_status,
310                                             }).execute(num_retries=self.num_retries)
311             except Exception as e:
312                 logger.info("Couldn't update runtime_status: %s", e)
313
314     def wrapped_callback(self, cb, obj, st):
315         with self.workflow_eval_lock:
316             cb(obj, st)
317             self.workflow_eval_lock.notifyAll()
318
319     def get_wrapped_callback(self, cb):
320         return partial(self.wrapped_callback, cb)
321
322     def on_message(self, event):
323         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
324             uuid = event["object_uuid"]
325             if event["properties"]["new_attributes"]["state"] == "Running":
326                 with self.workflow_eval_lock:
327                     j = self.processes[uuid]
328                     if j.running is False:
329                         j.running = True
330                         j.update_pipeline_component(event["properties"]["new_attributes"])
331                         logger.info("%s %s is Running", self.label(j), uuid)
332             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
333                 self.process_done(uuid, event["properties"]["new_attributes"])
334
335     def label(self, obj):
336         return "[%s %s]" % (self.work_api[0:-1], obj.name)
337
338     def poll_states(self):
339         """Poll status of jobs or containers listed in the processes dict.
340
341         Runs in a separate thread.
342         """
343
344         try:
345             remain_wait = self.poll_interval
346             while True:
347                 if remain_wait > 0:
348                     self.stop_polling.wait(remain_wait)
349                 if self.stop_polling.is_set():
350                     break
351                 with self.workflow_eval_lock:
352                     keys = list(self.processes)
353                 if not keys:
354                     remain_wait = self.poll_interval
355                     continue
356
357                 begin_poll = time.time()
358                 if self.work_api == "containers":
359                     table = self.poll_api.container_requests()
360                 elif self.work_api == "jobs":
361                     table = self.poll_api.jobs()
362
363                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
364
365                 while keys:
366                     page = keys[:pageSize]
367                     keys = keys[pageSize:]
368                     try:
369                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
370                     except Exception:
371                         logger.exception("Error checking states on API server: %s")
372                         remain_wait = self.poll_interval
373                         continue
374
375                     for p in proc_states["items"]:
376                         self.on_message({
377                             "object_uuid": p["uuid"],
378                             "event_type": "update",
379                             "properties": {
380                                 "new_attributes": p
381                             }
382                         })
383                 finish_poll = time.time()
384                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
385         except:
386             logger.exception("Fatal error in state polling thread.")
387             with self.workflow_eval_lock:
388                 self.processes.clear()
389                 self.workflow_eval_lock.notifyAll()
390         finally:
391             self.stop_polling.set()
392
393     def add_intermediate_output(self, uuid):
394         if uuid:
395             self.intermediate_output_collections.append(uuid)
396
397     def trash_intermediate_output(self):
398         logger.info("Cleaning up intermediate output collections")
399         for i in self.intermediate_output_collections:
400             try:
401                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
402             except Exception:
403                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
404             except (KeyboardInterrupt, SystemExit):
405                 break
406
407     def check_features(self, obj):
408         if isinstance(obj, dict):
409             if obj.get("writable") and self.work_api != "containers":
410                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
411             if obj.get("class") == "DockerRequirement":
412                 if obj.get("dockerOutputDirectory"):
413                     if self.work_api != "containers":
414                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
415                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
416                     if not obj.get("dockerOutputDirectory").startswith('/'):
417                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
418                             "Option 'dockerOutputDirectory' must be an absolute path.")
419             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
420                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
421             for v in viewvalues(obj):
422                 self.check_features(v)
423         elif isinstance(obj, list):
424             for i,v in enumerate(obj):
425                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
426                     self.check_features(v)
427
428     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
429         outputObj = copy.deepcopy(outputObj)
430
431         files = []
432         def capture(fileobj):
433             files.append(fileobj)
434
435         adjustDirObjs(outputObj, capture)
436         adjustFileObjs(outputObj, capture)
437
438         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
439
440         final = arvados.collection.Collection(api_client=self.api,
441                                               keep_client=self.keep_client,
442                                               num_retries=self.num_retries)
443
444         for k,v in generatemapper.items():
445             if v.type == "Directory" and v.resolved.startswith("_:"):
446                     continue
447             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
448                 with final.open(v.target, "wb") as f:
449                     f.write(v.resolved.encode("utf-8"))
450                     continue
451
452             if not v.resolved.startswith("keep:"):
453                 raise Exception("Output source is not in keep or a literal")
454             sp = v.resolved.split("/")
455             srccollection = sp[0][5:]
456             try:
457                 reader = self.collection_cache.get(srccollection)
458                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
459                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
460             except arvados.errors.ArgumentError as e:
461                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
462                 raise
463             except IOError as e:
464                 logger.warning("While preparing output collection: %s", e)
465
466         def rewrite(fileobj):
467             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
468             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
469                 if k in fileobj:
470                     del fileobj[k]
471
472         adjustDirObjs(outputObj, rewrite)
473         adjustFileObjs(outputObj, rewrite)
474
475         with final.open("cwl.output.json", "w") as f:
476             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
477             f.write(res)
478
479         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
480
481         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
482                     final.api_response()["name"],
483                     final.manifest_locator())
484
485         final_uuid = final.manifest_locator()
486         tags = tagsString.split(',')
487         for tag in tags:
488              self.api.links().create(body={
489                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
490                 }).execute(num_retries=self.num_retries)
491
492         def finalcollection(fileobj):
493             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
494
495         adjustDirObjs(outputObj, finalcollection)
496         adjustFileObjs(outputObj, finalcollection)
497
498         return (outputObj, final)
499
500     def set_crunch_output(self):
501         if self.work_api == "containers":
502             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
503             if current is None:
504                 return
505             try:
506                 self.api.containers().update(uuid=current['uuid'],
507                                              body={
508                                                  'output': self.final_output_collection.portable_data_hash(),
509                                              }).execute(num_retries=self.num_retries)
510                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
511                                               body={
512                                                   'is_trashed': True
513                                               }).execute(num_retries=self.num_retries)
514             except Exception:
515                 logger.exception("Setting container output")
516                 return
517         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
518             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
519                                    body={
520                                        'output': self.final_output_collection.portable_data_hash(),
521                                        'success': self.final_status == "success",
522                                        'progress':1.0
523                                    }).execute(num_retries=self.num_retries)
524
525     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
526         self.debug = runtimeContext.debug
527
528         tool.visit(self.check_features)
529
530         self.project_uuid = runtimeContext.project_uuid
531         self.pipeline = None
532         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
533         self.secret_store = runtimeContext.secret_store
534
535         self.trash_intermediate = runtimeContext.trash_intermediate
536         if self.trash_intermediate and self.work_api != "containers":
537             raise Exception("--trash-intermediate is only supported with --api=containers.")
538
539         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
540         if self.intermediate_output_ttl and self.work_api != "containers":
541             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
542         if self.intermediate_output_ttl < 0:
543             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
544
545         if runtimeContext.submit_request_uuid and self.work_api != "containers":
546             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
547
548         if not runtimeContext.name:
549             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
550
551         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
552         # Also uploads docker images.
553         merged_map = upload_workflow_deps(self, tool)
554
555         # Reload tool object which may have been updated by
556         # upload_workflow_deps
557         # Don't validate this time because it will just print redundant errors.
558         loadingContext = self.loadingContext.copy()
559         loadingContext.loader = tool.doc_loader
560         loadingContext.avsc_names = tool.doc_schema
561         loadingContext.metadata = tool.metadata
562         loadingContext.do_validate = False
563
564         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
565                                   loadingContext)
566
567         # Upload local file references in the job order.
568         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
569                                      tool, job_order)
570
571         existing_uuid = runtimeContext.update_workflow
572         if existing_uuid or runtimeContext.create_workflow:
573             # Create a pipeline template or workflow record and exit.
574             if self.work_api == "jobs":
575                 tmpl = RunnerTemplate(self, tool, job_order,
576                                       runtimeContext.enable_reuse,
577                                       uuid=existing_uuid,
578                                       submit_runner_ram=runtimeContext.submit_runner_ram,
579                                       name=runtimeContext.name,
580                                       merged_map=merged_map,
581                                       loadingContext=loadingContext)
582                 tmpl.save()
583                 # cwltool.main will write our return value to stdout.
584                 return (tmpl.uuid, "success")
585             elif self.work_api == "containers":
586                 return (upload_workflow(self, tool, job_order,
587                                         self.project_uuid,
588                                         uuid=existing_uuid,
589                                         submit_runner_ram=runtimeContext.submit_runner_ram,
590                                         name=runtimeContext.name,
591                                         merged_map=merged_map),
592                         "success")
593
594         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
595         self.eval_timeout = runtimeContext.eval_timeout
596
597         runtimeContext = runtimeContext.copy()
598         runtimeContext.use_container = True
599         runtimeContext.tmpdir_prefix = "tmp"
600         runtimeContext.work_api = self.work_api
601
602         if self.work_api == "containers":
603             if self.ignore_docker_for_reuse:
604                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
605             runtimeContext.outdir = "/var/spool/cwl"
606             runtimeContext.docker_outdir = "/var/spool/cwl"
607             runtimeContext.tmpdir = "/tmp"
608             runtimeContext.docker_tmpdir = "/tmp"
609         elif self.work_api == "jobs":
610             if runtimeContext.priority != DEFAULT_PRIORITY:
611                 raise Exception("--priority not implemented for jobs API.")
612             runtimeContext.outdir = "$(task.outdir)"
613             runtimeContext.docker_outdir = "$(task.outdir)"
614             runtimeContext.tmpdir = "$(task.tmpdir)"
615
616         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
617             raise Exception("--priority must be in the range 1..1000.")
618
619         if self.should_estimate_cache_size:
620             visited = set()
621             estimated_size = [0]
622             def estimate_collection_cache(obj):
623                 if obj.get("location", "").startswith("keep:"):
624                     m = pdh_size.match(obj["location"][5:])
625                     if m and m.group(1) not in visited:
626                         visited.add(m.group(1))
627                         estimated_size[0] += int(m.group(2))
628             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
629             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
630             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
631
632         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
633
634         runnerjob = None
635         if runtimeContext.submit:
636             # Submit a runner job to run the workflow for us.
637             if self.work_api == "containers":
638                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
639                     runtimeContext.runnerjob = tool.tool["id"]
640                 else:
641                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
642                                                 self.output_name,
643                                                 self.output_tags,
644                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
645                                                 name=runtimeContext.name,
646                                                 on_error=runtimeContext.on_error,
647                                                 submit_runner_image=runtimeContext.submit_runner_image,
648                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
649                                                 merged_map=merged_map,
650                                                 priority=runtimeContext.priority,
651                                                 secret_store=self.secret_store,
652                                                 collection_cache_size=runtimeContext.collection_cache_size,
653                                                 collection_cache_is_default=self.should_estimate_cache_size)
654             elif self.work_api == "jobs":
655                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
656                                       self.output_name,
657                                       self.output_tags,
658                                       submit_runner_ram=runtimeContext.submit_runner_ram,
659                                       name=runtimeContext.name,
660                                       on_error=runtimeContext.on_error,
661                                       submit_runner_image=runtimeContext.submit_runner_image,
662                                       merged_map=merged_map)
663         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
664             # Create pipeline for local run
665             self.pipeline = self.api.pipeline_instances().create(
666                 body={
667                     "owner_uuid": self.project_uuid,
668                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
669                     "components": {},
670                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
671             logger.info("Pipeline instance %s", self.pipeline["uuid"])
672
673         if runtimeContext.cwl_runner_job is not None:
674             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
675
676         jobiter = tool.job(job_order,
677                            self.output_callback,
678                            runtimeContext)
679
680         if runtimeContext.submit and not runtimeContext.wait:
681             runnerjob = next(jobiter)
682             runnerjob.run(runtimeContext)
683             return (runnerjob.uuid, "success")
684
685         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
686         if current_container:
687             logger.info("Running inside container %s", current_container.get("uuid"))
688
689         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
690         self.polling_thread = threading.Thread(target=self.poll_states)
691         self.polling_thread.start()
692
693         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
694
695         try:
696             self.workflow_eval_lock.acquire()
697
698             # Holds the lock while this code runs and releases it when
699             # it is safe to do so in self.workflow_eval_lock.wait(),
700             # at which point on_message can update job state and
701             # process output callbacks.
702
703             loopperf = Perf(metrics, "jobiter")
704             loopperf.__enter__()
705             for runnable in jobiter:
706                 loopperf.__exit__()
707
708                 if self.stop_polling.is_set():
709                     break
710
711                 if self.task_queue.error is not None:
712                     raise self.task_queue.error
713
714                 if runnable:
715                     with Perf(metrics, "run"):
716                         self.start_run(runnable, runtimeContext)
717                 else:
718                     if (self.task_queue.in_flight + len(self.processes)) > 0:
719                         self.workflow_eval_lock.wait(3)
720                     else:
721                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
722                         break
723
724                 if self.stop_polling.is_set():
725                     break
726
727                 loopperf.__enter__()
728             loopperf.__exit__()
729
730             while (self.task_queue.in_flight + len(self.processes)) > 0:
731                 if self.task_queue.error is not None:
732                     raise self.task_queue.error
733                 self.workflow_eval_lock.wait(3)
734
735         except UnsupportedRequirement:
736             raise
737         except:
738             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
739                 logger.error("Interrupted, workflow will be cancelled")
740             elif isinstance(sys.exc_info()[1], WorkflowException):
741                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
742             else:
743                 logger.exception("Workflow execution failed")
744
745             if self.pipeline:
746                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
747                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
748             if runtimeContext.submit and isinstance(tool, Runner):
749                 runnerjob = tool
750                 if runnerjob.uuid and self.work_api == "containers":
751                     self.api.container_requests().update(uuid=runnerjob.uuid,
752                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
753         finally:
754             self.workflow_eval_lock.release()
755             self.task_queue.drain()
756             self.stop_polling.set()
757             self.polling_thread.join()
758             self.task_queue.join()
759
760         if self.final_status == "UnsupportedRequirement":
761             raise UnsupportedRequirement("Check log for details.")
762
763         if self.final_output is None:
764             raise WorkflowException("Workflow did not return a result.")
765
766         if runtimeContext.submit and isinstance(tool, Runner):
767             logger.info("Final output collection %s", tool.final_output)
768         else:
769             if self.output_name is None:
770                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
771             if self.output_tags is None:
772                 self.output_tags = ""
773
774             storage_classes = runtimeContext.storage_classes.strip().split(",")
775             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
776             self.set_crunch_output()
777
778         if runtimeContext.compute_checksum:
779             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
780             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
781
782         if self.trash_intermediate and self.final_status == "success":
783             self.trash_intermediate_output()
784
785         return (self.final_output, self.final_status)