19688: Trying a fast path submit for registered workflows
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import subprocess
21 import time
22 import urllib
23
24 from cwltool.errors import WorkflowException
25 import cwltool.workflow
26 from schema_salad.sourceline import SourceLine
27 import schema_salad.validate as validate
28 from schema_salad.ref_resolver import file_uri, uri_file_path
29
30 import arvados
31 import arvados.config
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
34
35 import arvados_cwl.util
36 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
37 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
38 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from cwltool.task_queue import TaskQueue
44 from .context import ArvLoadingContext, ArvRuntimeContext
45 from ._version import __version__
46
47 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
48 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
49 from cwltool.command_line_tool import compute_checksums
50 from cwltool.load_tool import load_tool
51
52 logger = logging.getLogger('arvados.cwl-runner')
53 metrics = logging.getLogger('arvados.cwl-runner.metrics')
54
55 DEFAULT_PRIORITY = 500
56
57 class RuntimeStatusLoggingHandler(logging.Handler):
58     """
59     Intercepts logging calls and report them as runtime statuses on runner
60     containers.
61     """
62     def __init__(self, runtime_status_update_func):
63         super(RuntimeStatusLoggingHandler, self).__init__()
64         self.runtime_status_update = runtime_status_update_func
65         self.updatingRuntimeStatus = False
66
67     def emit(self, record):
68         kind = None
69         if record.levelno >= logging.ERROR:
70             kind = 'error'
71         elif record.levelno >= logging.WARNING:
72             kind = 'warning'
73         if kind is not None and self.updatingRuntimeStatus is not True:
74             self.updatingRuntimeStatus = True
75             try:
76                 log_msg = record.getMessage()
77                 if '\n' in log_msg:
78                     # If the logged message is multi-line, use its first line as status
79                     # and the rest as detail.
80                     status, detail = log_msg.split('\n', 1)
81                     self.runtime_status_update(
82                         kind,
83                         "%s: %s" % (record.name, status),
84                         detail
85                     )
86                 else:
87                     self.runtime_status_update(
88                         kind,
89                         "%s: %s" % (record.name, record.getMessage())
90                     )
91             finally:
92                 self.updatingRuntimeStatus = False
93
94
95 class ArvCwlExecutor(object):
96     """Execute a CWL tool or workflow, submit work (using containers API),
97     wait for them to complete, and report output.
98
99     """
100
101     def __init__(self, api_client,
102                  arvargs=None,
103                  keep_client=None,
104                  num_retries=4,
105                  thread_count=4,
106                  stdout=sys.stdout):
107
108         if arvargs is None:
109             arvargs = argparse.Namespace()
110             arvargs.work_api = None
111             arvargs.output_name = None
112             arvargs.output_tags = None
113             arvargs.thread_count = 1
114             arvargs.collection_cache_size = None
115
116         self.api = api_client
117         self.processes = {}
118         self.workflow_eval_lock = threading.Condition(threading.RLock())
119         self.final_output = None
120         self.final_status = None
121         self.num_retries = num_retries
122         self.uuid = None
123         self.stop_polling = threading.Event()
124         self.poll_api = None
125         self.pipeline = None
126         self.final_output_collection = None
127         self.output_name = arvargs.output_name
128         self.output_tags = arvargs.output_tags
129         self.project_uuid = None
130         self.intermediate_output_ttl = 0
131         self.intermediate_output_collections = []
132         self.trash_intermediate = False
133         self.thread_count = arvargs.thread_count
134         self.poll_interval = 12
135         self.loadingContext = None
136         self.should_estimate_cache_size = True
137         self.fs_access = None
138         self.secret_store = None
139         self.stdout = stdout
140         self.fast_submit = False
141
142         if keep_client is not None:
143             self.keep_client = keep_client
144         else:
145             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
146
147         if arvargs.collection_cache_size:
148             collection_cache_size = arvargs.collection_cache_size*1024*1024
149             self.should_estimate_cache_size = False
150         else:
151             collection_cache_size = 256*1024*1024
152
153         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
154                                                 cap=collection_cache_size)
155
156         self.fetcher_constructor = partial(CollectionFetcher,
157                                            api_client=self.api,
158                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
159                                            num_retries=self.num_retries)
160
161         self.work_api = None
162         expected_api = ["containers"]
163         for api in expected_api:
164             try:
165                 methods = self.api._rootDesc.get('resources')[api]['methods']
166                 if ('httpMethod' in methods['create'] and
167                     (arvargs.work_api == api or arvargs.work_api is None)):
168                     self.work_api = api
169                     break
170             except KeyError:
171                 pass
172
173         if not self.work_api:
174             if arvargs.work_api is None:
175                 raise Exception("No supported APIs")
176             else:
177                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
178
179         if self.work_api == "jobs":
180             logger.error("""
181 *******************************
182 The 'jobs' API is no longer supported.
183 *******************************""")
184             exit(1)
185
186         self.loadingContext = ArvLoadingContext(vars(arvargs))
187         self.loadingContext.fetcher_constructor = self.fetcher_constructor
188         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
189         self.loadingContext.construct_tool_object = self.arv_make_tool
190
191         # Add a custom logging handler to the root logger for runtime status reporting
192         # if running inside a container
193         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
194             root_logger = logging.getLogger('')
195
196             # Remove existing RuntimeStatusLoggingHandlers if they exist
197             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
198             root_logger.handlers = handlers
199
200             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
201             root_logger.addHandler(handler)
202
203         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
204         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
205                                                      collection_cache=self.collection_cache)
206
207         validate_cluster_target(self, self.toplevel_runtimeContext)
208
209
210     def arv_make_tool(self, toolpath_object, loadingContext):
211         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
212             return ArvadosCommandTool(self, toolpath_object, loadingContext)
213         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
214             return ArvadosWorkflow(self, toolpath_object, loadingContext)
215         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
216             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
217         else:
218             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
219
220     def output_callback(self, out, processStatus):
221         with self.workflow_eval_lock:
222             if processStatus == "success":
223                 logger.info("Overall process status is %s", processStatus)
224                 state = "Complete"
225             else:
226                 logger.error("Overall process status is %s", processStatus)
227                 state = "Failed"
228             if self.pipeline:
229                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
230                                                         body={"state": state}).execute(num_retries=self.num_retries)
231             self.final_status = processStatus
232             self.final_output = out
233             self.workflow_eval_lock.notifyAll()
234
235
236     def start_run(self, runnable, runtimeContext):
237         self.task_queue.add(partial(runnable.run, runtimeContext),
238                             self.workflow_eval_lock, self.stop_polling)
239
240     def process_submitted(self, container):
241         with self.workflow_eval_lock:
242             self.processes[container.uuid] = container
243
244     def process_done(self, uuid, record):
245         with self.workflow_eval_lock:
246             j = self.processes[uuid]
247             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
248             self.task_queue.add(partial(j.done, record),
249                                 self.workflow_eval_lock, self.stop_polling)
250             del self.processes[uuid]
251
252     def runtime_status_update(self, kind, message, detail=None):
253         """
254         Updates the runtime_status field on the runner container.
255         Called when there's a need to report errors, warnings or just
256         activity statuses, for example in the RuntimeStatusLoggingHandler.
257         """
258
259         if kind not in ('error', 'warning'):
260             # Ignore any other status kind
261             return
262
263         with self.workflow_eval_lock:
264             current = None
265             try:
266                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
267             except Exception as e:
268                 logger.info("Couldn't get current container: %s", e)
269             if current is None:
270                 return
271             runtime_status = current.get('runtime_status', {})
272
273             original_updatemessage = updatemessage = runtime_status.get(kind, "")
274             if not updatemessage:
275                 updatemessage = message
276
277             # Subsequent messages tacked on in detail
278             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
279             maxlines = 40
280             if updatedetail.count("\n") < maxlines:
281                 if updatedetail:
282                     updatedetail += "\n"
283                 updatedetail += message + "\n"
284
285                 if detail:
286                     updatedetail += detail + "\n"
287
288                 if updatedetail.count("\n") >= maxlines:
289                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
290
291             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
292                 # don't waste time doing an update if nothing changed
293                 # (usually because we exceeded the max lines)
294                 return
295
296             runtime_status.update({
297                 kind: updatemessage,
298                 kind+'Detail': updatedetail,
299             })
300
301             try:
302                 self.api.containers().update(uuid=current['uuid'],
303                                             body={
304                                                 'runtime_status': runtime_status,
305                                             }).execute(num_retries=self.num_retries)
306             except Exception as e:
307                 logger.info("Couldn't update runtime_status: %s", e)
308
309     def wrapped_callback(self, cb, obj, st):
310         with self.workflow_eval_lock:
311             cb(obj, st)
312             self.workflow_eval_lock.notifyAll()
313
314     def get_wrapped_callback(self, cb):
315         return partial(self.wrapped_callback, cb)
316
317     def on_message(self, event):
318         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
319             uuid = event["object_uuid"]
320             if event["properties"]["new_attributes"]["state"] == "Running":
321                 with self.workflow_eval_lock:
322                     j = self.processes[uuid]
323                     if j.running is False:
324                         j.running = True
325                         j.update_pipeline_component(event["properties"]["new_attributes"])
326                         logger.info("%s %s is Running", self.label(j), uuid)
327             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
328                 self.process_done(uuid, event["properties"]["new_attributes"])
329
330     def label(self, obj):
331         return "[%s %s]" % (self.work_api[0:-1], obj.name)
332
333     def poll_states(self):
334         """Poll status of containers listed in the processes dict.
335
336         Runs in a separate thread.
337         """
338
339         try:
340             remain_wait = self.poll_interval
341             while True:
342                 if remain_wait > 0:
343                     self.stop_polling.wait(remain_wait)
344                 if self.stop_polling.is_set():
345                     break
346                 with self.workflow_eval_lock:
347                     keys = list(self.processes)
348                 if not keys:
349                     remain_wait = self.poll_interval
350                     continue
351
352                 begin_poll = time.time()
353                 if self.work_api == "containers":
354                     table = self.poll_api.container_requests()
355
356                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
357
358                 while keys:
359                     page = keys[:pageSize]
360                     try:
361                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
362                     except Exception:
363                         logger.exception("Error checking states on API server: %s")
364                         remain_wait = self.poll_interval
365                         continue
366
367                     for p in proc_states["items"]:
368                         self.on_message({
369                             "object_uuid": p["uuid"],
370                             "event_type": "update",
371                             "properties": {
372                                 "new_attributes": p
373                             }
374                         })
375                     keys = keys[pageSize:]
376
377                 finish_poll = time.time()
378                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
379         except:
380             logger.exception("Fatal error in state polling thread.")
381             with self.workflow_eval_lock:
382                 self.processes.clear()
383                 self.workflow_eval_lock.notifyAll()
384         finally:
385             self.stop_polling.set()
386
387     def add_intermediate_output(self, uuid):
388         if uuid:
389             self.intermediate_output_collections.append(uuid)
390
391     def trash_intermediate_output(self):
392         logger.info("Cleaning up intermediate output collections")
393         for i in self.intermediate_output_collections:
394             try:
395                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
396             except Exception:
397                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
398             except (KeyboardInterrupt, SystemExit):
399                 break
400
401     def check_features(self, obj, parentfield=""):
402         if isinstance(obj, dict):
403             if obj.get("class") == "DockerRequirement":
404                 if obj.get("dockerOutputDirectory"):
405                     if not obj.get("dockerOutputDirectory").startswith('/'):
406                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
407                             "Option 'dockerOutputDirectory' must be an absolute path.")
408             if obj.get("class") == "InplaceUpdateRequirement":
409                 if obj["inplaceUpdate"] and parentfield == "requirements":
410                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
411             for k,v in viewitems(obj):
412                 self.check_features(v, parentfield=k)
413         elif isinstance(obj, list):
414             for i,v in enumerate(obj):
415                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
416                     self.check_features(v, parentfield=parentfield)
417
418     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
419         outputObj = copy.deepcopy(outputObj)
420
421         files = []
422         def capture(fileobj):
423             files.append(fileobj)
424
425         adjustDirObjs(outputObj, capture)
426         adjustFileObjs(outputObj, capture)
427
428         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
429
430         final = arvados.collection.Collection(api_client=self.api,
431                                               keep_client=self.keep_client,
432                                               num_retries=self.num_retries)
433
434         for k,v in generatemapper.items():
435             if v.type == "Directory" and v.resolved.startswith("_:"):
436                     continue
437             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
438                 with final.open(v.target, "wb") as f:
439                     f.write(v.resolved.encode("utf-8"))
440                     continue
441
442             if not v.resolved.startswith("keep:"):
443                 raise Exception("Output source is not in keep or a literal")
444             sp = v.resolved.split("/")
445             srccollection = sp[0][5:]
446             try:
447                 reader = self.collection_cache.get(srccollection)
448                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
449                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
450             except arvados.errors.ArgumentError as e:
451                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
452                 raise
453             except IOError as e:
454                 logger.error("While preparing output collection: %s", e)
455                 raise
456
457         def rewrite(fileobj):
458             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
459             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
460                 if k in fileobj:
461                     del fileobj[k]
462
463         adjustDirObjs(outputObj, rewrite)
464         adjustFileObjs(outputObj, rewrite)
465
466         with final.open("cwl.output.json", "w") as f:
467             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
468             f.write(res)
469
470
471         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
472                        ensure_unique_name=True, properties=output_properties)
473
474         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
475                     final.api_response()["name"],
476                     final.manifest_locator())
477
478         final_uuid = final.manifest_locator()
479         tags = tagsString.split(',')
480         for tag in tags:
481              self.api.links().create(body={
482                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
483                 }).execute(num_retries=self.num_retries)
484
485         def finalcollection(fileobj):
486             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
487
488         adjustDirObjs(outputObj, finalcollection)
489         adjustFileObjs(outputObj, finalcollection)
490
491         return (outputObj, final)
492
493     def set_crunch_output(self):
494         if self.work_api == "containers":
495             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
496             if current is None:
497                 return
498             try:
499                 self.api.containers().update(uuid=current['uuid'],
500                                              body={
501                                                  'output': self.final_output_collection.portable_data_hash(),
502                                                  'output_properties': self.final_output_collection.get_properties(),
503                                              }).execute(num_retries=self.num_retries)
504                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
505                                               body={
506                                                   'is_trashed': True
507                                               }).execute(num_retries=self.num_retries)
508             except Exception:
509                 logger.exception("Setting container output")
510                 raise
511
512     def apply_reqs(self, job_order_object, tool):
513         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
514             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
515                 raise WorkflowException(
516                     "`cwl:requirements` in the input object is not part of CWL "
517                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
518                     "can set the cwlVersion to v1.1 or greater and re-run with "
519                     "--enable-dev.")
520             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
521             for req in job_reqs:
522                 tool.requirements.append(req)
523
524     @staticmethod
525     def get_git_info(tool):
526         in_a_git_repo = False
527         cwd = None
528         filepath = None
529
530         if tool.tool["id"].startswith("file://"):
531             # check if git is installed
532             try:
533                 filepath = uri_file_path(tool.tool["id"])
534                 cwd = os.path.dirname(filepath)
535                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
536                 in_a_git_repo = True
537             except Exception as e:
538                 pass
539
540         gitproperties = {}
541
542         if in_a_git_repo:
543             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
544             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
545             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
546             git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
547             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
548             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
549             git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
550             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
551             git_path = filepath[len(git_toplevel):]
552
553             gitproperties = {
554                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
555                 "http://arvados.org/cwl#gitDate": git_date.strip(),
556                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
557                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
558                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
559                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
560                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
561                 "http://arvados.org/cwl#gitPath": git_path.strip(),
562             }
563         else:
564             for g in ("http://arvados.org/cwl#gitCommit",
565                       "http://arvados.org/cwl#gitDate",
566                       "http://arvados.org/cwl#gitCommitter",
567                       "http://arvados.org/cwl#gitBranch",
568                       "http://arvados.org/cwl#gitOrigin",
569                       "http://arvados.org/cwl#gitStatus",
570                       "http://arvados.org/cwl#gitDescribe",
571                       "http://arvados.org/cwl#gitPath"):
572                 if g in tool.metadata:
573                     gitproperties[g] = tool.metadata[g]
574
575         return gitproperties
576
577     def set_container_request_properties(self, container, properties):
578         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
579         for cr in resp["items"]:
580             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
581             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
582
583     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
584         self.debug = runtimeContext.debug
585
586         git_info = self.get_git_info(updated_tool)
587         if git_info:
588             logger.info("Git provenance")
589             for g in git_info:
590                 if git_info[g]:
591                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
592
593         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
594         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
595         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
596         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
597
598         if not self.fast_submit:
599             updated_tool.visit(self.check_features)
600
601         self.pipeline = None
602         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
603         self.secret_store = runtimeContext.secret_store
604
605         self.trash_intermediate = runtimeContext.trash_intermediate
606         if self.trash_intermediate and self.work_api != "containers":
607             raise Exception("--trash-intermediate is only supported with --api=containers.")
608
609         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
610         if self.intermediate_output_ttl and self.work_api != "containers":
611             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
612         if self.intermediate_output_ttl < 0:
613             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
614
615         if runtimeContext.submit_request_uuid and self.work_api != "containers":
616             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
617
618         runtimeContext = runtimeContext.copy()
619
620         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
621         if runtimeContext.storage_classes == "default":
622             runtimeContext.storage_classes = default_storage_classes
623         if runtimeContext.intermediate_storage_classes == "default":
624             runtimeContext.intermediate_storage_classes = default_storage_classes
625
626         if not runtimeContext.name:
627             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
628             if git_info.get("http://arvados.org/cwl#gitDescribe"):
629                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
630             runtimeContext.name = self.name
631
632         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
633             # When creating or updating workflow record, by default
634             # always copy dependencies and ensure Docker images are up
635             # to date.
636             runtimeContext.copy_deps = True
637             runtimeContext.match_local_docker = True
638
639         if runtimeContext.update_workflow and self.project_uuid is None:
640             # If we are updating a workflow, make sure anything that
641             # gets uploaded goes into the same parent project, unless
642             # an alternate --project-uuid was provided.
643             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
644             runtimeContext.project_uuid = existing_wf["owner_uuid"]
645
646         self.project_uuid = runtimeContext.project_uuid
647
648         # Upload local file references in the job order.
649         with Perf(metrics, "upload_job_order"):
650             job_order = upload_job_order(self, "%s input" % runtimeContext.name,
651                                          updated_tool, job_order, runtimeContext)
652
653         # the last clause means: if it is a command line tool, and we
654         # are going to wait for the result, and always_submit_runner
655         # is false, then we don't submit a runner process.
656
657         submitting = (runtimeContext.update_workflow or
658                       runtimeContext.create_workflow or
659                       (runtimeContext.submit and not
660                        (updated_tool.tool["class"] == "CommandLineTool" and
661                         runtimeContext.wait and
662                         not runtimeContext.always_submit_runner)))
663
664         loadingContext = self.loadingContext.copy()
665         loadingContext.do_validate = False
666         loadingContext.disable_js_validation = True
667         if submitting and not self.fast_submit:
668             loadingContext.do_update = False
669             # Document may have been auto-updated. Reload the original
670             # document with updating disabled because we want to
671             # submit the document with its original CWL version, not
672             # the auto-updated one.
673             with Perf(metrics, "load_tool original"):
674                 tool = load_tool(updated_tool.tool["id"], loadingContext)
675         else:
676             tool = updated_tool
677
678         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
679         # Also uploads docker images.
680         if not self.fast_submit:
681             logger.info("Uploading workflow dependencies")
682             with Perf(metrics, "upload_workflow_deps"):
683                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
684         else:
685             merged_map = {}
686
687         # Recreate process object (ArvadosWorkflow or
688         # ArvadosCommandTool) because tool document may have been
689         # updated by upload_workflow_deps in ways that modify
690         # inheritance of hints or requirements.
691         loadingContext.loader = tool.doc_loader
692         loadingContext.avsc_names = tool.doc_schema
693         loadingContext.metadata = tool.metadata
694         with Perf(metrics, "load_tool"):
695             tool = load_tool(tool.tool, loadingContext)
696
697         if runtimeContext.update_workflow or runtimeContext.create_workflow:
698             # Create a pipeline template or workflow record and exit.
699             if self.work_api == "containers":
700                 uuid = upload_workflow(self, tool, job_order,
701                                        runtimeContext.project_uuid,
702                                        runtimeContext,
703                                        uuid=runtimeContext.update_workflow,
704                                        submit_runner_ram=runtimeContext.submit_runner_ram,
705                                        name=runtimeContext.name,
706                                        merged_map=merged_map,
707                                        submit_runner_image=runtimeContext.submit_runner_image,
708                                        git_info=git_info)
709                 self.stdout.write(uuid + "\n")
710                 return (None, "success")
711
712         self.apply_reqs(job_order, tool)
713
714         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
715         self.eval_timeout = runtimeContext.eval_timeout
716
717         runtimeContext.use_container = True
718         runtimeContext.tmpdir_prefix = "tmp"
719         runtimeContext.work_api = self.work_api
720
721         if not self.output_name:
722              self.output_name = "Output from workflow %s" % runtimeContext.name
723
724         self.output_name  = cleanup_name_for_collection(self.output_name)
725
726         if self.work_api == "containers":
727             if self.ignore_docker_for_reuse:
728                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
729             runtimeContext.outdir = "/var/spool/cwl"
730             runtimeContext.docker_outdir = "/var/spool/cwl"
731             runtimeContext.tmpdir = "/tmp"
732             runtimeContext.docker_tmpdir = "/tmp"
733
734         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
735             raise Exception("--priority must be in the range 1..1000.")
736
737         if self.should_estimate_cache_size:
738             visited = set()
739             estimated_size = [0]
740             def estimate_collection_cache(obj):
741                 if obj.get("location", "").startswith("keep:"):
742                     m = pdh_size.match(obj["location"][5:])
743                     if m and m.group(1) not in visited:
744                         visited.add(m.group(1))
745                         estimated_size[0] += int(m.group(2))
746             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
747             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
748             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
749
750         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
751
752         runnerjob = None
753         if runtimeContext.submit:
754             # Submit a runner job to run the workflow for us.
755             if self.work_api == "containers":
756                 if submitting:
757                     tool = RunnerContainer(self, updated_tool,
758                                            tool, loadingContext, runtimeContext.enable_reuse,
759                                            self.output_name,
760                                            self.output_tags,
761                                            submit_runner_ram=runtimeContext.submit_runner_ram,
762                                            name=runtimeContext.name,
763                                            on_error=runtimeContext.on_error,
764                                            submit_runner_image=runtimeContext.submit_runner_image,
765                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
766                                            merged_map=merged_map,
767                                            priority=runtimeContext.priority,
768                                            secret_store=self.secret_store,
769                                            collection_cache_size=runtimeContext.collection_cache_size,
770                                            collection_cache_is_default=self.should_estimate_cache_size,
771                                            git_info=git_info)
772                 else:
773                     runtimeContext.runnerjob = tool.tool["id"]
774
775         if runtimeContext.cwl_runner_job is not None:
776             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
777
778         jobiter = tool.job(job_order,
779                            self.output_callback,
780                            runtimeContext)
781
782         if runtimeContext.submit and not runtimeContext.wait:
783             runnerjob = next(jobiter)
784             runnerjob.run(runtimeContext)
785             self.stdout.write(runnerjob.uuid+"\n")
786             return (None, "success")
787
788         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
789         if current_container:
790             logger.info("Running inside container %s", current_container.get("uuid"))
791             self.set_container_request_properties(current_container, git_info)
792
793         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
794         self.polling_thread = threading.Thread(target=self.poll_states)
795         self.polling_thread.start()
796
797         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
798
799         try:
800             self.workflow_eval_lock.acquire()
801
802             # Holds the lock while this code runs and releases it when
803             # it is safe to do so in self.workflow_eval_lock.wait(),
804             # at which point on_message can update job state and
805             # process output callbacks.
806
807             loopperf = Perf(metrics, "jobiter")
808             loopperf.__enter__()
809             for runnable in jobiter:
810                 loopperf.__exit__()
811
812                 if self.stop_polling.is_set():
813                     break
814
815                 if self.task_queue.error is not None:
816                     raise self.task_queue.error
817
818                 if runnable:
819                     with Perf(metrics, "run"):
820                         self.start_run(runnable, runtimeContext)
821                 else:
822                     if (self.task_queue.in_flight + len(self.processes)) > 0:
823                         self.workflow_eval_lock.wait(3)
824                     else:
825                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
826                         break
827
828                 if self.stop_polling.is_set():
829                     break
830
831                 loopperf.__enter__()
832             loopperf.__exit__()
833
834             while (self.task_queue.in_flight + len(self.processes)) > 0:
835                 if self.task_queue.error is not None:
836                     raise self.task_queue.error
837                 self.workflow_eval_lock.wait(3)
838
839         except UnsupportedRequirement:
840             raise
841         except:
842             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
843                 logger.error("Interrupted, workflow will be cancelled")
844             elif isinstance(sys.exc_info()[1], WorkflowException):
845                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
846             else:
847                 logger.exception("Workflow execution failed")
848
849             if self.pipeline:
850                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
851                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
852
853             if self.work_api == "containers" and not current_container:
854                 # Not running in a crunch container, so cancel any outstanding processes.
855                 for p in self.processes:
856                     try:
857                         self.api.container_requests().update(uuid=p,
858                                                              body={"priority": "0"}
859                         ).execute(num_retries=self.num_retries)
860                     except Exception:
861                         pass
862         finally:
863             self.workflow_eval_lock.release()
864             self.task_queue.drain()
865             self.stop_polling.set()
866             self.polling_thread.join()
867             self.task_queue.join()
868
869         if self.final_status == "UnsupportedRequirement":
870             raise UnsupportedRequirement("Check log for details.")
871
872         if self.final_output is None:
873             raise WorkflowException("Workflow did not return a result.")
874
875         if runtimeContext.submit and isinstance(tool, Runner):
876             logger.info("Final output collection %s", tool.final_output)
877             if workbench2 or workbench1:
878                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
879         else:
880             if self.output_tags is None:
881                 self.output_tags = ""
882
883             storage_classes = ""
884             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
885             if storage_class_req and storage_class_req.get("finalStorageClass"):
886                 storage_classes = aslist(storage_class_req["finalStorageClass"])
887             else:
888                 storage_classes = runtimeContext.storage_classes.strip().split(",")
889
890             output_properties = {}
891             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
892             if output_properties_req:
893                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
894                 for pr in output_properties_req["outputProperties"]:
895                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
896
897             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
898                                                                                           self.output_tags, output_properties,
899                                                                                           self.final_output)
900             self.set_crunch_output()
901
902         if runtimeContext.compute_checksum:
903             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
904             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
905
906         if self.trash_intermediate and self.final_status == "success":
907             self.trash_intermediate_output()
908
909         return (self.final_output, self.final_status)