21700: Install Bundler system-wide in Rails postinst
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import subprocess
21 import time
22 import urllib
23
24 from cwltool.errors import WorkflowException
25 import cwltool.workflow
26 from schema_salad.sourceline import SourceLine, cmap
27 import schema_salad.validate as validate
28 from schema_salad.ref_resolver import file_uri, uri_file_path
29
30 import arvados
31 import arvados.config
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
34
35 import arvados_cwl.util
36 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
37 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map, print_keep_deps
38 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from cwltool.task_queue import TaskQueue
44 from .context import ArvLoadingContext, ArvRuntimeContext
45 from ._version import __version__
46
47 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
48 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
49 from cwltool.command_line_tool import compute_checksums
50 from cwltool.load_tool import load_tool
51
52 logger = logging.getLogger('arvados.cwl-runner')
53 metrics = logging.getLogger('arvados.cwl-runner.metrics')
54
55 DEFAULT_PRIORITY = 500
56
57 class RuntimeStatusLoggingHandler(logging.Handler):
58     """
59     Intercepts logging calls and report them as runtime statuses on runner
60     containers.
61     """
62     def __init__(self, runtime_status_update_func):
63         super(RuntimeStatusLoggingHandler, self).__init__()
64         self.runtime_status_update = runtime_status_update_func
65         self.updatingRuntimeStatus = False
66
67     def emit(self, record):
68         kind = None
69         if record.levelno >= logging.ERROR:
70             kind = 'error'
71         elif record.levelno >= logging.WARNING:
72             kind = 'warning'
73         if kind == 'warning' and record.name in ("salad", "crunchstat_summary"):
74             # Don't send validation warnings to runtime status,
75             # they're noisy and unhelpful.
76             return
77         if kind is not None and self.updatingRuntimeStatus is not True:
78             self.updatingRuntimeStatus = True
79             try:
80                 log_msg = record.getMessage()
81                 if '\n' in log_msg:
82                     # If the logged message is multi-line, use its first line as status
83                     # and the rest as detail.
84                     status, detail = log_msg.split('\n', 1)
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, status),
88                         detail
89                     )
90                 else:
91                     self.runtime_status_update(
92                         kind,
93                         "%s: %s" % (record.name, record.getMessage())
94                     )
95             finally:
96                 self.updatingRuntimeStatus = False
97
98
99 class ArvCwlExecutor(object):
100     """Execute a CWL tool or workflow, submit work (using containers API),
101     wait for them to complete, and report output.
102
103     """
104
105     def __init__(self, api_client,
106                  arvargs=None,
107                  keep_client=None,
108                  num_retries=4,
109                  thread_count=4,
110                  stdout=sys.stdout):
111
112         if arvargs is None:
113             arvargs = argparse.Namespace()
114             arvargs.work_api = None
115             arvargs.output_name = None
116             arvargs.output_tags = None
117             arvargs.thread_count = 1
118             arvargs.collection_cache_size = None
119             arvargs.git_info = True
120             arvargs.submit = False
121             arvargs.defer_downloads = False
122
123         self.api = api_client
124         self.processes = {}
125         self.workflow_eval_lock = threading.Condition(threading.RLock())
126         self.final_output = None
127         self.final_status = None
128         self.num_retries = num_retries
129         self.uuid = None
130         self.stop_polling = threading.Event()
131         self.poll_api = None
132         self.pipeline = None
133         self.final_output_collection = None
134         self.output_name = arvargs.output_name
135         self.output_tags = arvargs.output_tags
136         self.project_uuid = None
137         self.intermediate_output_ttl = 0
138         self.intermediate_output_collections = []
139         self.trash_intermediate = False
140         self.thread_count = arvargs.thread_count
141         self.poll_interval = 12
142         self.loadingContext = None
143         self.should_estimate_cache_size = True
144         self.fs_access = None
145         self.secret_store = None
146         self.stdout = stdout
147         self.fast_submit = False
148         self.git_info = arvargs.git_info
149         self.debug = False
150
151         if keep_client is not None:
152             self.keep_client = keep_client
153         else:
154             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
155
156         if arvargs.collection_cache_size:
157             collection_cache_size = arvargs.collection_cache_size*1024*1024
158             self.should_estimate_cache_size = False
159         else:
160             collection_cache_size = 256*1024*1024
161
162         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
163                                                 cap=collection_cache_size)
164
165         self.fetcher_constructor = partial(CollectionFetcher,
166                                            api_client=self.api,
167                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
168                                            num_retries=self.num_retries)
169
170         self.work_api = None
171         expected_api = ["containers"]
172         for api in expected_api:
173             try:
174                 methods = self.api._rootDesc.get('resources')[api]['methods']
175                 if ('httpMethod' in methods['create'] and
176                     (arvargs.work_api == api or arvargs.work_api is None)):
177                     self.work_api = api
178                     break
179             except KeyError:
180                 pass
181
182         if not self.work_api:
183             if arvargs.work_api is None:
184                 raise Exception("No supported APIs")
185             else:
186                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
187
188         if self.work_api == "jobs":
189             logger.error("""
190 *******************************
191 The 'jobs' API is no longer supported.
192 *******************************""")
193             exit(1)
194
195         self.loadingContext = ArvLoadingContext(vars(arvargs))
196         self.loadingContext.fetcher_constructor = self.fetcher_constructor
197         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
198         self.loadingContext.construct_tool_object = self.arv_make_tool
199
200         # Add a custom logging handler to the root logger for runtime status reporting
201         # if running inside a container
202         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
203             root_logger = logging.getLogger('')
204
205             # Remove existing RuntimeStatusLoggingHandlers if they exist
206             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
207             root_logger.handlers = handlers
208
209             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
210             root_logger.addHandler(handler)
211
212         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
213         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
214                                                      collection_cache=self.collection_cache)
215
216         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
217
218         validate_cluster_target(self, self.toplevel_runtimeContext)
219
220
221     def arv_make_tool(self, toolpath_object, loadingContext):
222         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
223             return ArvadosCommandTool(self, toolpath_object, loadingContext)
224         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
225             return ArvadosWorkflow(self, toolpath_object, loadingContext)
226         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
227             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
228         else:
229             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
230
231     def output_callback(self, out, processStatus):
232         with self.workflow_eval_lock:
233             if processStatus == "success":
234                 logger.info("Overall process status is %s", processStatus)
235                 state = "Complete"
236             else:
237                 logger.error("Overall process status is %s", processStatus)
238                 state = "Failed"
239             if self.pipeline:
240                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
241                                                         body={"state": state}).execute(num_retries=self.num_retries)
242             self.final_status = processStatus
243             self.final_output = out
244             self.workflow_eval_lock.notifyAll()
245
246
247     def start_run(self, runnable, runtimeContext):
248         self.task_queue.add(partial(runnable.run, runtimeContext),
249                             self.workflow_eval_lock, self.stop_polling)
250
251     def process_submitted(self, container):
252         with self.workflow_eval_lock:
253             self.processes[container.uuid] = container
254
255     def process_done(self, uuid, record):
256         with self.workflow_eval_lock:
257             j = self.processes[uuid]
258             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
259             self.task_queue.add(partial(j.done, record),
260                                 self.workflow_eval_lock, self.stop_polling)
261             del self.processes[uuid]
262
263     def runtime_status_update(self, kind, message, detail=None):
264         """
265         Updates the runtime_status field on the runner container.
266         Called when there's a need to report errors, warnings or just
267         activity statuses, for example in the RuntimeStatusLoggingHandler.
268         """
269
270         if kind not in ('error', 'warning', 'activity'):
271             # Ignore any other status kind
272             return
273
274         with self.workflow_eval_lock:
275             current = None
276             try:
277                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
278             except Exception as e:
279                 logger.info("Couldn't get current container: %s", e)
280             if current is None:
281                 return
282             runtime_status = current.get('runtime_status', {})
283
284             original_updatemessage = updatemessage = runtime_status.get(kind, "")
285             if kind == "activity" or not updatemessage:
286                 updatemessage = message
287
288             # Subsequent messages tacked on in detail
289             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
290             maxlines = 40
291             if updatedetail.count("\n") < maxlines:
292                 if updatedetail:
293                     updatedetail += "\n"
294                 updatedetail += message + "\n"
295
296                 if detail:
297                     updatedetail += detail + "\n"
298
299                 if updatedetail.count("\n") >= maxlines:
300                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
301
302             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
303                 # don't waste time doing an update if nothing changed
304                 # (usually because we exceeded the max lines)
305                 return
306
307             runtime_status.update({
308                 kind: updatemessage,
309                 kind+'Detail': updatedetail,
310             })
311
312             try:
313                 self.api.containers().update(uuid=current['uuid'],
314                                             body={
315                                                 'runtime_status': runtime_status,
316                                             }).execute(num_retries=self.num_retries)
317             except Exception as e:
318                 logger.info("Couldn't update runtime_status: %s", e)
319
320     def wrapped_callback(self, cb, obj, st):
321         with self.workflow_eval_lock:
322             cb(obj, st)
323             self.workflow_eval_lock.notifyAll()
324
325     def get_wrapped_callback(self, cb):
326         return partial(self.wrapped_callback, cb)
327
328     def on_message(self, event):
329         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
330             uuid = event["object_uuid"]
331             if event["properties"]["new_attributes"]["state"] == "Running":
332                 with self.workflow_eval_lock:
333                     j = self.processes[uuid]
334                     if j.running is False:
335                         j.running = True
336                         j.update_pipeline_component(event["properties"]["new_attributes"])
337                         logger.info("%s %s is Running", self.label(j), uuid)
338             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
339                 self.process_done(uuid, event["properties"]["new_attributes"])
340
341     def label(self, obj):
342         return "[%s %s]" % (self.work_api[0:-1], obj.name)
343
344     def poll_states(self):
345         """Poll status of containers listed in the processes dict.
346
347         Runs in a separate thread.
348         """
349
350         try:
351             remain_wait = self.poll_interval
352             while True:
353                 if remain_wait > 0:
354                     self.stop_polling.wait(remain_wait)
355                 if self.stop_polling.is_set():
356                     break
357                 with self.workflow_eval_lock:
358                     keys = list(self.processes)
359                 if not keys:
360                     remain_wait = self.poll_interval
361                     continue
362
363                 begin_poll = time.time()
364                 if self.work_api == "containers":
365                     table = self.poll_api.container_requests()
366
367                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
368
369                 while keys:
370                     page = keys[:pageSize]
371                     try:
372                         proc_states = table.list(filters=[["uuid", "in", page]], select=["uuid", "container_uuid", "state", "log_uuid",
373                                                                                          "output_uuid", "modified_at", "properties",
374                                                                                          "runtime_constraints"]).execute(num_retries=self.num_retries)
375                     except Exception as e:
376                         logger.warning("Temporary error checking states on API server: %s", e)
377                         remain_wait = self.poll_interval
378                         continue
379
380                     for p in proc_states["items"]:
381                         self.on_message({
382                             "object_uuid": p["uuid"],
383                             "event_type": "update",
384                             "properties": {
385                                 "new_attributes": p
386                             }
387                         })
388                     keys = keys[pageSize:]
389
390                 finish_poll = time.time()
391                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
392         except:
393             logger.exception("Fatal error in state polling thread.")
394             with self.workflow_eval_lock:
395                 self.processes.clear()
396                 self.workflow_eval_lock.notifyAll()
397         finally:
398             self.stop_polling.set()
399
400     def add_intermediate_output(self, uuid):
401         if uuid:
402             self.intermediate_output_collections.append(uuid)
403
404     def trash_intermediate_output(self):
405         logger.info("Cleaning up intermediate output collections")
406         for i in self.intermediate_output_collections:
407             try:
408                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
409             except Exception:
410                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
411             except (KeyboardInterrupt, SystemExit):
412                 break
413
414     def check_features(self, obj, parentfield=""):
415         if isinstance(obj, dict):
416             if obj.get("class") == "DockerRequirement":
417                 if obj.get("dockerOutputDirectory"):
418                     if not obj.get("dockerOutputDirectory").startswith('/'):
419                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
420                             "Option 'dockerOutputDirectory' must be an absolute path.")
421             if obj.get("class") == "InplaceUpdateRequirement":
422                 if obj["inplaceUpdate"] and parentfield == "requirements":
423                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
424             for k,v in viewitems(obj):
425                 self.check_features(v, parentfield=k)
426         elif isinstance(obj, list):
427             for i,v in enumerate(obj):
428                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
429                     self.check_features(v, parentfield=parentfield)
430
431     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
432         outputObj = copy.deepcopy(outputObj)
433
434         files = []
435         def capture(fileobj):
436             files.append(fileobj)
437
438         adjustDirObjs(outputObj, capture)
439         adjustFileObjs(outputObj, capture)
440
441         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
442
443         final = arvados.collection.Collection(api_client=self.api,
444                                               keep_client=self.keep_client,
445                                               num_retries=self.num_retries)
446
447         for k,v in generatemapper.items():
448             if v.type == "Directory" and v.resolved.startswith("_:"):
449                     continue
450             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
451                 with final.open(v.target, "wb") as f:
452                     f.write(v.resolved.encode("utf-8"))
453                     continue
454
455             if not v.resolved.startswith("keep:"):
456                 raise Exception("Output source is not in keep or a literal")
457             sp = v.resolved.split("/")
458             srccollection = sp[0][5:]
459             try:
460                 reader = self.collection_cache.get(srccollection)
461                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
462                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
463             except arvados.errors.ArgumentError as e:
464                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
465                 raise
466             except IOError as e:
467                 logger.error("While preparing output collection: %s", e)
468                 raise
469
470         def rewrite(fileobj):
471             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
472             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
473                 if k in fileobj:
474                     del fileobj[k]
475
476         adjustDirObjs(outputObj, rewrite)
477         adjustFileObjs(outputObj, rewrite)
478
479         with final.open("cwl.output.json", "w") as f:
480             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
481             f.write(res)
482
483
484         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
485                        ensure_unique_name=True, properties=output_properties)
486
487         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
488                     final.api_response()["name"],
489                     final.manifest_locator())
490
491         final_uuid = final.manifest_locator()
492         tags = tagsString.split(',')
493         for tag in tags:
494              self.api.links().create(body={
495                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
496                 }).execute(num_retries=self.num_retries)
497
498         def finalcollection(fileobj):
499             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
500
501         adjustDirObjs(outputObj, finalcollection)
502         adjustFileObjs(outputObj, finalcollection)
503
504         return (outputObj, final)
505
506     def set_crunch_output(self):
507         if self.work_api == "containers":
508             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
509             if current is None:
510                 return
511             try:
512                 self.api.containers().update(uuid=current['uuid'],
513                                              body={
514                                                  'output': self.final_output_collection.portable_data_hash(),
515                                                  'output_properties': self.final_output_collection.get_properties(),
516                                              }).execute(num_retries=self.num_retries)
517                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
518                                               body={
519                                                   'is_trashed': True
520                                               }).execute(num_retries=self.num_retries)
521             except Exception:
522                 logger.exception("Setting container output")
523                 raise
524
525     def apply_reqs(self, job_order_object, tool):
526         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
527             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
528                 raise WorkflowException(
529                     "`cwl:requirements` in the input object is not part of CWL "
530                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
531                     "can set the cwlVersion to v1.1 or greater and re-run with "
532                     "--enable-dev.")
533             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
534             for req in job_reqs:
535                 tool.requirements.append(req)
536
537     @staticmethod
538     def get_git_info(tool):
539         in_a_git_repo = False
540         cwd = None
541         filepath = None
542
543         if tool.tool["id"].startswith("file://"):
544             # check if git is installed
545             try:
546                 filepath = uri_file_path(tool.tool["id"])
547                 cwd = os.path.dirname(filepath)
548                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
549                 in_a_git_repo = True
550             except Exception as e:
551                 pass
552
553         gitproperties = {}
554
555         if in_a_git_repo:
556             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
557             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
558             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
559             git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
560             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
561             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
562             git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
563             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
564             git_path = filepath[len(git_toplevel):]
565
566             gitproperties = {
567                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
568                 "http://arvados.org/cwl#gitDate": git_date.strip(),
569                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
570                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
571                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
572                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
573                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
574                 "http://arvados.org/cwl#gitPath": git_path.strip(),
575             }
576         else:
577             for g in ("http://arvados.org/cwl#gitCommit",
578                       "http://arvados.org/cwl#gitDate",
579                       "http://arvados.org/cwl#gitCommitter",
580                       "http://arvados.org/cwl#gitBranch",
581                       "http://arvados.org/cwl#gitOrigin",
582                       "http://arvados.org/cwl#gitStatus",
583                       "http://arvados.org/cwl#gitDescribe",
584                       "http://arvados.org/cwl#gitPath"):
585                 if g in tool.metadata:
586                     gitproperties[g] = tool.metadata[g]
587
588         return gitproperties
589
590     def set_container_request_properties(self, container, properties):
591         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
592         for cr in resp["items"]:
593             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
594             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
595
596     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
597         self.debug = runtimeContext.debug
598
599         self.runtime_status_update("activity", "initialization")
600
601         git_info = self.get_git_info(updated_tool) if self.git_info else {}
602         if git_info:
603             logger.info("Git provenance")
604             for g in git_info:
605                 if git_info[g]:
606                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
607
608         runtimeContext.git_info = git_info
609
610         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
611         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
612         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
613         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
614
615         if not self.fast_submit:
616             updated_tool.visit(self.check_features)
617
618         self.pipeline = None
619         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
620         self.secret_store = runtimeContext.secret_store
621
622         self.trash_intermediate = runtimeContext.trash_intermediate
623         if self.trash_intermediate and self.work_api != "containers":
624             raise Exception("--trash-intermediate is only supported with --api=containers.")
625
626         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
627         if self.intermediate_output_ttl and self.work_api != "containers":
628             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
629         if self.intermediate_output_ttl < 0:
630             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
631
632         if runtimeContext.submit_request_uuid and self.work_api != "containers":
633             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
634
635         runtimeContext = runtimeContext.copy()
636
637         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
638         if runtimeContext.storage_classes == "default":
639             runtimeContext.storage_classes = default_storage_classes
640         if runtimeContext.intermediate_storage_classes == "default":
641             runtimeContext.intermediate_storage_classes = default_storage_classes
642
643         if not runtimeContext.name:
644             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
645             if git_info.get("http://arvados.org/cwl#gitDescribe"):
646                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
647             runtimeContext.name = self.name
648
649         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
650             # When creating or updating workflow record, by default
651             # always copy dependencies and ensure Docker images are up
652             # to date.
653             runtimeContext.copy_deps = True
654             runtimeContext.match_local_docker = True
655
656         if runtimeContext.print_keep_deps:
657             runtimeContext.copy_deps = False
658             runtimeContext.match_local_docker = False
659
660         if runtimeContext.update_workflow and self.project_uuid is None:
661             # If we are updating a workflow, make sure anything that
662             # gets uploaded goes into the same parent project, unless
663             # an alternate --project-uuid was provided.
664             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
665             runtimeContext.project_uuid = existing_wf["owner_uuid"]
666
667         self.project_uuid = runtimeContext.project_uuid
668
669         self.runtime_status_update("activity", "data transfer")
670
671         # Upload local file references in the job order.
672         with Perf(metrics, "upload_job_order"):
673             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
674                                          updated_tool, job_order, runtimeContext)
675
676         # determine if we are submitting or directly executing the workflow.
677         #
678         # the last clause means: if it is a command line tool, and we
679         # are going to wait for the result, and always_submit_runner
680         # is false, then we don't submit a runner process.
681
682         submitting = (runtimeContext.submit and not
683                        (updated_tool.tool["class"] == "CommandLineTool" and
684                         runtimeContext.wait and
685                         not runtimeContext.always_submit_runner))
686
687         loadingContext = self.loadingContext.copy()
688         loadingContext.do_validate = False
689         loadingContext.disable_js_validation = True
690         tool = updated_tool
691
692         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
693         # Also uploads docker images.
694         if not self.fast_submit:
695             logger.info("Uploading workflow dependencies")
696             with Perf(metrics, "upload_workflow_deps"):
697                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
698         else:
699             # in the fast submit case, we are running a workflow that
700             # has already been uploaded to Arvados, so we assume all
701             # the dependencies have been pinned to keep references and
702             # there is nothing to do.
703             merged_map = {}
704
705         loadingContext.loader = tool.doc_loader
706         loadingContext.avsc_names = tool.doc_schema
707         loadingContext.metadata = tool.metadata
708         loadingContext.skip_resolve_all = True
709
710         workflow_wrapper = None
711         if (submitting and not self.fast_submit) or runtimeContext.update_workflow or runtimeContext.create_workflow or runtimeContext.print_keep_deps:
712             # upload workflow and get back the workflow wrapper
713
714             workflow_wrapper = upload_workflow(self, tool, job_order,
715                                                runtimeContext.project_uuid,
716                                                runtimeContext,
717                                                uuid=runtimeContext.update_workflow,
718                                                submit_runner_ram=runtimeContext.submit_runner_ram,
719                                                name=runtimeContext.name,
720                                                merged_map=merged_map,
721                                                submit_runner_image=runtimeContext.submit_runner_image,
722                                                git_info=git_info,
723                                                set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow),
724                                                jobmapper=jobmapper)
725
726             if runtimeContext.update_workflow or runtimeContext.create_workflow:
727                 # We're registering the workflow, so create or update
728                 # the workflow record and then exit.
729                 uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
730                                             runtimeContext.project_uuid, runtimeContext.update_workflow)
731                 self.stdout.write(uuid + "\n")
732                 return (None, "success")
733
734             if runtimeContext.print_keep_deps:
735                 # Just find and print out all the collection dependencies and exit
736                 print_keep_deps(self, runtimeContext, merged_map, tool)
737                 return (None, "success")
738
739             # Did not register a workflow, we're going to submit
740             # it instead.
741             loadingContext.loader.idx.clear()
742             loadingContext.loader.idx["_:main"] = workflow_wrapper
743             workflow_wrapper["id"] = "_:main"
744
745             # Reload the minimal wrapper workflow.
746             self.fast_submit = True
747             tool = load_tool(workflow_wrapper, loadingContext)
748             loadingContext.loader.idx["_:main"] = workflow_wrapper
749
750         if not submitting:
751             # If we are going to run the workflow now (rather than
752             # submit it), we need to update the workflow document
753             # replacing file references with keep references.  If we
754             # are just going to construct a run submission, we don't
755             # need to do this.
756             update_from_merged_map(tool, merged_map)
757
758         self.apply_reqs(job_order, tool)
759
760         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
761         self.eval_timeout = runtimeContext.eval_timeout
762
763         runtimeContext.use_container = True
764         runtimeContext.tmpdir_prefix = "tmp"
765         runtimeContext.work_api = self.work_api
766
767         if not self.output_name:
768              self.output_name = "Output from workflow %s" % runtimeContext.name
769
770         self.output_name  = cleanup_name_for_collection(self.output_name)
771
772         if self.work_api == "containers":
773             if self.ignore_docker_for_reuse:
774                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
775             runtimeContext.outdir = "/var/spool/cwl"
776             runtimeContext.docker_outdir = "/var/spool/cwl"
777             runtimeContext.tmpdir = "/tmp"
778             runtimeContext.docker_tmpdir = "/tmp"
779
780         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
781             raise Exception("--priority must be in the range 1..1000.")
782
783         if self.should_estimate_cache_size:
784             visited = set()
785             estimated_size = [0]
786             def estimate_collection_cache(obj):
787                 if obj.get("location", "").startswith("keep:"):
788                     m = pdh_size.match(obj["location"][5:])
789                     if m and m.group(1) not in visited:
790                         visited.add(m.group(1))
791                         estimated_size[0] += int(m.group(2))
792             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
793             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
794             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
795
796         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
797
798         runnerjob = None
799         if runtimeContext.submit:
800             # We are submitting instead of running immediately.
801             #
802             # Create a "Runner job" that when run() is invoked,
803             # creates the container request to run the workflow.
804             if self.work_api == "containers":
805                 if submitting:
806                     loadingContext.metadata = updated_tool.metadata.copy()
807                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
808                                            self.output_name,
809                                            self.output_tags,
810                                            submit_runner_ram=runtimeContext.submit_runner_ram,
811                                            name=runtimeContext.name,
812                                            on_error=runtimeContext.on_error,
813                                            submit_runner_image=runtimeContext.submit_runner_image,
814                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
815                                            merged_map=merged_map,
816                                            priority=runtimeContext.priority,
817                                            secret_store=self.secret_store,
818                                            collection_cache_size=runtimeContext.collection_cache_size,
819                                            collection_cache_is_default=self.should_estimate_cache_size,
820                                            git_info=git_info)
821                 else:
822                     runtimeContext.runnerjob = tool.tool["id"]
823
824         if runtimeContext.cwl_runner_job is not None:
825             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
826
827         jobiter = tool.job(job_order,
828                            self.output_callback,
829                            runtimeContext)
830
831         if runtimeContext.submit and not runtimeContext.wait:
832             # User provided --no-wait so submit the container request,
833             # get the container request uuid, print it out, and exit.
834             runnerjob = next(jobiter)
835             runnerjob.run(runtimeContext)
836             self.stdout.write(runnerjob.uuid+"\n")
837             return (None, "success")
838
839         # We either running the workflow directly, or submitting it
840         # and will wait for a final result.
841
842         self.runtime_status_update("activity", "workflow execution")
843
844         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
845         if current_container:
846             logger.info("Running inside container %s", current_container.get("uuid"))
847             self.set_container_request_properties(current_container, git_info)
848
849         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
850         self.polling_thread = threading.Thread(target=self.poll_states)
851         self.polling_thread.start()
852
853         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
854
855         try:
856             self.workflow_eval_lock.acquire()
857
858             # Holds the lock while this code runs and releases it when
859             # it is safe to do so in self.workflow_eval_lock.wait(),
860             # at which point on_message can update job state and
861             # process output callbacks.
862
863             loopperf = Perf(metrics, "jobiter")
864             loopperf.__enter__()
865             for runnable in jobiter:
866                 loopperf.__exit__()
867
868                 if self.stop_polling.is_set():
869                     break
870
871                 if self.task_queue.error is not None:
872                     raise self.task_queue.error
873
874                 if runnable:
875                     with Perf(metrics, "run"):
876                         self.start_run(runnable, runtimeContext)
877                 else:
878                     if (self.task_queue.in_flight + len(self.processes)) > 0:
879                         self.workflow_eval_lock.wait(3)
880                     else:
881                         if self.final_status is None:
882                             logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
883                         break
884
885                 if self.stop_polling.is_set():
886                     break
887
888                 loopperf.__enter__()
889             loopperf.__exit__()
890
891             while (self.task_queue.in_flight + len(self.processes)) > 0:
892                 if self.task_queue.error is not None:
893                     raise self.task_queue.error
894                 self.workflow_eval_lock.wait(3)
895
896         except UnsupportedRequirement:
897             raise
898         except:
899             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
900                 logger.error("Interrupted, workflow will be cancelled")
901             elif isinstance(sys.exc_info()[1], WorkflowException):
902                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
903             else:
904                 logger.exception("Workflow execution failed")
905
906             if self.pipeline:
907                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
908                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
909
910             if self.work_api == "containers" and not current_container:
911                 # Not running in a crunch container, so cancel any outstanding processes.
912                 for p in self.processes:
913                     try:
914                         self.api.container_requests().update(uuid=p,
915                                                              body={"priority": "0"}
916                         ).execute(num_retries=self.num_retries)
917                     except Exception:
918                         pass
919         finally:
920             self.workflow_eval_lock.release()
921             self.task_queue.drain()
922             self.stop_polling.set()
923             self.polling_thread.join()
924             self.task_queue.join()
925
926         if self.final_status == "UnsupportedRequirement":
927             raise UnsupportedRequirement("Check log for details.")
928
929         if self.final_output is None:
930             raise WorkflowException("Workflow did not return a result.")
931
932         if runtimeContext.usage_report_notes:
933             logger.info("Steps with low resource utilization (possible optimization opportunities):")
934             for x in runtimeContext.usage_report_notes:
935                 logger.info("  %s", x)
936
937         if runtimeContext.submit and isinstance(tool, Runner):
938             logger.info("Final output collection %s", tool.final_output)
939             if workbench2 or workbench1:
940                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
941         else:
942             if self.output_tags is None:
943                 self.output_tags = ""
944
945             storage_classes = ""
946             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
947             if storage_class_req and storage_class_req.get("finalStorageClass"):
948                 storage_classes = aslist(storage_class_req["finalStorageClass"])
949             else:
950                 storage_classes = runtimeContext.storage_classes.strip().split(",")
951
952             output_properties = {}
953             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
954             if output_properties_req:
955                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
956                 for pr in output_properties_req["outputProperties"]:
957                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
958
959             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
960                                                                                           self.output_tags, output_properties,
961                                                                                           self.final_output)
962             self.set_crunch_output()
963
964         if runtimeContext.compute_checksum:
965             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
966             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
967
968         if self.trash_intermediate and self.final_status == "success":
969             self.trash_intermediate_output()
970
971         return (self.final_output, self.final_status)