18238: Add required container_engine parameter to builder
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21
22 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 from schema_salad.sourceline import SourceLine
25 import schema_salad.validate as validate
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 import arvados_cwl.util
33 from .arvcontainer import RunnerContainer
34 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
35 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
36 from .arvworkflow import ArvadosWorkflow, upload_workflow
37 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
38 from .perf import Perf
39 from .pathmapper import NoFollowPathMapper
40 from cwltool.task_queue import TaskQueue
41 from .context import ArvLoadingContext, ArvRuntimeContext
42 from ._version import __version__
43
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
46 from cwltool.command_line_tool import compute_checksums
47 from cwltool.load_tool import load_tool
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51
52 DEFAULT_PRIORITY = 500
53
54 class RuntimeStatusLoggingHandler(logging.Handler):
55     """
56     Intercepts logging calls and report them as runtime statuses on runner
57     containers.
58     """
59     def __init__(self, runtime_status_update_func):
60         super(RuntimeStatusLoggingHandler, self).__init__()
61         self.runtime_status_update = runtime_status_update_func
62         self.updatingRuntimeStatus = False
63
64     def emit(self, record):
65         kind = None
66         if record.levelno >= logging.ERROR:
67             kind = 'error'
68         elif record.levelno >= logging.WARNING:
69             kind = 'warning'
70         if kind is not None and self.updatingRuntimeStatus is not True:
71             self.updatingRuntimeStatus = True
72             try:
73                 log_msg = record.getMessage()
74                 if '\n' in log_msg:
75                     # If the logged message is multi-line, use its first line as status
76                     # and the rest as detail.
77                     status, detail = log_msg.split('\n', 1)
78                     self.runtime_status_update(
79                         kind,
80                         "%s: %s" % (record.name, status),
81                         detail
82                     )
83                 else:
84                     self.runtime_status_update(
85                         kind,
86                         "%s: %s" % (record.name, record.getMessage())
87                     )
88             finally:
89                 self.updatingRuntimeStatus = False
90
91
92 class ArvCwlExecutor(object):
93     """Execute a CWL tool or workflow, submit work (using containers API),
94     wait for them to complete, and report output.
95
96     """
97
98     def __init__(self, api_client,
99                  arvargs=None,
100                  keep_client=None,
101                  num_retries=4,
102                  thread_count=4):
103
104         if arvargs is None:
105             arvargs = argparse.Namespace()
106             arvargs.work_api = None
107             arvargs.output_name = None
108             arvargs.output_tags = None
109             arvargs.thread_count = 1
110             arvargs.collection_cache_size = None
111
112         self.api = api_client
113         self.processes = {}
114         self.workflow_eval_lock = threading.Condition(threading.RLock())
115         self.final_output = None
116         self.final_status = None
117         self.num_retries = num_retries
118         self.uuid = None
119         self.stop_polling = threading.Event()
120         self.poll_api = None
121         self.pipeline = None
122         self.final_output_collection = None
123         self.output_name = arvargs.output_name
124         self.output_tags = arvargs.output_tags
125         self.project_uuid = None
126         self.intermediate_output_ttl = 0
127         self.intermediate_output_collections = []
128         self.trash_intermediate = False
129         self.thread_count = arvargs.thread_count
130         self.poll_interval = 12
131         self.loadingContext = None
132         self.should_estimate_cache_size = True
133         self.fs_access = None
134         self.secret_store = None
135
136         if keep_client is not None:
137             self.keep_client = keep_client
138         else:
139             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
140
141         if arvargs.collection_cache_size:
142             collection_cache_size = arvargs.collection_cache_size*1024*1024
143             self.should_estimate_cache_size = False
144         else:
145             collection_cache_size = 256*1024*1024
146
147         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
148                                                 cap=collection_cache_size)
149
150         self.fetcher_constructor = partial(CollectionFetcher,
151                                            api_client=self.api,
152                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
153                                            num_retries=self.num_retries)
154
155         self.work_api = None
156         expected_api = ["containers"]
157         for api in expected_api:
158             try:
159                 methods = self.api._rootDesc.get('resources')[api]['methods']
160                 if ('httpMethod' in methods['create'] and
161                     (arvargs.work_api == api or arvargs.work_api is None)):
162                     self.work_api = api
163                     break
164             except KeyError:
165                 pass
166
167         if not self.work_api:
168             if arvargs.work_api is None:
169                 raise Exception("No supported APIs")
170             else:
171                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
172
173         if self.work_api == "jobs":
174             logger.error("""
175 *******************************
176 The 'jobs' API is no longer supported.
177 *******************************""")
178             exit(1)
179
180         self.loadingContext = ArvLoadingContext(vars(arvargs))
181         self.loadingContext.fetcher_constructor = self.fetcher_constructor
182         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
183         self.loadingContext.construct_tool_object = self.arv_make_tool
184
185         # Add a custom logging handler to the root logger for runtime status reporting
186         # if running inside a container
187         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
188             root_logger = logging.getLogger('')
189
190             # Remove existing RuntimeStatusLoggingHandlers if they exist
191             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
192             root_logger.handlers = handlers
193
194             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
195             root_logger.addHandler(handler)
196
197         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
198         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
199                                                      collection_cache=self.collection_cache)
200
201         validate_cluster_target(self, self.runtimeContext)
202
203
204     def arv_make_tool(self, toolpath_object, loadingContext):
205         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
206             return ArvadosCommandTool(self, toolpath_object, loadingContext)
207         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
208             return ArvadosWorkflow(self, toolpath_object, loadingContext)
209         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
210             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
211         else:
212             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
213
214     def output_callback(self, out, processStatus):
215         with self.workflow_eval_lock:
216             if processStatus == "success":
217                 logger.info("Overall process status is %s", processStatus)
218                 state = "Complete"
219             else:
220                 logger.error("Overall process status is %s", processStatus)
221                 state = "Failed"
222             if self.pipeline:
223                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
224                                                         body={"state": state}).execute(num_retries=self.num_retries)
225             self.final_status = processStatus
226             self.final_output = out
227             self.workflow_eval_lock.notifyAll()
228
229
230     def start_run(self, runnable, runtimeContext):
231         self.task_queue.add(partial(runnable.run, runtimeContext),
232                             self.workflow_eval_lock, self.stop_polling)
233
234     def process_submitted(self, container):
235         with self.workflow_eval_lock:
236             self.processes[container.uuid] = container
237
238     def process_done(self, uuid, record):
239         with self.workflow_eval_lock:
240             j = self.processes[uuid]
241             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
242             self.task_queue.add(partial(j.done, record),
243                                 self.workflow_eval_lock, self.stop_polling)
244             del self.processes[uuid]
245
246     def runtime_status_update(self, kind, message, detail=None):
247         """
248         Updates the runtime_status field on the runner container.
249         Called when there's a need to report errors, warnings or just
250         activity statuses, for example in the RuntimeStatusLoggingHandler.
251         """
252         with self.workflow_eval_lock:
253             current = None
254             try:
255                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
256             except Exception as e:
257                 logger.info("Couldn't get current container: %s", e)
258             if current is None:
259                 return
260             runtime_status = current.get('runtime_status', {})
261             # In case of status being an error, only report the first one.
262             if kind == 'error':
263                 if not runtime_status.get('error'):
264                     runtime_status.update({
265                         'error': message
266                     })
267                     if detail is not None:
268                         runtime_status.update({
269                             'errorDetail': detail
270                         })
271                 # Further errors are only mentioned as a count.
272                 else:
273                     # Get anything before an optional 'and N more' string.
274                     try:
275                         error_msg = re.match(
276                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
277                         more_failures = re.match(
278                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
279                     except TypeError:
280                         # Ignore tests stubbing errors
281                         return
282                     if more_failures:
283                         failure_qty = int(more_failures.groups()[0])
284                         runtime_status.update({
285                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
286                         })
287                     else:
288                         runtime_status.update({
289                             'error': "%s (and 1 more)" % error_msg
290                         })
291             elif kind in ['warning', 'activity']:
292                 # Record the last warning/activity status without regard of
293                 # previous occurences.
294                 runtime_status.update({
295                     kind: message
296                 })
297                 if detail is not None:
298                     runtime_status.update({
299                         kind+"Detail": detail
300                     })
301             else:
302                 # Ignore any other status kind
303                 return
304             try:
305                 self.api.containers().update(uuid=current['uuid'],
306                                             body={
307                                                 'runtime_status': runtime_status,
308                                             }).execute(num_retries=self.num_retries)
309             except Exception as e:
310                 logger.info("Couldn't update runtime_status: %s", e)
311
312     def wrapped_callback(self, cb, obj, st):
313         with self.workflow_eval_lock:
314             cb(obj, st)
315             self.workflow_eval_lock.notifyAll()
316
317     def get_wrapped_callback(self, cb):
318         return partial(self.wrapped_callback, cb)
319
320     def on_message(self, event):
321         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
322             uuid = event["object_uuid"]
323             if event["properties"]["new_attributes"]["state"] == "Running":
324                 with self.workflow_eval_lock:
325                     j = self.processes[uuid]
326                     if j.running is False:
327                         j.running = True
328                         j.update_pipeline_component(event["properties"]["new_attributes"])
329                         logger.info("%s %s is Running", self.label(j), uuid)
330             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
331                 self.process_done(uuid, event["properties"]["new_attributes"])
332
333     def label(self, obj):
334         return "[%s %s]" % (self.work_api[0:-1], obj.name)
335
336     def poll_states(self):
337         """Poll status of containers listed in the processes dict.
338
339         Runs in a separate thread.
340         """
341
342         try:
343             remain_wait = self.poll_interval
344             while True:
345                 if remain_wait > 0:
346                     self.stop_polling.wait(remain_wait)
347                 if self.stop_polling.is_set():
348                     break
349                 with self.workflow_eval_lock:
350                     keys = list(self.processes)
351                 if not keys:
352                     remain_wait = self.poll_interval
353                     continue
354
355                 begin_poll = time.time()
356                 if self.work_api == "containers":
357                     table = self.poll_api.container_requests()
358
359                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
360
361                 while keys:
362                     page = keys[:pageSize]
363                     try:
364                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
365                     except Exception:
366                         logger.exception("Error checking states on API server: %s")
367                         remain_wait = self.poll_interval
368                         continue
369
370                     for p in proc_states["items"]:
371                         self.on_message({
372                             "object_uuid": p["uuid"],
373                             "event_type": "update",
374                             "properties": {
375                                 "new_attributes": p
376                             }
377                         })
378                     keys = keys[pageSize:]
379
380                 finish_poll = time.time()
381                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
382         except:
383             logger.exception("Fatal error in state polling thread.")
384             with self.workflow_eval_lock:
385                 self.processes.clear()
386                 self.workflow_eval_lock.notifyAll()
387         finally:
388             self.stop_polling.set()
389
390     def add_intermediate_output(self, uuid):
391         if uuid:
392             self.intermediate_output_collections.append(uuid)
393
394     def trash_intermediate_output(self):
395         logger.info("Cleaning up intermediate output collections")
396         for i in self.intermediate_output_collections:
397             try:
398                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
399             except Exception:
400                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
401             except (KeyboardInterrupt, SystemExit):
402                 break
403
404     def check_features(self, obj, parentfield=""):
405         if isinstance(obj, dict):
406             if obj.get("class") == "DockerRequirement":
407                 if obj.get("dockerOutputDirectory"):
408                     if not obj.get("dockerOutputDirectory").startswith('/'):
409                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
410                             "Option 'dockerOutputDirectory' must be an absolute path.")
411             if obj.get("class") == "InplaceUpdateRequirement":
412                 if obj["inplaceUpdate"] and parentfield == "requirements":
413                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
414             for k,v in viewitems(obj):
415                 self.check_features(v, parentfield=k)
416         elif isinstance(obj, list):
417             for i,v in enumerate(obj):
418                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
419                     self.check_features(v, parentfield=parentfield)
420
421     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
422         outputObj = copy.deepcopy(outputObj)
423
424         files = []
425         def capture(fileobj):
426             files.append(fileobj)
427
428         adjustDirObjs(outputObj, capture)
429         adjustFileObjs(outputObj, capture)
430
431         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
432
433         final = arvados.collection.Collection(api_client=self.api,
434                                               keep_client=self.keep_client,
435                                               num_retries=self.num_retries)
436
437         for k,v in generatemapper.items():
438             if v.type == "Directory" and v.resolved.startswith("_:"):
439                     continue
440             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
441                 with final.open(v.target, "wb") as f:
442                     f.write(v.resolved.encode("utf-8"))
443                     continue
444
445             if not v.resolved.startswith("keep:"):
446                 raise Exception("Output source is not in keep or a literal")
447             sp = v.resolved.split("/")
448             srccollection = sp[0][5:]
449             try:
450                 reader = self.collection_cache.get(srccollection)
451                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
452                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
453             except arvados.errors.ArgumentError as e:
454                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
455                 raise
456             except IOError as e:
457                 logger.error("While preparing output collection: %s", e)
458                 raise
459
460         def rewrite(fileobj):
461             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
462             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
463                 if k in fileobj:
464                     del fileobj[k]
465
466         adjustDirObjs(outputObj, rewrite)
467         adjustFileObjs(outputObj, rewrite)
468
469         with final.open("cwl.output.json", "w") as f:
470             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
471             f.write(res)
472
473         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
474
475         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
476                     final.api_response()["name"],
477                     final.manifest_locator())
478
479         final_uuid = final.manifest_locator()
480         tags = tagsString.split(',')
481         for tag in tags:
482              self.api.links().create(body={
483                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
484                 }).execute(num_retries=self.num_retries)
485
486         def finalcollection(fileobj):
487             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
488
489         adjustDirObjs(outputObj, finalcollection)
490         adjustFileObjs(outputObj, finalcollection)
491
492         return (outputObj, final)
493
494     def set_crunch_output(self):
495         if self.work_api == "containers":
496             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
497             if current is None:
498                 return
499             try:
500                 self.api.containers().update(uuid=current['uuid'],
501                                              body={
502                                                  'output': self.final_output_collection.portable_data_hash(),
503                                              }).execute(num_retries=self.num_retries)
504                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
505                                               body={
506                                                   'is_trashed': True
507                                               }).execute(num_retries=self.num_retries)
508             except Exception:
509                 logger.exception("Setting container output")
510                 raise
511
512     def apply_reqs(self, job_order_object, tool):
513         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
514             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
515                 raise WorkflowException(
516                     "`cwl:requirements` in the input object is not part of CWL "
517                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
518                     "can set the cwlVersion to v1.1 or greater and re-run with "
519                     "--enable-dev.")
520             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
521             for req in job_reqs:
522                 tool.requirements.append(req)
523
524     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
525         self.debug = runtimeContext.debug
526
527         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
528         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
529         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
530         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
531
532         updated_tool.visit(self.check_features)
533
534         self.project_uuid = runtimeContext.project_uuid
535         self.pipeline = None
536         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
537         self.secret_store = runtimeContext.secret_store
538
539         self.trash_intermediate = runtimeContext.trash_intermediate
540         if self.trash_intermediate and self.work_api != "containers":
541             raise Exception("--trash-intermediate is only supported with --api=containers.")
542
543         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
544         if self.intermediate_output_ttl and self.work_api != "containers":
545             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
546         if self.intermediate_output_ttl < 0:
547             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
548
549         if runtimeContext.submit_request_uuid and self.work_api != "containers":
550             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
551
552         default_storage_classes = ",".join([k for k,v in self.api.config()["StorageClasses"].items() if v.get("Default") is True])
553         if runtimeContext.storage_classes == "default":
554             runtimeContext.storage_classes = default_storage_classes
555         if runtimeContext.intermediate_storage_classes == "default":
556             runtimeContext.intermediate_storage_classes = default_storage_classes
557
558         if not runtimeContext.name:
559             runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
560
561         # Upload local file references in the job order.
562         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
563                                      updated_tool, job_order)
564
565         # the last clause means: if it is a command line tool, and we
566         # are going to wait for the result, and always_submit_runner
567         # is false, then we don't submit a runner process.
568
569         submitting = (runtimeContext.update_workflow or
570                       runtimeContext.create_workflow or
571                       (runtimeContext.submit and not
572                        (updated_tool.tool["class"] == "CommandLineTool" and
573                         runtimeContext.wait and
574                         not runtimeContext.always_submit_runner)))
575
576         loadingContext = self.loadingContext.copy()
577         loadingContext.do_validate = False
578         loadingContext.do_update = False
579         if submitting:
580             # Document may have been auto-updated. Reload the original
581             # document with updating disabled because we want to
582             # submit the document with its original CWL version, not
583             # the auto-updated one.
584             tool = load_tool(updated_tool.tool["id"], loadingContext)
585         else:
586             tool = updated_tool
587
588         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
589         # Also uploads docker images.
590         merged_map = upload_workflow_deps(self, tool)
591
592         # Recreate process object (ArvadosWorkflow or
593         # ArvadosCommandTool) because tool document may have been
594         # updated by upload_workflow_deps in ways that modify
595         # inheritance of hints or requirements.
596         loadingContext.loader = tool.doc_loader
597         loadingContext.avsc_names = tool.doc_schema
598         loadingContext.metadata = tool.metadata
599         tool = load_tool(tool.tool, loadingContext)
600
601         existing_uuid = runtimeContext.update_workflow
602         if existing_uuid or runtimeContext.create_workflow:
603             # Create a pipeline template or workflow record and exit.
604             if self.work_api == "containers":
605                 return (upload_workflow(self, tool, job_order,
606                                         self.project_uuid,
607                                         uuid=existing_uuid,
608                                         submit_runner_ram=runtimeContext.submit_runner_ram,
609                                         name=runtimeContext.name,
610                                         merged_map=merged_map,
611                                         submit_runner_image=runtimeContext.submit_runner_image),
612                         "success")
613
614         self.apply_reqs(job_order, tool)
615
616         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
617         self.eval_timeout = runtimeContext.eval_timeout
618
619         runtimeContext = runtimeContext.copy()
620         runtimeContext.use_container = True
621         runtimeContext.tmpdir_prefix = "tmp"
622         runtimeContext.work_api = self.work_api
623
624         if self.work_api == "containers":
625             if self.ignore_docker_for_reuse:
626                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
627             runtimeContext.outdir = "/var/spool/cwl"
628             runtimeContext.docker_outdir = "/var/spool/cwl"
629             runtimeContext.tmpdir = "/tmp"
630             runtimeContext.docker_tmpdir = "/tmp"
631
632         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
633             raise Exception("--priority must be in the range 1..1000.")
634
635         if self.should_estimate_cache_size:
636             visited = set()
637             estimated_size = [0]
638             def estimate_collection_cache(obj):
639                 if obj.get("location", "").startswith("keep:"):
640                     m = pdh_size.match(obj["location"][5:])
641                     if m and m.group(1) not in visited:
642                         visited.add(m.group(1))
643                         estimated_size[0] += int(m.group(2))
644             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
645             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
646             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
647
648         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
649
650         runnerjob = None
651         if runtimeContext.submit:
652             # Submit a runner job to run the workflow for us.
653             if self.work_api == "containers":
654                 if submitting:
655                     tool = RunnerContainer(self, updated_tool,
656                                            tool, loadingContext, runtimeContext.enable_reuse,
657                                            self.output_name,
658                                            self.output_tags,
659                                            submit_runner_ram=runtimeContext.submit_runner_ram,
660                                            name=runtimeContext.name,
661                                            on_error=runtimeContext.on_error,
662                                            submit_runner_image=runtimeContext.submit_runner_image,
663                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
664                                            merged_map=merged_map,
665                                            priority=runtimeContext.priority,
666                                            secret_store=self.secret_store,
667                                            collection_cache_size=runtimeContext.collection_cache_size,
668                                            collection_cache_is_default=self.should_estimate_cache_size)
669                 else:
670                     runtimeContext.runnerjob = tool.tool["id"]
671
672         if runtimeContext.cwl_runner_job is not None:
673             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
674
675         jobiter = tool.job(job_order,
676                            self.output_callback,
677                            runtimeContext)
678
679         if runtimeContext.submit and not runtimeContext.wait:
680             runnerjob = next(jobiter)
681             runnerjob.run(runtimeContext)
682             return (runnerjob.uuid, "success")
683
684         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
685         if current_container:
686             logger.info("Running inside container %s", current_container.get("uuid"))
687
688         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
689         self.polling_thread = threading.Thread(target=self.poll_states)
690         self.polling_thread.start()
691
692         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
693
694         try:
695             self.workflow_eval_lock.acquire()
696
697             # Holds the lock while this code runs and releases it when
698             # it is safe to do so in self.workflow_eval_lock.wait(),
699             # at which point on_message can update job state and
700             # process output callbacks.
701
702             loopperf = Perf(metrics, "jobiter")
703             loopperf.__enter__()
704             for runnable in jobiter:
705                 loopperf.__exit__()
706
707                 if self.stop_polling.is_set():
708                     break
709
710                 if self.task_queue.error is not None:
711                     raise self.task_queue.error
712
713                 if runnable:
714                     with Perf(metrics, "run"):
715                         self.start_run(runnable, runtimeContext)
716                 else:
717                     if (self.task_queue.in_flight + len(self.processes)) > 0:
718                         self.workflow_eval_lock.wait(3)
719                     else:
720                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
721                         break
722
723                 if self.stop_polling.is_set():
724                     break
725
726                 loopperf.__enter__()
727             loopperf.__exit__()
728
729             while (self.task_queue.in_flight + len(self.processes)) > 0:
730                 if self.task_queue.error is not None:
731                     raise self.task_queue.error
732                 self.workflow_eval_lock.wait(3)
733
734         except UnsupportedRequirement:
735             raise
736         except:
737             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
738                 logger.error("Interrupted, workflow will be cancelled")
739             elif isinstance(sys.exc_info()[1], WorkflowException):
740                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
741             else:
742                 logger.exception("Workflow execution failed")
743
744             if self.pipeline:
745                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
746                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
747
748             if self.work_api == "containers" and not current_container:
749                 # Not running in a crunch container, so cancel any outstanding processes.
750                 for p in self.processes:
751                     try:
752                         self.api.container_requests().update(uuid=p,
753                                                              body={"priority": "0"}
754                         ).execute(num_retries=self.num_retries)
755                     except Exception:
756                         pass
757         finally:
758             self.workflow_eval_lock.release()
759             self.task_queue.drain()
760             self.stop_polling.set()
761             self.polling_thread.join()
762             self.task_queue.join()
763
764         if self.final_status == "UnsupportedRequirement":
765             raise UnsupportedRequirement("Check log for details.")
766
767         if self.final_output is None:
768             raise WorkflowException("Workflow did not return a result.")
769
770         if runtimeContext.submit and isinstance(tool, Runner):
771             logger.info("Final output collection %s", tool.final_output)
772             if workbench2 or workbench1:
773                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
774         else:
775             if self.output_name is None:
776                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
777             if self.output_tags is None:
778                 self.output_tags = ""
779
780             storage_classes = ""
781             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
782             if storage_class_req and storage_class_req.get("finalStorageClass"):
783                 storage_classes = aslist(storage_class_req["finalStorageClass"])
784             else:
785                 storage_classes = runtimeContext.storage_classes.strip().split(",")
786
787             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
788             self.set_crunch_output()
789
790         if runtimeContext.compute_checksum:
791             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
792             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
793
794         if self.trash_intermediate and self.final_status == "success":
795             self.trash_intermediate_output()
796
797         return (self.final_output, self.final_status)