Merge branch '19269-all-users-writable'
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21 import urllib
22
23 from cwltool.errors import WorkflowException
24 import cwltool.workflow
25 from schema_salad.sourceline import SourceLine
26 import schema_salad.validate as validate
27
28 import arvados
29 import arvados.config
30 from arvados.keep import KeepClient
31 from arvados.errors import ApiError
32
33 import arvados_cwl.util
34 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
35 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
36 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from cwltool.task_queue import TaskQueue
42 from .context import ArvLoadingContext, ArvRuntimeContext
43 from ._version import __version__
44
45 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
46 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
47 from cwltool.command_line_tool import compute_checksums
48 from cwltool.load_tool import load_tool
49
50 logger = logging.getLogger('arvados.cwl-runner')
51 metrics = logging.getLogger('arvados.cwl-runner.metrics')
52
53 DEFAULT_PRIORITY = 500
54
55 class RuntimeStatusLoggingHandler(logging.Handler):
56     """
57     Intercepts logging calls and report them as runtime statuses on runner
58     containers.
59     """
60     def __init__(self, runtime_status_update_func):
61         super(RuntimeStatusLoggingHandler, self).__init__()
62         self.runtime_status_update = runtime_status_update_func
63         self.updatingRuntimeStatus = False
64
65     def emit(self, record):
66         kind = None
67         if record.levelno >= logging.ERROR:
68             kind = 'error'
69         elif record.levelno >= logging.WARNING:
70             kind = 'warning'
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using containers API),
95     wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4,
104                  stdout=sys.stdout):
105
106         if arvargs is None:
107             arvargs = argparse.Namespace()
108             arvargs.work_api = None
109             arvargs.output_name = None
110             arvargs.output_tags = None
111             arvargs.thread_count = 1
112             arvargs.collection_cache_size = None
113
114         self.api = api_client
115         self.processes = {}
116         self.workflow_eval_lock = threading.Condition(threading.RLock())
117         self.final_output = None
118         self.final_status = None
119         self.num_retries = num_retries
120         self.uuid = None
121         self.stop_polling = threading.Event()
122         self.poll_api = None
123         self.pipeline = None
124         self.final_output_collection = None
125         self.output_name = arvargs.output_name
126         self.output_tags = arvargs.output_tags
127         self.project_uuid = None
128         self.intermediate_output_ttl = 0
129         self.intermediate_output_collections = []
130         self.trash_intermediate = False
131         self.thread_count = arvargs.thread_count
132         self.poll_interval = 12
133         self.loadingContext = None
134         self.should_estimate_cache_size = True
135         self.fs_access = None
136         self.secret_store = None
137         self.stdout = stdout
138
139         if keep_client is not None:
140             self.keep_client = keep_client
141         else:
142             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
143
144         if arvargs.collection_cache_size:
145             collection_cache_size = arvargs.collection_cache_size*1024*1024
146             self.should_estimate_cache_size = False
147         else:
148             collection_cache_size = 256*1024*1024
149
150         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
151                                                 cap=collection_cache_size)
152
153         self.fetcher_constructor = partial(CollectionFetcher,
154                                            api_client=self.api,
155                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
156                                            num_retries=self.num_retries)
157
158         self.work_api = None
159         expected_api = ["containers"]
160         for api in expected_api:
161             try:
162                 methods = self.api._rootDesc.get('resources')[api]['methods']
163                 if ('httpMethod' in methods['create'] and
164                     (arvargs.work_api == api or arvargs.work_api is None)):
165                     self.work_api = api
166                     break
167             except KeyError:
168                 pass
169
170         if not self.work_api:
171             if arvargs.work_api is None:
172                 raise Exception("No supported APIs")
173             else:
174                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
175
176         if self.work_api == "jobs":
177             logger.error("""
178 *******************************
179 The 'jobs' API is no longer supported.
180 *******************************""")
181             exit(1)
182
183         self.loadingContext = ArvLoadingContext(vars(arvargs))
184         self.loadingContext.fetcher_constructor = self.fetcher_constructor
185         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
186         self.loadingContext.construct_tool_object = self.arv_make_tool
187
188         # Add a custom logging handler to the root logger for runtime status reporting
189         # if running inside a container
190         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
191             root_logger = logging.getLogger('')
192
193             # Remove existing RuntimeStatusLoggingHandlers if they exist
194             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
195             root_logger.handlers = handlers
196
197             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
198             root_logger.addHandler(handler)
199
200         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
201         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
202                                                      collection_cache=self.collection_cache)
203
204         validate_cluster_target(self, self.toplevel_runtimeContext)
205
206
207     def arv_make_tool(self, toolpath_object, loadingContext):
208         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
209             return ArvadosCommandTool(self, toolpath_object, loadingContext)
210         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
211             return ArvadosWorkflow(self, toolpath_object, loadingContext)
212         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
213             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
214         else:
215             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
216
217     def output_callback(self, out, processStatus):
218         with self.workflow_eval_lock:
219             if processStatus == "success":
220                 logger.info("Overall process status is %s", processStatus)
221                 state = "Complete"
222             else:
223                 logger.error("Overall process status is %s", processStatus)
224                 state = "Failed"
225             if self.pipeline:
226                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
227                                                         body={"state": state}).execute(num_retries=self.num_retries)
228             self.final_status = processStatus
229             self.final_output = out
230             self.workflow_eval_lock.notifyAll()
231
232
233     def start_run(self, runnable, runtimeContext):
234         self.task_queue.add(partial(runnable.run, runtimeContext),
235                             self.workflow_eval_lock, self.stop_polling)
236
237     def process_submitted(self, container):
238         with self.workflow_eval_lock:
239             self.processes[container.uuid] = container
240
241     def process_done(self, uuid, record):
242         with self.workflow_eval_lock:
243             j = self.processes[uuid]
244             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
245             self.task_queue.add(partial(j.done, record),
246                                 self.workflow_eval_lock, self.stop_polling)
247             del self.processes[uuid]
248
249     def runtime_status_update(self, kind, message, detail=None):
250         """
251         Updates the runtime_status field on the runner container.
252         Called when there's a need to report errors, warnings or just
253         activity statuses, for example in the RuntimeStatusLoggingHandler.
254         """
255
256         if kind not in ('error', 'warning'):
257             # Ignore any other status kind
258             return
259
260         with self.workflow_eval_lock:
261             current = None
262             try:
263                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
264             except Exception as e:
265                 logger.info("Couldn't get current container: %s", e)
266             if current is None:
267                 return
268             runtime_status = current.get('runtime_status', {})
269
270             original_updatemessage = updatemessage = runtime_status.get(kind, "")
271             if not updatemessage:
272                 updatemessage = message
273
274             # Subsequent messages tacked on in detail
275             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
276             maxlines = 40
277             if updatedetail.count("\n") < maxlines:
278                 if updatedetail:
279                     updatedetail += "\n"
280                 updatedetail += message + "\n"
281
282                 if detail:
283                     updatedetail += detail + "\n"
284
285                 if updatedetail.count("\n") >= maxlines:
286                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
287
288             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
289                 # don't waste time doing an update if nothing changed
290                 # (usually because we exceeded the max lines)
291                 return
292
293             runtime_status.update({
294                 kind: updatemessage,
295                 kind+'Detail': updatedetail,
296             })
297
298             try:
299                 self.api.containers().update(uuid=current['uuid'],
300                                             body={
301                                                 'runtime_status': runtime_status,
302                                             }).execute(num_retries=self.num_retries)
303             except Exception as e:
304                 logger.info("Couldn't update runtime_status: %s", e)
305
306     def wrapped_callback(self, cb, obj, st):
307         with self.workflow_eval_lock:
308             cb(obj, st)
309             self.workflow_eval_lock.notifyAll()
310
311     def get_wrapped_callback(self, cb):
312         return partial(self.wrapped_callback, cb)
313
314     def on_message(self, event):
315         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
316             uuid = event["object_uuid"]
317             if event["properties"]["new_attributes"]["state"] == "Running":
318                 with self.workflow_eval_lock:
319                     j = self.processes[uuid]
320                     if j.running is False:
321                         j.running = True
322                         j.update_pipeline_component(event["properties"]["new_attributes"])
323                         logger.info("%s %s is Running", self.label(j), uuid)
324             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
325                 self.process_done(uuid, event["properties"]["new_attributes"])
326
327     def label(self, obj):
328         return "[%s %s]" % (self.work_api[0:-1], obj.name)
329
330     def poll_states(self):
331         """Poll status of containers listed in the processes dict.
332
333         Runs in a separate thread.
334         """
335
336         try:
337             remain_wait = self.poll_interval
338             while True:
339                 if remain_wait > 0:
340                     self.stop_polling.wait(remain_wait)
341                 if self.stop_polling.is_set():
342                     break
343                 with self.workflow_eval_lock:
344                     keys = list(self.processes)
345                 if not keys:
346                     remain_wait = self.poll_interval
347                     continue
348
349                 begin_poll = time.time()
350                 if self.work_api == "containers":
351                     table = self.poll_api.container_requests()
352
353                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
354
355                 while keys:
356                     page = keys[:pageSize]
357                     try:
358                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
359                     except Exception:
360                         logger.exception("Error checking states on API server: %s")
361                         remain_wait = self.poll_interval
362                         continue
363
364                     for p in proc_states["items"]:
365                         self.on_message({
366                             "object_uuid": p["uuid"],
367                             "event_type": "update",
368                             "properties": {
369                                 "new_attributes": p
370                             }
371                         })
372                     keys = keys[pageSize:]
373
374                 finish_poll = time.time()
375                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
376         except:
377             logger.exception("Fatal error in state polling thread.")
378             with self.workflow_eval_lock:
379                 self.processes.clear()
380                 self.workflow_eval_lock.notifyAll()
381         finally:
382             self.stop_polling.set()
383
384     def add_intermediate_output(self, uuid):
385         if uuid:
386             self.intermediate_output_collections.append(uuid)
387
388     def trash_intermediate_output(self):
389         logger.info("Cleaning up intermediate output collections")
390         for i in self.intermediate_output_collections:
391             try:
392                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
393             except Exception:
394                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
395             except (KeyboardInterrupt, SystemExit):
396                 break
397
398     def check_features(self, obj, parentfield=""):
399         if isinstance(obj, dict):
400             if obj.get("class") == "DockerRequirement":
401                 if obj.get("dockerOutputDirectory"):
402                     if not obj.get("dockerOutputDirectory").startswith('/'):
403                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
404                             "Option 'dockerOutputDirectory' must be an absolute path.")
405             if obj.get("class") == "InplaceUpdateRequirement":
406                 if obj["inplaceUpdate"] and parentfield == "requirements":
407                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
408             for k,v in viewitems(obj):
409                 self.check_features(v, parentfield=k)
410         elif isinstance(obj, list):
411             for i,v in enumerate(obj):
412                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
413                     self.check_features(v, parentfield=parentfield)
414
415     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
416         outputObj = copy.deepcopy(outputObj)
417
418         files = []
419         def capture(fileobj):
420             files.append(fileobj)
421
422         adjustDirObjs(outputObj, capture)
423         adjustFileObjs(outputObj, capture)
424
425         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
426
427         final = arvados.collection.Collection(api_client=self.api,
428                                               keep_client=self.keep_client,
429                                               num_retries=self.num_retries)
430
431         for k,v in generatemapper.items():
432             if v.type == "Directory" and v.resolved.startswith("_:"):
433                     continue
434             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
435                 with final.open(v.target, "wb") as f:
436                     f.write(v.resolved.encode("utf-8"))
437                     continue
438
439             if not v.resolved.startswith("keep:"):
440                 raise Exception("Output source is not in keep or a literal")
441             sp = v.resolved.split("/")
442             srccollection = sp[0][5:]
443             try:
444                 reader = self.collection_cache.get(srccollection)
445                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
446                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
447             except arvados.errors.ArgumentError as e:
448                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
449                 raise
450             except IOError as e:
451                 logger.error("While preparing output collection: %s", e)
452                 raise
453
454         def rewrite(fileobj):
455             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
456             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
457                 if k in fileobj:
458                     del fileobj[k]
459
460         adjustDirObjs(outputObj, rewrite)
461         adjustFileObjs(outputObj, rewrite)
462
463         with final.open("cwl.output.json", "w") as f:
464             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
465             f.write(res)
466
467
468         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
469                        ensure_unique_name=True, properties=output_properties)
470
471         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
472                     final.api_response()["name"],
473                     final.manifest_locator())
474
475         final_uuid = final.manifest_locator()
476         tags = tagsString.split(',')
477         for tag in tags:
478              self.api.links().create(body={
479                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
480                 }).execute(num_retries=self.num_retries)
481
482         def finalcollection(fileobj):
483             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
484
485         adjustDirObjs(outputObj, finalcollection)
486         adjustFileObjs(outputObj, finalcollection)
487
488         return (outputObj, final)
489
490     def set_crunch_output(self):
491         if self.work_api == "containers":
492             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
493             if current is None:
494                 return
495             try:
496                 self.api.containers().update(uuid=current['uuid'],
497                                              body={
498                                                  'output': self.final_output_collection.portable_data_hash(),
499                                                  'output_properties': self.final_output_collection.get_properties(),
500                                              }).execute(num_retries=self.num_retries)
501                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
502                                               body={
503                                                   'is_trashed': True
504                                               }).execute(num_retries=self.num_retries)
505             except Exception:
506                 logger.exception("Setting container output")
507                 raise
508
509     def apply_reqs(self, job_order_object, tool):
510         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
511             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
512                 raise WorkflowException(
513                     "`cwl:requirements` in the input object is not part of CWL "
514                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
515                     "can set the cwlVersion to v1.1 or greater and re-run with "
516                     "--enable-dev.")
517             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
518             for req in job_reqs:
519                 tool.requirements.append(req)
520
521     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
522         self.debug = runtimeContext.debug
523
524         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
525         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
526         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
527         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
528
529         updated_tool.visit(self.check_features)
530
531         self.pipeline = None
532         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
533         self.secret_store = runtimeContext.secret_store
534
535         self.trash_intermediate = runtimeContext.trash_intermediate
536         if self.trash_intermediate and self.work_api != "containers":
537             raise Exception("--trash-intermediate is only supported with --api=containers.")
538
539         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
540         if self.intermediate_output_ttl and self.work_api != "containers":
541             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
542         if self.intermediate_output_ttl < 0:
543             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
544
545         if runtimeContext.submit_request_uuid and self.work_api != "containers":
546             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
547
548         runtimeContext = runtimeContext.copy()
549
550         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
551         if runtimeContext.storage_classes == "default":
552             runtimeContext.storage_classes = default_storage_classes
553         if runtimeContext.intermediate_storage_classes == "default":
554             runtimeContext.intermediate_storage_classes = default_storage_classes
555
556         if not runtimeContext.name:
557             runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
558
559         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
560             # When creating or updating workflow record, by default
561             # always copy dependencies and ensure Docker images are up
562             # to date.
563             runtimeContext.copy_deps = True
564             runtimeContext.match_local_docker = True
565
566         if runtimeContext.update_workflow and self.project_uuid is None:
567             # If we are updating a workflow, make sure anything that
568             # gets uploaded goes into the same parent project, unless
569             # an alternate --project-uuid was provided.
570             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
571             runtimeContext.project_uuid = existing_wf["owner_uuid"]
572
573         self.project_uuid = runtimeContext.project_uuid
574
575         # Upload local file references in the job order.
576         with Perf(metrics, "upload_job_order"):
577             job_order = upload_job_order(self, "%s input" % runtimeContext.name,
578                                          updated_tool, job_order, runtimeContext)
579
580         # the last clause means: if it is a command line tool, and we
581         # are going to wait for the result, and always_submit_runner
582         # is false, then we don't submit a runner process.
583
584         submitting = (runtimeContext.update_workflow or
585                       runtimeContext.create_workflow or
586                       (runtimeContext.submit and not
587                        (updated_tool.tool["class"] == "CommandLineTool" and
588                         runtimeContext.wait and
589                         not runtimeContext.always_submit_runner)))
590
591         loadingContext = self.loadingContext.copy()
592         loadingContext.do_validate = False
593         loadingContext.disable_js_validation = True
594         if submitting:
595             loadingContext.do_update = False
596             # Document may have been auto-updated. Reload the original
597             # document with updating disabled because we want to
598             # submit the document with its original CWL version, not
599             # the auto-updated one.
600             with Perf(metrics, "load_tool original"):
601                 tool = load_tool(updated_tool.tool["id"], loadingContext)
602         else:
603             tool = updated_tool
604
605         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
606         # Also uploads docker images.
607         logger.info("Uploading workflow dependencies")
608         with Perf(metrics, "upload_workflow_deps"):
609             merged_map = upload_workflow_deps(self, tool, runtimeContext)
610
611         # Recreate process object (ArvadosWorkflow or
612         # ArvadosCommandTool) because tool document may have been
613         # updated by upload_workflow_deps in ways that modify
614         # inheritance of hints or requirements.
615         loadingContext.loader = tool.doc_loader
616         loadingContext.avsc_names = tool.doc_schema
617         loadingContext.metadata = tool.metadata
618         with Perf(metrics, "load_tool"):
619             tool = load_tool(tool.tool, loadingContext)
620
621         if runtimeContext.update_workflow or runtimeContext.create_workflow:
622             # Create a pipeline template or workflow record and exit.
623             if self.work_api == "containers":
624                 uuid = upload_workflow(self, tool, job_order,
625                                        runtimeContext.project_uuid,
626                                        runtimeContext,
627                                        uuid=runtimeContext.update_workflow,
628                                        submit_runner_ram=runtimeContext.submit_runner_ram,
629                                        name=runtimeContext.name,
630                                        merged_map=merged_map,
631                                        submit_runner_image=runtimeContext.submit_runner_image)
632                 self.stdout.write(uuid + "\n")
633                 return (None, "success")
634
635         self.apply_reqs(job_order, tool)
636
637         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
638         self.eval_timeout = runtimeContext.eval_timeout
639
640         runtimeContext.use_container = True
641         runtimeContext.tmpdir_prefix = "tmp"
642         runtimeContext.work_api = self.work_api
643
644         if not self.output_name:
645              self.output_name = "Output from workflow %s" % runtimeContext.name
646
647         self.output_name  = cleanup_name_for_collection(self.output_name)
648
649         if self.work_api == "containers":
650             if self.ignore_docker_for_reuse:
651                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
652             runtimeContext.outdir = "/var/spool/cwl"
653             runtimeContext.docker_outdir = "/var/spool/cwl"
654             runtimeContext.tmpdir = "/tmp"
655             runtimeContext.docker_tmpdir = "/tmp"
656
657         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
658             raise Exception("--priority must be in the range 1..1000.")
659
660         if self.should_estimate_cache_size:
661             visited = set()
662             estimated_size = [0]
663             def estimate_collection_cache(obj):
664                 if obj.get("location", "").startswith("keep:"):
665                     m = pdh_size.match(obj["location"][5:])
666                     if m and m.group(1) not in visited:
667                         visited.add(m.group(1))
668                         estimated_size[0] += int(m.group(2))
669             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
670             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
671             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
672
673         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
674
675         runnerjob = None
676         if runtimeContext.submit:
677             # Submit a runner job to run the workflow for us.
678             if self.work_api == "containers":
679                 if submitting:
680                     tool = RunnerContainer(self, updated_tool,
681                                            tool, loadingContext, runtimeContext.enable_reuse,
682                                            self.output_name,
683                                            self.output_tags,
684                                            submit_runner_ram=runtimeContext.submit_runner_ram,
685                                            name=runtimeContext.name,
686                                            on_error=runtimeContext.on_error,
687                                            submit_runner_image=runtimeContext.submit_runner_image,
688                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
689                                            merged_map=merged_map,
690                                            priority=runtimeContext.priority,
691                                            secret_store=self.secret_store,
692                                            collection_cache_size=runtimeContext.collection_cache_size,
693                                            collection_cache_is_default=self.should_estimate_cache_size)
694                 else:
695                     runtimeContext.runnerjob = tool.tool["id"]
696
697         if runtimeContext.cwl_runner_job is not None:
698             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
699
700         jobiter = tool.job(job_order,
701                            self.output_callback,
702                            runtimeContext)
703
704         if runtimeContext.submit and not runtimeContext.wait:
705             runnerjob = next(jobiter)
706             runnerjob.run(runtimeContext)
707             self.stdout.write(runnerjob.uuid+"\n")
708             return (None, "success")
709
710         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
711         if current_container:
712             logger.info("Running inside container %s", current_container.get("uuid"))
713
714         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
715         self.polling_thread = threading.Thread(target=self.poll_states)
716         self.polling_thread.start()
717
718         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
719
720         try:
721             self.workflow_eval_lock.acquire()
722
723             # Holds the lock while this code runs and releases it when
724             # it is safe to do so in self.workflow_eval_lock.wait(),
725             # at which point on_message can update job state and
726             # process output callbacks.
727
728             loopperf = Perf(metrics, "jobiter")
729             loopperf.__enter__()
730             for runnable in jobiter:
731                 loopperf.__exit__()
732
733                 if self.stop_polling.is_set():
734                     break
735
736                 if self.task_queue.error is not None:
737                     raise self.task_queue.error
738
739                 if runnable:
740                     with Perf(metrics, "run"):
741                         self.start_run(runnable, runtimeContext)
742                 else:
743                     if (self.task_queue.in_flight + len(self.processes)) > 0:
744                         self.workflow_eval_lock.wait(3)
745                     else:
746                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
747                         break
748
749                 if self.stop_polling.is_set():
750                     break
751
752                 loopperf.__enter__()
753             loopperf.__exit__()
754
755             while (self.task_queue.in_flight + len(self.processes)) > 0:
756                 if self.task_queue.error is not None:
757                     raise self.task_queue.error
758                 self.workflow_eval_lock.wait(3)
759
760         except UnsupportedRequirement:
761             raise
762         except:
763             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
764                 logger.error("Interrupted, workflow will be cancelled")
765             elif isinstance(sys.exc_info()[1], WorkflowException):
766                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
767             else:
768                 logger.exception("Workflow execution failed")
769
770             if self.pipeline:
771                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
772                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
773
774             if self.work_api == "containers" and not current_container:
775                 # Not running in a crunch container, so cancel any outstanding processes.
776                 for p in self.processes:
777                     try:
778                         self.api.container_requests().update(uuid=p,
779                                                              body={"priority": "0"}
780                         ).execute(num_retries=self.num_retries)
781                     except Exception:
782                         pass
783         finally:
784             self.workflow_eval_lock.release()
785             self.task_queue.drain()
786             self.stop_polling.set()
787             self.polling_thread.join()
788             self.task_queue.join()
789
790         if self.final_status == "UnsupportedRequirement":
791             raise UnsupportedRequirement("Check log for details.")
792
793         if self.final_output is None:
794             raise WorkflowException("Workflow did not return a result.")
795
796         if runtimeContext.submit and isinstance(tool, Runner):
797             logger.info("Final output collection %s", tool.final_output)
798             if workbench2 or workbench1:
799                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
800         else:
801             if self.output_tags is None:
802                 self.output_tags = ""
803
804             storage_classes = ""
805             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
806             if storage_class_req and storage_class_req.get("finalStorageClass"):
807                 storage_classes = aslist(storage_class_req["finalStorageClass"])
808             else:
809                 storage_classes = runtimeContext.storage_classes.strip().split(",")
810
811             output_properties = {}
812             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
813             if output_properties_req:
814                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
815                 for pr in output_properties_req["outputProperties"]:
816                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
817
818             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
819                                                                                           self.output_tags, output_properties,
820                                                                                           self.final_output)
821             self.set_crunch_output()
822
823         if runtimeContext.compute_checksum:
824             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
825             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
826
827         if self.trash_intermediate and self.final_status == "success":
828             self.trash_intermediate_output()
829
830         return (self.final_output, self.final_status)