19744: Fix single container report generation.
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import subprocess
21 import time
22 import urllib
23
24 from cwltool.errors import WorkflowException
25 import cwltool.workflow
26 from schema_salad.sourceline import SourceLine, cmap
27 import schema_salad.validate as validate
28 from schema_salad.ref_resolver import file_uri, uri_file_path
29
30 import arvados
31 import arvados.config
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
34
35 import arvados_cwl.util
36 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
37 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map, print_keep_deps
38 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from cwltool.task_queue import TaskQueue
44 from .context import ArvLoadingContext, ArvRuntimeContext
45 from ._version import __version__
46
47 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
48 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
49 from cwltool.command_line_tool import compute_checksums
50 from cwltool.load_tool import load_tool
51
52 logger = logging.getLogger('arvados.cwl-runner')
53 metrics = logging.getLogger('arvados.cwl-runner.metrics')
54
55 DEFAULT_PRIORITY = 500
56
57 class RuntimeStatusLoggingHandler(logging.Handler):
58     """
59     Intercepts logging calls and report them as runtime statuses on runner
60     containers.
61     """
62     def __init__(self, runtime_status_update_func):
63         super(RuntimeStatusLoggingHandler, self).__init__()
64         self.runtime_status_update = runtime_status_update_func
65         self.updatingRuntimeStatus = False
66
67     def emit(self, record):
68         kind = None
69         if record.levelno >= logging.ERROR:
70             kind = 'error'
71         elif record.levelno >= logging.WARNING:
72             kind = 'warning'
73         if kind == 'warning' and (record.name == "salad" or record.name == "crunchstat_summary"):
74             # Don't send validation warnings to runtime status,
75             # they're noisy and unhelpful.
76             return
77         if kind is not None and self.updatingRuntimeStatus is not True:
78             self.updatingRuntimeStatus = True
79             try:
80                 log_msg = record.getMessage()
81                 if '\n' in log_msg:
82                     # If the logged message is multi-line, use its first line as status
83                     # and the rest as detail.
84                     status, detail = log_msg.split('\n', 1)
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, status),
88                         detail
89                     )
90                 else:
91                     self.runtime_status_update(
92                         kind,
93                         "%s: %s" % (record.name, record.getMessage())
94                     )
95             finally:
96                 self.updatingRuntimeStatus = False
97
98
99 class ArvCwlExecutor(object):
100     """Execute a CWL tool or workflow, submit work (using containers API),
101     wait for them to complete, and report output.
102
103     """
104
105     def __init__(self, api_client,
106                  arvargs=None,
107                  keep_client=None,
108                  num_retries=4,
109                  thread_count=4,
110                  stdout=sys.stdout):
111
112         if arvargs is None:
113             arvargs = argparse.Namespace()
114             arvargs.work_api = None
115             arvargs.output_name = None
116             arvargs.output_tags = None
117             arvargs.thread_count = 1
118             arvargs.collection_cache_size = None
119             arvargs.git_info = True
120             arvargs.submit = False
121             arvargs.defer_downloads = False
122
123         self.api = api_client
124         self.processes = {}
125         self.workflow_eval_lock = threading.Condition(threading.RLock())
126         self.final_output = None
127         self.final_status = None
128         self.num_retries = num_retries
129         self.uuid = None
130         self.stop_polling = threading.Event()
131         self.poll_api = None
132         self.pipeline = None
133         self.final_output_collection = None
134         self.output_name = arvargs.output_name
135         self.output_tags = arvargs.output_tags
136         self.project_uuid = None
137         self.intermediate_output_ttl = 0
138         self.intermediate_output_collections = []
139         self.trash_intermediate = False
140         self.thread_count = arvargs.thread_count
141         self.poll_interval = 12
142         self.loadingContext = None
143         self.should_estimate_cache_size = True
144         self.fs_access = None
145         self.secret_store = None
146         self.stdout = stdout
147         self.fast_submit = False
148         self.git_info = arvargs.git_info
149         self.debug = False
150
151         if keep_client is not None:
152             self.keep_client = keep_client
153         else:
154             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
155
156         if arvargs.collection_cache_size:
157             collection_cache_size = arvargs.collection_cache_size*1024*1024
158             self.should_estimate_cache_size = False
159         else:
160             collection_cache_size = 256*1024*1024
161
162         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
163                                                 cap=collection_cache_size)
164
165         self.fetcher_constructor = partial(CollectionFetcher,
166                                            api_client=self.api,
167                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
168                                            num_retries=self.num_retries)
169
170         self.work_api = None
171         expected_api = ["containers"]
172         for api in expected_api:
173             try:
174                 methods = self.api._rootDesc.get('resources')[api]['methods']
175                 if ('httpMethod' in methods['create'] and
176                     (arvargs.work_api == api or arvargs.work_api is None)):
177                     self.work_api = api
178                     break
179             except KeyError:
180                 pass
181
182         if not self.work_api:
183             if arvargs.work_api is None:
184                 raise Exception("No supported APIs")
185             else:
186                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
187
188         if self.work_api == "jobs":
189             logger.error("""
190 *******************************
191 The 'jobs' API is no longer supported.
192 *******************************""")
193             exit(1)
194
195         self.loadingContext = ArvLoadingContext(vars(arvargs))
196         self.loadingContext.fetcher_constructor = self.fetcher_constructor
197         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
198         self.loadingContext.construct_tool_object = self.arv_make_tool
199
200         # Add a custom logging handler to the root logger for runtime status reporting
201         # if running inside a container
202         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
203             root_logger = logging.getLogger('')
204
205             # Remove existing RuntimeStatusLoggingHandlers if they exist
206             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
207             root_logger.handlers = handlers
208
209             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
210             root_logger.addHandler(handler)
211
212         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
213         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
214                                                      collection_cache=self.collection_cache)
215
216         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
217
218         validate_cluster_target(self, self.toplevel_runtimeContext)
219
220
221     def arv_make_tool(self, toolpath_object, loadingContext):
222         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
223             return ArvadosCommandTool(self, toolpath_object, loadingContext)
224         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
225             return ArvadosWorkflow(self, toolpath_object, loadingContext)
226         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
227             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
228         else:
229             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
230
231     def output_callback(self, out, processStatus):
232         with self.workflow_eval_lock:
233             if processStatus == "success":
234                 logger.info("Overall process status is %s", processStatus)
235                 state = "Complete"
236             else:
237                 logger.error("Overall process status is %s", processStatus)
238                 state = "Failed"
239             if self.pipeline:
240                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
241                                                         body={"state": state}).execute(num_retries=self.num_retries)
242             self.final_status = processStatus
243             self.final_output = out
244             self.workflow_eval_lock.notifyAll()
245
246
247     def start_run(self, runnable, runtimeContext):
248         self.task_queue.add(partial(runnable.run, runtimeContext),
249                             self.workflow_eval_lock, self.stop_polling)
250
251     def process_submitted(self, container):
252         with self.workflow_eval_lock:
253             self.processes[container.uuid] = container
254
255     def process_done(self, uuid, record):
256         with self.workflow_eval_lock:
257             j = self.processes[uuid]
258             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
259             self.task_queue.add(partial(j.done, record),
260                                 self.workflow_eval_lock, self.stop_polling)
261             del self.processes[uuid]
262
263     def runtime_status_update(self, kind, message, detail=None):
264         """
265         Updates the runtime_status field on the runner container.
266         Called when there's a need to report errors, warnings or just
267         activity statuses, for example in the RuntimeStatusLoggingHandler.
268         """
269
270         if kind not in ('error', 'warning', 'activity'):
271             # Ignore any other status kind
272             return
273
274         with self.workflow_eval_lock:
275             current = None
276             try:
277                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
278             except Exception as e:
279                 logger.info("Couldn't get current container: %s", e)
280             if current is None:
281                 return
282             runtime_status = current.get('runtime_status', {})
283
284             original_updatemessage = updatemessage = runtime_status.get(kind, "")
285             if kind == "activity" or not updatemessage:
286                 updatemessage = message
287
288             # Subsequent messages tacked on in detail
289             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
290             maxlines = 40
291             if updatedetail.count("\n") < maxlines:
292                 if updatedetail:
293                     updatedetail += "\n"
294                 updatedetail += message + "\n"
295
296                 if detail:
297                     updatedetail += detail + "\n"
298
299                 if updatedetail.count("\n") >= maxlines:
300                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
301
302             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
303                 # don't waste time doing an update if nothing changed
304                 # (usually because we exceeded the max lines)
305                 return
306
307             runtime_status.update({
308                 kind: updatemessage,
309                 kind+'Detail': updatedetail,
310             })
311
312             try:
313                 self.api.containers().update(uuid=current['uuid'],
314                                             body={
315                                                 'runtime_status': runtime_status,
316                                             }).execute(num_retries=self.num_retries)
317             except Exception as e:
318                 logger.info("Couldn't update runtime_status: %s", e)
319
320     def wrapped_callback(self, cb, obj, st):
321         with self.workflow_eval_lock:
322             cb(obj, st)
323             self.workflow_eval_lock.notifyAll()
324
325     def get_wrapped_callback(self, cb):
326         return partial(self.wrapped_callback, cb)
327
328     def on_message(self, event):
329         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
330             uuid = event["object_uuid"]
331             if event["properties"]["new_attributes"]["state"] == "Running":
332                 with self.workflow_eval_lock:
333                     j = self.processes[uuid]
334                     if j.running is False:
335                         j.running = True
336                         j.update_pipeline_component(event["properties"]["new_attributes"])
337                         logger.info("%s %s is Running", self.label(j), uuid)
338             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
339                 self.process_done(uuid, event["properties"]["new_attributes"])
340
341     def label(self, obj):
342         return "[%s %s]" % (self.work_api[0:-1], obj.name)
343
344     def poll_states(self):
345         """Poll status of containers listed in the processes dict.
346
347         Runs in a separate thread.
348         """
349
350         try:
351             remain_wait = self.poll_interval
352             while True:
353                 if remain_wait > 0:
354                     self.stop_polling.wait(remain_wait)
355                 if self.stop_polling.is_set():
356                     break
357                 with self.workflow_eval_lock:
358                     keys = list(self.processes)
359                 if not keys:
360                     remain_wait = self.poll_interval
361                     continue
362
363                 begin_poll = time.time()
364                 if self.work_api == "containers":
365                     table = self.poll_api.container_requests()
366
367                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
368
369                 while keys:
370                     page = keys[:pageSize]
371                     try:
372                         proc_states = table.list(filters=[["uuid", "in", page]], select=["uuid", "container_uuid", "state", "log_uuid",
373                                                                                          "output_uuid", "modified_at", "properties"]).execute(num_retries=self.num_retries)
374                     except Exception as e:
375                         logger.warning("Temporary error checking states on API server: %s", e)
376                         remain_wait = self.poll_interval
377                         continue
378
379                     for p in proc_states["items"]:
380                         self.on_message({
381                             "object_uuid": p["uuid"],
382                             "event_type": "update",
383                             "properties": {
384                                 "new_attributes": p
385                             }
386                         })
387                     keys = keys[pageSize:]
388
389                 finish_poll = time.time()
390                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
391         except:
392             logger.exception("Fatal error in state polling thread.")
393             with self.workflow_eval_lock:
394                 self.processes.clear()
395                 self.workflow_eval_lock.notifyAll()
396         finally:
397             self.stop_polling.set()
398
399     def add_intermediate_output(self, uuid):
400         if uuid:
401             self.intermediate_output_collections.append(uuid)
402
403     def trash_intermediate_output(self):
404         logger.info("Cleaning up intermediate output collections")
405         for i in self.intermediate_output_collections:
406             try:
407                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
408             except Exception:
409                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
410             except (KeyboardInterrupt, SystemExit):
411                 break
412
413     def check_features(self, obj, parentfield=""):
414         if isinstance(obj, dict):
415             if obj.get("class") == "DockerRequirement":
416                 if obj.get("dockerOutputDirectory"):
417                     if not obj.get("dockerOutputDirectory").startswith('/'):
418                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
419                             "Option 'dockerOutputDirectory' must be an absolute path.")
420             if obj.get("class") == "InplaceUpdateRequirement":
421                 if obj["inplaceUpdate"] and parentfield == "requirements":
422                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
423             for k,v in viewitems(obj):
424                 self.check_features(v, parentfield=k)
425         elif isinstance(obj, list):
426             for i,v in enumerate(obj):
427                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
428                     self.check_features(v, parentfield=parentfield)
429
430     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
431         outputObj = copy.deepcopy(outputObj)
432
433         files = []
434         def capture(fileobj):
435             files.append(fileobj)
436
437         adjustDirObjs(outputObj, capture)
438         adjustFileObjs(outputObj, capture)
439
440         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
441
442         final = arvados.collection.Collection(api_client=self.api,
443                                               keep_client=self.keep_client,
444                                               num_retries=self.num_retries)
445
446         for k,v in generatemapper.items():
447             if v.type == "Directory" and v.resolved.startswith("_:"):
448                     continue
449             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
450                 with final.open(v.target, "wb") as f:
451                     f.write(v.resolved.encode("utf-8"))
452                     continue
453
454             if not v.resolved.startswith("keep:"):
455                 raise Exception("Output source is not in keep or a literal")
456             sp = v.resolved.split("/")
457             srccollection = sp[0][5:]
458             try:
459                 reader = self.collection_cache.get(srccollection)
460                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
461                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
462             except arvados.errors.ArgumentError as e:
463                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
464                 raise
465             except IOError as e:
466                 logger.error("While preparing output collection: %s", e)
467                 raise
468
469         def rewrite(fileobj):
470             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
471             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
472                 if k in fileobj:
473                     del fileobj[k]
474
475         adjustDirObjs(outputObj, rewrite)
476         adjustFileObjs(outputObj, rewrite)
477
478         with final.open("cwl.output.json", "w") as f:
479             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
480             f.write(res)
481
482
483         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
484                        ensure_unique_name=True, properties=output_properties)
485
486         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
487                     final.api_response()["name"],
488                     final.manifest_locator())
489
490         final_uuid = final.manifest_locator()
491         tags = tagsString.split(',')
492         for tag in tags:
493              self.api.links().create(body={
494                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
495                 }).execute(num_retries=self.num_retries)
496
497         def finalcollection(fileobj):
498             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
499
500         adjustDirObjs(outputObj, finalcollection)
501         adjustFileObjs(outputObj, finalcollection)
502
503         return (outputObj, final)
504
505     def set_crunch_output(self):
506         if self.work_api == "containers":
507             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
508             if current is None:
509                 return
510             try:
511                 self.api.containers().update(uuid=current['uuid'],
512                                              body={
513                                                  'output': self.final_output_collection.portable_data_hash(),
514                                                  'output_properties': self.final_output_collection.get_properties(),
515                                              }).execute(num_retries=self.num_retries)
516                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
517                                               body={
518                                                   'is_trashed': True
519                                               }).execute(num_retries=self.num_retries)
520             except Exception:
521                 logger.exception("Setting container output")
522                 raise
523
524     def apply_reqs(self, job_order_object, tool):
525         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
526             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
527                 raise WorkflowException(
528                     "`cwl:requirements` in the input object is not part of CWL "
529                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
530                     "can set the cwlVersion to v1.1 or greater and re-run with "
531                     "--enable-dev.")
532             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
533             for req in job_reqs:
534                 tool.requirements.append(req)
535
536     @staticmethod
537     def get_git_info(tool):
538         in_a_git_repo = False
539         cwd = None
540         filepath = None
541
542         if tool.tool["id"].startswith("file://"):
543             # check if git is installed
544             try:
545                 filepath = uri_file_path(tool.tool["id"])
546                 cwd = os.path.dirname(filepath)
547                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
548                 in_a_git_repo = True
549             except Exception as e:
550                 pass
551
552         gitproperties = {}
553
554         if in_a_git_repo:
555             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
556             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
557             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
558             git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
559             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
560             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
561             git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
562             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
563             git_path = filepath[len(git_toplevel):]
564
565             gitproperties = {
566                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
567                 "http://arvados.org/cwl#gitDate": git_date.strip(),
568                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
569                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
570                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
571                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
572                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
573                 "http://arvados.org/cwl#gitPath": git_path.strip(),
574             }
575         else:
576             for g in ("http://arvados.org/cwl#gitCommit",
577                       "http://arvados.org/cwl#gitDate",
578                       "http://arvados.org/cwl#gitCommitter",
579                       "http://arvados.org/cwl#gitBranch",
580                       "http://arvados.org/cwl#gitOrigin",
581                       "http://arvados.org/cwl#gitStatus",
582                       "http://arvados.org/cwl#gitDescribe",
583                       "http://arvados.org/cwl#gitPath"):
584                 if g in tool.metadata:
585                     gitproperties[g] = tool.metadata[g]
586
587         return gitproperties
588
589     def set_container_request_properties(self, container, properties):
590         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
591         for cr in resp["items"]:
592             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
593             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
594
595     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
596         self.debug = runtimeContext.debug
597
598         self.runtime_status_update("activity", "initialization")
599
600         git_info = self.get_git_info(updated_tool) if self.git_info else {}
601         if git_info:
602             logger.info("Git provenance")
603             for g in git_info:
604                 if git_info[g]:
605                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
606
607         runtimeContext.git_info = git_info
608
609         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
610         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
611         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
612         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
613
614         if not self.fast_submit:
615             updated_tool.visit(self.check_features)
616
617         self.pipeline = None
618         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
619         self.secret_store = runtimeContext.secret_store
620
621         self.trash_intermediate = runtimeContext.trash_intermediate
622         if self.trash_intermediate and self.work_api != "containers":
623             raise Exception("--trash-intermediate is only supported with --api=containers.")
624
625         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
626         if self.intermediate_output_ttl and self.work_api != "containers":
627             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
628         if self.intermediate_output_ttl < 0:
629             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
630
631         if runtimeContext.submit_request_uuid and self.work_api != "containers":
632             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
633
634         runtimeContext = runtimeContext.copy()
635
636         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
637         if runtimeContext.storage_classes == "default":
638             runtimeContext.storage_classes = default_storage_classes
639         if runtimeContext.intermediate_storage_classes == "default":
640             runtimeContext.intermediate_storage_classes = default_storage_classes
641
642         if not runtimeContext.name:
643             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
644             if git_info.get("http://arvados.org/cwl#gitDescribe"):
645                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
646             runtimeContext.name = self.name
647
648         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
649             # When creating or updating workflow record, by default
650             # always copy dependencies and ensure Docker images are up
651             # to date.
652             runtimeContext.copy_deps = True
653             runtimeContext.match_local_docker = True
654
655         if runtimeContext.print_keep_deps:
656             runtimeContext.copy_deps = False
657             runtimeContext.match_local_docker = False
658
659         if runtimeContext.update_workflow and self.project_uuid is None:
660             # If we are updating a workflow, make sure anything that
661             # gets uploaded goes into the same parent project, unless
662             # an alternate --project-uuid was provided.
663             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
664             runtimeContext.project_uuid = existing_wf["owner_uuid"]
665
666         self.project_uuid = runtimeContext.project_uuid
667
668         self.runtime_status_update("activity", "data transfer")
669
670         # Upload local file references in the job order.
671         with Perf(metrics, "upload_job_order"):
672             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
673                                          updated_tool, job_order, runtimeContext)
674
675         # determine if we are submitting or directly executing the workflow.
676         #
677         # the last clause means: if it is a command line tool, and we
678         # are going to wait for the result, and always_submit_runner
679         # is false, then we don't submit a runner process.
680
681         submitting = (runtimeContext.submit and not
682                        (updated_tool.tool["class"] == "CommandLineTool" and
683                         runtimeContext.wait and
684                         not runtimeContext.always_submit_runner))
685
686         loadingContext = self.loadingContext.copy()
687         loadingContext.do_validate = False
688         loadingContext.disable_js_validation = True
689         tool = updated_tool
690
691         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
692         # Also uploads docker images.
693         if not self.fast_submit:
694             logger.info("Uploading workflow dependencies")
695             with Perf(metrics, "upload_workflow_deps"):
696                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
697         else:
698             # in the fast submit case, we are running a workflow that
699             # has already been uploaded to Arvados, so we assume all
700             # the dependencies have been pinned to keep references and
701             # there is nothing to do.
702             merged_map = {}
703
704         loadingContext.loader = tool.doc_loader
705         loadingContext.avsc_names = tool.doc_schema
706         loadingContext.metadata = tool.metadata
707         loadingContext.skip_resolve_all = True
708
709         workflow_wrapper = None
710         if (submitting and not self.fast_submit) or runtimeContext.update_workflow or runtimeContext.create_workflow or runtimeContext.print_keep_deps:
711             # upload workflow and get back the workflow wrapper
712
713             workflow_wrapper = upload_workflow(self, tool, job_order,
714                                                runtimeContext.project_uuid,
715                                                runtimeContext,
716                                                uuid=runtimeContext.update_workflow,
717                                                submit_runner_ram=runtimeContext.submit_runner_ram,
718                                                name=runtimeContext.name,
719                                                merged_map=merged_map,
720                                                submit_runner_image=runtimeContext.submit_runner_image,
721                                                git_info=git_info,
722                                                set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow),
723                                                jobmapper=jobmapper)
724
725             if runtimeContext.update_workflow or runtimeContext.create_workflow:
726                 # We're registering the workflow, so create or update
727                 # the workflow record and then exit.
728                 uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
729                                             runtimeContext.project_uuid, runtimeContext.update_workflow)
730                 self.stdout.write(uuid + "\n")
731                 return (None, "success")
732
733             if runtimeContext.print_keep_deps:
734                 # Just find and print out all the collection dependencies and exit
735                 print_keep_deps(self, runtimeContext, merged_map, tool)
736                 return (None, "success")
737
738             # Did not register a workflow, we're going to submit
739             # it instead.
740             loadingContext.loader.idx.clear()
741             loadingContext.loader.idx["_:main"] = workflow_wrapper
742             workflow_wrapper["id"] = "_:main"
743
744             # Reload the minimal wrapper workflow.
745             self.fast_submit = True
746             tool = load_tool(workflow_wrapper, loadingContext)
747             loadingContext.loader.idx["_:main"] = workflow_wrapper
748
749         if not submitting:
750             # If we are going to run the workflow now (rather than
751             # submit it), we need to update the workflow document
752             # replacing file references with keep references.  If we
753             # are just going to construct a run submission, we don't
754             # need to do this.
755             update_from_merged_map(tool, merged_map)
756
757         self.apply_reqs(job_order, tool)
758
759         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
760         self.eval_timeout = runtimeContext.eval_timeout
761
762         runtimeContext.use_container = True
763         runtimeContext.tmpdir_prefix = "tmp"
764         runtimeContext.work_api = self.work_api
765
766         if not self.output_name:
767              self.output_name = "Output from workflow %s" % runtimeContext.name
768
769         self.output_name  = cleanup_name_for_collection(self.output_name)
770
771         if self.work_api == "containers":
772             if self.ignore_docker_for_reuse:
773                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
774             runtimeContext.outdir = "/var/spool/cwl"
775             runtimeContext.docker_outdir = "/var/spool/cwl"
776             runtimeContext.tmpdir = "/tmp"
777             runtimeContext.docker_tmpdir = "/tmp"
778
779         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
780             raise Exception("--priority must be in the range 1..1000.")
781
782         if self.should_estimate_cache_size:
783             visited = set()
784             estimated_size = [0]
785             def estimate_collection_cache(obj):
786                 if obj.get("location", "").startswith("keep:"):
787                     m = pdh_size.match(obj["location"][5:])
788                     if m and m.group(1) not in visited:
789                         visited.add(m.group(1))
790                         estimated_size[0] += int(m.group(2))
791             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
792             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
793             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
794
795         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
796
797         runnerjob = None
798         if runtimeContext.submit:
799             # We are submitting instead of running immediately.
800             #
801             # Create a "Runner job" that when run() is invoked,
802             # creates the container request to run the workflow.
803             if self.work_api == "containers":
804                 if submitting:
805                     loadingContext.metadata = updated_tool.metadata.copy()
806                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
807                                            self.output_name,
808                                            self.output_tags,
809                                            submit_runner_ram=runtimeContext.submit_runner_ram,
810                                            name=runtimeContext.name,
811                                            on_error=runtimeContext.on_error,
812                                            submit_runner_image=runtimeContext.submit_runner_image,
813                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
814                                            merged_map=merged_map,
815                                            priority=runtimeContext.priority,
816                                            secret_store=self.secret_store,
817                                            collection_cache_size=runtimeContext.collection_cache_size,
818                                            collection_cache_is_default=self.should_estimate_cache_size,
819                                            git_info=git_info)
820                 else:
821                     runtimeContext.runnerjob = tool.tool["id"]
822
823         if runtimeContext.cwl_runner_job is not None:
824             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
825
826         jobiter = tool.job(job_order,
827                            self.output_callback,
828                            runtimeContext)
829
830         if runtimeContext.submit and not runtimeContext.wait:
831             # User provided --no-wait so submit the container request,
832             # get the container request uuid, print it out, and exit.
833             runnerjob = next(jobiter)
834             runnerjob.run(runtimeContext)
835             self.stdout.write(runnerjob.uuid+"\n")
836             return (None, "success")
837
838         # We either running the workflow directly, or submitting it
839         # and will wait for a final result.
840
841         self.runtime_status_update("activity", "workflow execution")
842
843         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
844         if current_container:
845             logger.info("Running inside container %s", current_container.get("uuid"))
846             self.set_container_request_properties(current_container, git_info)
847
848         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
849         self.polling_thread = threading.Thread(target=self.poll_states)
850         self.polling_thread.start()
851
852         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
853
854         try:
855             self.workflow_eval_lock.acquire()
856
857             # Holds the lock while this code runs and releases it when
858             # it is safe to do so in self.workflow_eval_lock.wait(),
859             # at which point on_message can update job state and
860             # process output callbacks.
861
862             loopperf = Perf(metrics, "jobiter")
863             loopperf.__enter__()
864             for runnable in jobiter:
865                 loopperf.__exit__()
866
867                 if self.stop_polling.is_set():
868                     break
869
870                 if self.task_queue.error is not None:
871                     raise self.task_queue.error
872
873                 if runnable:
874                     with Perf(metrics, "run"):
875                         self.start_run(runnable, runtimeContext)
876                 else:
877                     if (self.task_queue.in_flight + len(self.processes)) > 0:
878                         self.workflow_eval_lock.wait(3)
879                     else:
880                         if self.final_status is None:
881                             logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
882                         break
883
884                 if self.stop_polling.is_set():
885                     break
886
887                 loopperf.__enter__()
888             loopperf.__exit__()
889
890             while (self.task_queue.in_flight + len(self.processes)) > 0:
891                 if self.task_queue.error is not None:
892                     raise self.task_queue.error
893                 self.workflow_eval_lock.wait(3)
894
895         except UnsupportedRequirement:
896             raise
897         except:
898             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
899                 logger.error("Interrupted, workflow will be cancelled")
900             elif isinstance(sys.exc_info()[1], WorkflowException):
901                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
902             else:
903                 logger.exception("Workflow execution failed")
904
905             if self.pipeline:
906                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
907                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
908
909             if self.work_api == "containers" and not current_container:
910                 # Not running in a crunch container, so cancel any outstanding processes.
911                 for p in self.processes:
912                     try:
913                         self.api.container_requests().update(uuid=p,
914                                                              body={"priority": "0"}
915                         ).execute(num_retries=self.num_retries)
916                     except Exception:
917                         pass
918         finally:
919             self.workflow_eval_lock.release()
920             self.task_queue.drain()
921             self.stop_polling.set()
922             self.polling_thread.join()
923             self.task_queue.join()
924
925         if self.final_status == "UnsupportedRequirement":
926             raise UnsupportedRequirement("Check log for details.")
927
928         if self.final_output is None:
929             raise WorkflowException("Workflow did not return a result.")
930
931         if runtimeContext.submit and isinstance(tool, Runner):
932             logger.info("Final output collection %s", tool.final_output)
933             if workbench2 or workbench1:
934                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
935         else:
936             if self.output_tags is None:
937                 self.output_tags = ""
938
939             storage_classes = ""
940             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
941             if storage_class_req and storage_class_req.get("finalStorageClass"):
942                 storage_classes = aslist(storage_class_req["finalStorageClass"])
943             else:
944                 storage_classes = runtimeContext.storage_classes.strip().split(",")
945
946             output_properties = {}
947             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
948             if output_properties_req:
949                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
950                 for pr in output_properties_req["outputProperties"]:
951                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
952
953             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
954                                                                                           self.output_tags, output_properties,
955                                                                                           self.final_output)
956             self.set_crunch_output()
957
958         if runtimeContext.compute_checksum:
959             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
960             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
961
962         if self.trash_intermediate and self.final_status == "success":
963             self.trash_intermediate_output()
964
965         return (self.final_output, self.final_status)