9964: Update schema revision to 20240502 for output_glob feature
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import subprocess
15 import time
16 import urllib
17
18 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 from schema_salad.sourceline import SourceLine, cmap
21 import schema_salad.validate as validate
22 from schema_salad.ref_resolver import file_uri, uri_file_path
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 import arvados_cwl.util
30 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
31 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map, print_keep_deps
32 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
35 from .perf import Perf
36 from .pathmapper import NoFollowPathMapper
37 from cwltool.task_queue import TaskQueue
38 from .context import ArvLoadingContext, ArvRuntimeContext
39 from ._version import __version__
40
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
43 from cwltool.command_line_tool import compute_checksums
44 from cwltool.load_tool import load_tool
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48
49 DEFAULT_PRIORITY = 500
50
51 class RuntimeStatusLoggingHandler(logging.Handler):
52     """
53     Intercepts logging calls and report them as runtime statuses on runner
54     containers.
55     """
56     def __init__(self, runtime_status_update_func):
57         super(RuntimeStatusLoggingHandler, self).__init__()
58         self.runtime_status_update = runtime_status_update_func
59         self.updatingRuntimeStatus = False
60
61     def emit(self, record):
62         kind = None
63         if record.levelno >= logging.ERROR:
64             kind = 'error'
65         elif record.levelno >= logging.WARNING:
66             kind = 'warning'
67         if kind == 'warning' and record.name in ("salad", "crunchstat_summary"):
68             # Don't send validation warnings to runtime status,
69             # they're noisy and unhelpful.
70             return
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using containers API),
95     wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4,
104                  stdout=sys.stdout):
105
106         if arvargs is None:
107             arvargs = argparse.Namespace()
108             arvargs.work_api = None
109             arvargs.output_name = None
110             arvargs.output_tags = None
111             arvargs.thread_count = 1
112             arvargs.collection_cache_size = None
113             arvargs.git_info = True
114             arvargs.submit = False
115             arvargs.defer_downloads = False
116
117         self.api = api_client
118         self.processes = {}
119         self.workflow_eval_lock = threading.Condition(threading.RLock())
120         self.final_output = None
121         self.final_status = None
122         self.num_retries = num_retries
123         self.uuid = None
124         self.stop_polling = threading.Event()
125         self.poll_api = None
126         self.pipeline = None
127         self.final_output_collection = None
128         self.output_name = arvargs.output_name
129         self.output_tags = arvargs.output_tags
130         self.project_uuid = None
131         self.intermediate_output_ttl = 0
132         self.intermediate_output_collections = []
133         self.trash_intermediate = False
134         self.thread_count = arvargs.thread_count
135         self.poll_interval = 12
136         self.loadingContext = None
137         self.should_estimate_cache_size = True
138         self.fs_access = None
139         self.secret_store = None
140         self.stdout = stdout
141         self.fast_submit = False
142         self.git_info = arvargs.git_info
143         self.debug = False
144
145         if keep_client is not None:
146             self.keep_client = keep_client
147         else:
148             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
149
150         if arvargs.collection_cache_size:
151             collection_cache_size = arvargs.collection_cache_size*1024*1024
152             self.should_estimate_cache_size = False
153         else:
154             collection_cache_size = 256*1024*1024
155
156         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
157                                                 cap=collection_cache_size)
158
159         self.fetcher_constructor = partial(CollectionFetcher,
160                                            api_client=self.api,
161                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
162                                            num_retries=self.num_retries)
163
164         self.work_api = None
165         expected_api = ["containers"]
166         for api in expected_api:
167             try:
168                 methods = self.api._rootDesc.get('resources')[api]['methods']
169                 if ('httpMethod' in methods['create'] and
170                     (arvargs.work_api == api or arvargs.work_api is None)):
171                     self.work_api = api
172                     break
173             except KeyError:
174                 pass
175
176         if not self.work_api:
177             if arvargs.work_api is None:
178                 raise Exception("No supported APIs")
179             else:
180                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
181
182         if self.work_api == "jobs":
183             logger.error("""
184 *******************************
185 The 'jobs' API is no longer supported.
186 *******************************""")
187             exit(1)
188
189         self.loadingContext = ArvLoadingContext(vars(arvargs))
190         self.loadingContext.fetcher_constructor = self.fetcher_constructor
191         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
192         self.loadingContext.construct_tool_object = self.arv_make_tool
193
194         # Add a custom logging handler to the root logger for runtime status reporting
195         # if running inside a container
196         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
197             root_logger = logging.getLogger('')
198
199             # Remove existing RuntimeStatusLoggingHandlers if they exist
200             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
201             root_logger.handlers = handlers
202
203             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
204             root_logger.addHandler(handler)
205
206         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
207         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
208                                                      collection_cache=self.collection_cache)
209
210         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
211
212         validate_cluster_target(self, self.toplevel_runtimeContext)
213
214
215     def arv_make_tool(self, toolpath_object, loadingContext):
216         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
217             return ArvadosCommandTool(self, toolpath_object, loadingContext)
218         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
219             return ArvadosWorkflow(self, toolpath_object, loadingContext)
220         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
221             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
222         else:
223             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
224
225     def output_callback(self, out, processStatus):
226         with self.workflow_eval_lock:
227             if processStatus == "success":
228                 logger.info("Overall process status is %s", processStatus)
229                 state = "Complete"
230             else:
231                 logger.error("Overall process status is %s", processStatus)
232                 state = "Failed"
233             if self.pipeline:
234                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
235                                                         body={"state": state}).execute(num_retries=self.num_retries)
236             self.final_status = processStatus
237             self.final_output = out
238             self.workflow_eval_lock.notifyAll()
239
240
241     def start_run(self, runnable, runtimeContext):
242         self.task_queue.add(partial(runnable.run, runtimeContext),
243                             self.workflow_eval_lock, self.stop_polling)
244
245     def process_submitted(self, container):
246         with self.workflow_eval_lock:
247             self.processes[container.uuid] = container
248
249     def process_done(self, uuid, record):
250         with self.workflow_eval_lock:
251             j = self.processes[uuid]
252             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
253             self.task_queue.add(partial(j.done, record),
254                                 self.workflow_eval_lock, self.stop_polling)
255             del self.processes[uuid]
256
257     def runtime_status_update(self, kind, message, detail=None):
258         """
259         Updates the runtime_status field on the runner container.
260         Called when there's a need to report errors, warnings or just
261         activity statuses, for example in the RuntimeStatusLoggingHandler.
262         """
263
264         if kind not in ('error', 'warning', 'activity'):
265             # Ignore any other status kind
266             return
267
268         with self.workflow_eval_lock:
269             current = None
270             try:
271                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
272             except Exception as e:
273                 logger.info("Couldn't get current container: %s", e)
274             if current is None:
275                 return
276             runtime_status = current.get('runtime_status', {})
277
278             original_updatemessage = updatemessage = runtime_status.get(kind, "")
279             if kind == "activity" or not updatemessage:
280                 updatemessage = message
281
282             # Subsequent messages tacked on in detail
283             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
284             maxlines = 40
285             if updatedetail.count("\n") < maxlines:
286                 if updatedetail:
287                     updatedetail += "\n"
288                 updatedetail += message + "\n"
289
290                 if detail:
291                     updatedetail += detail + "\n"
292
293                 if updatedetail.count("\n") >= maxlines:
294                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
295
296             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
297                 # don't waste time doing an update if nothing changed
298                 # (usually because we exceeded the max lines)
299                 return
300
301             runtime_status.update({
302                 kind: updatemessage,
303                 kind+'Detail': updatedetail,
304             })
305
306             try:
307                 self.api.containers().update(uuid=current['uuid'],
308                                             body={
309                                                 'runtime_status': runtime_status,
310                                             }).execute(num_retries=self.num_retries)
311             except Exception as e:
312                 logger.info("Couldn't update runtime_status: %s", e)
313
314     def wrapped_callback(self, cb, obj, st):
315         with self.workflow_eval_lock:
316             cb(obj, st)
317             self.workflow_eval_lock.notifyAll()
318
319     def get_wrapped_callback(self, cb):
320         return partial(self.wrapped_callback, cb)
321
322     def on_message(self, event):
323         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
324             uuid = event["object_uuid"]
325             if event["properties"]["new_attributes"]["state"] == "Running":
326                 with self.workflow_eval_lock:
327                     j = self.processes[uuid]
328                     if j.running is False:
329                         j.running = True
330                         j.update_pipeline_component(event["properties"]["new_attributes"])
331                         logger.info("%s %s is Running", self.label(j), uuid)
332             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
333                 self.process_done(uuid, event["properties"]["new_attributes"])
334
335     def label(self, obj):
336         return "[%s %s]" % (self.work_api[0:-1], obj.name)
337
338     def poll_states(self):
339         """Poll status of containers listed in the processes dict.
340
341         Runs in a separate thread.
342         """
343
344         try:
345             remain_wait = self.poll_interval
346             while True:
347                 if remain_wait > 0:
348                     self.stop_polling.wait(remain_wait)
349                 if self.stop_polling.is_set():
350                     break
351                 with self.workflow_eval_lock:
352                     keys = list(self.processes)
353                 if not keys:
354                     remain_wait = self.poll_interval
355                     continue
356
357                 begin_poll = time.time()
358                 if self.work_api == "containers":
359                     table = self.poll_api.container_requests()
360
361                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
362
363                 while keys:
364                     page = keys[:pageSize]
365                     try:
366                         proc_states = table.list(filters=[["uuid", "in", page]], select=["uuid", "container_uuid", "state", "log_uuid",
367                                                                                          "output_uuid", "modified_at", "properties",
368                                                                                          "runtime_constraints"]).execute(num_retries=self.num_retries)
369                     except Exception as e:
370                         logger.warning("Temporary error checking states on API server: %s", e)
371                         remain_wait = self.poll_interval
372                         continue
373
374                     for p in proc_states["items"]:
375                         self.on_message({
376                             "object_uuid": p["uuid"],
377                             "event_type": "update",
378                             "properties": {
379                                 "new_attributes": p
380                             }
381                         })
382                     keys = keys[pageSize:]
383
384                 finish_poll = time.time()
385                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
386         except:
387             logger.exception("Fatal error in state polling thread.")
388             with self.workflow_eval_lock:
389                 self.processes.clear()
390                 self.workflow_eval_lock.notifyAll()
391         finally:
392             self.stop_polling.set()
393
394     def add_intermediate_output(self, uuid):
395         if uuid:
396             self.intermediate_output_collections.append(uuid)
397
398     def trash_intermediate_output(self):
399         logger.info("Cleaning up intermediate output collections")
400         for i in self.intermediate_output_collections:
401             try:
402                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
403             except Exception:
404                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
405             except (KeyboardInterrupt, SystemExit):
406                 break
407
408     def check_features(self, obj, parentfield=""):
409         if isinstance(obj, dict):
410             if obj.get("class") == "DockerRequirement":
411                 if obj.get("dockerOutputDirectory"):
412                     if not obj.get("dockerOutputDirectory").startswith('/'):
413                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
414                             "Option 'dockerOutputDirectory' must be an absolute path.")
415             if obj.get("class") == "InplaceUpdateRequirement":
416                 if obj["inplaceUpdate"] and parentfield == "requirements":
417                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
418             for k,v in obj.items():
419                 self.check_features(v, parentfield=k)
420         elif isinstance(obj, list):
421             for i,v in enumerate(obj):
422                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
423                     self.check_features(v, parentfield=parentfield)
424
425     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
426         outputObj = copy.deepcopy(outputObj)
427
428         files = []
429         def capture(fileobj):
430             files.append(fileobj)
431
432         adjustDirObjs(outputObj, capture)
433         adjustFileObjs(outputObj, capture)
434
435         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
436
437         final = arvados.collection.Collection(api_client=self.api,
438                                               keep_client=self.keep_client,
439                                               num_retries=self.num_retries)
440
441         for k,v in generatemapper.items():
442             if v.type == "Directory" and v.resolved.startswith("_:"):
443                     continue
444             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
445                 with final.open(v.target, "wb") as f:
446                     f.write(v.resolved.encode("utf-8"))
447                     continue
448
449             if not v.resolved.startswith("keep:"):
450                 raise Exception("Output source is not in keep or a literal")
451             sp = v.resolved.split("/")
452             srccollection = sp[0][5:]
453             try:
454                 reader = self.collection_cache.get(srccollection)
455                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
456                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
457             except arvados.errors.ArgumentError as e:
458                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
459                 raise
460             except IOError as e:
461                 logger.error("While preparing output collection: %s", e)
462                 raise
463
464         def rewrite(fileobj):
465             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
466             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
467                 if k in fileobj:
468                     del fileobj[k]
469
470         adjustDirObjs(outputObj, rewrite)
471         adjustFileObjs(outputObj, rewrite)
472
473         with final.open("cwl.output.json", "w") as f:
474             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
475             f.write(res)
476
477
478         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
479                        ensure_unique_name=True, properties=output_properties)
480
481         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
482                     final.api_response()["name"],
483                     final.manifest_locator())
484
485         final_uuid = final.manifest_locator()
486         tags = tagsString.split(',')
487         for tag in tags:
488              self.api.links().create(body={
489                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
490                 }).execute(num_retries=self.num_retries)
491
492         def finalcollection(fileobj):
493             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
494
495         adjustDirObjs(outputObj, finalcollection)
496         adjustFileObjs(outputObj, finalcollection)
497
498         return (outputObj, final)
499
500     def set_crunch_output(self):
501         if self.work_api == "containers":
502             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
503             if current is None:
504                 return
505             try:
506                 self.api.containers().update(uuid=current['uuid'],
507                                              body={
508                                                  'output': self.final_output_collection.portable_data_hash(),
509                                                  'output_properties': self.final_output_collection.get_properties(),
510                                              }).execute(num_retries=self.num_retries)
511                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
512                                               body={
513                                                   'is_trashed': True
514                                               }).execute(num_retries=self.num_retries)
515             except Exception:
516                 logger.exception("Setting container output")
517                 raise
518
519     def apply_reqs(self, job_order_object, tool):
520         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
521             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
522                 raise WorkflowException(
523                     "`cwl:requirements` in the input object is not part of CWL "
524                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
525                     "can set the cwlVersion to v1.1 or greater and re-run with "
526                     "--enable-dev.")
527             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
528             for req in job_reqs:
529                 tool.requirements.append(req)
530
531     @staticmethod
532     def get_git_info(tool):
533         in_a_git_repo = False
534         cwd = None
535         filepath = None
536
537         if tool.tool["id"].startswith("file://"):
538             # check if git is installed
539             try:
540                 filepath = uri_file_path(tool.tool["id"])
541                 cwd = os.path.dirname(filepath)
542                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
543                 in_a_git_repo = True
544             except Exception as e:
545                 pass
546
547         gitproperties = {}
548
549         if in_a_git_repo:
550             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
551             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
552             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
553             git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
554             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
555             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
556             git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
557             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
558             git_path = filepath[len(git_toplevel):]
559
560             gitproperties = {
561                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
562                 "http://arvados.org/cwl#gitDate": git_date.strip(),
563                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
564                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
565                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
566                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
567                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
568                 "http://arvados.org/cwl#gitPath": git_path.strip(),
569             }
570         else:
571             for g in ("http://arvados.org/cwl#gitCommit",
572                       "http://arvados.org/cwl#gitDate",
573                       "http://arvados.org/cwl#gitCommitter",
574                       "http://arvados.org/cwl#gitBranch",
575                       "http://arvados.org/cwl#gitOrigin",
576                       "http://arvados.org/cwl#gitStatus",
577                       "http://arvados.org/cwl#gitDescribe",
578                       "http://arvados.org/cwl#gitPath"):
579                 if g in tool.metadata:
580                     gitproperties[g] = tool.metadata[g]
581
582         return gitproperties
583
584     def set_container_request_properties(self, container, properties):
585         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
586         for cr in resp["items"]:
587             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
588             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
589
590     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
591         self.debug = runtimeContext.debug
592
593         self.runtime_status_update("activity", "initialization")
594
595         git_info = self.get_git_info(updated_tool) if self.git_info else {}
596         if git_info:
597             logger.info("Git provenance")
598             for g in git_info:
599                 if git_info[g]:
600                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
601
602         runtimeContext.git_info = git_info
603
604         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
605         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
606         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
607         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
608
609         if not self.fast_submit:
610             updated_tool.visit(self.check_features)
611
612         self.pipeline = None
613         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
614         self.secret_store = runtimeContext.secret_store
615
616         self.trash_intermediate = runtimeContext.trash_intermediate
617         if self.trash_intermediate and self.work_api != "containers":
618             raise Exception("--trash-intermediate is only supported with --api=containers.")
619
620         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
621         if self.intermediate_output_ttl and self.work_api != "containers":
622             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
623         if self.intermediate_output_ttl < 0:
624             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
625
626         if runtimeContext.submit_request_uuid and self.work_api != "containers":
627             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
628
629         runtimeContext = runtimeContext.copy()
630
631         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
632         if runtimeContext.storage_classes == "default":
633             runtimeContext.storage_classes = default_storage_classes
634         if runtimeContext.intermediate_storage_classes == "default":
635             runtimeContext.intermediate_storage_classes = default_storage_classes
636
637         if not runtimeContext.name:
638             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
639             if git_info.get("http://arvados.org/cwl#gitDescribe"):
640                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
641             runtimeContext.name = self.name
642
643         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
644             # When creating or updating workflow record, by default
645             # always copy dependencies and ensure Docker images are up
646             # to date.
647             runtimeContext.copy_deps = True
648             runtimeContext.match_local_docker = True
649
650         if runtimeContext.print_keep_deps:
651             runtimeContext.copy_deps = False
652             runtimeContext.match_local_docker = False
653
654         if runtimeContext.update_workflow and self.project_uuid is None:
655             # If we are updating a workflow, make sure anything that
656             # gets uploaded goes into the same parent project, unless
657             # an alternate --project-uuid was provided.
658             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
659             runtimeContext.project_uuid = existing_wf["owner_uuid"]
660
661         self.project_uuid = runtimeContext.project_uuid
662
663         self.runtime_status_update("activity", "data transfer")
664
665         # Upload local file references in the job order.
666         with Perf(metrics, "upload_job_order"):
667             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
668                                          updated_tool, job_order, runtimeContext)
669
670         # determine if we are submitting or directly executing the workflow.
671         #
672         # the last clause means: if it is a command line tool, and we
673         # are going to wait for the result, and always_submit_runner
674         # is false, then we don't submit a runner process.
675
676         submitting = (runtimeContext.submit and not
677                        (updated_tool.tool["class"] == "CommandLineTool" and
678                         runtimeContext.wait and
679                         not runtimeContext.always_submit_runner))
680
681         loadingContext = self.loadingContext.copy()
682         loadingContext.do_validate = False
683         loadingContext.disable_js_validation = True
684         tool = updated_tool
685
686         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
687         # Also uploads docker images.
688         if not self.fast_submit:
689             logger.info("Uploading workflow dependencies")
690             with Perf(metrics, "upload_workflow_deps"):
691                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
692         else:
693             # in the fast submit case, we are running a workflow that
694             # has already been uploaded to Arvados, so we assume all
695             # the dependencies have been pinned to keep references and
696             # there is nothing to do.
697             merged_map = {}
698
699         loadingContext.loader = tool.doc_loader
700         loadingContext.avsc_names = tool.doc_schema
701         loadingContext.metadata = tool.metadata
702         loadingContext.skip_resolve_all = True
703
704         workflow_wrapper = None
705         if (submitting and not self.fast_submit) or runtimeContext.update_workflow or runtimeContext.create_workflow or runtimeContext.print_keep_deps:
706             # upload workflow and get back the workflow wrapper
707
708             workflow_wrapper = upload_workflow(self, tool, job_order,
709                                                runtimeContext.project_uuid,
710                                                runtimeContext,
711                                                uuid=runtimeContext.update_workflow,
712                                                submit_runner_ram=runtimeContext.submit_runner_ram,
713                                                name=runtimeContext.name,
714                                                merged_map=merged_map,
715                                                submit_runner_image=runtimeContext.submit_runner_image,
716                                                git_info=git_info,
717                                                set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow),
718                                                jobmapper=jobmapper)
719
720             if runtimeContext.update_workflow or runtimeContext.create_workflow:
721                 # We're registering the workflow, so create or update
722                 # the workflow record and then exit.
723                 uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
724                                             runtimeContext.project_uuid, runtimeContext.update_workflow)
725                 self.stdout.write(uuid + "\n")
726                 return (None, "success")
727
728             if runtimeContext.print_keep_deps:
729                 # Just find and print out all the collection dependencies and exit
730                 print_keep_deps(self, runtimeContext, merged_map, tool)
731                 return (None, "success")
732
733             # Did not register a workflow, we're going to submit
734             # it instead.
735             loadingContext.loader.idx.clear()
736             loadingContext.loader.idx["_:main"] = workflow_wrapper
737             workflow_wrapper["id"] = "_:main"
738
739             # Reload the minimal wrapper workflow.
740             self.fast_submit = True
741             tool = load_tool(workflow_wrapper, loadingContext)
742             loadingContext.loader.idx["_:main"] = workflow_wrapper
743
744         if not submitting:
745             # If we are going to run the workflow now (rather than
746             # submit it), we need to update the workflow document
747             # replacing file references with keep references.  If we
748             # are just going to construct a run submission, we don't
749             # need to do this.
750             update_from_merged_map(tool, merged_map)
751
752         self.apply_reqs(job_order, tool)
753
754         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
755         self.eval_timeout = runtimeContext.eval_timeout
756
757         runtimeContext.use_container = True
758         runtimeContext.tmpdir_prefix = "tmp"
759         runtimeContext.work_api = self.work_api
760
761         if not self.output_name:
762              self.output_name = "Output from workflow %s" % runtimeContext.name
763
764         self.output_name  = cleanup_name_for_collection(self.output_name)
765
766         if self.work_api == "containers":
767             if self.ignore_docker_for_reuse:
768                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
769             runtimeContext.outdir = "/var/spool/cwl"
770             runtimeContext.docker_outdir = "/var/spool/cwl"
771             runtimeContext.tmpdir = "/tmp"
772             runtimeContext.docker_tmpdir = "/tmp"
773
774         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
775             raise Exception("--priority must be in the range 1..1000.")
776
777         if self.should_estimate_cache_size:
778             visited = set()
779             estimated_size = [0]
780             def estimate_collection_cache(obj):
781                 if obj.get("location", "").startswith("keep:"):
782                     m = pdh_size.match(obj["location"][5:])
783                     if m and m.group(1) not in visited:
784                         visited.add(m.group(1))
785                         estimated_size[0] += int(m.group(2))
786             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
787             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
788             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
789
790         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
791
792         runnerjob = None
793         if runtimeContext.submit:
794             # We are submitting instead of running immediately.
795             #
796             # Create a "Runner job" that when run() is invoked,
797             # creates the container request to run the workflow.
798             if self.work_api == "containers":
799                 if submitting:
800                     loadingContext.metadata = updated_tool.metadata.copy()
801                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
802                                            self.output_name,
803                                            self.output_tags,
804                                            submit_runner_ram=runtimeContext.submit_runner_ram,
805                                            name=runtimeContext.name,
806                                            on_error=runtimeContext.on_error,
807                                            submit_runner_image=runtimeContext.submit_runner_image,
808                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
809                                            merged_map=merged_map,
810                                            priority=runtimeContext.priority,
811                                            secret_store=self.secret_store,
812                                            collection_cache_size=runtimeContext.collection_cache_size,
813                                            collection_cache_is_default=self.should_estimate_cache_size,
814                                            git_info=git_info)
815                 else:
816                     runtimeContext.runnerjob = tool.tool["id"]
817
818         if runtimeContext.cwl_runner_job is not None:
819             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
820
821         jobiter = tool.job(job_order,
822                            self.output_callback,
823                            runtimeContext)
824
825         if runtimeContext.submit and not runtimeContext.wait:
826             # User provided --no-wait so submit the container request,
827             # get the container request uuid, print it out, and exit.
828             runnerjob = next(jobiter)
829             runnerjob.run(runtimeContext)
830             self.stdout.write(runnerjob.uuid+"\n")
831             return (None, "success")
832
833         # We either running the workflow directly, or submitting it
834         # and will wait for a final result.
835
836         self.runtime_status_update("activity", "workflow execution")
837
838         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
839         if current_container:
840             logger.info("Running inside container %s", current_container.get("uuid"))
841             self.set_container_request_properties(current_container, git_info)
842
843         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
844         self.polling_thread = threading.Thread(target=self.poll_states)
845         self.polling_thread.start()
846
847         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
848
849         try:
850             self.workflow_eval_lock.acquire()
851
852             # Holds the lock while this code runs and releases it when
853             # it is safe to do so in self.workflow_eval_lock.wait(),
854             # at which point on_message can update job state and
855             # process output callbacks.
856
857             loopperf = Perf(metrics, "jobiter")
858             loopperf.__enter__()
859             for runnable in jobiter:
860                 loopperf.__exit__()
861
862                 if self.stop_polling.is_set():
863                     break
864
865                 if self.task_queue.error is not None:
866                     raise self.task_queue.error
867
868                 if runnable:
869                     with Perf(metrics, "run"):
870                         self.start_run(runnable, runtimeContext)
871                 else:
872                     if (self.task_queue.in_flight + len(self.processes)) > 0:
873                         self.workflow_eval_lock.wait(3)
874                     else:
875                         if self.final_status is None:
876                             logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
877                         break
878
879                 if self.stop_polling.is_set():
880                     break
881
882                 loopperf.__enter__()
883             loopperf.__exit__()
884
885             while (self.task_queue.in_flight + len(self.processes)) > 0:
886                 if self.task_queue.error is not None:
887                     raise self.task_queue.error
888                 self.workflow_eval_lock.wait(3)
889
890         except UnsupportedRequirement:
891             raise
892         except:
893             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
894                 logger.error("Interrupted, workflow will be cancelled")
895             elif isinstance(sys.exc_info()[1], WorkflowException):
896                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
897             else:
898                 logger.exception("Workflow execution failed")
899
900             if self.pipeline:
901                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
902                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
903
904             if self.work_api == "containers" and not current_container:
905                 # Not running in a crunch container, so cancel any outstanding processes.
906                 for p in self.processes:
907                     try:
908                         self.api.container_requests().update(uuid=p,
909                                                              body={"priority": "0"}
910                         ).execute(num_retries=self.num_retries)
911                     except Exception:
912                         pass
913         finally:
914             self.workflow_eval_lock.release()
915             self.task_queue.drain()
916             self.stop_polling.set()
917             self.polling_thread.join()
918             self.task_queue.join()
919
920         if self.final_status == "UnsupportedRequirement":
921             raise UnsupportedRequirement("Check log for details.")
922
923         if self.final_output is None:
924             raise WorkflowException("Workflow did not return a result.")
925
926         if runtimeContext.usage_report_notes:
927             logger.info("Steps with low resource utilization (possible optimization opportunities):")
928             for x in runtimeContext.usage_report_notes:
929                 logger.info("  %s", x)
930
931         if runtimeContext.submit and isinstance(tool, Runner):
932             logger.info("Final output collection %s", tool.final_output)
933             if workbench2 or workbench1:
934                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
935         else:
936             if self.output_tags is None:
937                 self.output_tags = ""
938
939             storage_classes = ""
940             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
941             if storage_class_req and storage_class_req.get("finalStorageClass"):
942                 storage_classes = aslist(storage_class_req["finalStorageClass"])
943             else:
944                 storage_classes = runtimeContext.storage_classes.strip().split(",")
945
946             output_properties = {}
947             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
948             if output_properties_req:
949                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
950                 for pr in output_properties_req["outputProperties"]:
951                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
952
953             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
954                                                                                           self.output_tags, output_properties,
955                                                                                           self.final_output)
956             self.set_crunch_output()
957
958         if runtimeContext.compute_checksum:
959             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
960             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
961
962         if self.trash_intermediate and self.final_status == "success":
963             self.trash_intermediate_output()
964
965         return (self.final_output, self.final_status)