84a9799f6101cdb4b4566a759a9e0ccebe2442b8
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import subprocess
21 import time
22 import urllib
23
24 from cwltool.errors import WorkflowException
25 import cwltool.workflow
26 from schema_salad.sourceline import SourceLine
27 import schema_salad.validate as validate
28 from schema_salad.ref_resolver import file_uri, uri_file_path
29
30 import arvados
31 import arvados.config
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
34
35 import arvados_cwl.util
36 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
37 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
38 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from cwltool.task_queue import TaskQueue
44 from .context import ArvLoadingContext, ArvRuntimeContext
45 from ._version import __version__
46
47 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
48 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
49 from cwltool.command_line_tool import compute_checksums
50 from cwltool.load_tool import load_tool
51
52 logger = logging.getLogger('arvados.cwl-runner')
53 metrics = logging.getLogger('arvados.cwl-runner.metrics')
54
55 DEFAULT_PRIORITY = 500
56
57 class RuntimeStatusLoggingHandler(logging.Handler):
58     """
59     Intercepts logging calls and report them as runtime statuses on runner
60     containers.
61     """
62     def __init__(self, runtime_status_update_func):
63         super(RuntimeStatusLoggingHandler, self).__init__()
64         self.runtime_status_update = runtime_status_update_func
65         self.updatingRuntimeStatus = False
66
67     def emit(self, record):
68         kind = None
69         if record.levelno >= logging.ERROR:
70             kind = 'error'
71         elif record.levelno >= logging.WARNING:
72             kind = 'warning'
73         if kind is not None and self.updatingRuntimeStatus is not True:
74             self.updatingRuntimeStatus = True
75             try:
76                 log_msg = record.getMessage()
77                 if '\n' in log_msg:
78                     # If the logged message is multi-line, use its first line as status
79                     # and the rest as detail.
80                     status, detail = log_msg.split('\n', 1)
81                     self.runtime_status_update(
82                         kind,
83                         "%s: %s" % (record.name, status),
84                         detail
85                     )
86                 else:
87                     self.runtime_status_update(
88                         kind,
89                         "%s: %s" % (record.name, record.getMessage())
90                     )
91             finally:
92                 self.updatingRuntimeStatus = False
93
94
95 class ArvCwlExecutor(object):
96     """Execute a CWL tool or workflow, submit work (using containers API),
97     wait for them to complete, and report output.
98
99     """
100
101     def __init__(self, api_client,
102                  arvargs=None,
103                  keep_client=None,
104                  num_retries=4,
105                  thread_count=4,
106                  stdout=sys.stdout):
107
108         if arvargs is None:
109             arvargs = argparse.Namespace()
110             arvargs.work_api = None
111             arvargs.output_name = None
112             arvargs.output_tags = None
113             arvargs.thread_count = 1
114             arvargs.collection_cache_size = None
115             arvargs.git_info = True
116
117         self.api = api_client
118         self.processes = {}
119         self.workflow_eval_lock = threading.Condition(threading.RLock())
120         self.final_output = None
121         self.final_status = None
122         self.num_retries = num_retries
123         self.uuid = None
124         self.stop_polling = threading.Event()
125         self.poll_api = None
126         self.pipeline = None
127         self.final_output_collection = None
128         self.output_name = arvargs.output_name
129         self.output_tags = arvargs.output_tags
130         self.project_uuid = None
131         self.intermediate_output_ttl = 0
132         self.intermediate_output_collections = []
133         self.trash_intermediate = False
134         self.thread_count = arvargs.thread_count
135         self.poll_interval = 12
136         self.loadingContext = None
137         self.should_estimate_cache_size = True
138         self.fs_access = None
139         self.secret_store = None
140         self.stdout = stdout
141         self.fast_submit = False
142         self.git_info = arvargs.git_info
143
144         if keep_client is not None:
145             self.keep_client = keep_client
146         else:
147             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
148
149         if arvargs.collection_cache_size:
150             collection_cache_size = arvargs.collection_cache_size*1024*1024
151             self.should_estimate_cache_size = False
152         else:
153             collection_cache_size = 256*1024*1024
154
155         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
156                                                 cap=collection_cache_size)
157
158         self.fetcher_constructor = partial(CollectionFetcher,
159                                            api_client=self.api,
160                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
161                                            num_retries=self.num_retries)
162
163         self.work_api = None
164         expected_api = ["containers"]
165         for api in expected_api:
166             try:
167                 methods = self.api._rootDesc.get('resources')[api]['methods']
168                 if ('httpMethod' in methods['create'] and
169                     (arvargs.work_api == api or arvargs.work_api is None)):
170                     self.work_api = api
171                     break
172             except KeyError:
173                 pass
174
175         if not self.work_api:
176             if arvargs.work_api is None:
177                 raise Exception("No supported APIs")
178             else:
179                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
180
181         if self.work_api == "jobs":
182             logger.error("""
183 *******************************
184 The 'jobs' API is no longer supported.
185 *******************************""")
186             exit(1)
187
188         self.loadingContext = ArvLoadingContext(vars(arvargs))
189         self.loadingContext.fetcher_constructor = self.fetcher_constructor
190         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
191         self.loadingContext.construct_tool_object = self.arv_make_tool
192
193         # Add a custom logging handler to the root logger for runtime status reporting
194         # if running inside a container
195         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
196             root_logger = logging.getLogger('')
197
198             # Remove existing RuntimeStatusLoggingHandlers if they exist
199             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
200             root_logger.handlers = handlers
201
202             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
203             root_logger.addHandler(handler)
204
205         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
206         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
207                                                      collection_cache=self.collection_cache)
208
209         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
210
211         validate_cluster_target(self, self.toplevel_runtimeContext)
212
213
214     def arv_make_tool(self, toolpath_object, loadingContext):
215         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
216             return ArvadosCommandTool(self, toolpath_object, loadingContext)
217         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
218             return ArvadosWorkflow(self, toolpath_object, loadingContext)
219         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
220             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
221         else:
222             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
223
224     def output_callback(self, out, processStatus):
225         with self.workflow_eval_lock:
226             if processStatus == "success":
227                 logger.info("Overall process status is %s", processStatus)
228                 state = "Complete"
229             else:
230                 logger.error("Overall process status is %s", processStatus)
231                 state = "Failed"
232             if self.pipeline:
233                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
234                                                         body={"state": state}).execute(num_retries=self.num_retries)
235             self.final_status = processStatus
236             self.final_output = out
237             self.workflow_eval_lock.notifyAll()
238
239
240     def start_run(self, runnable, runtimeContext):
241         self.task_queue.add(partial(runnable.run, runtimeContext),
242                             self.workflow_eval_lock, self.stop_polling)
243
244     def process_submitted(self, container):
245         with self.workflow_eval_lock:
246             self.processes[container.uuid] = container
247
248     def process_done(self, uuid, record):
249         with self.workflow_eval_lock:
250             j = self.processes[uuid]
251             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
252             self.task_queue.add(partial(j.done, record),
253                                 self.workflow_eval_lock, self.stop_polling)
254             del self.processes[uuid]
255
256     def runtime_status_update(self, kind, message, detail=None):
257         """
258         Updates the runtime_status field on the runner container.
259         Called when there's a need to report errors, warnings or just
260         activity statuses, for example in the RuntimeStatusLoggingHandler.
261         """
262
263         if kind not in ('error', 'warning'):
264             # Ignore any other status kind
265             return
266
267         with self.workflow_eval_lock:
268             current = None
269             try:
270                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
271             except Exception as e:
272                 logger.info("Couldn't get current container: %s", e)
273             if current is None:
274                 return
275             runtime_status = current.get('runtime_status', {})
276
277             original_updatemessage = updatemessage = runtime_status.get(kind, "")
278             if not updatemessage:
279                 updatemessage = message
280
281             # Subsequent messages tacked on in detail
282             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
283             maxlines = 40
284             if updatedetail.count("\n") < maxlines:
285                 if updatedetail:
286                     updatedetail += "\n"
287                 updatedetail += message + "\n"
288
289                 if detail:
290                     updatedetail += detail + "\n"
291
292                 if updatedetail.count("\n") >= maxlines:
293                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
294
295             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
296                 # don't waste time doing an update if nothing changed
297                 # (usually because we exceeded the max lines)
298                 return
299
300             runtime_status.update({
301                 kind: updatemessage,
302                 kind+'Detail': updatedetail,
303             })
304
305             try:
306                 self.api.containers().update(uuid=current['uuid'],
307                                             body={
308                                                 'runtime_status': runtime_status,
309                                             }).execute(num_retries=self.num_retries)
310             except Exception as e:
311                 logger.info("Couldn't update runtime_status: %s", e)
312
313     def wrapped_callback(self, cb, obj, st):
314         with self.workflow_eval_lock:
315             cb(obj, st)
316             self.workflow_eval_lock.notifyAll()
317
318     def get_wrapped_callback(self, cb):
319         return partial(self.wrapped_callback, cb)
320
321     def on_message(self, event):
322         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
323             uuid = event["object_uuid"]
324             if event["properties"]["new_attributes"]["state"] == "Running":
325                 with self.workflow_eval_lock:
326                     j = self.processes[uuid]
327                     if j.running is False:
328                         j.running = True
329                         j.update_pipeline_component(event["properties"]["new_attributes"])
330                         logger.info("%s %s is Running", self.label(j), uuid)
331             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
332                 self.process_done(uuid, event["properties"]["new_attributes"])
333
334     def label(self, obj):
335         return "[%s %s]" % (self.work_api[0:-1], obj.name)
336
337     def poll_states(self):
338         """Poll status of containers listed in the processes dict.
339
340         Runs in a separate thread.
341         """
342
343         try:
344             remain_wait = self.poll_interval
345             while True:
346                 if remain_wait > 0:
347                     self.stop_polling.wait(remain_wait)
348                 if self.stop_polling.is_set():
349                     break
350                 with self.workflow_eval_lock:
351                     keys = list(self.processes)
352                 if not keys:
353                     remain_wait = self.poll_interval
354                     continue
355
356                 begin_poll = time.time()
357                 if self.work_api == "containers":
358                     table = self.poll_api.container_requests()
359
360                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
361
362                 while keys:
363                     page = keys[:pageSize]
364                     try:
365                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
366                     except Exception:
367                         logger.exception("Error checking states on API server: %s")
368                         remain_wait = self.poll_interval
369                         continue
370
371                     for p in proc_states["items"]:
372                         self.on_message({
373                             "object_uuid": p["uuid"],
374                             "event_type": "update",
375                             "properties": {
376                                 "new_attributes": p
377                             }
378                         })
379                     keys = keys[pageSize:]
380
381                 finish_poll = time.time()
382                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
383         except:
384             logger.exception("Fatal error in state polling thread.")
385             with self.workflow_eval_lock:
386                 self.processes.clear()
387                 self.workflow_eval_lock.notifyAll()
388         finally:
389             self.stop_polling.set()
390
391     def add_intermediate_output(self, uuid):
392         if uuid:
393             self.intermediate_output_collections.append(uuid)
394
395     def trash_intermediate_output(self):
396         logger.info("Cleaning up intermediate output collections")
397         for i in self.intermediate_output_collections:
398             try:
399                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
400             except Exception:
401                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
402             except (KeyboardInterrupt, SystemExit):
403                 break
404
405     def check_features(self, obj, parentfield=""):
406         if isinstance(obj, dict):
407             if obj.get("class") == "DockerRequirement":
408                 if obj.get("dockerOutputDirectory"):
409                     if not obj.get("dockerOutputDirectory").startswith('/'):
410                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
411                             "Option 'dockerOutputDirectory' must be an absolute path.")
412             if obj.get("class") == "InplaceUpdateRequirement":
413                 if obj["inplaceUpdate"] and parentfield == "requirements":
414                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
415             for k,v in viewitems(obj):
416                 self.check_features(v, parentfield=k)
417         elif isinstance(obj, list):
418             for i,v in enumerate(obj):
419                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
420                     self.check_features(v, parentfield=parentfield)
421
422     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
423         outputObj = copy.deepcopy(outputObj)
424
425         files = []
426         def capture(fileobj):
427             files.append(fileobj)
428
429         adjustDirObjs(outputObj, capture)
430         adjustFileObjs(outputObj, capture)
431
432         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
433
434         final = arvados.collection.Collection(api_client=self.api,
435                                               keep_client=self.keep_client,
436                                               num_retries=self.num_retries)
437
438         for k,v in generatemapper.items():
439             if v.type == "Directory" and v.resolved.startswith("_:"):
440                     continue
441             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
442                 with final.open(v.target, "wb") as f:
443                     f.write(v.resolved.encode("utf-8"))
444                     continue
445
446             if not v.resolved.startswith("keep:"):
447                 raise Exception("Output source is not in keep or a literal")
448             sp = v.resolved.split("/")
449             srccollection = sp[0][5:]
450             try:
451                 reader = self.collection_cache.get(srccollection)
452                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
453                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
454             except arvados.errors.ArgumentError as e:
455                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
456                 raise
457             except IOError as e:
458                 logger.error("While preparing output collection: %s", e)
459                 raise
460
461         def rewrite(fileobj):
462             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
463             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
464                 if k in fileobj:
465                     del fileobj[k]
466
467         adjustDirObjs(outputObj, rewrite)
468         adjustFileObjs(outputObj, rewrite)
469
470         with final.open("cwl.output.json", "w") as f:
471             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
472             f.write(res)
473
474
475         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
476                        ensure_unique_name=True, properties=output_properties)
477
478         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
479                     final.api_response()["name"],
480                     final.manifest_locator())
481
482         final_uuid = final.manifest_locator()
483         tags = tagsString.split(',')
484         for tag in tags:
485              self.api.links().create(body={
486                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
487                 }).execute(num_retries=self.num_retries)
488
489         def finalcollection(fileobj):
490             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
491
492         adjustDirObjs(outputObj, finalcollection)
493         adjustFileObjs(outputObj, finalcollection)
494
495         return (outputObj, final)
496
497     def set_crunch_output(self):
498         if self.work_api == "containers":
499             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
500             if current is None:
501                 return
502             try:
503                 self.api.containers().update(uuid=current['uuid'],
504                                              body={
505                                                  'output': self.final_output_collection.portable_data_hash(),
506                                                  'output_properties': self.final_output_collection.get_properties(),
507                                              }).execute(num_retries=self.num_retries)
508                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
509                                               body={
510                                                   'is_trashed': True
511                                               }).execute(num_retries=self.num_retries)
512             except Exception:
513                 logger.exception("Setting container output")
514                 raise
515
516     def apply_reqs(self, job_order_object, tool):
517         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
518             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
519                 raise WorkflowException(
520                     "`cwl:requirements` in the input object is not part of CWL "
521                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
522                     "can set the cwlVersion to v1.1 or greater and re-run with "
523                     "--enable-dev.")
524             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
525             for req in job_reqs:
526                 tool.requirements.append(req)
527
528     @staticmethod
529     def get_git_info(tool):
530         in_a_git_repo = False
531         cwd = None
532         filepath = None
533
534         if tool.tool["id"].startswith("file://"):
535             # check if git is installed
536             try:
537                 filepath = uri_file_path(tool.tool["id"])
538                 cwd = os.path.dirname(filepath)
539                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
540                 in_a_git_repo = True
541             except Exception as e:
542                 pass
543
544         gitproperties = {}
545
546         if in_a_git_repo:
547             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
548             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
549             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
550             git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
551             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
552             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
553             git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
554             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
555             git_path = filepath[len(git_toplevel):]
556
557             gitproperties = {
558                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
559                 "http://arvados.org/cwl#gitDate": git_date.strip(),
560                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
561                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
562                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
563                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
564                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
565                 "http://arvados.org/cwl#gitPath": git_path.strip(),
566             }
567         else:
568             for g in ("http://arvados.org/cwl#gitCommit",
569                       "http://arvados.org/cwl#gitDate",
570                       "http://arvados.org/cwl#gitCommitter",
571                       "http://arvados.org/cwl#gitBranch",
572                       "http://arvados.org/cwl#gitOrigin",
573                       "http://arvados.org/cwl#gitStatus",
574                       "http://arvados.org/cwl#gitDescribe",
575                       "http://arvados.org/cwl#gitPath"):
576                 if g in tool.metadata:
577                     gitproperties[g] = tool.metadata[g]
578
579         return gitproperties
580
581     def set_container_request_properties(self, container, properties):
582         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
583         for cr in resp["items"]:
584             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
585             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
586
587     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
588         self.debug = runtimeContext.debug
589
590         git_info = self.get_git_info(updated_tool) if self.git_info else {}
591         if git_info:
592             logger.info("Git provenance")
593             for g in git_info:
594                 if git_info[g]:
595                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
596
597         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
598         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
599         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
600         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
601
602         if not self.fast_submit:
603             updated_tool.visit(self.check_features)
604
605         self.pipeline = None
606         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
607         self.secret_store = runtimeContext.secret_store
608
609         self.trash_intermediate = runtimeContext.trash_intermediate
610         if self.trash_intermediate and self.work_api != "containers":
611             raise Exception("--trash-intermediate is only supported with --api=containers.")
612
613         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
614         if self.intermediate_output_ttl and self.work_api != "containers":
615             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
616         if self.intermediate_output_ttl < 0:
617             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
618
619         if runtimeContext.submit_request_uuid and self.work_api != "containers":
620             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
621
622         runtimeContext = runtimeContext.copy()
623
624         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
625         if runtimeContext.storage_classes == "default":
626             runtimeContext.storage_classes = default_storage_classes
627         if runtimeContext.intermediate_storage_classes == "default":
628             runtimeContext.intermediate_storage_classes = default_storage_classes
629
630         if not runtimeContext.name:
631             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
632             if git_info.get("http://arvados.org/cwl#gitDescribe"):
633                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
634             runtimeContext.name = self.name
635
636         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
637             # When creating or updating workflow record, by default
638             # always copy dependencies and ensure Docker images are up
639             # to date.
640             runtimeContext.copy_deps = True
641             runtimeContext.match_local_docker = True
642
643         if runtimeContext.update_workflow and self.project_uuid is None:
644             # If we are updating a workflow, make sure anything that
645             # gets uploaded goes into the same parent project, unless
646             # an alternate --project-uuid was provided.
647             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
648             runtimeContext.project_uuid = existing_wf["owner_uuid"]
649
650         self.project_uuid = runtimeContext.project_uuid
651
652         # Upload local file references in the job order.
653         with Perf(metrics, "upload_job_order"):
654             job_order = upload_job_order(self, "%s input" % runtimeContext.name,
655                                          updated_tool, job_order, runtimeContext)
656
657         # the last clause means: if it is a command line tool, and we
658         # are going to wait for the result, and always_submit_runner
659         # is false, then we don't submit a runner process.
660
661         submitting = (runtimeContext.update_workflow or
662                       runtimeContext.create_workflow or
663                       (runtimeContext.submit and not
664                        (updated_tool.tool["class"] == "CommandLineTool" and
665                         runtimeContext.wait and
666                         not runtimeContext.always_submit_runner)))
667
668         loadingContext = self.loadingContext.copy()
669         loadingContext.do_validate = False
670         loadingContext.disable_js_validation = True
671         if submitting and not self.fast_submit:
672             loadingContext.do_update = False
673             # Document may have been auto-updated. Reload the original
674             # document with updating disabled because we want to
675             # submit the document with its original CWL version, not
676             # the auto-updated one.
677             with Perf(metrics, "load_tool original"):
678                 tool = load_tool(updated_tool.tool["id"], loadingContext)
679         else:
680             tool = updated_tool
681
682         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
683         # Also uploads docker images.
684         if not self.fast_submit:
685             logger.info("Uploading workflow dependencies")
686             with Perf(metrics, "upload_workflow_deps"):
687                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
688         else:
689             merged_map = {}
690
691         # Recreate process object (ArvadosWorkflow or
692         # ArvadosCommandTool) because tool document may have been
693         # updated by upload_workflow_deps in ways that modify
694         # inheritance of hints or requirements.
695         loadingContext.loader = tool.doc_loader
696         loadingContext.avsc_names = tool.doc_schema
697         loadingContext.metadata = tool.metadata
698         with Perf(metrics, "load_tool"):
699             tool = load_tool(tool.tool, loadingContext)
700
701         if runtimeContext.update_workflow or runtimeContext.create_workflow:
702             # Create a pipeline template or workflow record and exit.
703             if self.work_api == "containers":
704                 uuid = upload_workflow(self, tool, job_order,
705                                        runtimeContext.project_uuid,
706                                        runtimeContext,
707                                        uuid=runtimeContext.update_workflow,
708                                        submit_runner_ram=runtimeContext.submit_runner_ram,
709                                        name=runtimeContext.name,
710                                        merged_map=merged_map,
711                                        submit_runner_image=runtimeContext.submit_runner_image,
712                                        git_info=git_info)
713                 self.stdout.write(uuid + "\n")
714                 return (None, "success")
715
716         self.apply_reqs(job_order, tool)
717
718         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
719         self.eval_timeout = runtimeContext.eval_timeout
720
721         runtimeContext.use_container = True
722         runtimeContext.tmpdir_prefix = "tmp"
723         runtimeContext.work_api = self.work_api
724
725         if not self.output_name:
726              self.output_name = "Output from workflow %s" % runtimeContext.name
727
728         self.output_name  = cleanup_name_for_collection(self.output_name)
729
730         if self.work_api == "containers":
731             if self.ignore_docker_for_reuse:
732                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
733             runtimeContext.outdir = "/var/spool/cwl"
734             runtimeContext.docker_outdir = "/var/spool/cwl"
735             runtimeContext.tmpdir = "/tmp"
736             runtimeContext.docker_tmpdir = "/tmp"
737
738         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
739             raise Exception("--priority must be in the range 1..1000.")
740
741         if self.should_estimate_cache_size:
742             visited = set()
743             estimated_size = [0]
744             def estimate_collection_cache(obj):
745                 if obj.get("location", "").startswith("keep:"):
746                     m = pdh_size.match(obj["location"][5:])
747                     if m and m.group(1) not in visited:
748                         visited.add(m.group(1))
749                         estimated_size[0] += int(m.group(2))
750             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
751             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
752             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
753
754         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
755
756         runnerjob = None
757         if runtimeContext.submit:
758             # Submit a runner job to run the workflow for us.
759             if self.work_api == "containers":
760                 if submitting:
761                     tool = RunnerContainer(self, updated_tool,
762                                            tool, loadingContext, runtimeContext.enable_reuse,
763                                            self.output_name,
764                                            self.output_tags,
765                                            submit_runner_ram=runtimeContext.submit_runner_ram,
766                                            name=runtimeContext.name,
767                                            on_error=runtimeContext.on_error,
768                                            submit_runner_image=runtimeContext.submit_runner_image,
769                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
770                                            merged_map=merged_map,
771                                            priority=runtimeContext.priority,
772                                            secret_store=self.secret_store,
773                                            collection_cache_size=runtimeContext.collection_cache_size,
774                                            collection_cache_is_default=self.should_estimate_cache_size,
775                                            git_info=git_info)
776                 else:
777                     runtimeContext.runnerjob = tool.tool["id"]
778
779         if runtimeContext.cwl_runner_job is not None:
780             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
781
782         jobiter = tool.job(job_order,
783                            self.output_callback,
784                            runtimeContext)
785
786         if runtimeContext.submit and not runtimeContext.wait:
787             runnerjob = next(jobiter)
788             runnerjob.run(runtimeContext)
789             self.stdout.write(runnerjob.uuid+"\n")
790             return (None, "success")
791
792         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
793         if current_container:
794             logger.info("Running inside container %s", current_container.get("uuid"))
795             self.set_container_request_properties(current_container, git_info)
796
797         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
798         self.polling_thread = threading.Thread(target=self.poll_states)
799         self.polling_thread.start()
800
801         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
802
803         try:
804             self.workflow_eval_lock.acquire()
805
806             # Holds the lock while this code runs and releases it when
807             # it is safe to do so in self.workflow_eval_lock.wait(),
808             # at which point on_message can update job state and
809             # process output callbacks.
810
811             loopperf = Perf(metrics, "jobiter")
812             loopperf.__enter__()
813             for runnable in jobiter:
814                 loopperf.__exit__()
815
816                 if self.stop_polling.is_set():
817                     break
818
819                 if self.task_queue.error is not None:
820                     raise self.task_queue.error
821
822                 if runnable:
823                     with Perf(metrics, "run"):
824                         self.start_run(runnable, runtimeContext)
825                 else:
826                     if (self.task_queue.in_flight + len(self.processes)) > 0:
827                         self.workflow_eval_lock.wait(3)
828                     else:
829                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
830                         break
831
832                 if self.stop_polling.is_set():
833                     break
834
835                 loopperf.__enter__()
836             loopperf.__exit__()
837
838             while (self.task_queue.in_flight + len(self.processes)) > 0:
839                 if self.task_queue.error is not None:
840                     raise self.task_queue.error
841                 self.workflow_eval_lock.wait(3)
842
843         except UnsupportedRequirement:
844             raise
845         except:
846             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
847                 logger.error("Interrupted, workflow will be cancelled")
848             elif isinstance(sys.exc_info()[1], WorkflowException):
849                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
850             else:
851                 logger.exception("Workflow execution failed")
852
853             if self.pipeline:
854                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
855                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
856
857             if self.work_api == "containers" and not current_container:
858                 # Not running in a crunch container, so cancel any outstanding processes.
859                 for p in self.processes:
860                     try:
861                         self.api.container_requests().update(uuid=p,
862                                                              body={"priority": "0"}
863                         ).execute(num_retries=self.num_retries)
864                     except Exception:
865                         pass
866         finally:
867             self.workflow_eval_lock.release()
868             self.task_queue.drain()
869             self.stop_polling.set()
870             self.polling_thread.join()
871             self.task_queue.join()
872
873         if self.final_status == "UnsupportedRequirement":
874             raise UnsupportedRequirement("Check log for details.")
875
876         if self.final_output is None:
877             raise WorkflowException("Workflow did not return a result.")
878
879         if runtimeContext.submit and isinstance(tool, Runner):
880             logger.info("Final output collection %s", tool.final_output)
881             if workbench2 or workbench1:
882                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
883         else:
884             if self.output_tags is None:
885                 self.output_tags = ""
886
887             storage_classes = ""
888             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
889             if storage_class_req and storage_class_req.get("finalStorageClass"):
890                 storage_classes = aslist(storage_class_req["finalStorageClass"])
891             else:
892                 storage_classes = runtimeContext.storage_classes.strip().split(",")
893
894             output_properties = {}
895             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
896             if output_properties_req:
897                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
898                 for pr in output_properties_req["outputProperties"]:
899                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
900
901             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
902                                                                                           self.output_tags, output_properties,
903                                                                                           self.final_output)
904             self.set_crunch_output()
905
906         if runtimeContext.compute_checksum:
907             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
908             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
909
910         if self.trash_intermediate and self.final_status == "success":
911             self.trash_intermediate_output()
912
913         return (self.final_output, self.final_status)