Merge branch '18994-cwl-basename' refs #18994
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21 import urllib
22
23 from cwltool.errors import WorkflowException
24 import cwltool.workflow
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 import arvados_cwl.util
34 from .arvcontainer import RunnerContainer
35 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
36 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from cwltool.task_queue import TaskQueue
42 from .context import ArvLoadingContext, ArvRuntimeContext
43 from ._version import __version__
44
45 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
46 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
47 from cwltool.command_line_tool import compute_checksums
48 from cwltool.load_tool import load_tool
49
50 logger = logging.getLogger('arvados.cwl-runner')
51 metrics = logging.getLogger('arvados.cwl-runner.metrics')
52
53 DEFAULT_PRIORITY = 500
54
55 class RuntimeStatusLoggingHandler(logging.Handler):
56     """
57     Intercepts logging calls and report them as runtime statuses on runner
58     containers.
59     """
60     def __init__(self, runtime_status_update_func):
61         super(RuntimeStatusLoggingHandler, self).__init__()
62         self.runtime_status_update = runtime_status_update_func
63         self.updatingRuntimeStatus = False
64
65     def emit(self, record):
66         kind = None
67         if record.levelno >= logging.ERROR:
68             kind = 'error'
69         elif record.levelno >= logging.WARNING:
70             kind = 'warning'
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using containers API),
95     wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4,
104                  stdout=sys.stdout):
105
106         if arvargs is None:
107             arvargs = argparse.Namespace()
108             arvargs.work_api = None
109             arvargs.output_name = None
110             arvargs.output_tags = None
111             arvargs.thread_count = 1
112             arvargs.collection_cache_size = None
113
114         self.api = api_client
115         self.processes = {}
116         self.workflow_eval_lock = threading.Condition(threading.RLock())
117         self.final_output = None
118         self.final_status = None
119         self.num_retries = num_retries
120         self.uuid = None
121         self.stop_polling = threading.Event()
122         self.poll_api = None
123         self.pipeline = None
124         self.final_output_collection = None
125         self.output_name = arvargs.output_name
126         self.output_tags = arvargs.output_tags
127         self.project_uuid = None
128         self.intermediate_output_ttl = 0
129         self.intermediate_output_collections = []
130         self.trash_intermediate = False
131         self.thread_count = arvargs.thread_count
132         self.poll_interval = 12
133         self.loadingContext = None
134         self.should_estimate_cache_size = True
135         self.fs_access = None
136         self.secret_store = None
137         self.stdout = stdout
138
139         if keep_client is not None:
140             self.keep_client = keep_client
141         else:
142             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
143
144         if arvargs.collection_cache_size:
145             collection_cache_size = arvargs.collection_cache_size*1024*1024
146             self.should_estimate_cache_size = False
147         else:
148             collection_cache_size = 256*1024*1024
149
150         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
151                                                 cap=collection_cache_size)
152
153         self.fetcher_constructor = partial(CollectionFetcher,
154                                            api_client=self.api,
155                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
156                                            num_retries=self.num_retries)
157
158         self.work_api = None
159         expected_api = ["containers"]
160         for api in expected_api:
161             try:
162                 methods = self.api._rootDesc.get('resources')[api]['methods']
163                 if ('httpMethod' in methods['create'] and
164                     (arvargs.work_api == api or arvargs.work_api is None)):
165                     self.work_api = api
166                     break
167             except KeyError:
168                 pass
169
170         if not self.work_api:
171             if arvargs.work_api is None:
172                 raise Exception("No supported APIs")
173             else:
174                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
175
176         if self.work_api == "jobs":
177             logger.error("""
178 *******************************
179 The 'jobs' API is no longer supported.
180 *******************************""")
181             exit(1)
182
183         self.loadingContext = ArvLoadingContext(vars(arvargs))
184         self.loadingContext.fetcher_constructor = self.fetcher_constructor
185         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
186         self.loadingContext.construct_tool_object = self.arv_make_tool
187
188         # Add a custom logging handler to the root logger for runtime status reporting
189         # if running inside a container
190         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
191             root_logger = logging.getLogger('')
192
193             # Remove existing RuntimeStatusLoggingHandlers if they exist
194             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
195             root_logger.handlers = handlers
196
197             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
198             root_logger.addHandler(handler)
199
200         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
201         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
202                                                      collection_cache=self.collection_cache)
203
204         validate_cluster_target(self, self.runtimeContext)
205
206
207     def arv_make_tool(self, toolpath_object, loadingContext):
208         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
209             return ArvadosCommandTool(self, toolpath_object, loadingContext)
210         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
211             return ArvadosWorkflow(self, toolpath_object, loadingContext)
212         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
213             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
214         else:
215             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
216
217     def output_callback(self, out, processStatus):
218         with self.workflow_eval_lock:
219             if processStatus == "success":
220                 logger.info("Overall process status is %s", processStatus)
221                 state = "Complete"
222             else:
223                 logger.error("Overall process status is %s", processStatus)
224                 state = "Failed"
225             if self.pipeline:
226                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
227                                                         body={"state": state}).execute(num_retries=self.num_retries)
228             self.final_status = processStatus
229             self.final_output = out
230             self.workflow_eval_lock.notifyAll()
231
232
233     def start_run(self, runnable, runtimeContext):
234         self.task_queue.add(partial(runnable.run, runtimeContext),
235                             self.workflow_eval_lock, self.stop_polling)
236
237     def process_submitted(self, container):
238         with self.workflow_eval_lock:
239             self.processes[container.uuid] = container
240
241     def process_done(self, uuid, record):
242         with self.workflow_eval_lock:
243             j = self.processes[uuid]
244             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
245             self.task_queue.add(partial(j.done, record),
246                                 self.workflow_eval_lock, self.stop_polling)
247             del self.processes[uuid]
248
249     def runtime_status_update(self, kind, message, detail=None):
250         """
251         Updates the runtime_status field on the runner container.
252         Called when there's a need to report errors, warnings or just
253         activity statuses, for example in the RuntimeStatusLoggingHandler.
254         """
255         with self.workflow_eval_lock:
256             current = None
257             try:
258                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
259             except Exception as e:
260                 logger.info("Couldn't get current container: %s", e)
261             if current is None:
262                 return
263             runtime_status = current.get('runtime_status', {})
264             # In case of status being an error, only report the first one.
265             if kind == 'error':
266                 if not runtime_status.get('error'):
267                     runtime_status.update({
268                         'error': message
269                     })
270                     if detail is not None:
271                         runtime_status.update({
272                             'errorDetail': detail
273                         })
274                 # Further errors are only mentioned as a count.
275                 else:
276                     # Get anything before an optional 'and N more' string.
277                     try:
278                         error_msg = re.match(
279                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
280                         more_failures = re.match(
281                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
282                     except TypeError:
283                         # Ignore tests stubbing errors
284                         return
285                     if more_failures:
286                         failure_qty = int(more_failures.groups()[0])
287                         runtime_status.update({
288                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
289                         })
290                     else:
291                         runtime_status.update({
292                             'error': "%s (and 1 more)" % error_msg
293                         })
294             elif kind in ['warning', 'activity']:
295                 # Record the last warning/activity status without regard of
296                 # previous occurences.
297                 runtime_status.update({
298                     kind: message
299                 })
300                 if detail is not None:
301                     runtime_status.update({
302                         kind+"Detail": detail
303                     })
304             else:
305                 # Ignore any other status kind
306                 return
307             try:
308                 self.api.containers().update(uuid=current['uuid'],
309                                             body={
310                                                 'runtime_status': runtime_status,
311                                             }).execute(num_retries=self.num_retries)
312             except Exception as e:
313                 logger.info("Couldn't update runtime_status: %s", e)
314
315     def wrapped_callback(self, cb, obj, st):
316         with self.workflow_eval_lock:
317             cb(obj, st)
318             self.workflow_eval_lock.notifyAll()
319
320     def get_wrapped_callback(self, cb):
321         return partial(self.wrapped_callback, cb)
322
323     def on_message(self, event):
324         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
325             uuid = event["object_uuid"]
326             if event["properties"]["new_attributes"]["state"] == "Running":
327                 with self.workflow_eval_lock:
328                     j = self.processes[uuid]
329                     if j.running is False:
330                         j.running = True
331                         j.update_pipeline_component(event["properties"]["new_attributes"])
332                         logger.info("%s %s is Running", self.label(j), uuid)
333             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
334                 self.process_done(uuid, event["properties"]["new_attributes"])
335
336     def label(self, obj):
337         return "[%s %s]" % (self.work_api[0:-1], obj.name)
338
339     def poll_states(self):
340         """Poll status of containers listed in the processes dict.
341
342         Runs in a separate thread.
343         """
344
345         try:
346             remain_wait = self.poll_interval
347             while True:
348                 if remain_wait > 0:
349                     self.stop_polling.wait(remain_wait)
350                 if self.stop_polling.is_set():
351                     break
352                 with self.workflow_eval_lock:
353                     keys = list(self.processes)
354                 if not keys:
355                     remain_wait = self.poll_interval
356                     continue
357
358                 begin_poll = time.time()
359                 if self.work_api == "containers":
360                     table = self.poll_api.container_requests()
361
362                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
363
364                 while keys:
365                     page = keys[:pageSize]
366                     try:
367                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
368                     except Exception:
369                         logger.exception("Error checking states on API server: %s")
370                         remain_wait = self.poll_interval
371                         continue
372
373                     for p in proc_states["items"]:
374                         self.on_message({
375                             "object_uuid": p["uuid"],
376                             "event_type": "update",
377                             "properties": {
378                                 "new_attributes": p
379                             }
380                         })
381                     keys = keys[pageSize:]
382
383                 finish_poll = time.time()
384                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
385         except:
386             logger.exception("Fatal error in state polling thread.")
387             with self.workflow_eval_lock:
388                 self.processes.clear()
389                 self.workflow_eval_lock.notifyAll()
390         finally:
391             self.stop_polling.set()
392
393     def add_intermediate_output(self, uuid):
394         if uuid:
395             self.intermediate_output_collections.append(uuid)
396
397     def trash_intermediate_output(self):
398         logger.info("Cleaning up intermediate output collections")
399         for i in self.intermediate_output_collections:
400             try:
401                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
402             except Exception:
403                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
404             except (KeyboardInterrupt, SystemExit):
405                 break
406
407     def check_features(self, obj, parentfield=""):
408         if isinstance(obj, dict):
409             if obj.get("class") == "DockerRequirement":
410                 if obj.get("dockerOutputDirectory"):
411                     if not obj.get("dockerOutputDirectory").startswith('/'):
412                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
413                             "Option 'dockerOutputDirectory' must be an absolute path.")
414             if obj.get("class") == "InplaceUpdateRequirement":
415                 if obj["inplaceUpdate"] and parentfield == "requirements":
416                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
417             for k,v in viewitems(obj):
418                 self.check_features(v, parentfield=k)
419         elif isinstance(obj, list):
420             for i,v in enumerate(obj):
421                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
422                     self.check_features(v, parentfield=parentfield)
423
424     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
425         outputObj = copy.deepcopy(outputObj)
426
427         files = []
428         def capture(fileobj):
429             files.append(fileobj)
430
431         adjustDirObjs(outputObj, capture)
432         adjustFileObjs(outputObj, capture)
433
434         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
435
436         final = arvados.collection.Collection(api_client=self.api,
437                                               keep_client=self.keep_client,
438                                               num_retries=self.num_retries)
439
440         for k,v in generatemapper.items():
441             if v.type == "Directory" and v.resolved.startswith("_:"):
442                     continue
443             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
444                 with final.open(v.target, "wb") as f:
445                     f.write(v.resolved.encode("utf-8"))
446                     continue
447
448             if not v.resolved.startswith("keep:"):
449                 raise Exception("Output source is not in keep or a literal")
450             sp = v.resolved.split("/")
451             srccollection = sp[0][5:]
452             try:
453                 reader = self.collection_cache.get(srccollection)
454                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
455                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
456             except arvados.errors.ArgumentError as e:
457                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
458                 raise
459             except IOError as e:
460                 logger.error("While preparing output collection: %s", e)
461                 raise
462
463         def rewrite(fileobj):
464             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
465             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
466                 if k in fileobj:
467                     del fileobj[k]
468
469         adjustDirObjs(outputObj, rewrite)
470         adjustFileObjs(outputObj, rewrite)
471
472         with final.open("cwl.output.json", "w") as f:
473             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
474             f.write(res)
475
476         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
477
478         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
479                     final.api_response()["name"],
480                     final.manifest_locator())
481
482         final_uuid = final.manifest_locator()
483         tags = tagsString.split(',')
484         for tag in tags:
485              self.api.links().create(body={
486                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
487                 }).execute(num_retries=self.num_retries)
488
489         def finalcollection(fileobj):
490             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
491
492         adjustDirObjs(outputObj, finalcollection)
493         adjustFileObjs(outputObj, finalcollection)
494
495         return (outputObj, final)
496
497     def set_crunch_output(self):
498         if self.work_api == "containers":
499             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
500             if current is None:
501                 return
502             try:
503                 self.api.containers().update(uuid=current['uuid'],
504                                              body={
505                                                  'output': self.final_output_collection.portable_data_hash(),
506                                              }).execute(num_retries=self.num_retries)
507                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
508                                               body={
509                                                   'is_trashed': True
510                                               }).execute(num_retries=self.num_retries)
511             except Exception:
512                 logger.exception("Setting container output")
513                 raise
514
515     def apply_reqs(self, job_order_object, tool):
516         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
517             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
518                 raise WorkflowException(
519                     "`cwl:requirements` in the input object is not part of CWL "
520                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
521                     "can set the cwlVersion to v1.1 or greater and re-run with "
522                     "--enable-dev.")
523             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
524             for req in job_reqs:
525                 tool.requirements.append(req)
526
527     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
528         self.debug = runtimeContext.debug
529
530         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
531         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
532         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
533         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
534
535         updated_tool.visit(self.check_features)
536
537         self.project_uuid = runtimeContext.project_uuid
538         self.pipeline = None
539         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
540         self.secret_store = runtimeContext.secret_store
541
542         self.trash_intermediate = runtimeContext.trash_intermediate
543         if self.trash_intermediate and self.work_api != "containers":
544             raise Exception("--trash-intermediate is only supported with --api=containers.")
545
546         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
547         if self.intermediate_output_ttl and self.work_api != "containers":
548             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
549         if self.intermediate_output_ttl < 0:
550             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
551
552         if runtimeContext.submit_request_uuid and self.work_api != "containers":
553             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
554
555         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
556         if runtimeContext.storage_classes == "default":
557             runtimeContext.storage_classes = default_storage_classes
558         if runtimeContext.intermediate_storage_classes == "default":
559             runtimeContext.intermediate_storage_classes = default_storage_classes
560
561         if not runtimeContext.name:
562             runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
563
564         # Upload local file references in the job order.
565         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
566                                      updated_tool, job_order)
567
568         # the last clause means: if it is a command line tool, and we
569         # are going to wait for the result, and always_submit_runner
570         # is false, then we don't submit a runner process.
571
572         submitting = (runtimeContext.update_workflow or
573                       runtimeContext.create_workflow or
574                       (runtimeContext.submit and not
575                        (updated_tool.tool["class"] == "CommandLineTool" and
576                         runtimeContext.wait and
577                         not runtimeContext.always_submit_runner)))
578
579         loadingContext = self.loadingContext.copy()
580         loadingContext.do_validate = False
581         if submitting:
582             loadingContext.do_update = False
583             # Document may have been auto-updated. Reload the original
584             # document with updating disabled because we want to
585             # submit the document with its original CWL version, not
586             # the auto-updated one.
587             tool = load_tool(updated_tool.tool["id"], loadingContext)
588         else:
589             tool = updated_tool
590
591         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
592         # Also uploads docker images.
593         merged_map = upload_workflow_deps(self, tool)
594
595         # Recreate process object (ArvadosWorkflow or
596         # ArvadosCommandTool) because tool document may have been
597         # updated by upload_workflow_deps in ways that modify
598         # inheritance of hints or requirements.
599         loadingContext.loader = tool.doc_loader
600         loadingContext.avsc_names = tool.doc_schema
601         loadingContext.metadata = tool.metadata
602         tool = load_tool(tool.tool, loadingContext)
603
604         existing_uuid = runtimeContext.update_workflow
605         if existing_uuid or runtimeContext.create_workflow:
606             # Create a pipeline template or workflow record and exit.
607             if self.work_api == "containers":
608                 uuid = upload_workflow(self, tool, job_order,
609                                         self.project_uuid,
610                                         uuid=existing_uuid,
611                                         submit_runner_ram=runtimeContext.submit_runner_ram,
612                                         name=runtimeContext.name,
613                                         merged_map=merged_map,
614                                         submit_runner_image=runtimeContext.submit_runner_image)
615                 self.stdout.write(uuid + "\n")
616                 return (None, "success")
617
618         self.apply_reqs(job_order, tool)
619
620         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
621         self.eval_timeout = runtimeContext.eval_timeout
622
623         runtimeContext = runtimeContext.copy()
624         runtimeContext.use_container = True
625         runtimeContext.tmpdir_prefix = "tmp"
626         runtimeContext.work_api = self.work_api
627
628         if self.work_api == "containers":
629             if self.ignore_docker_for_reuse:
630                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
631             runtimeContext.outdir = "/var/spool/cwl"
632             runtimeContext.docker_outdir = "/var/spool/cwl"
633             runtimeContext.tmpdir = "/tmp"
634             runtimeContext.docker_tmpdir = "/tmp"
635
636         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
637             raise Exception("--priority must be in the range 1..1000.")
638
639         if self.should_estimate_cache_size:
640             visited = set()
641             estimated_size = [0]
642             def estimate_collection_cache(obj):
643                 if obj.get("location", "").startswith("keep:"):
644                     m = pdh_size.match(obj["location"][5:])
645                     if m and m.group(1) not in visited:
646                         visited.add(m.group(1))
647                         estimated_size[0] += int(m.group(2))
648             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
649             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
650             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
651
652         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
653
654         runnerjob = None
655         if runtimeContext.submit:
656             # Submit a runner job to run the workflow for us.
657             if self.work_api == "containers":
658                 if submitting:
659                     tool = RunnerContainer(self, updated_tool,
660                                            tool, loadingContext, runtimeContext.enable_reuse,
661                                            self.output_name,
662                                            self.output_tags,
663                                            submit_runner_ram=runtimeContext.submit_runner_ram,
664                                            name=runtimeContext.name,
665                                            on_error=runtimeContext.on_error,
666                                            submit_runner_image=runtimeContext.submit_runner_image,
667                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
668                                            merged_map=merged_map,
669                                            priority=runtimeContext.priority,
670                                            secret_store=self.secret_store,
671                                            collection_cache_size=runtimeContext.collection_cache_size,
672                                            collection_cache_is_default=self.should_estimate_cache_size)
673                 else:
674                     runtimeContext.runnerjob = tool.tool["id"]
675
676         if runtimeContext.cwl_runner_job is not None:
677             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
678
679         jobiter = tool.job(job_order,
680                            self.output_callback,
681                            runtimeContext)
682
683         if runtimeContext.submit and not runtimeContext.wait:
684             runnerjob = next(jobiter)
685             runnerjob.run(runtimeContext)
686             self.stdout.write(runnerjob.uuid+"\n")
687             return (None, "success")
688
689         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
690         if current_container:
691             logger.info("Running inside container %s", current_container.get("uuid"))
692
693         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
694         self.polling_thread = threading.Thread(target=self.poll_states)
695         self.polling_thread.start()
696
697         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
698
699         try:
700             self.workflow_eval_lock.acquire()
701
702             # Holds the lock while this code runs and releases it when
703             # it is safe to do so in self.workflow_eval_lock.wait(),
704             # at which point on_message can update job state and
705             # process output callbacks.
706
707             loopperf = Perf(metrics, "jobiter")
708             loopperf.__enter__()
709             for runnable in jobiter:
710                 loopperf.__exit__()
711
712                 if self.stop_polling.is_set():
713                     break
714
715                 if self.task_queue.error is not None:
716                     raise self.task_queue.error
717
718                 if runnable:
719                     with Perf(metrics, "run"):
720                         self.start_run(runnable, runtimeContext)
721                 else:
722                     if (self.task_queue.in_flight + len(self.processes)) > 0:
723                         self.workflow_eval_lock.wait(3)
724                     else:
725                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
726                         break
727
728                 if self.stop_polling.is_set():
729                     break
730
731                 loopperf.__enter__()
732             loopperf.__exit__()
733
734             while (self.task_queue.in_flight + len(self.processes)) > 0:
735                 if self.task_queue.error is not None:
736                     raise self.task_queue.error
737                 self.workflow_eval_lock.wait(3)
738
739         except UnsupportedRequirement:
740             raise
741         except:
742             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
743                 logger.error("Interrupted, workflow will be cancelled")
744             elif isinstance(sys.exc_info()[1], WorkflowException):
745                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
746             else:
747                 logger.exception("Workflow execution failed")
748
749             if self.pipeline:
750                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
751                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
752
753             if self.work_api == "containers" and not current_container:
754                 # Not running in a crunch container, so cancel any outstanding processes.
755                 for p in self.processes:
756                     try:
757                         self.api.container_requests().update(uuid=p,
758                                                              body={"priority": "0"}
759                         ).execute(num_retries=self.num_retries)
760                     except Exception:
761                         pass
762         finally:
763             self.workflow_eval_lock.release()
764             self.task_queue.drain()
765             self.stop_polling.set()
766             self.polling_thread.join()
767             self.task_queue.join()
768
769         if self.final_status == "UnsupportedRequirement":
770             raise UnsupportedRequirement("Check log for details.")
771
772         if self.final_output is None:
773             raise WorkflowException("Workflow did not return a result.")
774
775         if runtimeContext.submit and isinstance(tool, Runner):
776             logger.info("Final output collection %s", tool.final_output)
777             if workbench2 or workbench1:
778                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
779         else:
780             if self.output_name is None:
781                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
782             if self.output_tags is None:
783                 self.output_tags = ""
784
785             storage_classes = ""
786             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
787             if storage_class_req and storage_class_req.get("finalStorageClass"):
788                 storage_classes = aslist(storage_class_req["finalStorageClass"])
789             else:
790                 storage_classes = runtimeContext.storage_classes.strip().split(",")
791
792             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
793             self.set_crunch_output()
794
795         if runtimeContext.compute_checksum:
796             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
797             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
798
799         if self.trash_intermediate and self.final_status == "success":
800             self.trash_intermediate_output()
801
802         return (self.final_output, self.final_status)