19699: Add --varying-url-params
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import subprocess
21 import time
22 import urllib
23
24 from cwltool.errors import WorkflowException
25 import cwltool.workflow
26 from schema_salad.sourceline import SourceLine
27 import schema_salad.validate as validate
28 from schema_salad.ref_resolver import file_uri, uri_file_path
29
30 import arvados
31 import arvados.config
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
34
35 import arvados_cwl.util
36 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
37 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
38 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from cwltool.task_queue import TaskQueue
44 from .context import ArvLoadingContext, ArvRuntimeContext
45 from ._version import __version__
46
47 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
48 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
49 from cwltool.command_line_tool import compute_checksums
50 from cwltool.load_tool import load_tool
51
52 logger = logging.getLogger('arvados.cwl-runner')
53 metrics = logging.getLogger('arvados.cwl-runner.metrics')
54
55 DEFAULT_PRIORITY = 500
56
57 class RuntimeStatusLoggingHandler(logging.Handler):
58     """
59     Intercepts logging calls and report them as runtime statuses on runner
60     containers.
61     """
62     def __init__(self, runtime_status_update_func):
63         super(RuntimeStatusLoggingHandler, self).__init__()
64         self.runtime_status_update = runtime_status_update_func
65         self.updatingRuntimeStatus = False
66
67     def emit(self, record):
68         kind = None
69         if record.levelno >= logging.ERROR:
70             kind = 'error'
71         elif record.levelno >= logging.WARNING:
72             kind = 'warning'
73         if kind is not None and self.updatingRuntimeStatus is not True:
74             self.updatingRuntimeStatus = True
75             try:
76                 log_msg = record.getMessage()
77                 if '\n' in log_msg:
78                     # If the logged message is multi-line, use its first line as status
79                     # and the rest as detail.
80                     status, detail = log_msg.split('\n', 1)
81                     self.runtime_status_update(
82                         kind,
83                         "%s: %s" % (record.name, status),
84                         detail
85                     )
86                 else:
87                     self.runtime_status_update(
88                         kind,
89                         "%s: %s" % (record.name, record.getMessage())
90                     )
91             finally:
92                 self.updatingRuntimeStatus = False
93
94
95 class ArvCwlExecutor(object):
96     """Execute a CWL tool or workflow, submit work (using containers API),
97     wait for them to complete, and report output.
98
99     """
100
101     def __init__(self, api_client,
102                  arvargs=None,
103                  keep_client=None,
104                  num_retries=4,
105                  thread_count=4,
106                  stdout=sys.stdout):
107
108         if arvargs is None:
109             arvargs = argparse.Namespace()
110             arvargs.work_api = None
111             arvargs.output_name = None
112             arvargs.output_tags = None
113             arvargs.thread_count = 1
114             arvargs.collection_cache_size = None
115             arvargs.git_info = True
116
117         self.api = api_client
118         self.processes = {}
119         self.workflow_eval_lock = threading.Condition(threading.RLock())
120         self.final_output = None
121         self.final_status = None
122         self.num_retries = num_retries
123         self.uuid = None
124         self.stop_polling = threading.Event()
125         self.poll_api = None
126         self.pipeline = None
127         self.final_output_collection = None
128         self.output_name = arvargs.output_name
129         self.output_tags = arvargs.output_tags
130         self.project_uuid = None
131         self.intermediate_output_ttl = 0
132         self.intermediate_output_collections = []
133         self.trash_intermediate = False
134         self.thread_count = arvargs.thread_count
135         self.poll_interval = 12
136         self.loadingContext = None
137         self.should_estimate_cache_size = True
138         self.fs_access = None
139         self.secret_store = None
140         self.stdout = stdout
141         self.fast_submit = False
142         self.git_info = arvargs.git_info
143
144         if keep_client is not None:
145             self.keep_client = keep_client
146         else:
147             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
148
149         if arvargs.collection_cache_size:
150             collection_cache_size = arvargs.collection_cache_size*1024*1024
151             self.should_estimate_cache_size = False
152         else:
153             collection_cache_size = 256*1024*1024
154
155         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
156                                                 cap=collection_cache_size)
157
158         self.fetcher_constructor = partial(CollectionFetcher,
159                                            api_client=self.api,
160                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
161                                            num_retries=self.num_retries)
162
163         self.work_api = None
164         expected_api = ["containers"]
165         for api in expected_api:
166             try:
167                 methods = self.api._rootDesc.get('resources')[api]['methods']
168                 if ('httpMethod' in methods['create'] and
169                     (arvargs.work_api == api or arvargs.work_api is None)):
170                     self.work_api = api
171                     break
172             except KeyError:
173                 pass
174
175         if not self.work_api:
176             if arvargs.work_api is None:
177                 raise Exception("No supported APIs")
178             else:
179                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
180
181         if self.work_api == "jobs":
182             logger.error("""
183 *******************************
184 The 'jobs' API is no longer supported.
185 *******************************""")
186             exit(1)
187
188         self.loadingContext = ArvLoadingContext(vars(arvargs))
189         self.loadingContext.fetcher_constructor = self.fetcher_constructor
190         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
191         self.loadingContext.construct_tool_object = self.arv_make_tool
192
193         # Add a custom logging handler to the root logger for runtime status reporting
194         # if running inside a container
195         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
196             root_logger = logging.getLogger('')
197
198             # Remove existing RuntimeStatusLoggingHandlers if they exist
199             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
200             root_logger.handlers = handlers
201
202             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
203             root_logger.addHandler(handler)
204
205         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
206         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
207                                                      collection_cache=self.collection_cache)
208
209         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
210         self.varying_url_params = arvargs.varying_url_params
211
212         validate_cluster_target(self, self.toplevel_runtimeContext)
213
214
215     def arv_make_tool(self, toolpath_object, loadingContext):
216         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
217             return ArvadosCommandTool(self, toolpath_object, loadingContext)
218         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
219             return ArvadosWorkflow(self, toolpath_object, loadingContext)
220         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
221             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
222         else:
223             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
224
225     def output_callback(self, out, processStatus):
226         with self.workflow_eval_lock:
227             if processStatus == "success":
228                 logger.info("Overall process status is %s", processStatus)
229                 state = "Complete"
230             else:
231                 logger.error("Overall process status is %s", processStatus)
232                 state = "Failed"
233             if self.pipeline:
234                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
235                                                         body={"state": state}).execute(num_retries=self.num_retries)
236             self.final_status = processStatus
237             self.final_output = out
238             self.workflow_eval_lock.notifyAll()
239
240
241     def start_run(self, runnable, runtimeContext):
242         self.task_queue.add(partial(runnable.run, runtimeContext),
243                             self.workflow_eval_lock, self.stop_polling)
244
245     def process_submitted(self, container):
246         with self.workflow_eval_lock:
247             self.processes[container.uuid] = container
248
249     def process_done(self, uuid, record):
250         with self.workflow_eval_lock:
251             j = self.processes[uuid]
252             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
253             self.task_queue.add(partial(j.done, record),
254                                 self.workflow_eval_lock, self.stop_polling)
255             del self.processes[uuid]
256
257     def runtime_status_update(self, kind, message, detail=None):
258         """
259         Updates the runtime_status field on the runner container.
260         Called when there's a need to report errors, warnings or just
261         activity statuses, for example in the RuntimeStatusLoggingHandler.
262         """
263
264         if kind not in ('error', 'warning'):
265             # Ignore any other status kind
266             return
267
268         with self.workflow_eval_lock:
269             current = None
270             try:
271                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
272             except Exception as e:
273                 logger.info("Couldn't get current container: %s", e)
274             if current is None:
275                 return
276             runtime_status = current.get('runtime_status', {})
277
278             original_updatemessage = updatemessage = runtime_status.get(kind, "")
279             if not updatemessage:
280                 updatemessage = message
281
282             # Subsequent messages tacked on in detail
283             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
284             maxlines = 40
285             if updatedetail.count("\n") < maxlines:
286                 if updatedetail:
287                     updatedetail += "\n"
288                 updatedetail += message + "\n"
289
290                 if detail:
291                     updatedetail += detail + "\n"
292
293                 if updatedetail.count("\n") >= maxlines:
294                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
295
296             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
297                 # don't waste time doing an update if nothing changed
298                 # (usually because we exceeded the max lines)
299                 return
300
301             runtime_status.update({
302                 kind: updatemessage,
303                 kind+'Detail': updatedetail,
304             })
305
306             try:
307                 self.api.containers().update(uuid=current['uuid'],
308                                             body={
309                                                 'runtime_status': runtime_status,
310                                             }).execute(num_retries=self.num_retries)
311             except Exception as e:
312                 logger.info("Couldn't update runtime_status: %s", e)
313
314     def wrapped_callback(self, cb, obj, st):
315         with self.workflow_eval_lock:
316             cb(obj, st)
317             self.workflow_eval_lock.notifyAll()
318
319     def get_wrapped_callback(self, cb):
320         return partial(self.wrapped_callback, cb)
321
322     def on_message(self, event):
323         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
324             uuid = event["object_uuid"]
325             if event["properties"]["new_attributes"]["state"] == "Running":
326                 with self.workflow_eval_lock:
327                     j = self.processes[uuid]
328                     if j.running is False:
329                         j.running = True
330                         j.update_pipeline_component(event["properties"]["new_attributes"])
331                         logger.info("%s %s is Running", self.label(j), uuid)
332             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
333                 self.process_done(uuid, event["properties"]["new_attributes"])
334
335     def label(self, obj):
336         return "[%s %s]" % (self.work_api[0:-1], obj.name)
337
338     def poll_states(self):
339         """Poll status of containers listed in the processes dict.
340
341         Runs in a separate thread.
342         """
343
344         try:
345             remain_wait = self.poll_interval
346             while True:
347                 if remain_wait > 0:
348                     self.stop_polling.wait(remain_wait)
349                 if self.stop_polling.is_set():
350                     break
351                 with self.workflow_eval_lock:
352                     keys = list(self.processes)
353                 if not keys:
354                     remain_wait = self.poll_interval
355                     continue
356
357                 begin_poll = time.time()
358                 if self.work_api == "containers":
359                     table = self.poll_api.container_requests()
360
361                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
362
363                 while keys:
364                     page = keys[:pageSize]
365                     try:
366                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
367                     except Exception:
368                         logger.exception("Error checking states on API server: %s")
369                         remain_wait = self.poll_interval
370                         continue
371
372                     for p in proc_states["items"]:
373                         self.on_message({
374                             "object_uuid": p["uuid"],
375                             "event_type": "update",
376                             "properties": {
377                                 "new_attributes": p
378                             }
379                         })
380                     keys = keys[pageSize:]
381
382                 finish_poll = time.time()
383                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
384         except:
385             logger.exception("Fatal error in state polling thread.")
386             with self.workflow_eval_lock:
387                 self.processes.clear()
388                 self.workflow_eval_lock.notifyAll()
389         finally:
390             self.stop_polling.set()
391
392     def add_intermediate_output(self, uuid):
393         if uuid:
394             self.intermediate_output_collections.append(uuid)
395
396     def trash_intermediate_output(self):
397         logger.info("Cleaning up intermediate output collections")
398         for i in self.intermediate_output_collections:
399             try:
400                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
401             except Exception:
402                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
403             except (KeyboardInterrupt, SystemExit):
404                 break
405
406     def check_features(self, obj, parentfield=""):
407         if isinstance(obj, dict):
408             if obj.get("class") == "DockerRequirement":
409                 if obj.get("dockerOutputDirectory"):
410                     if not obj.get("dockerOutputDirectory").startswith('/'):
411                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
412                             "Option 'dockerOutputDirectory' must be an absolute path.")
413             if obj.get("class") == "InplaceUpdateRequirement":
414                 if obj["inplaceUpdate"] and parentfield == "requirements":
415                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
416             for k,v in viewitems(obj):
417                 self.check_features(v, parentfield=k)
418         elif isinstance(obj, list):
419             for i,v in enumerate(obj):
420                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
421                     self.check_features(v, parentfield=parentfield)
422
423     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
424         outputObj = copy.deepcopy(outputObj)
425
426         files = []
427         def capture(fileobj):
428             files.append(fileobj)
429
430         adjustDirObjs(outputObj, capture)
431         adjustFileObjs(outputObj, capture)
432
433         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
434
435         final = arvados.collection.Collection(api_client=self.api,
436                                               keep_client=self.keep_client,
437                                               num_retries=self.num_retries)
438
439         for k,v in generatemapper.items():
440             if v.type == "Directory" and v.resolved.startswith("_:"):
441                     continue
442             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
443                 with final.open(v.target, "wb") as f:
444                     f.write(v.resolved.encode("utf-8"))
445                     continue
446
447             if not v.resolved.startswith("keep:"):
448                 raise Exception("Output source is not in keep or a literal")
449             sp = v.resolved.split("/")
450             srccollection = sp[0][5:]
451             try:
452                 reader = self.collection_cache.get(srccollection)
453                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
454                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
455             except arvados.errors.ArgumentError as e:
456                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
457                 raise
458             except IOError as e:
459                 logger.error("While preparing output collection: %s", e)
460                 raise
461
462         def rewrite(fileobj):
463             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
464             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
465                 if k in fileobj:
466                     del fileobj[k]
467
468         adjustDirObjs(outputObj, rewrite)
469         adjustFileObjs(outputObj, rewrite)
470
471         with final.open("cwl.output.json", "w") as f:
472             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
473             f.write(res)
474
475
476         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
477                        ensure_unique_name=True, properties=output_properties)
478
479         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
480                     final.api_response()["name"],
481                     final.manifest_locator())
482
483         final_uuid = final.manifest_locator()
484         tags = tagsString.split(',')
485         for tag in tags:
486              self.api.links().create(body={
487                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
488                 }).execute(num_retries=self.num_retries)
489
490         def finalcollection(fileobj):
491             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
492
493         adjustDirObjs(outputObj, finalcollection)
494         adjustFileObjs(outputObj, finalcollection)
495
496         return (outputObj, final)
497
498     def set_crunch_output(self):
499         if self.work_api == "containers":
500             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
501             if current is None:
502                 return
503             try:
504                 self.api.containers().update(uuid=current['uuid'],
505                                              body={
506                                                  'output': self.final_output_collection.portable_data_hash(),
507                                                  'output_properties': self.final_output_collection.get_properties(),
508                                              }).execute(num_retries=self.num_retries)
509                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
510                                               body={
511                                                   'is_trashed': True
512                                               }).execute(num_retries=self.num_retries)
513             except Exception:
514                 logger.exception("Setting container output")
515                 raise
516
517     def apply_reqs(self, job_order_object, tool):
518         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
519             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
520                 raise WorkflowException(
521                     "`cwl:requirements` in the input object is not part of CWL "
522                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
523                     "can set the cwlVersion to v1.1 or greater and re-run with "
524                     "--enable-dev.")
525             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
526             for req in job_reqs:
527                 tool.requirements.append(req)
528
529     @staticmethod
530     def get_git_info(tool):
531         in_a_git_repo = False
532         cwd = None
533         filepath = None
534
535         if tool.tool["id"].startswith("file://"):
536             # check if git is installed
537             try:
538                 filepath = uri_file_path(tool.tool["id"])
539                 cwd = os.path.dirname(filepath)
540                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
541                 in_a_git_repo = True
542             except Exception as e:
543                 pass
544
545         gitproperties = {}
546
547         if in_a_git_repo:
548             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
549             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
550             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
551             git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
552             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
553             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
554             git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
555             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
556             git_path = filepath[len(git_toplevel):]
557
558             gitproperties = {
559                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
560                 "http://arvados.org/cwl#gitDate": git_date.strip(),
561                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
562                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
563                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
564                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
565                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
566                 "http://arvados.org/cwl#gitPath": git_path.strip(),
567             }
568         else:
569             for g in ("http://arvados.org/cwl#gitCommit",
570                       "http://arvados.org/cwl#gitDate",
571                       "http://arvados.org/cwl#gitCommitter",
572                       "http://arvados.org/cwl#gitBranch",
573                       "http://arvados.org/cwl#gitOrigin",
574                       "http://arvados.org/cwl#gitStatus",
575                       "http://arvados.org/cwl#gitDescribe",
576                       "http://arvados.org/cwl#gitPath"):
577                 if g in tool.metadata:
578                     gitproperties[g] = tool.metadata[g]
579
580         return gitproperties
581
582     def set_container_request_properties(self, container, properties):
583         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
584         for cr in resp["items"]:
585             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
586             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
587
588     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
589         self.debug = runtimeContext.debug
590
591         git_info = self.get_git_info(updated_tool) if self.git_info else {}
592         if git_info:
593             logger.info("Git provenance")
594             for g in git_info:
595                 if git_info[g]:
596                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
597
598         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
599         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
600         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
601         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
602
603         if not self.fast_submit:
604             updated_tool.visit(self.check_features)
605
606         self.pipeline = None
607         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
608         self.secret_store = runtimeContext.secret_store
609
610         self.trash_intermediate = runtimeContext.trash_intermediate
611         if self.trash_intermediate and self.work_api != "containers":
612             raise Exception("--trash-intermediate is only supported with --api=containers.")
613
614         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
615         if self.intermediate_output_ttl and self.work_api != "containers":
616             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
617         if self.intermediate_output_ttl < 0:
618             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
619
620         if runtimeContext.submit_request_uuid and self.work_api != "containers":
621             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
622
623         runtimeContext = runtimeContext.copy()
624
625         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
626         if runtimeContext.storage_classes == "default":
627             runtimeContext.storage_classes = default_storage_classes
628         if runtimeContext.intermediate_storage_classes == "default":
629             runtimeContext.intermediate_storage_classes = default_storage_classes
630
631         if not runtimeContext.name:
632             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
633             if git_info.get("http://arvados.org/cwl#gitDescribe"):
634                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
635             runtimeContext.name = self.name
636
637         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
638             # When creating or updating workflow record, by default
639             # always copy dependencies and ensure Docker images are up
640             # to date.
641             runtimeContext.copy_deps = True
642             runtimeContext.match_local_docker = True
643
644         if runtimeContext.update_workflow and self.project_uuid is None:
645             # If we are updating a workflow, make sure anything that
646             # gets uploaded goes into the same parent project, unless
647             # an alternate --project-uuid was provided.
648             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
649             runtimeContext.project_uuid = existing_wf["owner_uuid"]
650
651         self.project_uuid = runtimeContext.project_uuid
652
653         # Upload local file references in the job order.
654         with Perf(metrics, "upload_job_order"):
655             job_order = upload_job_order(self, "%s input" % runtimeContext.name,
656                                          updated_tool, job_order, runtimeContext)
657
658         # the last clause means: if it is a command line tool, and we
659         # are going to wait for the result, and always_submit_runner
660         # is false, then we don't submit a runner process.
661
662         submitting = (runtimeContext.update_workflow or
663                       runtimeContext.create_workflow or
664                       (runtimeContext.submit and not
665                        (updated_tool.tool["class"] == "CommandLineTool" and
666                         runtimeContext.wait and
667                         not runtimeContext.always_submit_runner)))
668
669         loadingContext = self.loadingContext.copy()
670         loadingContext.do_validate = False
671         loadingContext.disable_js_validation = True
672         if submitting and not self.fast_submit:
673             loadingContext.do_update = False
674             # Document may have been auto-updated. Reload the original
675             # document with updating disabled because we want to
676             # submit the document with its original CWL version, not
677             # the auto-updated one.
678             with Perf(metrics, "load_tool original"):
679                 tool = load_tool(updated_tool.tool["id"], loadingContext)
680         else:
681             tool = updated_tool
682
683         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
684         # Also uploads docker images.
685         if not self.fast_submit:
686             logger.info("Uploading workflow dependencies")
687             with Perf(metrics, "upload_workflow_deps"):
688                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
689         else:
690             merged_map = {}
691
692         # Recreate process object (ArvadosWorkflow or
693         # ArvadosCommandTool) because tool document may have been
694         # updated by upload_workflow_deps in ways that modify
695         # inheritance of hints or requirements.
696         loadingContext.loader = tool.doc_loader
697         loadingContext.avsc_names = tool.doc_schema
698         loadingContext.metadata = tool.metadata
699         with Perf(metrics, "load_tool"):
700             tool = load_tool(tool.tool, loadingContext)
701
702         if runtimeContext.update_workflow or runtimeContext.create_workflow:
703             # Create a pipeline template or workflow record and exit.
704             if self.work_api == "containers":
705                 uuid = upload_workflow(self, tool, job_order,
706                                        runtimeContext.project_uuid,
707                                        runtimeContext,
708                                        uuid=runtimeContext.update_workflow,
709                                        submit_runner_ram=runtimeContext.submit_runner_ram,
710                                        name=runtimeContext.name,
711                                        merged_map=merged_map,
712                                        submit_runner_image=runtimeContext.submit_runner_image,
713                                        git_info=git_info)
714                 self.stdout.write(uuid + "\n")
715                 return (None, "success")
716
717         self.apply_reqs(job_order, tool)
718
719         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
720         self.eval_timeout = runtimeContext.eval_timeout
721
722         runtimeContext.use_container = True
723         runtimeContext.tmpdir_prefix = "tmp"
724         runtimeContext.work_api = self.work_api
725
726         if not self.output_name:
727              self.output_name = "Output from workflow %s" % runtimeContext.name
728
729         self.output_name  = cleanup_name_for_collection(self.output_name)
730
731         if self.work_api == "containers":
732             if self.ignore_docker_for_reuse:
733                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
734             runtimeContext.outdir = "/var/spool/cwl"
735             runtimeContext.docker_outdir = "/var/spool/cwl"
736             runtimeContext.tmpdir = "/tmp"
737             runtimeContext.docker_tmpdir = "/tmp"
738
739         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
740             raise Exception("--priority must be in the range 1..1000.")
741
742         if self.should_estimate_cache_size:
743             visited = set()
744             estimated_size = [0]
745             def estimate_collection_cache(obj):
746                 if obj.get("location", "").startswith("keep:"):
747                     m = pdh_size.match(obj["location"][5:])
748                     if m and m.group(1) not in visited:
749                         visited.add(m.group(1))
750                         estimated_size[0] += int(m.group(2))
751             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
752             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
753             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
754
755         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
756
757         runnerjob = None
758         if runtimeContext.submit:
759             # Submit a runner job to run the workflow for us.
760             if self.work_api == "containers":
761                 if submitting:
762                     tool = RunnerContainer(self, updated_tool,
763                                            tool, loadingContext, runtimeContext.enable_reuse,
764                                            self.output_name,
765                                            self.output_tags,
766                                            submit_runner_ram=runtimeContext.submit_runner_ram,
767                                            name=runtimeContext.name,
768                                            on_error=runtimeContext.on_error,
769                                            submit_runner_image=runtimeContext.submit_runner_image,
770                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
771                                            merged_map=merged_map,
772                                            priority=runtimeContext.priority,
773                                            secret_store=self.secret_store,
774                                            collection_cache_size=runtimeContext.collection_cache_size,
775                                            collection_cache_is_default=self.should_estimate_cache_size,
776                                            git_info=git_info)
777                 else:
778                     runtimeContext.runnerjob = tool.tool["id"]
779
780         if runtimeContext.cwl_runner_job is not None:
781             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
782
783         jobiter = tool.job(job_order,
784                            self.output_callback,
785                            runtimeContext)
786
787         if runtimeContext.submit and not runtimeContext.wait:
788             runnerjob = next(jobiter)
789             runnerjob.run(runtimeContext)
790             self.stdout.write(runnerjob.uuid+"\n")
791             return (None, "success")
792
793         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
794         if current_container:
795             logger.info("Running inside container %s", current_container.get("uuid"))
796             self.set_container_request_properties(current_container, git_info)
797
798         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
799         self.polling_thread = threading.Thread(target=self.poll_states)
800         self.polling_thread.start()
801
802         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
803
804         try:
805             self.workflow_eval_lock.acquire()
806
807             # Holds the lock while this code runs and releases it when
808             # it is safe to do so in self.workflow_eval_lock.wait(),
809             # at which point on_message can update job state and
810             # process output callbacks.
811
812             loopperf = Perf(metrics, "jobiter")
813             loopperf.__enter__()
814             for runnable in jobiter:
815                 loopperf.__exit__()
816
817                 if self.stop_polling.is_set():
818                     break
819
820                 if self.task_queue.error is not None:
821                     raise self.task_queue.error
822
823                 if runnable:
824                     with Perf(metrics, "run"):
825                         self.start_run(runnable, runtimeContext)
826                 else:
827                     if (self.task_queue.in_flight + len(self.processes)) > 0:
828                         self.workflow_eval_lock.wait(3)
829                     else:
830                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
831                         break
832
833                 if self.stop_polling.is_set():
834                     break
835
836                 loopperf.__enter__()
837             loopperf.__exit__()
838
839             while (self.task_queue.in_flight + len(self.processes)) > 0:
840                 if self.task_queue.error is not None:
841                     raise self.task_queue.error
842                 self.workflow_eval_lock.wait(3)
843
844         except UnsupportedRequirement:
845             raise
846         except:
847             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
848                 logger.error("Interrupted, workflow will be cancelled")
849             elif isinstance(sys.exc_info()[1], WorkflowException):
850                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
851             else:
852                 logger.exception("Workflow execution failed")
853
854             if self.pipeline:
855                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
856                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
857
858             if self.work_api == "containers" and not current_container:
859                 # Not running in a crunch container, so cancel any outstanding processes.
860                 for p in self.processes:
861                     try:
862                         self.api.container_requests().update(uuid=p,
863                                                              body={"priority": "0"}
864                         ).execute(num_retries=self.num_retries)
865                     except Exception:
866                         pass
867         finally:
868             self.workflow_eval_lock.release()
869             self.task_queue.drain()
870             self.stop_polling.set()
871             self.polling_thread.join()
872             self.task_queue.join()
873
874         if self.final_status == "UnsupportedRequirement":
875             raise UnsupportedRequirement("Check log for details.")
876
877         if self.final_output is None:
878             raise WorkflowException("Workflow did not return a result.")
879
880         if runtimeContext.submit and isinstance(tool, Runner):
881             logger.info("Final output collection %s", tool.final_output)
882             if workbench2 or workbench1:
883                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
884         else:
885             if self.output_tags is None:
886                 self.output_tags = ""
887
888             storage_classes = ""
889             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
890             if storage_class_req and storage_class_req.get("finalStorageClass"):
891                 storage_classes = aslist(storage_class_req["finalStorageClass"])
892             else:
893                 storage_classes = runtimeContext.storage_classes.strip().split(",")
894
895             output_properties = {}
896             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
897             if output_properties_req:
898                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
899                 for pr in output_properties_req["outputProperties"]:
900                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
901
902             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
903                                                                                           self.output_tags, output_properties,
904                                                                                           self.final_output)
905             self.set_crunch_output()
906
907         if runtimeContext.compute_checksum:
908             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
909             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
910
911         if self.trash_intermediate and self.final_status == "success":
912             self.trash_intermediate_output()
913
914         return (self.final_output, self.final_status)