Merge branch '15814-wb2-secrets' refs #15814
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import subprocess
15 import time
16 import urllib
17
18 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 from schema_salad.sourceline import SourceLine, cmap
21 import schema_salad.validate as validate
22 from schema_salad.ref_resolver import file_uri, uri_file_path
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 import arvados_cwl.util
30 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
31 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map, print_keep_deps, ArvSecretStore
32 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
35 from .perf import Perf
36 from .pathmapper import NoFollowPathMapper
37 from cwltool.task_queue import TaskQueue
38 from .context import ArvLoadingContext, ArvRuntimeContext
39 from ._version import __version__
40
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
43 from cwltool.command_line_tool import compute_checksums
44 from cwltool.load_tool import load_tool
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48
49 DEFAULT_PRIORITY = 500
50
51 class RuntimeStatusLoggingHandler(logging.Handler):
52     """
53     Intercepts logging calls and report them as runtime statuses on runner
54     containers.
55     """
56     def __init__(self, runtime_status_update_func):
57         super(RuntimeStatusLoggingHandler, self).__init__()
58         self.runtime_status_update = runtime_status_update_func
59         self.updatingRuntimeStatus = False
60
61     def emit(self, record):
62         kind = None
63         if record.levelno >= logging.ERROR:
64             kind = 'error'
65         elif record.levelno >= logging.WARNING:
66             kind = 'warning'
67         if kind == 'warning' and record.name in ("salad", "crunchstat_summary"):
68             # Don't send validation warnings to runtime status,
69             # they're noisy and unhelpful.
70             return
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using containers API),
95     wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4,
104                  stdout=sys.stdout):
105
106         if arvargs is None:
107             arvargs = argparse.Namespace()
108             arvargs.work_api = None
109             arvargs.output_name = None
110             arvargs.output_tags = None
111             arvargs.thread_count = 1
112             arvargs.collection_cache_size = None
113             arvargs.git_info = True
114             arvargs.submit = False
115             arvargs.defer_downloads = False
116
117         self.api = api_client
118         self.processes = {}
119         self.workflow_eval_lock = threading.Condition(threading.RLock())
120         self.final_output = None
121         self.final_status = None
122         self.num_retries = num_retries
123         self.uuid = None
124         self.stop_polling = threading.Event()
125         self.poll_api = None
126         self.pipeline = None
127         self.final_output_collection = None
128         self.output_name = arvargs.output_name
129         self.output_tags = arvargs.output_tags
130         self.project_uuid = None
131         self.intermediate_output_ttl = 0
132         self.intermediate_output_collections = []
133         self.trash_intermediate = False
134         self.thread_count = arvargs.thread_count
135         self.poll_interval = 12
136         self.loadingContext = None
137         self.should_estimate_cache_size = True
138         self.fs_access = None
139         self.secret_store = None
140         self.stdout = stdout
141         self.fast_submit = False
142         self.git_info = arvargs.git_info
143         self.debug = False
144
145         if keep_client is not None:
146             self.keep_client = keep_client
147         else:
148             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
149
150         if arvargs.collection_cache_size:
151             collection_cache_size = arvargs.collection_cache_size*1024*1024
152             self.should_estimate_cache_size = False
153         else:
154             collection_cache_size = 256*1024*1024
155
156         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
157                                                 cap=collection_cache_size)
158
159         self.fetcher_constructor = partial(CollectionFetcher,
160                                            api_client=self.api,
161                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
162                                            num_retries=self.num_retries)
163
164         self.work_api = None
165         expected_api = ["containers"]
166         for api in expected_api:
167             try:
168                 methods = self.api._rootDesc.get('resources')[api]['methods']
169                 if ('httpMethod' in methods['create'] and
170                     (arvargs.work_api == api or arvargs.work_api is None)):
171                     self.work_api = api
172                     break
173             except KeyError:
174                 pass
175
176         if not self.work_api:
177             if arvargs.work_api is None:
178                 raise Exception("No supported APIs")
179             else:
180                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
181
182         if self.work_api == "jobs":
183             logger.error("""
184 *******************************
185 The 'jobs' API is no longer supported.
186 *******************************""")
187             exit(1)
188
189         self.loadingContext = ArvLoadingContext(vars(arvargs))
190         self.loadingContext.fetcher_constructor = self.fetcher_constructor
191         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
192         self.loadingContext.construct_tool_object = self.arv_make_tool
193
194         # Add a custom logging handler to the root logger for runtime status reporting
195         # if running inside a container
196         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
197             root_logger = logging.getLogger('')
198
199             # Remove existing RuntimeStatusLoggingHandlers if they exist
200             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
201             root_logger.handlers = handlers
202
203             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
204             root_logger.addHandler(handler)
205
206         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
207         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
208                                                      collection_cache=self.collection_cache)
209         self.toplevel_runtimeContext.secret_store = ArvSecretStore()
210
211         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
212
213         validate_cluster_target(self, self.toplevel_runtimeContext)
214
215
216     def arv_make_tool(self, toolpath_object, loadingContext):
217         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
218             return ArvadosCommandTool(self, toolpath_object, loadingContext)
219         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
220             return ArvadosWorkflow(self, toolpath_object, loadingContext)
221         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
222             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
223         else:
224             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
225
226     def output_callback(self, out, processStatus):
227         with self.workflow_eval_lock:
228             if processStatus == "success":
229                 logger.info("Overall process status is %s", processStatus)
230                 state = "Complete"
231             else:
232                 logger.error("Overall process status is %s", processStatus)
233                 state = "Failed"
234             if self.pipeline:
235                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
236                                                         body={"state": state}).execute(num_retries=self.num_retries)
237             self.final_status = processStatus
238             self.final_output = out
239             self.workflow_eval_lock.notifyAll()
240
241
242     def start_run(self, runnable, runtimeContext):
243         self.task_queue.add(partial(runnable.run, runtimeContext),
244                             self.workflow_eval_lock, self.stop_polling)
245
246     def process_submitted(self, container):
247         with self.workflow_eval_lock:
248             self.processes[container.uuid] = container
249
250     def process_done(self, uuid, record):
251         with self.workflow_eval_lock:
252             j = self.processes[uuid]
253             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
254             self.task_queue.add(partial(j.done, record),
255                                 self.workflow_eval_lock, self.stop_polling)
256             del self.processes[uuid]
257
258     def runtime_status_update(self, kind, message, detail=None):
259         """
260         Updates the runtime_status field on the runner container.
261         Called when there's a need to report errors, warnings or just
262         activity statuses, for example in the RuntimeStatusLoggingHandler.
263         """
264
265         if kind not in ('error', 'warning', 'activity'):
266             # Ignore any other status kind
267             return
268
269         with self.workflow_eval_lock:
270             current = None
271             try:
272                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
273             except Exception as e:
274                 logger.info("Couldn't get current container: %s", e)
275             if current is None:
276                 return
277             runtime_status = current.get('runtime_status', {})
278
279             original_updatemessage = updatemessage = runtime_status.get(kind, "")
280             if kind == "activity" or not updatemessage:
281                 updatemessage = message
282
283             # Subsequent messages tacked on in detail
284             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
285             maxlines = 40
286             if updatedetail.count("\n") < maxlines:
287                 if updatedetail:
288                     updatedetail += "\n"
289                 updatedetail += message + "\n"
290
291                 if detail:
292                     updatedetail += detail + "\n"
293
294                 if updatedetail.count("\n") >= maxlines:
295                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
296
297             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
298                 # don't waste time doing an update if nothing changed
299                 # (usually because we exceeded the max lines)
300                 return
301
302             runtime_status.update({
303                 kind: updatemessage,
304                 kind+'Detail': updatedetail,
305             })
306
307             try:
308                 self.api.containers().update(uuid=current['uuid'],
309                                             body={
310                                                 'runtime_status': runtime_status,
311                                             }).execute(num_retries=self.num_retries)
312             except Exception as e:
313                 logger.info("Couldn't update runtime_status: %s", e)
314
315     def wrapped_callback(self, cb, obj, st):
316         with self.workflow_eval_lock:
317             cb(obj, st)
318             self.workflow_eval_lock.notifyAll()
319
320     def get_wrapped_callback(self, cb):
321         return partial(self.wrapped_callback, cb)
322
323     def on_message(self, event):
324         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
325             uuid = event["object_uuid"]
326             if event["properties"]["new_attributes"]["state"] == "Running":
327                 with self.workflow_eval_lock:
328                     j = self.processes[uuid]
329                     if j.running is False:
330                         j.running = True
331                         j.update_pipeline_component(event["properties"]["new_attributes"])
332                         logger.info("%s %s is Running", self.label(j), uuid)
333             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
334                 self.process_done(uuid, event["properties"]["new_attributes"])
335
336     def label(self, obj):
337         return "[%s %s]" % (self.work_api[0:-1], obj.name)
338
339     def poll_states(self):
340         """Poll status of containers listed in the processes dict.
341
342         Runs in a separate thread.
343         """
344
345         try:
346             remain_wait = self.poll_interval
347             while True:
348                 if remain_wait > 0:
349                     self.stop_polling.wait(remain_wait)
350                 if self.stop_polling.is_set():
351                     break
352                 with self.workflow_eval_lock:
353                     keys = list(self.processes)
354                 if not keys:
355                     remain_wait = self.poll_interval
356                     continue
357
358                 begin_poll = time.time()
359                 if self.work_api == "containers":
360                     table = self.poll_api.container_requests()
361
362                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
363
364                 while keys:
365                     page = keys[:pageSize]
366                     try:
367                         proc_states = table.list(filters=[["uuid", "in", page]], select=["uuid", "container_uuid", "state", "log_uuid",
368                                                                                          "output_uuid", "modified_at", "properties",
369                                                                                          "runtime_constraints"]).execute(num_retries=self.num_retries)
370                     except Exception as e:
371                         logger.warning("Temporary error checking states on API server: %s", e)
372                         remain_wait = self.poll_interval
373                         continue
374
375                     for p in proc_states["items"]:
376                         self.on_message({
377                             "object_uuid": p["uuid"],
378                             "event_type": "update",
379                             "properties": {
380                                 "new_attributes": p
381                             }
382                         })
383                     keys = keys[pageSize:]
384
385                 finish_poll = time.time()
386                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
387         except:
388             logger.exception("Fatal error in state polling thread.")
389             with self.workflow_eval_lock:
390                 self.processes.clear()
391                 self.workflow_eval_lock.notifyAll()
392         finally:
393             self.stop_polling.set()
394
395     def add_intermediate_output(self, uuid):
396         if uuid:
397             self.intermediate_output_collections.append(uuid)
398
399     def trash_intermediate_output(self):
400         logger.info("Cleaning up intermediate output collections")
401         for i in self.intermediate_output_collections:
402             try:
403                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
404             except Exception:
405                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
406             except (KeyboardInterrupt, SystemExit):
407                 break
408
409     def check_features(self, obj, parentfield=""):
410         if isinstance(obj, dict):
411             if obj.get("class") == "DockerRequirement":
412                 if obj.get("dockerOutputDirectory"):
413                     if not obj.get("dockerOutputDirectory").startswith('/'):
414                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
415                             "Option 'dockerOutputDirectory' must be an absolute path.")
416             if obj.get("class") == "InplaceUpdateRequirement":
417                 if obj["inplaceUpdate"] and parentfield == "requirements":
418                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
419             for k,v in obj.items():
420                 self.check_features(v, parentfield=k)
421         elif isinstance(obj, list):
422             for i,v in enumerate(obj):
423                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
424                     self.check_features(v, parentfield=parentfield)
425
426     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
427         outputObj = copy.deepcopy(outputObj)
428
429         files = []
430         def capture(fileobj):
431             files.append(fileobj)
432
433         adjustDirObjs(outputObj, capture)
434         adjustFileObjs(outputObj, capture)
435
436         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
437
438         final = arvados.collection.Collection(api_client=self.api,
439                                               keep_client=self.keep_client,
440                                               num_retries=self.num_retries)
441
442         for k,v in generatemapper.items():
443             if v.type == "Directory" and v.resolved.startswith("_:"):
444                     continue
445             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
446                 with final.open(v.target, "wb") as f:
447                     f.write(v.resolved.encode("utf-8"))
448                     continue
449
450             if not v.resolved.startswith("keep:"):
451                 raise Exception("Output source is not in keep or a literal")
452             sp = v.resolved.split("/")
453             srccollection = sp[0][5:]
454             try:
455                 reader = self.collection_cache.get(srccollection)
456                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
457                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
458             except arvados.errors.ArgumentError as e:
459                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
460                 raise
461             except IOError as e:
462                 logger.error("While preparing output collection: %s", e)
463                 raise
464
465         def rewrite(fileobj):
466             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
467             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
468                 if k in fileobj:
469                     del fileobj[k]
470
471         adjustDirObjs(outputObj, rewrite)
472         adjustFileObjs(outputObj, rewrite)
473
474         with final.open("cwl.output.json", "w") as f:
475             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
476             f.write(res)
477
478
479         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
480                        ensure_unique_name=True, properties=output_properties)
481
482         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
483                     final.api_response()["name"],
484                     final.manifest_locator())
485
486         final_uuid = final.manifest_locator()
487         tags = tagsString.split(',')
488         for tag in tags:
489              self.api.links().create(body={
490                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
491                 }).execute(num_retries=self.num_retries)
492
493         def finalcollection(fileobj):
494             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
495
496         adjustDirObjs(outputObj, finalcollection)
497         adjustFileObjs(outputObj, finalcollection)
498
499         return (outputObj, final)
500
501     def set_crunch_output(self):
502         if self.work_api == "containers":
503             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
504             if current is None:
505                 return
506             try:
507                 self.api.containers().update(uuid=current['uuid'],
508                                              body={
509                                                  'output': self.final_output_collection.portable_data_hash(),
510                                                  'output_properties': self.final_output_collection.get_properties(),
511                                              }).execute(num_retries=self.num_retries)
512                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
513                                               body={
514                                                   'is_trashed': True
515                                               }).execute(num_retries=self.num_retries)
516             except Exception:
517                 logger.exception("Setting container output")
518                 raise
519
520     def apply_reqs(self, job_order_object, tool):
521         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
522             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
523                 raise WorkflowException(
524                     "`cwl:requirements` in the input object is not part of CWL "
525                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
526                     "can set the cwlVersion to v1.1 or greater and re-run with "
527                     "--enable-dev.")
528             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
529             for req in job_reqs:
530                 tool.requirements.append(req)
531
532     @staticmethod
533     def get_git_info(tool):
534         in_a_git_repo = False
535         cwd = None
536         filepath = None
537
538         if tool.tool["id"].startswith("file://"):
539             # check if git is installed
540             try:
541                 filepath = uri_file_path(tool.tool["id"])
542                 cwd = os.path.dirname(filepath)
543                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
544                 in_a_git_repo = True
545             except Exception as e:
546                 pass
547
548         gitproperties = {}
549
550         if in_a_git_repo:
551             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
552             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
553             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
554             git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
555             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
556             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
557             git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
558             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
559             git_path = filepath[len(git_toplevel):]
560
561             gitproperties = {
562                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
563                 "http://arvados.org/cwl#gitDate": git_date.strip(),
564                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
565                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
566                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
567                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
568                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
569                 "http://arvados.org/cwl#gitPath": git_path.strip(),
570             }
571         else:
572             for g in ("http://arvados.org/cwl#gitCommit",
573                       "http://arvados.org/cwl#gitDate",
574                       "http://arvados.org/cwl#gitCommitter",
575                       "http://arvados.org/cwl#gitBranch",
576                       "http://arvados.org/cwl#gitOrigin",
577                       "http://arvados.org/cwl#gitStatus",
578                       "http://arvados.org/cwl#gitDescribe",
579                       "http://arvados.org/cwl#gitPath"):
580                 if g in tool.metadata:
581                     gitproperties[g] = tool.metadata[g]
582
583         return gitproperties
584
585     def set_container_request_properties(self, container, properties):
586         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
587         for cr in resp["items"]:
588             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
589             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
590
591     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
592         self.debug = runtimeContext.debug
593
594         self.runtime_status_update("activity", "initialization")
595
596         git_info = self.get_git_info(updated_tool) if self.git_info else {}
597         if git_info:
598             logger.info("Git provenance")
599             for g in git_info:
600                 if git_info[g]:
601                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
602
603         runtimeContext.git_info = git_info
604
605         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
606         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
607         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
608         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
609
610         if not self.fast_submit:
611             updated_tool.visit(self.check_features)
612
613         self.pipeline = None
614         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
615         self.secret_store = runtimeContext.secret_store
616
617         self.trash_intermediate = runtimeContext.trash_intermediate
618         if self.trash_intermediate and self.work_api != "containers":
619             raise Exception("--trash-intermediate is only supported with --api=containers.")
620
621         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
622         if self.intermediate_output_ttl and self.work_api != "containers":
623             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
624         if self.intermediate_output_ttl < 0:
625             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
626
627         if runtimeContext.submit_request_uuid and self.work_api != "containers":
628             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
629
630         runtimeContext = runtimeContext.copy()
631
632         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
633         if runtimeContext.storage_classes == "default":
634             runtimeContext.storage_classes = default_storage_classes
635         if runtimeContext.intermediate_storage_classes == "default":
636             runtimeContext.intermediate_storage_classes = default_storage_classes
637
638         if not runtimeContext.name:
639             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
640             if git_info.get("http://arvados.org/cwl#gitDescribe"):
641                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
642             runtimeContext.name = self.name
643
644         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
645             # When creating or updating workflow record, by default
646             # always copy dependencies and ensure Docker images are up
647             # to date.
648             runtimeContext.copy_deps = True
649             runtimeContext.match_local_docker = True
650
651         if runtimeContext.print_keep_deps:
652             runtimeContext.copy_deps = False
653             runtimeContext.match_local_docker = False
654
655         if runtimeContext.update_workflow and self.project_uuid is None:
656             # If we are updating a workflow, make sure anything that
657             # gets uploaded goes into the same parent project, unless
658             # an alternate --project-uuid was provided.
659             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
660             runtimeContext.project_uuid = existing_wf["owner_uuid"]
661
662         self.project_uuid = runtimeContext.project_uuid
663
664         self.runtime_status_update("activity", "data transfer")
665
666         # Upload local file references in the job order.
667         with Perf(metrics, "upload_job_order"):
668             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
669                                          updated_tool, job_order, runtimeContext)
670
671         # determine if we are submitting or directly executing the workflow.
672         #
673         # the last clause means: if it is a command line tool, and we
674         # are going to wait for the result, and always_submit_runner
675         # is false, then we don't submit a runner process.
676
677         submitting = (runtimeContext.submit and not
678                        (updated_tool.tool["class"] == "CommandLineTool" and
679                         runtimeContext.wait and
680                         not runtimeContext.always_submit_runner))
681
682         loadingContext = self.loadingContext.copy()
683         loadingContext.do_validate = False
684         loadingContext.disable_js_validation = True
685         tool = updated_tool
686
687         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
688         # Also uploads docker images.
689         if not self.fast_submit:
690             logger.info("Uploading workflow dependencies")
691             with Perf(metrics, "upload_workflow_deps"):
692                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
693         else:
694             # in the fast submit case, we are running a workflow that
695             # has already been uploaded to Arvados, so we assume all
696             # the dependencies have been pinned to keep references and
697             # there is nothing to do.
698             merged_map = {}
699
700         loadingContext.loader = tool.doc_loader
701         loadingContext.avsc_names = tool.doc_schema
702         loadingContext.metadata = tool.metadata
703         loadingContext.skip_resolve_all = True
704
705         workflow_wrapper = None
706         if (submitting and not self.fast_submit) or runtimeContext.update_workflow or runtimeContext.create_workflow or runtimeContext.print_keep_deps:
707             # upload workflow and get back the workflow wrapper
708
709             workflow_wrapper = upload_workflow(self, tool, job_order,
710                                                runtimeContext.project_uuid,
711                                                runtimeContext,
712                                                uuid=runtimeContext.update_workflow,
713                                                submit_runner_ram=runtimeContext.submit_runner_ram,
714                                                name=runtimeContext.name,
715                                                merged_map=merged_map,
716                                                submit_runner_image=runtimeContext.submit_runner_image,
717                                                git_info=git_info,
718                                                set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow),
719                                                jobmapper=jobmapper)
720
721             if runtimeContext.update_workflow or runtimeContext.create_workflow:
722                 # We're registering the workflow, so create or update
723                 # the workflow record and then exit.
724                 uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
725                                             runtimeContext.project_uuid, runtimeContext.update_workflow)
726                 self.stdout.write(uuid + "\n")
727                 return (None, "success")
728
729             if runtimeContext.print_keep_deps:
730                 # Just find and print out all the collection dependencies and exit
731                 print_keep_deps(self, runtimeContext, merged_map, tool)
732                 return (None, "success")
733
734             # Did not register a workflow, we're going to submit
735             # it instead.
736             loadingContext.loader.idx.clear()
737             loadingContext.loader.idx["_:main"] = workflow_wrapper
738             workflow_wrapper["id"] = "_:main"
739
740             # Reload the minimal wrapper workflow.
741             self.fast_submit = True
742             tool = load_tool(workflow_wrapper, loadingContext)
743             loadingContext.loader.idx["_:main"] = workflow_wrapper
744
745         if not submitting:
746             # If we are going to run the workflow now (rather than
747             # submit it), we need to update the workflow document
748             # replacing file references with keep references.  If we
749             # are just going to construct a run submission, we don't
750             # need to do this.
751             update_from_merged_map(tool, merged_map)
752
753         self.apply_reqs(job_order, tool)
754
755         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
756         self.eval_timeout = runtimeContext.eval_timeout
757
758         runtimeContext.use_container = True
759         runtimeContext.tmpdir_prefix = "tmp"
760         runtimeContext.work_api = self.work_api
761
762         if not self.output_name:
763              self.output_name = "Output from workflow %s" % runtimeContext.name
764
765         self.output_name  = cleanup_name_for_collection(self.output_name)
766
767         if self.work_api == "containers":
768             if self.ignore_docker_for_reuse:
769                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
770             runtimeContext.outdir = "/var/spool/cwl"
771             runtimeContext.docker_outdir = "/var/spool/cwl"
772             runtimeContext.tmpdir = "/tmp"
773             runtimeContext.docker_tmpdir = "/tmp"
774
775         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
776             raise Exception("--priority must be in the range 1..1000.")
777
778         if self.should_estimate_cache_size:
779             visited = set()
780             estimated_size = [0]
781             def estimate_collection_cache(obj):
782                 if obj.get("location", "").startswith("keep:"):
783                     m = pdh_size.match(obj["location"][5:])
784                     if m and m.group(1) not in visited:
785                         visited.add(m.group(1))
786                         estimated_size[0] += int(m.group(2))
787             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
788             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
789             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
790
791         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
792
793         runnerjob = None
794         if runtimeContext.submit:
795             # We are submitting instead of running immediately.
796             #
797             # Create a "Runner job" that when run() is invoked,
798             # creates the container request to run the workflow.
799             if self.work_api == "containers":
800                 if submitting:
801                     loadingContext.metadata = updated_tool.metadata.copy()
802                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
803                                            self.output_name,
804                                            self.output_tags,
805                                            submit_runner_ram=runtimeContext.submit_runner_ram,
806                                            name=runtimeContext.name,
807                                            on_error=runtimeContext.on_error,
808                                            submit_runner_image=runtimeContext.submit_runner_image,
809                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
810                                            merged_map=merged_map,
811                                            priority=runtimeContext.priority,
812                                            secret_store=self.secret_store,
813                                            collection_cache_size=runtimeContext.collection_cache_size,
814                                            collection_cache_is_default=self.should_estimate_cache_size,
815                                            git_info=git_info)
816                 else:
817                     runtimeContext.runnerjob = tool.tool["id"]
818
819         if runtimeContext.cwl_runner_job is not None:
820             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
821
822         jobiter = tool.job(job_order,
823                            self.output_callback,
824                            runtimeContext)
825
826         if runtimeContext.submit and not runtimeContext.wait:
827             # User provided --no-wait so submit the container request,
828             # get the container request uuid, print it out, and exit.
829             runnerjob = next(jobiter)
830             runnerjob.run(runtimeContext)
831             self.stdout.write(runnerjob.uuid+"\n")
832             return (None, "success")
833
834         # We either running the workflow directly, or submitting it
835         # and will wait for a final result.
836
837         self.runtime_status_update("activity", "workflow execution")
838
839         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
840         if current_container:
841             logger.info("Running inside container %s", current_container.get("uuid"))
842             self.set_container_request_properties(current_container, git_info)
843
844         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
845         self.polling_thread = threading.Thread(target=self.poll_states)
846         self.polling_thread.start()
847
848         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
849
850         try:
851             self.workflow_eval_lock.acquire()
852
853             # Holds the lock while this code runs and releases it when
854             # it is safe to do so in self.workflow_eval_lock.wait(),
855             # at which point on_message can update job state and
856             # process output callbacks.
857
858             loopperf = Perf(metrics, "jobiter")
859             loopperf.__enter__()
860             for runnable in jobiter:
861                 loopperf.__exit__()
862
863                 if self.stop_polling.is_set():
864                     break
865
866                 if self.task_queue.error is not None:
867                     raise self.task_queue.error
868
869                 if runnable:
870                     with Perf(metrics, "run"):
871                         self.start_run(runnable, runtimeContext)
872                 else:
873                     if (self.task_queue.in_flight + len(self.processes)) > 0:
874                         self.workflow_eval_lock.wait(3)
875                     else:
876                         if self.final_status is None:
877                             logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
878                         break
879
880                 if self.stop_polling.is_set():
881                     break
882
883                 loopperf.__enter__()
884             loopperf.__exit__()
885
886             while (self.task_queue.in_flight + len(self.processes)) > 0:
887                 if self.task_queue.error is not None:
888                     raise self.task_queue.error
889                 self.workflow_eval_lock.wait(3)
890
891         except UnsupportedRequirement:
892             raise
893         except:
894             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
895                 logger.error("Interrupted, workflow will be cancelled")
896             elif isinstance(sys.exc_info()[1], WorkflowException):
897                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
898             else:
899                 logger.exception("Workflow execution failed")
900
901             if self.pipeline:
902                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
903                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
904
905             if self.work_api == "containers" and not current_container:
906                 # Not running in a crunch container, so cancel any outstanding processes.
907                 for p in self.processes:
908                     try:
909                         self.api.container_requests().update(uuid=p,
910                                                              body={"priority": "0"}
911                         ).execute(num_retries=self.num_retries)
912                     except Exception:
913                         pass
914         finally:
915             self.workflow_eval_lock.release()
916             self.task_queue.drain()
917             self.stop_polling.set()
918             self.polling_thread.join()
919             self.task_queue.join()
920
921         if self.final_status == "UnsupportedRequirement":
922             raise UnsupportedRequirement("Check log for details.")
923
924         if self.final_output is None:
925             raise WorkflowException("Workflow did not return a result.")
926
927         if runtimeContext.usage_report_notes:
928             logger.info("Steps with low resource utilization (possible optimization opportunities):")
929             for x in runtimeContext.usage_report_notes:
930                 logger.info("  %s", x)
931
932         if runtimeContext.submit and isinstance(tool, Runner):
933             logger.info("Final output collection %s", tool.final_output)
934             if workbench2 or workbench1:
935                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
936         else:
937             if self.output_tags is None:
938                 self.output_tags = ""
939
940             storage_classes = ""
941             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
942             if storage_class_req and storage_class_req.get("finalStorageClass"):
943                 storage_classes = aslist(storage_class_req["finalStorageClass"])
944             else:
945                 storage_classes = runtimeContext.storage_classes.strip().split(",")
946
947             output_properties = {}
948             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
949             if output_properties_req:
950                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
951                 for pr in output_properties_req["outputProperties"]:
952                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
953
954             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
955                                                                                           self.output_tags, output_properties,
956                                                                                           self.final_output)
957             self.set_crunch_output()
958
959         if runtimeContext.compute_checksum:
960             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
961             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
962
963         if self.trash_intermediate and self.final_status == "success":
964             self.trash_intermediate_output()
965
966         return (self.final_output, self.final_status)
967
968 def blank_secrets(job_order_object, process):
969     secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets")
970     pass