19688: Update tests
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import subprocess
21 import time
22 import urllib
23
24 from cwltool.errors import WorkflowException
25 import cwltool.workflow
26 from schema_salad.sourceline import SourceLine
27 import schema_salad.validate as validate
28 from schema_salad.ref_resolver import file_uri, uri_file_path
29
30 import arvados
31 import arvados.config
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
34
35 import arvados_cwl.util
36 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
37 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
38 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from cwltool.task_queue import TaskQueue
44 from .context import ArvLoadingContext, ArvRuntimeContext
45 from ._version import __version__
46
47 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
48 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
49 from cwltool.command_line_tool import compute_checksums
50 from cwltool.load_tool import load_tool
51
52 logger = logging.getLogger('arvados.cwl-runner')
53 metrics = logging.getLogger('arvados.cwl-runner.metrics')
54
55 DEFAULT_PRIORITY = 500
56
57 class RuntimeStatusLoggingHandler(logging.Handler):
58     """
59     Intercepts logging calls and report them as runtime statuses on runner
60     containers.
61     """
62     def __init__(self, runtime_status_update_func):
63         super(RuntimeStatusLoggingHandler, self).__init__()
64         self.runtime_status_update = runtime_status_update_func
65         self.updatingRuntimeStatus = False
66
67     def emit(self, record):
68         kind = None
69         if record.levelno >= logging.ERROR:
70             kind = 'error'
71         elif record.levelno >= logging.WARNING:
72             kind = 'warning'
73         if kind is not None and self.updatingRuntimeStatus is not True:
74             self.updatingRuntimeStatus = True
75             try:
76                 log_msg = record.getMessage()
77                 if '\n' in log_msg:
78                     # If the logged message is multi-line, use its first line as status
79                     # and the rest as detail.
80                     status, detail = log_msg.split('\n', 1)
81                     self.runtime_status_update(
82                         kind,
83                         "%s: %s" % (record.name, status),
84                         detail
85                     )
86                 else:
87                     self.runtime_status_update(
88                         kind,
89                         "%s: %s" % (record.name, record.getMessage())
90                     )
91             finally:
92                 self.updatingRuntimeStatus = False
93
94
95 class ArvCwlExecutor(object):
96     """Execute a CWL tool or workflow, submit work (using containers API),
97     wait for them to complete, and report output.
98
99     """
100
101     def __init__(self, api_client,
102                  arvargs=None,
103                  keep_client=None,
104                  num_retries=4,
105                  thread_count=4,
106                  stdout=sys.stdout):
107
108         if arvargs is None:
109             arvargs = argparse.Namespace()
110             arvargs.work_api = None
111             arvargs.output_name = None
112             arvargs.output_tags = None
113             arvargs.thread_count = 1
114             arvargs.collection_cache_size = None
115             arvargs.git_info = True
116
117         self.api = api_client
118         self.processes = {}
119         self.workflow_eval_lock = threading.Condition(threading.RLock())
120         self.final_output = None
121         self.final_status = None
122         self.num_retries = num_retries
123         self.uuid = None
124         self.stop_polling = threading.Event()
125         self.poll_api = None
126         self.pipeline = None
127         self.final_output_collection = None
128         self.output_name = arvargs.output_name
129         self.output_tags = arvargs.output_tags
130         self.project_uuid = None
131         self.intermediate_output_ttl = 0
132         self.intermediate_output_collections = []
133         self.trash_intermediate = False
134         self.thread_count = arvargs.thread_count
135         self.poll_interval = 12
136         self.loadingContext = None
137         self.should_estimate_cache_size = True
138         self.fs_access = None
139         self.secret_store = None
140         self.stdout = stdout
141         self.fast_submit = False
142         self.git_info = arvargs.git_info
143
144         if keep_client is not None:
145             self.keep_client = keep_client
146         else:
147             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
148
149         if arvargs.collection_cache_size:
150             collection_cache_size = arvargs.collection_cache_size*1024*1024
151             self.should_estimate_cache_size = False
152         else:
153             collection_cache_size = 256*1024*1024
154
155         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
156                                                 cap=collection_cache_size)
157
158         self.fetcher_constructor = partial(CollectionFetcher,
159                                            api_client=self.api,
160                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
161                                            num_retries=self.num_retries)
162
163         self.work_api = None
164         expected_api = ["containers"]
165         for api in expected_api:
166             try:
167                 methods = self.api._rootDesc.get('resources')[api]['methods']
168                 if ('httpMethod' in methods['create'] and
169                     (arvargs.work_api == api or arvargs.work_api is None)):
170                     self.work_api = api
171                     break
172             except KeyError:
173                 pass
174
175         if not self.work_api:
176             if arvargs.work_api is None:
177                 raise Exception("No supported APIs")
178             else:
179                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
180
181         if self.work_api == "jobs":
182             logger.error("""
183 *******************************
184 The 'jobs' API is no longer supported.
185 *******************************""")
186             exit(1)
187
188         self.loadingContext = ArvLoadingContext(vars(arvargs))
189         self.loadingContext.fetcher_constructor = self.fetcher_constructor
190         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
191         self.loadingContext.construct_tool_object = self.arv_make_tool
192
193         # Add a custom logging handler to the root logger for runtime status reporting
194         # if running inside a container
195         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
196             root_logger = logging.getLogger('')
197
198             # Remove existing RuntimeStatusLoggingHandlers if they exist
199             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
200             root_logger.handlers = handlers
201
202             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
203             root_logger.addHandler(handler)
204
205         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
206         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
207                                                      collection_cache=self.collection_cache)
208
209         validate_cluster_target(self, self.toplevel_runtimeContext)
210
211
212     def arv_make_tool(self, toolpath_object, loadingContext):
213         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
214             return ArvadosCommandTool(self, toolpath_object, loadingContext)
215         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
216             return ArvadosWorkflow(self, toolpath_object, loadingContext)
217         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
218             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
219         else:
220             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
221
222     def output_callback(self, out, processStatus):
223         with self.workflow_eval_lock:
224             if processStatus == "success":
225                 logger.info("Overall process status is %s", processStatus)
226                 state = "Complete"
227             else:
228                 logger.error("Overall process status is %s", processStatus)
229                 state = "Failed"
230             if self.pipeline:
231                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
232                                                         body={"state": state}).execute(num_retries=self.num_retries)
233             self.final_status = processStatus
234             self.final_output = out
235             self.workflow_eval_lock.notifyAll()
236
237
238     def start_run(self, runnable, runtimeContext):
239         self.task_queue.add(partial(runnable.run, runtimeContext),
240                             self.workflow_eval_lock, self.stop_polling)
241
242     def process_submitted(self, container):
243         with self.workflow_eval_lock:
244             self.processes[container.uuid] = container
245
246     def process_done(self, uuid, record):
247         with self.workflow_eval_lock:
248             j = self.processes[uuid]
249             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
250             self.task_queue.add(partial(j.done, record),
251                                 self.workflow_eval_lock, self.stop_polling)
252             del self.processes[uuid]
253
254     def runtime_status_update(self, kind, message, detail=None):
255         """
256         Updates the runtime_status field on the runner container.
257         Called when there's a need to report errors, warnings or just
258         activity statuses, for example in the RuntimeStatusLoggingHandler.
259         """
260
261         if kind not in ('error', 'warning'):
262             # Ignore any other status kind
263             return
264
265         with self.workflow_eval_lock:
266             current = None
267             try:
268                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
269             except Exception as e:
270                 logger.info("Couldn't get current container: %s", e)
271             if current is None:
272                 return
273             runtime_status = current.get('runtime_status', {})
274
275             original_updatemessage = updatemessage = runtime_status.get(kind, "")
276             if not updatemessage:
277                 updatemessage = message
278
279             # Subsequent messages tacked on in detail
280             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
281             maxlines = 40
282             if updatedetail.count("\n") < maxlines:
283                 if updatedetail:
284                     updatedetail += "\n"
285                 updatedetail += message + "\n"
286
287                 if detail:
288                     updatedetail += detail + "\n"
289
290                 if updatedetail.count("\n") >= maxlines:
291                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
292
293             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
294                 # don't waste time doing an update if nothing changed
295                 # (usually because we exceeded the max lines)
296                 return
297
298             runtime_status.update({
299                 kind: updatemessage,
300                 kind+'Detail': updatedetail,
301             })
302
303             try:
304                 self.api.containers().update(uuid=current['uuid'],
305                                             body={
306                                                 'runtime_status': runtime_status,
307                                             }).execute(num_retries=self.num_retries)
308             except Exception as e:
309                 logger.info("Couldn't update runtime_status: %s", e)
310
311     def wrapped_callback(self, cb, obj, st):
312         with self.workflow_eval_lock:
313             cb(obj, st)
314             self.workflow_eval_lock.notifyAll()
315
316     def get_wrapped_callback(self, cb):
317         return partial(self.wrapped_callback, cb)
318
319     def on_message(self, event):
320         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
321             uuid = event["object_uuid"]
322             if event["properties"]["new_attributes"]["state"] == "Running":
323                 with self.workflow_eval_lock:
324                     j = self.processes[uuid]
325                     if j.running is False:
326                         j.running = True
327                         j.update_pipeline_component(event["properties"]["new_attributes"])
328                         logger.info("%s %s is Running", self.label(j), uuid)
329             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
330                 self.process_done(uuid, event["properties"]["new_attributes"])
331
332     def label(self, obj):
333         return "[%s %s]" % (self.work_api[0:-1], obj.name)
334
335     def poll_states(self):
336         """Poll status of containers listed in the processes dict.
337
338         Runs in a separate thread.
339         """
340
341         try:
342             remain_wait = self.poll_interval
343             while True:
344                 if remain_wait > 0:
345                     self.stop_polling.wait(remain_wait)
346                 if self.stop_polling.is_set():
347                     break
348                 with self.workflow_eval_lock:
349                     keys = list(self.processes)
350                 if not keys:
351                     remain_wait = self.poll_interval
352                     continue
353
354                 begin_poll = time.time()
355                 if self.work_api == "containers":
356                     table = self.poll_api.container_requests()
357
358                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
359
360                 while keys:
361                     page = keys[:pageSize]
362                     try:
363                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
364                     except Exception:
365                         logger.exception("Error checking states on API server: %s")
366                         remain_wait = self.poll_interval
367                         continue
368
369                     for p in proc_states["items"]:
370                         self.on_message({
371                             "object_uuid": p["uuid"],
372                             "event_type": "update",
373                             "properties": {
374                                 "new_attributes": p
375                             }
376                         })
377                     keys = keys[pageSize:]
378
379                 finish_poll = time.time()
380                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
381         except:
382             logger.exception("Fatal error in state polling thread.")
383             with self.workflow_eval_lock:
384                 self.processes.clear()
385                 self.workflow_eval_lock.notifyAll()
386         finally:
387             self.stop_polling.set()
388
389     def add_intermediate_output(self, uuid):
390         if uuid:
391             self.intermediate_output_collections.append(uuid)
392
393     def trash_intermediate_output(self):
394         logger.info("Cleaning up intermediate output collections")
395         for i in self.intermediate_output_collections:
396             try:
397                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
398             except Exception:
399                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
400             except (KeyboardInterrupt, SystemExit):
401                 break
402
403     def check_features(self, obj, parentfield=""):
404         if isinstance(obj, dict):
405             if obj.get("class") == "DockerRequirement":
406                 if obj.get("dockerOutputDirectory"):
407                     if not obj.get("dockerOutputDirectory").startswith('/'):
408                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
409                             "Option 'dockerOutputDirectory' must be an absolute path.")
410             if obj.get("class") == "InplaceUpdateRequirement":
411                 if obj["inplaceUpdate"] and parentfield == "requirements":
412                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
413             for k,v in viewitems(obj):
414                 self.check_features(v, parentfield=k)
415         elif isinstance(obj, list):
416             for i,v in enumerate(obj):
417                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
418                     self.check_features(v, parentfield=parentfield)
419
420     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
421         outputObj = copy.deepcopy(outputObj)
422
423         files = []
424         def capture(fileobj):
425             files.append(fileobj)
426
427         adjustDirObjs(outputObj, capture)
428         adjustFileObjs(outputObj, capture)
429
430         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
431
432         final = arvados.collection.Collection(api_client=self.api,
433                                               keep_client=self.keep_client,
434                                               num_retries=self.num_retries)
435
436         for k,v in generatemapper.items():
437             if v.type == "Directory" and v.resolved.startswith("_:"):
438                     continue
439             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
440                 with final.open(v.target, "wb") as f:
441                     f.write(v.resolved.encode("utf-8"))
442                     continue
443
444             if not v.resolved.startswith("keep:"):
445                 raise Exception("Output source is not in keep or a literal")
446             sp = v.resolved.split("/")
447             srccollection = sp[0][5:]
448             try:
449                 reader = self.collection_cache.get(srccollection)
450                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
451                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
452             except arvados.errors.ArgumentError as e:
453                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
454                 raise
455             except IOError as e:
456                 logger.error("While preparing output collection: %s", e)
457                 raise
458
459         def rewrite(fileobj):
460             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
461             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
462                 if k in fileobj:
463                     del fileobj[k]
464
465         adjustDirObjs(outputObj, rewrite)
466         adjustFileObjs(outputObj, rewrite)
467
468         with final.open("cwl.output.json", "w") as f:
469             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
470             f.write(res)
471
472
473         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
474                        ensure_unique_name=True, properties=output_properties)
475
476         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
477                     final.api_response()["name"],
478                     final.manifest_locator())
479
480         final_uuid = final.manifest_locator()
481         tags = tagsString.split(',')
482         for tag in tags:
483              self.api.links().create(body={
484                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
485                 }).execute(num_retries=self.num_retries)
486
487         def finalcollection(fileobj):
488             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
489
490         adjustDirObjs(outputObj, finalcollection)
491         adjustFileObjs(outputObj, finalcollection)
492
493         return (outputObj, final)
494
495     def set_crunch_output(self):
496         if self.work_api == "containers":
497             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
498             if current is None:
499                 return
500             try:
501                 self.api.containers().update(uuid=current['uuid'],
502                                              body={
503                                                  'output': self.final_output_collection.portable_data_hash(),
504                                                  'output_properties': self.final_output_collection.get_properties(),
505                                              }).execute(num_retries=self.num_retries)
506                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
507                                               body={
508                                                   'is_trashed': True
509                                               }).execute(num_retries=self.num_retries)
510             except Exception:
511                 logger.exception("Setting container output")
512                 raise
513
514     def apply_reqs(self, job_order_object, tool):
515         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
516             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
517                 raise WorkflowException(
518                     "`cwl:requirements` in the input object is not part of CWL "
519                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
520                     "can set the cwlVersion to v1.1 or greater and re-run with "
521                     "--enable-dev.")
522             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
523             for req in job_reqs:
524                 tool.requirements.append(req)
525
526     @staticmethod
527     def get_git_info(tool):
528         in_a_git_repo = False
529         cwd = None
530         filepath = None
531
532         if tool.tool["id"].startswith("file://"):
533             # check if git is installed
534             try:
535                 filepath = uri_file_path(tool.tool["id"])
536                 cwd = os.path.dirname(filepath)
537                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
538                 in_a_git_repo = True
539             except Exception as e:
540                 pass
541
542         gitproperties = {}
543
544         if in_a_git_repo:
545             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
546             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
547             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
548             git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
549             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
550             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
551             git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
552             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
553             git_path = filepath[len(git_toplevel):]
554
555             gitproperties = {
556                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
557                 "http://arvados.org/cwl#gitDate": git_date.strip(),
558                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
559                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
560                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
561                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
562                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
563                 "http://arvados.org/cwl#gitPath": git_path.strip(),
564             }
565         else:
566             for g in ("http://arvados.org/cwl#gitCommit",
567                       "http://arvados.org/cwl#gitDate",
568                       "http://arvados.org/cwl#gitCommitter",
569                       "http://arvados.org/cwl#gitBranch",
570                       "http://arvados.org/cwl#gitOrigin",
571                       "http://arvados.org/cwl#gitStatus",
572                       "http://arvados.org/cwl#gitDescribe",
573                       "http://arvados.org/cwl#gitPath"):
574                 if g in tool.metadata:
575                     gitproperties[g] = tool.metadata[g]
576
577         return gitproperties
578
579     def set_container_request_properties(self, container, properties):
580         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
581         for cr in resp["items"]:
582             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
583             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
584
585     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
586         self.debug = runtimeContext.debug
587
588         git_info = self.get_git_info(updated_tool) if self.git_info else {}
589         if git_info:
590             logger.info("Git provenance")
591             for g in git_info:
592                 if git_info[g]:
593                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
594
595         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
596         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
597         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
598         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
599
600         if not self.fast_submit:
601             updated_tool.visit(self.check_features)
602
603         self.pipeline = None
604         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
605         self.secret_store = runtimeContext.secret_store
606
607         self.trash_intermediate = runtimeContext.trash_intermediate
608         if self.trash_intermediate and self.work_api != "containers":
609             raise Exception("--trash-intermediate is only supported with --api=containers.")
610
611         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
612         if self.intermediate_output_ttl and self.work_api != "containers":
613             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
614         if self.intermediate_output_ttl < 0:
615             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
616
617         if runtimeContext.submit_request_uuid and self.work_api != "containers":
618             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
619
620         runtimeContext = runtimeContext.copy()
621
622         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
623         if runtimeContext.storage_classes == "default":
624             runtimeContext.storage_classes = default_storage_classes
625         if runtimeContext.intermediate_storage_classes == "default":
626             runtimeContext.intermediate_storage_classes = default_storage_classes
627
628         if not runtimeContext.name:
629             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
630             if git_info.get("http://arvados.org/cwl#gitDescribe"):
631                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
632             runtimeContext.name = self.name
633
634         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
635             # When creating or updating workflow record, by default
636             # always copy dependencies and ensure Docker images are up
637             # to date.
638             runtimeContext.copy_deps = True
639             runtimeContext.match_local_docker = True
640
641         if runtimeContext.update_workflow and self.project_uuid is None:
642             # If we are updating a workflow, make sure anything that
643             # gets uploaded goes into the same parent project, unless
644             # an alternate --project-uuid was provided.
645             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
646             runtimeContext.project_uuid = existing_wf["owner_uuid"]
647
648         self.project_uuid = runtimeContext.project_uuid
649
650         # Upload local file references in the job order.
651         with Perf(metrics, "upload_job_order"):
652             job_order = upload_job_order(self, "%s input" % runtimeContext.name,
653                                          updated_tool, job_order, runtimeContext)
654
655         # the last clause means: if it is a command line tool, and we
656         # are going to wait for the result, and always_submit_runner
657         # is false, then we don't submit a runner process.
658
659         submitting = (runtimeContext.update_workflow or
660                       runtimeContext.create_workflow or
661                       (runtimeContext.submit and not
662                        (updated_tool.tool["class"] == "CommandLineTool" and
663                         runtimeContext.wait and
664                         not runtimeContext.always_submit_runner)))
665
666         loadingContext = self.loadingContext.copy()
667         loadingContext.do_validate = False
668         loadingContext.disable_js_validation = True
669         if submitting and not self.fast_submit:
670             loadingContext.do_update = False
671             # Document may have been auto-updated. Reload the original
672             # document with updating disabled because we want to
673             # submit the document with its original CWL version, not
674             # the auto-updated one.
675             with Perf(metrics, "load_tool original"):
676                 tool = load_tool(updated_tool.tool["id"], loadingContext)
677         else:
678             tool = updated_tool
679
680         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
681         # Also uploads docker images.
682         if not self.fast_submit:
683             logger.info("Uploading workflow dependencies")
684             with Perf(metrics, "upload_workflow_deps"):
685                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
686         else:
687             merged_map = {}
688
689         # Recreate process object (ArvadosWorkflow or
690         # ArvadosCommandTool) because tool document may have been
691         # updated by upload_workflow_deps in ways that modify
692         # inheritance of hints or requirements.
693         loadingContext.loader = tool.doc_loader
694         loadingContext.avsc_names = tool.doc_schema
695         loadingContext.metadata = tool.metadata
696         with Perf(metrics, "load_tool"):
697             tool = load_tool(tool.tool, loadingContext)
698
699         if runtimeContext.update_workflow or runtimeContext.create_workflow:
700             # Create a pipeline template or workflow record and exit.
701             if self.work_api == "containers":
702                 uuid = upload_workflow(self, tool, job_order,
703                                        runtimeContext.project_uuid,
704                                        runtimeContext,
705                                        uuid=runtimeContext.update_workflow,
706                                        submit_runner_ram=runtimeContext.submit_runner_ram,
707                                        name=runtimeContext.name,
708                                        merged_map=merged_map,
709                                        submit_runner_image=runtimeContext.submit_runner_image,
710                                        git_info=git_info)
711                 self.stdout.write(uuid + "\n")
712                 return (None, "success")
713
714         self.apply_reqs(job_order, tool)
715
716         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
717         self.eval_timeout = runtimeContext.eval_timeout
718
719         runtimeContext.use_container = True
720         runtimeContext.tmpdir_prefix = "tmp"
721         runtimeContext.work_api = self.work_api
722
723         if not self.output_name:
724              self.output_name = "Output from workflow %s" % runtimeContext.name
725
726         self.output_name  = cleanup_name_for_collection(self.output_name)
727
728         if self.work_api == "containers":
729             if self.ignore_docker_for_reuse:
730                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
731             runtimeContext.outdir = "/var/spool/cwl"
732             runtimeContext.docker_outdir = "/var/spool/cwl"
733             runtimeContext.tmpdir = "/tmp"
734             runtimeContext.docker_tmpdir = "/tmp"
735
736         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
737             raise Exception("--priority must be in the range 1..1000.")
738
739         if self.should_estimate_cache_size:
740             visited = set()
741             estimated_size = [0]
742             def estimate_collection_cache(obj):
743                 if obj.get("location", "").startswith("keep:"):
744                     m = pdh_size.match(obj["location"][5:])
745                     if m and m.group(1) not in visited:
746                         visited.add(m.group(1))
747                         estimated_size[0] += int(m.group(2))
748             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
749             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
750             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
751
752         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
753
754         runnerjob = None
755         if runtimeContext.submit:
756             # Submit a runner job to run the workflow for us.
757             if self.work_api == "containers":
758                 if submitting:
759                     tool = RunnerContainer(self, updated_tool,
760                                            tool, loadingContext, runtimeContext.enable_reuse,
761                                            self.output_name,
762                                            self.output_tags,
763                                            submit_runner_ram=runtimeContext.submit_runner_ram,
764                                            name=runtimeContext.name,
765                                            on_error=runtimeContext.on_error,
766                                            submit_runner_image=runtimeContext.submit_runner_image,
767                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
768                                            merged_map=merged_map,
769                                            priority=runtimeContext.priority,
770                                            secret_store=self.secret_store,
771                                            collection_cache_size=runtimeContext.collection_cache_size,
772                                            collection_cache_is_default=self.should_estimate_cache_size,
773                                            git_info=git_info)
774                 else:
775                     runtimeContext.runnerjob = tool.tool["id"]
776
777         if runtimeContext.cwl_runner_job is not None:
778             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
779
780         jobiter = tool.job(job_order,
781                            self.output_callback,
782                            runtimeContext)
783
784         if runtimeContext.submit and not runtimeContext.wait:
785             runnerjob = next(jobiter)
786             runnerjob.run(runtimeContext)
787             self.stdout.write(runnerjob.uuid+"\n")
788             return (None, "success")
789
790         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
791         if current_container:
792             logger.info("Running inside container %s", current_container.get("uuid"))
793             self.set_container_request_properties(current_container, git_info)
794
795         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
796         self.polling_thread = threading.Thread(target=self.poll_states)
797         self.polling_thread.start()
798
799         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
800
801         try:
802             self.workflow_eval_lock.acquire()
803
804             # Holds the lock while this code runs and releases it when
805             # it is safe to do so in self.workflow_eval_lock.wait(),
806             # at which point on_message can update job state and
807             # process output callbacks.
808
809             loopperf = Perf(metrics, "jobiter")
810             loopperf.__enter__()
811             for runnable in jobiter:
812                 loopperf.__exit__()
813
814                 if self.stop_polling.is_set():
815                     break
816
817                 if self.task_queue.error is not None:
818                     raise self.task_queue.error
819
820                 if runnable:
821                     with Perf(metrics, "run"):
822                         self.start_run(runnable, runtimeContext)
823                 else:
824                     if (self.task_queue.in_flight + len(self.processes)) > 0:
825                         self.workflow_eval_lock.wait(3)
826                     else:
827                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
828                         break
829
830                 if self.stop_polling.is_set():
831                     break
832
833                 loopperf.__enter__()
834             loopperf.__exit__()
835
836             while (self.task_queue.in_flight + len(self.processes)) > 0:
837                 if self.task_queue.error is not None:
838                     raise self.task_queue.error
839                 self.workflow_eval_lock.wait(3)
840
841         except UnsupportedRequirement:
842             raise
843         except:
844             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
845                 logger.error("Interrupted, workflow will be cancelled")
846             elif isinstance(sys.exc_info()[1], WorkflowException):
847                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
848             else:
849                 logger.exception("Workflow execution failed")
850
851             if self.pipeline:
852                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
853                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
854
855             if self.work_api == "containers" and not current_container:
856                 # Not running in a crunch container, so cancel any outstanding processes.
857                 for p in self.processes:
858                     try:
859                         self.api.container_requests().update(uuid=p,
860                                                              body={"priority": "0"}
861                         ).execute(num_retries=self.num_retries)
862                     except Exception:
863                         pass
864         finally:
865             self.workflow_eval_lock.release()
866             self.task_queue.drain()
867             self.stop_polling.set()
868             self.polling_thread.join()
869             self.task_queue.join()
870
871         if self.final_status == "UnsupportedRequirement":
872             raise UnsupportedRequirement("Check log for details.")
873
874         if self.final_output is None:
875             raise WorkflowException("Workflow did not return a result.")
876
877         if runtimeContext.submit and isinstance(tool, Runner):
878             logger.info("Final output collection %s", tool.final_output)
879             if workbench2 or workbench1:
880                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
881         else:
882             if self.output_tags is None:
883                 self.output_tags = ""
884
885             storage_classes = ""
886             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
887             if storage_class_req and storage_class_req.get("finalStorageClass"):
888                 storage_classes = aslist(storage_class_req["finalStorageClass"])
889             else:
890                 storage_classes = runtimeContext.storage_classes.strip().split(",")
891
892             output_properties = {}
893             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
894             if output_properties_req:
895                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
896                 for pr in output_properties_req["outputProperties"]:
897                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
898
899             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
900                                                                                           self.output_tags, output_properties,
901                                                                                           self.final_output)
902             self.set_crunch_output()
903
904         if runtimeContext.compute_checksum:
905             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
906             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
907
908         if self.trash_intermediate and self.final_status == "success":
909             self.trash_intermediate_output()
910
911         return (self.final_output, self.final_status)