Fix 2.4.2 upgrade notes formatting refs #19330
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21 import urllib
22
23 from cwltool.errors import WorkflowException
24 import cwltool.workflow
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 import arvados_cwl.util
34 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
35 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
36 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from cwltool.task_queue import TaskQueue
42 from .context import ArvLoadingContext, ArvRuntimeContext
43 from ._version import __version__
44
45 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
46 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
47 from cwltool.command_line_tool import compute_checksums
48 from cwltool.load_tool import load_tool
49
50 logger = logging.getLogger('arvados.cwl-runner')
51 metrics = logging.getLogger('arvados.cwl-runner.metrics')
52
53 DEFAULT_PRIORITY = 500
54
55 class RuntimeStatusLoggingHandler(logging.Handler):
56     """
57     Intercepts logging calls and report them as runtime statuses on runner
58     containers.
59     """
60     def __init__(self, runtime_status_update_func):
61         super(RuntimeStatusLoggingHandler, self).__init__()
62         self.runtime_status_update = runtime_status_update_func
63         self.updatingRuntimeStatus = False
64
65     def emit(self, record):
66         kind = None
67         if record.levelno >= logging.ERROR:
68             kind = 'error'
69         elif record.levelno >= logging.WARNING:
70             kind = 'warning'
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using containers API),
95     wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4,
104                  stdout=sys.stdout):
105
106         if arvargs is None:
107             arvargs = argparse.Namespace()
108             arvargs.work_api = None
109             arvargs.output_name = None
110             arvargs.output_tags = None
111             arvargs.thread_count = 1
112             arvargs.collection_cache_size = None
113
114         self.api = api_client
115         self.processes = {}
116         self.workflow_eval_lock = threading.Condition(threading.RLock())
117         self.final_output = None
118         self.final_status = None
119         self.num_retries = num_retries
120         self.uuid = None
121         self.stop_polling = threading.Event()
122         self.poll_api = None
123         self.pipeline = None
124         self.final_output_collection = None
125         self.output_name = arvargs.output_name
126         self.output_tags = arvargs.output_tags
127         self.project_uuid = None
128         self.intermediate_output_ttl = 0
129         self.intermediate_output_collections = []
130         self.trash_intermediate = False
131         self.thread_count = arvargs.thread_count
132         self.poll_interval = 12
133         self.loadingContext = None
134         self.should_estimate_cache_size = True
135         self.fs_access = None
136         self.secret_store = None
137         self.stdout = stdout
138
139         if keep_client is not None:
140             self.keep_client = keep_client
141         else:
142             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
143
144         if arvargs.collection_cache_size:
145             collection_cache_size = arvargs.collection_cache_size*1024*1024
146             self.should_estimate_cache_size = False
147         else:
148             collection_cache_size = 256*1024*1024
149
150         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
151                                                 cap=collection_cache_size)
152
153         self.fetcher_constructor = partial(CollectionFetcher,
154                                            api_client=self.api,
155                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
156                                            num_retries=self.num_retries)
157
158         self.work_api = None
159         expected_api = ["containers"]
160         for api in expected_api:
161             try:
162                 methods = self.api._rootDesc.get('resources')[api]['methods']
163                 if ('httpMethod' in methods['create'] and
164                     (arvargs.work_api == api or arvargs.work_api is None)):
165                     self.work_api = api
166                     break
167             except KeyError:
168                 pass
169
170         if not self.work_api:
171             if arvargs.work_api is None:
172                 raise Exception("No supported APIs")
173             else:
174                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
175
176         if self.work_api == "jobs":
177             logger.error("""
178 *******************************
179 The 'jobs' API is no longer supported.
180 *******************************""")
181             exit(1)
182
183         self.loadingContext = ArvLoadingContext(vars(arvargs))
184         self.loadingContext.fetcher_constructor = self.fetcher_constructor
185         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
186         self.loadingContext.construct_tool_object = self.arv_make_tool
187
188         # Add a custom logging handler to the root logger for runtime status reporting
189         # if running inside a container
190         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
191             root_logger = logging.getLogger('')
192
193             # Remove existing RuntimeStatusLoggingHandlers if they exist
194             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
195             root_logger.handlers = handlers
196
197             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
198             root_logger.addHandler(handler)
199
200         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
201         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
202                                                      collection_cache=self.collection_cache)
203
204         validate_cluster_target(self, self.toplevel_runtimeContext)
205
206
207     def arv_make_tool(self, toolpath_object, loadingContext):
208         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
209             return ArvadosCommandTool(self, toolpath_object, loadingContext)
210         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
211             return ArvadosWorkflow(self, toolpath_object, loadingContext)
212         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
213             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
214         else:
215             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
216
217     def output_callback(self, out, processStatus):
218         with self.workflow_eval_lock:
219             if processStatus == "success":
220                 logger.info("Overall process status is %s", processStatus)
221                 state = "Complete"
222             else:
223                 logger.error("Overall process status is %s", processStatus)
224                 state = "Failed"
225             if self.pipeline:
226                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
227                                                         body={"state": state}).execute(num_retries=self.num_retries)
228             self.final_status = processStatus
229             self.final_output = out
230             self.workflow_eval_lock.notifyAll()
231
232
233     def start_run(self, runnable, runtimeContext):
234         self.task_queue.add(partial(runnable.run, runtimeContext),
235                             self.workflow_eval_lock, self.stop_polling)
236
237     def process_submitted(self, container):
238         with self.workflow_eval_lock:
239             self.processes[container.uuid] = container
240
241     def process_done(self, uuid, record):
242         with self.workflow_eval_lock:
243             j = self.processes[uuid]
244             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
245             self.task_queue.add(partial(j.done, record),
246                                 self.workflow_eval_lock, self.stop_polling)
247             del self.processes[uuid]
248
249     def runtime_status_update(self, kind, message, detail=None):
250         """
251         Updates the runtime_status field on the runner container.
252         Called when there's a need to report errors, warnings or just
253         activity statuses, for example in the RuntimeStatusLoggingHandler.
254         """
255         with self.workflow_eval_lock:
256             current = None
257             try:
258                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
259             except Exception as e:
260                 logger.info("Couldn't get current container: %s", e)
261             if current is None:
262                 return
263             runtime_status = current.get('runtime_status', {})
264             if kind in ('error', 'warning'):
265                 updatemessage = runtime_status.get(kind, "")
266                 if not updatemessage:
267                     updatemessage = message
268
269                 # Subsequent messages tacked on in detail
270                 updatedetail = runtime_status.get(kind+'Detail', "")
271                 maxlines = 40
272                 if updatedetail.count("\n") < maxlines:
273                     if updatedetail:
274                         updatedetail += "\n"
275                     updatedetail += message + "\n"
276
277                     if detail:
278                         updatedetail += detail + "\n"
279
280                     if updatedetail.count("\n") >= maxlines:
281                         updatedetail += "\nSome messages may have been omitted.  Check the full log."
282
283                 runtime_status.update({
284                     kind: updatemessage,
285                     kind+'Detail': updatedetail,
286                 })
287             else:
288                 # Ignore any other status kind
289                 return
290             try:
291                 self.api.containers().update(uuid=current['uuid'],
292                                             body={
293                                                 'runtime_status': runtime_status,
294                                             }).execute(num_retries=self.num_retries)
295             except Exception as e:
296                 logger.info("Couldn't update runtime_status: %s", e)
297
298     def wrapped_callback(self, cb, obj, st):
299         with self.workflow_eval_lock:
300             cb(obj, st)
301             self.workflow_eval_lock.notifyAll()
302
303     def get_wrapped_callback(self, cb):
304         return partial(self.wrapped_callback, cb)
305
306     def on_message(self, event):
307         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
308             uuid = event["object_uuid"]
309             if event["properties"]["new_attributes"]["state"] == "Running":
310                 with self.workflow_eval_lock:
311                     j = self.processes[uuid]
312                     if j.running is False:
313                         j.running = True
314                         j.update_pipeline_component(event["properties"]["new_attributes"])
315                         logger.info("%s %s is Running", self.label(j), uuid)
316             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
317                 self.process_done(uuid, event["properties"]["new_attributes"])
318
319     def label(self, obj):
320         return "[%s %s]" % (self.work_api[0:-1], obj.name)
321
322     def poll_states(self):
323         """Poll status of containers listed in the processes dict.
324
325         Runs in a separate thread.
326         """
327
328         try:
329             remain_wait = self.poll_interval
330             while True:
331                 if remain_wait > 0:
332                     self.stop_polling.wait(remain_wait)
333                 if self.stop_polling.is_set():
334                     break
335                 with self.workflow_eval_lock:
336                     keys = list(self.processes)
337                 if not keys:
338                     remain_wait = self.poll_interval
339                     continue
340
341                 begin_poll = time.time()
342                 if self.work_api == "containers":
343                     table = self.poll_api.container_requests()
344
345                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
346
347                 while keys:
348                     page = keys[:pageSize]
349                     try:
350                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
351                     except Exception:
352                         logger.exception("Error checking states on API server: %s")
353                         remain_wait = self.poll_interval
354                         continue
355
356                     for p in proc_states["items"]:
357                         self.on_message({
358                             "object_uuid": p["uuid"],
359                             "event_type": "update",
360                             "properties": {
361                                 "new_attributes": p
362                             }
363                         })
364                     keys = keys[pageSize:]
365
366                 finish_poll = time.time()
367                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
368         except:
369             logger.exception("Fatal error in state polling thread.")
370             with self.workflow_eval_lock:
371                 self.processes.clear()
372                 self.workflow_eval_lock.notifyAll()
373         finally:
374             self.stop_polling.set()
375
376     def add_intermediate_output(self, uuid):
377         if uuid:
378             self.intermediate_output_collections.append(uuid)
379
380     def trash_intermediate_output(self):
381         logger.info("Cleaning up intermediate output collections")
382         for i in self.intermediate_output_collections:
383             try:
384                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
385             except Exception:
386                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
387             except (KeyboardInterrupt, SystemExit):
388                 break
389
390     def check_features(self, obj, parentfield=""):
391         if isinstance(obj, dict):
392             if obj.get("class") == "DockerRequirement":
393                 if obj.get("dockerOutputDirectory"):
394                     if not obj.get("dockerOutputDirectory").startswith('/'):
395                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
396                             "Option 'dockerOutputDirectory' must be an absolute path.")
397             if obj.get("class") == "InplaceUpdateRequirement":
398                 if obj["inplaceUpdate"] and parentfield == "requirements":
399                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
400             for k,v in viewitems(obj):
401                 self.check_features(v, parentfield=k)
402         elif isinstance(obj, list):
403             for i,v in enumerate(obj):
404                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
405                     self.check_features(v, parentfield=parentfield)
406
407     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
408         outputObj = copy.deepcopy(outputObj)
409
410         files = []
411         def capture(fileobj):
412             files.append(fileobj)
413
414         adjustDirObjs(outputObj, capture)
415         adjustFileObjs(outputObj, capture)
416
417         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
418
419         final = arvados.collection.Collection(api_client=self.api,
420                                               keep_client=self.keep_client,
421                                               num_retries=self.num_retries)
422
423         for k,v in generatemapper.items():
424             if v.type == "Directory" and v.resolved.startswith("_:"):
425                     continue
426             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
427                 with final.open(v.target, "wb") as f:
428                     f.write(v.resolved.encode("utf-8"))
429                     continue
430
431             if not v.resolved.startswith("keep:"):
432                 raise Exception("Output source is not in keep or a literal")
433             sp = v.resolved.split("/")
434             srccollection = sp[0][5:]
435             try:
436                 reader = self.collection_cache.get(srccollection)
437                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
438                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
439             except arvados.errors.ArgumentError as e:
440                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
441                 raise
442             except IOError as e:
443                 logger.error("While preparing output collection: %s", e)
444                 raise
445
446         def rewrite(fileobj):
447             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
448             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
449                 if k in fileobj:
450                     del fileobj[k]
451
452         adjustDirObjs(outputObj, rewrite)
453         adjustFileObjs(outputObj, rewrite)
454
455         with final.open("cwl.output.json", "w") as f:
456             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
457             f.write(res)
458
459
460         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
461                        ensure_unique_name=True, properties=output_properties)
462
463         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
464                     final.api_response()["name"],
465                     final.manifest_locator())
466
467         final_uuid = final.manifest_locator()
468         tags = tagsString.split(',')
469         for tag in tags:
470              self.api.links().create(body={
471                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
472                 }).execute(num_retries=self.num_retries)
473
474         def finalcollection(fileobj):
475             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
476
477         adjustDirObjs(outputObj, finalcollection)
478         adjustFileObjs(outputObj, finalcollection)
479
480         return (outputObj, final)
481
482     def set_crunch_output(self):
483         if self.work_api == "containers":
484             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
485             if current is None:
486                 return
487             try:
488                 self.api.containers().update(uuid=current['uuid'],
489                                              body={
490                                                  'output': self.final_output_collection.portable_data_hash(),
491                                                  'output_properties': self.final_output_collection.get_properties(),
492                                              }).execute(num_retries=self.num_retries)
493                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
494                                               body={
495                                                   'is_trashed': True
496                                               }).execute(num_retries=self.num_retries)
497             except Exception:
498                 logger.exception("Setting container output")
499                 raise
500
501     def apply_reqs(self, job_order_object, tool):
502         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
503             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
504                 raise WorkflowException(
505                     "`cwl:requirements` in the input object is not part of CWL "
506                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
507                     "can set the cwlVersion to v1.1 or greater and re-run with "
508                     "--enable-dev.")
509             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
510             for req in job_reqs:
511                 tool.requirements.append(req)
512
513     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
514         self.debug = runtimeContext.debug
515
516         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
517         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
518         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
519         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
520
521         updated_tool.visit(self.check_features)
522
523         self.pipeline = None
524         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
525         self.secret_store = runtimeContext.secret_store
526
527         self.trash_intermediate = runtimeContext.trash_intermediate
528         if self.trash_intermediate and self.work_api != "containers":
529             raise Exception("--trash-intermediate is only supported with --api=containers.")
530
531         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
532         if self.intermediate_output_ttl and self.work_api != "containers":
533             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
534         if self.intermediate_output_ttl < 0:
535             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
536
537         if runtimeContext.submit_request_uuid and self.work_api != "containers":
538             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
539
540         runtimeContext = runtimeContext.copy()
541
542         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
543         if runtimeContext.storage_classes == "default":
544             runtimeContext.storage_classes = default_storage_classes
545         if runtimeContext.intermediate_storage_classes == "default":
546             runtimeContext.intermediate_storage_classes = default_storage_classes
547
548         if not runtimeContext.name:
549             runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
550
551         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
552             # When creating or updating workflow record, by default
553             # always copy dependencies and ensure Docker images are up
554             # to date.
555             runtimeContext.copy_deps = True
556             runtimeContext.match_local_docker = True
557
558         if runtimeContext.update_workflow and self.project_uuid is None:
559             # If we are updating a workflow, make sure anything that
560             # gets uploaded goes into the same parent project, unless
561             # an alternate --project-uuid was provided.
562             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
563             runtimeContext.project_uuid = existing_wf["owner_uuid"]
564
565         self.project_uuid = runtimeContext.project_uuid
566
567         # Upload local file references in the job order.
568         with Perf(metrics, "upload_job_order"):
569             job_order = upload_job_order(self, "%s input" % runtimeContext.name,
570                                          updated_tool, job_order, runtimeContext)
571
572         # the last clause means: if it is a command line tool, and we
573         # are going to wait for the result, and always_submit_runner
574         # is false, then we don't submit a runner process.
575
576         submitting = (runtimeContext.update_workflow or
577                       runtimeContext.create_workflow or
578                       (runtimeContext.submit and not
579                        (updated_tool.tool["class"] == "CommandLineTool" and
580                         runtimeContext.wait and
581                         not runtimeContext.always_submit_runner)))
582
583         loadingContext = self.loadingContext.copy()
584         loadingContext.do_validate = False
585         loadingContext.disable_js_validation = True
586         if submitting:
587             loadingContext.do_update = False
588             # Document may have been auto-updated. Reload the original
589             # document with updating disabled because we want to
590             # submit the document with its original CWL version, not
591             # the auto-updated one.
592             with Perf(metrics, "load_tool original"):
593                 tool = load_tool(updated_tool.tool["id"], loadingContext)
594         else:
595             tool = updated_tool
596
597         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
598         # Also uploads docker images.
599         logger.info("Uploading workflow dependencies")
600         with Perf(metrics, "upload_workflow_deps"):
601             merged_map = upload_workflow_deps(self, tool, runtimeContext)
602
603         # Recreate process object (ArvadosWorkflow or
604         # ArvadosCommandTool) because tool document may have been
605         # updated by upload_workflow_deps in ways that modify
606         # inheritance of hints or requirements.
607         loadingContext.loader = tool.doc_loader
608         loadingContext.avsc_names = tool.doc_schema
609         loadingContext.metadata = tool.metadata
610         with Perf(metrics, "load_tool"):
611             tool = load_tool(tool.tool, loadingContext)
612
613         if runtimeContext.update_workflow or runtimeContext.create_workflow:
614             # Create a pipeline template or workflow record and exit.
615             if self.work_api == "containers":
616                 uuid = upload_workflow(self, tool, job_order,
617                                        runtimeContext.project_uuid,
618                                        runtimeContext,
619                                        uuid=runtimeContext.update_workflow,
620                                        submit_runner_ram=runtimeContext.submit_runner_ram,
621                                        name=runtimeContext.name,
622                                        merged_map=merged_map,
623                                        submit_runner_image=runtimeContext.submit_runner_image)
624                 self.stdout.write(uuid + "\n")
625                 return (None, "success")
626
627         self.apply_reqs(job_order, tool)
628
629         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
630         self.eval_timeout = runtimeContext.eval_timeout
631
632         runtimeContext.use_container = True
633         runtimeContext.tmpdir_prefix = "tmp"
634         runtimeContext.work_api = self.work_api
635
636         if not self.output_name:
637              self.output_name = "Output from workflow %s" % runtimeContext.name
638
639         self.output_name  = cleanup_name_for_collection(self.output_name)
640
641         if self.work_api == "containers":
642             if self.ignore_docker_for_reuse:
643                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
644             runtimeContext.outdir = "/var/spool/cwl"
645             runtimeContext.docker_outdir = "/var/spool/cwl"
646             runtimeContext.tmpdir = "/tmp"
647             runtimeContext.docker_tmpdir = "/tmp"
648
649         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
650             raise Exception("--priority must be in the range 1..1000.")
651
652         if self.should_estimate_cache_size:
653             visited = set()
654             estimated_size = [0]
655             def estimate_collection_cache(obj):
656                 if obj.get("location", "").startswith("keep:"):
657                     m = pdh_size.match(obj["location"][5:])
658                     if m and m.group(1) not in visited:
659                         visited.add(m.group(1))
660                         estimated_size[0] += int(m.group(2))
661             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
662             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
663             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
664
665         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
666
667         runnerjob = None
668         if runtimeContext.submit:
669             # Submit a runner job to run the workflow for us.
670             if self.work_api == "containers":
671                 if submitting:
672                     tool = RunnerContainer(self, updated_tool,
673                                            tool, loadingContext, runtimeContext.enable_reuse,
674                                            self.output_name,
675                                            self.output_tags,
676                                            submit_runner_ram=runtimeContext.submit_runner_ram,
677                                            name=runtimeContext.name,
678                                            on_error=runtimeContext.on_error,
679                                            submit_runner_image=runtimeContext.submit_runner_image,
680                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
681                                            merged_map=merged_map,
682                                            priority=runtimeContext.priority,
683                                            secret_store=self.secret_store,
684                                            collection_cache_size=runtimeContext.collection_cache_size,
685                                            collection_cache_is_default=self.should_estimate_cache_size)
686                 else:
687                     runtimeContext.runnerjob = tool.tool["id"]
688
689         if runtimeContext.cwl_runner_job is not None:
690             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
691
692         jobiter = tool.job(job_order,
693                            self.output_callback,
694                            runtimeContext)
695
696         if runtimeContext.submit and not runtimeContext.wait:
697             runnerjob = next(jobiter)
698             runnerjob.run(runtimeContext)
699             self.stdout.write(runnerjob.uuid+"\n")
700             return (None, "success")
701
702         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
703         if current_container:
704             logger.info("Running inside container %s", current_container.get("uuid"))
705
706         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
707         self.polling_thread = threading.Thread(target=self.poll_states)
708         self.polling_thread.start()
709
710         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
711
712         try:
713             self.workflow_eval_lock.acquire()
714
715             # Holds the lock while this code runs and releases it when
716             # it is safe to do so in self.workflow_eval_lock.wait(),
717             # at which point on_message can update job state and
718             # process output callbacks.
719
720             loopperf = Perf(metrics, "jobiter")
721             loopperf.__enter__()
722             for runnable in jobiter:
723                 loopperf.__exit__()
724
725                 if self.stop_polling.is_set():
726                     break
727
728                 if self.task_queue.error is not None:
729                     raise self.task_queue.error
730
731                 if runnable:
732                     with Perf(metrics, "run"):
733                         self.start_run(runnable, runtimeContext)
734                 else:
735                     if (self.task_queue.in_flight + len(self.processes)) > 0:
736                         self.workflow_eval_lock.wait(3)
737                     else:
738                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
739                         break
740
741                 if self.stop_polling.is_set():
742                     break
743
744                 loopperf.__enter__()
745             loopperf.__exit__()
746
747             while (self.task_queue.in_flight + len(self.processes)) > 0:
748                 if self.task_queue.error is not None:
749                     raise self.task_queue.error
750                 self.workflow_eval_lock.wait(3)
751
752         except UnsupportedRequirement:
753             raise
754         except:
755             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
756                 logger.error("Interrupted, workflow will be cancelled")
757             elif isinstance(sys.exc_info()[1], WorkflowException):
758                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
759             else:
760                 logger.exception("Workflow execution failed")
761
762             if self.pipeline:
763                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
764                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
765
766             if self.work_api == "containers" and not current_container:
767                 # Not running in a crunch container, so cancel any outstanding processes.
768                 for p in self.processes:
769                     try:
770                         self.api.container_requests().update(uuid=p,
771                                                              body={"priority": "0"}
772                         ).execute(num_retries=self.num_retries)
773                     except Exception:
774                         pass
775         finally:
776             self.workflow_eval_lock.release()
777             self.task_queue.drain()
778             self.stop_polling.set()
779             self.polling_thread.join()
780             self.task_queue.join()
781
782         if self.final_status == "UnsupportedRequirement":
783             raise UnsupportedRequirement("Check log for details.")
784
785         if self.final_output is None:
786             raise WorkflowException("Workflow did not return a result.")
787
788         if runtimeContext.submit and isinstance(tool, Runner):
789             logger.info("Final output collection %s", tool.final_output)
790             if workbench2 or workbench1:
791                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
792         else:
793             if self.output_tags is None:
794                 self.output_tags = ""
795
796             storage_classes = ""
797             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
798             if storage_class_req and storage_class_req.get("finalStorageClass"):
799                 storage_classes = aslist(storage_class_req["finalStorageClass"])
800             else:
801                 storage_classes = runtimeContext.storage_classes.strip().split(",")
802
803             output_properties = {}
804             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
805             if output_properties_req:
806                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
807                 for pr in output_properties_req["outputProperties"]:
808                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
809
810             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
811                                                                                           self.output_tags, output_properties,
812                                                                                           self.final_output)
813             self.set_crunch_output()
814
815         if runtimeContext.compute_checksum:
816             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
817             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
818
819         if self.trash_intermediate and self.final_status == "success":
820             self.trash_intermediate_output()
821
822         return (self.final_output, self.final_status)