Merge branch '20257-http-import' refs #20257
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import subprocess
21 import time
22 import urllib
23
24 from cwltool.errors import WorkflowException
25 import cwltool.workflow
26 from schema_salad.sourceline import SourceLine, cmap
27 import schema_salad.validate as validate
28 from schema_salad.ref_resolver import file_uri, uri_file_path
29
30 import arvados
31 import arvados.config
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
34
35 import arvados_cwl.util
36 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
37 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map
38 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from cwltool.task_queue import TaskQueue
44 from .context import ArvLoadingContext, ArvRuntimeContext
45 from ._version import __version__
46
47 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
48 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
49 from cwltool.command_line_tool import compute_checksums
50 from cwltool.load_tool import load_tool
51
52 logger = logging.getLogger('arvados.cwl-runner')
53 metrics = logging.getLogger('arvados.cwl-runner.metrics')
54
55 DEFAULT_PRIORITY = 500
56
57 class RuntimeStatusLoggingHandler(logging.Handler):
58     """
59     Intercepts logging calls and report them as runtime statuses on runner
60     containers.
61     """
62     def __init__(self, runtime_status_update_func):
63         super(RuntimeStatusLoggingHandler, self).__init__()
64         self.runtime_status_update = runtime_status_update_func
65         self.updatingRuntimeStatus = False
66
67     def emit(self, record):
68         kind = None
69         if record.levelno >= logging.ERROR:
70             kind = 'error'
71         elif record.levelno >= logging.WARNING:
72             kind = 'warning'
73         if kind == 'warning' and record.name == "salad":
74             # Don't send validation warnings to runtime status,
75             # they're noisy and unhelpful.
76             return
77         if kind is not None and self.updatingRuntimeStatus is not True:
78             self.updatingRuntimeStatus = True
79             try:
80                 log_msg = record.getMessage()
81                 if '\n' in log_msg:
82                     # If the logged message is multi-line, use its first line as status
83                     # and the rest as detail.
84                     status, detail = log_msg.split('\n', 1)
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, status),
88                         detail
89                     )
90                 else:
91                     self.runtime_status_update(
92                         kind,
93                         "%s: %s" % (record.name, record.getMessage())
94                     )
95             finally:
96                 self.updatingRuntimeStatus = False
97
98
99 class ArvCwlExecutor(object):
100     """Execute a CWL tool or workflow, submit work (using containers API),
101     wait for them to complete, and report output.
102
103     """
104
105     def __init__(self, api_client,
106                  arvargs=None,
107                  keep_client=None,
108                  num_retries=4,
109                  thread_count=4,
110                  stdout=sys.stdout):
111
112         if arvargs is None:
113             arvargs = argparse.Namespace()
114             arvargs.work_api = None
115             arvargs.output_name = None
116             arvargs.output_tags = None
117             arvargs.thread_count = 1
118             arvargs.collection_cache_size = None
119             arvargs.git_info = True
120             arvargs.submit = False
121             arvargs.defer_downloads = False
122
123         self.api = api_client
124         self.processes = {}
125         self.workflow_eval_lock = threading.Condition(threading.RLock())
126         self.final_output = None
127         self.final_status = None
128         self.num_retries = num_retries
129         self.uuid = None
130         self.stop_polling = threading.Event()
131         self.poll_api = None
132         self.pipeline = None
133         self.final_output_collection = None
134         self.output_name = arvargs.output_name
135         self.output_tags = arvargs.output_tags
136         self.project_uuid = None
137         self.intermediate_output_ttl = 0
138         self.intermediate_output_collections = []
139         self.trash_intermediate = False
140         self.thread_count = arvargs.thread_count
141         self.poll_interval = 12
142         self.loadingContext = None
143         self.should_estimate_cache_size = True
144         self.fs_access = None
145         self.secret_store = None
146         self.stdout = stdout
147         self.fast_submit = False
148         self.git_info = arvargs.git_info
149
150         if keep_client is not None:
151             self.keep_client = keep_client
152         else:
153             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
154
155         if arvargs.collection_cache_size:
156             collection_cache_size = arvargs.collection_cache_size*1024*1024
157             self.should_estimate_cache_size = False
158         else:
159             collection_cache_size = 256*1024*1024
160
161         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
162                                                 cap=collection_cache_size)
163
164         self.fetcher_constructor = partial(CollectionFetcher,
165                                            api_client=self.api,
166                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
167                                            num_retries=self.num_retries)
168
169         self.work_api = None
170         expected_api = ["containers"]
171         for api in expected_api:
172             try:
173                 methods = self.api._rootDesc.get('resources')[api]['methods']
174                 if ('httpMethod' in methods['create'] and
175                     (arvargs.work_api == api or arvargs.work_api is None)):
176                     self.work_api = api
177                     break
178             except KeyError:
179                 pass
180
181         if not self.work_api:
182             if arvargs.work_api is None:
183                 raise Exception("No supported APIs")
184             else:
185                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
186
187         if self.work_api == "jobs":
188             logger.error("""
189 *******************************
190 The 'jobs' API is no longer supported.
191 *******************************""")
192             exit(1)
193
194         self.loadingContext = ArvLoadingContext(vars(arvargs))
195         self.loadingContext.fetcher_constructor = self.fetcher_constructor
196         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
197         self.loadingContext.construct_tool_object = self.arv_make_tool
198
199         # Add a custom logging handler to the root logger for runtime status reporting
200         # if running inside a container
201         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
202             root_logger = logging.getLogger('')
203
204             # Remove existing RuntimeStatusLoggingHandlers if they exist
205             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
206             root_logger.handlers = handlers
207
208             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
209             root_logger.addHandler(handler)
210
211         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
212         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
213                                                      collection_cache=self.collection_cache)
214
215         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
216
217         validate_cluster_target(self, self.toplevel_runtimeContext)
218
219
220     def arv_make_tool(self, toolpath_object, loadingContext):
221         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
222             return ArvadosCommandTool(self, toolpath_object, loadingContext)
223         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
224             return ArvadosWorkflow(self, toolpath_object, loadingContext)
225         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
226             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
227         else:
228             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
229
230     def output_callback(self, out, processStatus):
231         with self.workflow_eval_lock:
232             if processStatus == "success":
233                 logger.info("Overall process status is %s", processStatus)
234                 state = "Complete"
235             else:
236                 logger.error("Overall process status is %s", processStatus)
237                 state = "Failed"
238             if self.pipeline:
239                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
240                                                         body={"state": state}).execute(num_retries=self.num_retries)
241             self.final_status = processStatus
242             self.final_output = out
243             self.workflow_eval_lock.notifyAll()
244
245
246     def start_run(self, runnable, runtimeContext):
247         self.task_queue.add(partial(runnable.run, runtimeContext),
248                             self.workflow_eval_lock, self.stop_polling)
249
250     def process_submitted(self, container):
251         with self.workflow_eval_lock:
252             self.processes[container.uuid] = container
253
254     def process_done(self, uuid, record):
255         with self.workflow_eval_lock:
256             j = self.processes[uuid]
257             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
258             self.task_queue.add(partial(j.done, record),
259                                 self.workflow_eval_lock, self.stop_polling)
260             del self.processes[uuid]
261
262     def runtime_status_update(self, kind, message, detail=None):
263         """
264         Updates the runtime_status field on the runner container.
265         Called when there's a need to report errors, warnings or just
266         activity statuses, for example in the RuntimeStatusLoggingHandler.
267         """
268
269         if kind not in ('error', 'warning', 'activity'):
270             # Ignore any other status kind
271             return
272
273         with self.workflow_eval_lock:
274             current = None
275             try:
276                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
277             except Exception as e:
278                 logger.info("Couldn't get current container: %s", e)
279             if current is None:
280                 return
281             runtime_status = current.get('runtime_status', {})
282
283             original_updatemessage = updatemessage = runtime_status.get(kind, "")
284             if kind == "activity" or not updatemessage:
285                 updatemessage = message
286
287             # Subsequent messages tacked on in detail
288             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
289             maxlines = 40
290             if updatedetail.count("\n") < maxlines:
291                 if updatedetail:
292                     updatedetail += "\n"
293                 updatedetail += message + "\n"
294
295                 if detail:
296                     updatedetail += detail + "\n"
297
298                 if updatedetail.count("\n") >= maxlines:
299                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
300
301             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
302                 # don't waste time doing an update if nothing changed
303                 # (usually because we exceeded the max lines)
304                 return
305
306             runtime_status.update({
307                 kind: updatemessage,
308                 kind+'Detail': updatedetail,
309             })
310
311             try:
312                 self.api.containers().update(uuid=current['uuid'],
313                                             body={
314                                                 'runtime_status': runtime_status,
315                                             }).execute(num_retries=self.num_retries)
316             except Exception as e:
317                 logger.info("Couldn't update runtime_status: %s", e)
318
319     def wrapped_callback(self, cb, obj, st):
320         with self.workflow_eval_lock:
321             cb(obj, st)
322             self.workflow_eval_lock.notifyAll()
323
324     def get_wrapped_callback(self, cb):
325         return partial(self.wrapped_callback, cb)
326
327     def on_message(self, event):
328         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
329             uuid = event["object_uuid"]
330             if event["properties"]["new_attributes"]["state"] == "Running":
331                 with self.workflow_eval_lock:
332                     j = self.processes[uuid]
333                     if j.running is False:
334                         j.running = True
335                         j.update_pipeline_component(event["properties"]["new_attributes"])
336                         logger.info("%s %s is Running", self.label(j), uuid)
337             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
338                 self.process_done(uuid, event["properties"]["new_attributes"])
339
340     def label(self, obj):
341         return "[%s %s]" % (self.work_api[0:-1], obj.name)
342
343     def poll_states(self):
344         """Poll status of containers listed in the processes dict.
345
346         Runs in a separate thread.
347         """
348
349         try:
350             remain_wait = self.poll_interval
351             while True:
352                 if remain_wait > 0:
353                     self.stop_polling.wait(remain_wait)
354                 if self.stop_polling.is_set():
355                     break
356                 with self.workflow_eval_lock:
357                     keys = list(self.processes)
358                 if not keys:
359                     remain_wait = self.poll_interval
360                     continue
361
362                 begin_poll = time.time()
363                 if self.work_api == "containers":
364                     table = self.poll_api.container_requests()
365
366                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
367
368                 while keys:
369                     page = keys[:pageSize]
370                     try:
371                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
372                     except Exception as e:
373                         logger.exception("Error checking states on API server: %s", e)
374                         remain_wait = self.poll_interval
375                         continue
376
377                     for p in proc_states["items"]:
378                         self.on_message({
379                             "object_uuid": p["uuid"],
380                             "event_type": "update",
381                             "properties": {
382                                 "new_attributes": p
383                             }
384                         })
385                     keys = keys[pageSize:]
386
387                 finish_poll = time.time()
388                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
389         except:
390             logger.exception("Fatal error in state polling thread.")
391             with self.workflow_eval_lock:
392                 self.processes.clear()
393                 self.workflow_eval_lock.notifyAll()
394         finally:
395             self.stop_polling.set()
396
397     def add_intermediate_output(self, uuid):
398         if uuid:
399             self.intermediate_output_collections.append(uuid)
400
401     def trash_intermediate_output(self):
402         logger.info("Cleaning up intermediate output collections")
403         for i in self.intermediate_output_collections:
404             try:
405                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
406             except Exception:
407                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
408             except (KeyboardInterrupt, SystemExit):
409                 break
410
411     def check_features(self, obj, parentfield=""):
412         if isinstance(obj, dict):
413             if obj.get("class") == "DockerRequirement":
414                 if obj.get("dockerOutputDirectory"):
415                     if not obj.get("dockerOutputDirectory").startswith('/'):
416                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
417                             "Option 'dockerOutputDirectory' must be an absolute path.")
418             if obj.get("class") == "InplaceUpdateRequirement":
419                 if obj["inplaceUpdate"] and parentfield == "requirements":
420                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
421             for k,v in viewitems(obj):
422                 self.check_features(v, parentfield=k)
423         elif isinstance(obj, list):
424             for i,v in enumerate(obj):
425                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
426                     self.check_features(v, parentfield=parentfield)
427
428     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
429         outputObj = copy.deepcopy(outputObj)
430
431         files = []
432         def capture(fileobj):
433             files.append(fileobj)
434
435         adjustDirObjs(outputObj, capture)
436         adjustFileObjs(outputObj, capture)
437
438         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
439
440         final = arvados.collection.Collection(api_client=self.api,
441                                               keep_client=self.keep_client,
442                                               num_retries=self.num_retries)
443
444         for k,v in generatemapper.items():
445             if v.type == "Directory" and v.resolved.startswith("_:"):
446                     continue
447             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
448                 with final.open(v.target, "wb") as f:
449                     f.write(v.resolved.encode("utf-8"))
450                     continue
451
452             if not v.resolved.startswith("keep:"):
453                 raise Exception("Output source is not in keep or a literal")
454             sp = v.resolved.split("/")
455             srccollection = sp[0][5:]
456             try:
457                 reader = self.collection_cache.get(srccollection)
458                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
459                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
460             except arvados.errors.ArgumentError as e:
461                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
462                 raise
463             except IOError as e:
464                 logger.error("While preparing output collection: %s", e)
465                 raise
466
467         def rewrite(fileobj):
468             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
469             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
470                 if k in fileobj:
471                     del fileobj[k]
472
473         adjustDirObjs(outputObj, rewrite)
474         adjustFileObjs(outputObj, rewrite)
475
476         with final.open("cwl.output.json", "w") as f:
477             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
478             f.write(res)
479
480
481         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
482                        ensure_unique_name=True, properties=output_properties)
483
484         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
485                     final.api_response()["name"],
486                     final.manifest_locator())
487
488         final_uuid = final.manifest_locator()
489         tags = tagsString.split(',')
490         for tag in tags:
491              self.api.links().create(body={
492                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
493                 }).execute(num_retries=self.num_retries)
494
495         def finalcollection(fileobj):
496             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
497
498         adjustDirObjs(outputObj, finalcollection)
499         adjustFileObjs(outputObj, finalcollection)
500
501         return (outputObj, final)
502
503     def set_crunch_output(self):
504         if self.work_api == "containers":
505             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
506             if current is None:
507                 return
508             try:
509                 self.api.containers().update(uuid=current['uuid'],
510                                              body={
511                                                  'output': self.final_output_collection.portable_data_hash(),
512                                                  'output_properties': self.final_output_collection.get_properties(),
513                                              }).execute(num_retries=self.num_retries)
514                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
515                                               body={
516                                                   'is_trashed': True
517                                               }).execute(num_retries=self.num_retries)
518             except Exception:
519                 logger.exception("Setting container output")
520                 raise
521
522     def apply_reqs(self, job_order_object, tool):
523         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
524             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
525                 raise WorkflowException(
526                     "`cwl:requirements` in the input object is not part of CWL "
527                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
528                     "can set the cwlVersion to v1.1 or greater and re-run with "
529                     "--enable-dev.")
530             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
531             for req in job_reqs:
532                 tool.requirements.append(req)
533
534     @staticmethod
535     def get_git_info(tool):
536         in_a_git_repo = False
537         cwd = None
538         filepath = None
539
540         if tool.tool["id"].startswith("file://"):
541             # check if git is installed
542             try:
543                 filepath = uri_file_path(tool.tool["id"])
544                 cwd = os.path.dirname(filepath)
545                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
546                 in_a_git_repo = True
547             except Exception as e:
548                 pass
549
550         gitproperties = {}
551
552         if in_a_git_repo:
553             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
554             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
555             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
556             git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
557             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
558             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
559             git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
560             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
561             git_path = filepath[len(git_toplevel):]
562
563             gitproperties = {
564                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
565                 "http://arvados.org/cwl#gitDate": git_date.strip(),
566                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
567                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
568                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
569                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
570                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
571                 "http://arvados.org/cwl#gitPath": git_path.strip(),
572             }
573         else:
574             for g in ("http://arvados.org/cwl#gitCommit",
575                       "http://arvados.org/cwl#gitDate",
576                       "http://arvados.org/cwl#gitCommitter",
577                       "http://arvados.org/cwl#gitBranch",
578                       "http://arvados.org/cwl#gitOrigin",
579                       "http://arvados.org/cwl#gitStatus",
580                       "http://arvados.org/cwl#gitDescribe",
581                       "http://arvados.org/cwl#gitPath"):
582                 if g in tool.metadata:
583                     gitproperties[g] = tool.metadata[g]
584
585         return gitproperties
586
587     def set_container_request_properties(self, container, properties):
588         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
589         for cr in resp["items"]:
590             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
591             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
592
593     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
594         self.debug = runtimeContext.debug
595
596         self.runtime_status_update("activity", "initialization")
597
598         git_info = self.get_git_info(updated_tool) if self.git_info else {}
599         if git_info:
600             logger.info("Git provenance")
601             for g in git_info:
602                 if git_info[g]:
603                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
604
605         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
606         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
607         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
608         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
609
610         if not self.fast_submit:
611             updated_tool.visit(self.check_features)
612
613         self.pipeline = None
614         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
615         self.secret_store = runtimeContext.secret_store
616
617         self.trash_intermediate = runtimeContext.trash_intermediate
618         if self.trash_intermediate and self.work_api != "containers":
619             raise Exception("--trash-intermediate is only supported with --api=containers.")
620
621         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
622         if self.intermediate_output_ttl and self.work_api != "containers":
623             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
624         if self.intermediate_output_ttl < 0:
625             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
626
627         if runtimeContext.submit_request_uuid and self.work_api != "containers":
628             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
629
630         runtimeContext = runtimeContext.copy()
631
632         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
633         if runtimeContext.storage_classes == "default":
634             runtimeContext.storage_classes = default_storage_classes
635         if runtimeContext.intermediate_storage_classes == "default":
636             runtimeContext.intermediate_storage_classes = default_storage_classes
637
638         if not runtimeContext.name:
639             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
640             if git_info.get("http://arvados.org/cwl#gitDescribe"):
641                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
642             runtimeContext.name = self.name
643
644         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
645             # When creating or updating workflow record, by default
646             # always copy dependencies and ensure Docker images are up
647             # to date.
648             runtimeContext.copy_deps = True
649             runtimeContext.match_local_docker = True
650
651         if runtimeContext.update_workflow and self.project_uuid is None:
652             # If we are updating a workflow, make sure anything that
653             # gets uploaded goes into the same parent project, unless
654             # an alternate --project-uuid was provided.
655             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
656             runtimeContext.project_uuid = existing_wf["owner_uuid"]
657
658         self.project_uuid = runtimeContext.project_uuid
659
660         self.runtime_status_update("activity", "data transfer")
661
662         # Upload local file references in the job order.
663         with Perf(metrics, "upload_job_order"):
664             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
665                                          updated_tool, job_order, runtimeContext)
666
667         # determine if we are submitting or directly executing the workflow.
668         #
669         # the last clause means: if it is a command line tool, and we
670         # are going to wait for the result, and always_submit_runner
671         # is false, then we don't submit a runner process.
672
673         submitting = (runtimeContext.update_workflow or
674                       runtimeContext.create_workflow or
675                       (runtimeContext.submit and not
676                        (updated_tool.tool["class"] == "CommandLineTool" and
677                         runtimeContext.wait and
678                         not runtimeContext.always_submit_runner)))
679
680         loadingContext = self.loadingContext.copy()
681         loadingContext.do_validate = False
682         loadingContext.disable_js_validation = True
683         tool = updated_tool
684
685         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
686         # Also uploads docker images.
687         if not self.fast_submit:
688             logger.info("Uploading workflow dependencies")
689             with Perf(metrics, "upload_workflow_deps"):
690                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
691         else:
692             # in the fast submit case, we are running a workflow that
693             # has already been uploaded to Arvados, so we assume all
694             # the dependencies have been pinned to keep references and
695             # there is nothing to do.
696             merged_map = {}
697
698         loadingContext.loader = tool.doc_loader
699         loadingContext.avsc_names = tool.doc_schema
700         loadingContext.metadata = tool.metadata
701         loadingContext.skip_resolve_all = True
702
703         workflow_wrapper = None
704         if submitting and not self.fast_submit:
705             # upload workflow and get back the workflow wrapper
706
707             workflow_wrapper = upload_workflow(self, tool, job_order,
708                                                runtimeContext.project_uuid,
709                                                runtimeContext,
710                                                uuid=runtimeContext.update_workflow,
711                                                submit_runner_ram=runtimeContext.submit_runner_ram,
712                                                name=runtimeContext.name,
713                                                merged_map=merged_map,
714                                                submit_runner_image=runtimeContext.submit_runner_image,
715                                                git_info=git_info,
716                                                set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow),
717                                                jobmapper=jobmapper)
718
719             if runtimeContext.update_workflow or runtimeContext.create_workflow:
720                 # We're registering the workflow, so create or update
721                 # the workflow record and then exit.
722                 uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
723                                             runtimeContext.project_uuid, runtimeContext.update_workflow)
724                 self.stdout.write(uuid + "\n")
725                 return (None, "success")
726
727             # Did not register a workflow, we're going to submit
728             # it instead.
729             loadingContext.loader.idx.clear()
730             loadingContext.loader.idx["_:main"] = workflow_wrapper
731             workflow_wrapper["id"] = "_:main"
732
733             # Reload the minimal wrapper workflow.
734             self.fast_submit = True
735             tool = load_tool(workflow_wrapper, loadingContext)
736             loadingContext.loader.idx["_:main"] = workflow_wrapper
737
738         if not submitting:
739             # If we are going to run the workflow now (rather than
740             # submit it), we need to update the workflow document
741             # replacing file references with keep references.  If we
742             # are just going to construct a run submission, we don't
743             # need to do this.
744             update_from_merged_map(tool, merged_map)
745
746         self.apply_reqs(job_order, tool)
747
748         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
749         self.eval_timeout = runtimeContext.eval_timeout
750
751         runtimeContext.use_container = True
752         runtimeContext.tmpdir_prefix = "tmp"
753         runtimeContext.work_api = self.work_api
754
755         if not self.output_name:
756              self.output_name = "Output from workflow %s" % runtimeContext.name
757
758         self.output_name  = cleanup_name_for_collection(self.output_name)
759
760         if self.work_api == "containers":
761             if self.ignore_docker_for_reuse:
762                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
763             runtimeContext.outdir = "/var/spool/cwl"
764             runtimeContext.docker_outdir = "/var/spool/cwl"
765             runtimeContext.tmpdir = "/tmp"
766             runtimeContext.docker_tmpdir = "/tmp"
767
768         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
769             raise Exception("--priority must be in the range 1..1000.")
770
771         if self.should_estimate_cache_size:
772             visited = set()
773             estimated_size = [0]
774             def estimate_collection_cache(obj):
775                 if obj.get("location", "").startswith("keep:"):
776                     m = pdh_size.match(obj["location"][5:])
777                     if m and m.group(1) not in visited:
778                         visited.add(m.group(1))
779                         estimated_size[0] += int(m.group(2))
780             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
781             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
782             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
783
784         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
785
786         runnerjob = None
787         if runtimeContext.submit:
788             # We are submitting instead of running immediately.
789             #
790             # Create a "Runner job" that when run() is invoked,
791             # creates the container request to run the workflow.
792             if self.work_api == "containers":
793                 if submitting:
794                     loadingContext.metadata = updated_tool.metadata.copy()
795                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
796                                            self.output_name,
797                                            self.output_tags,
798                                            submit_runner_ram=runtimeContext.submit_runner_ram,
799                                            name=runtimeContext.name,
800                                            on_error=runtimeContext.on_error,
801                                            submit_runner_image=runtimeContext.submit_runner_image,
802                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
803                                            merged_map=merged_map,
804                                            priority=runtimeContext.priority,
805                                            secret_store=self.secret_store,
806                                            collection_cache_size=runtimeContext.collection_cache_size,
807                                            collection_cache_is_default=self.should_estimate_cache_size,
808                                            git_info=git_info)
809                 else:
810                     runtimeContext.runnerjob = tool.tool["id"]
811
812         if runtimeContext.cwl_runner_job is not None:
813             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
814
815         jobiter = tool.job(job_order,
816                            self.output_callback,
817                            runtimeContext)
818
819         if runtimeContext.submit and not runtimeContext.wait:
820             # User provided --no-wait so submit the container request,
821             # get the container request uuid, print it out, and exit.
822             runnerjob = next(jobiter)
823             runnerjob.run(runtimeContext)
824             self.stdout.write(runnerjob.uuid+"\n")
825             return (None, "success")
826
827         # We either running the workflow directly, or submitting it
828         # and will wait for a final result.
829
830         self.runtime_status_update("activity", "workflow execution")
831
832         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
833         if current_container:
834             logger.info("Running inside container %s", current_container.get("uuid"))
835             self.set_container_request_properties(current_container, git_info)
836
837         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
838         self.polling_thread = threading.Thread(target=self.poll_states)
839         self.polling_thread.start()
840
841         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
842
843         try:
844             self.workflow_eval_lock.acquire()
845
846             # Holds the lock while this code runs and releases it when
847             # it is safe to do so in self.workflow_eval_lock.wait(),
848             # at which point on_message can update job state and
849             # process output callbacks.
850
851             loopperf = Perf(metrics, "jobiter")
852             loopperf.__enter__()
853             for runnable in jobiter:
854                 loopperf.__exit__()
855
856                 if self.stop_polling.is_set():
857                     break
858
859                 if self.task_queue.error is not None:
860                     raise self.task_queue.error
861
862                 if runnable:
863                     with Perf(metrics, "run"):
864                         self.start_run(runnable, runtimeContext)
865                 else:
866                     if (self.task_queue.in_flight + len(self.processes)) > 0:
867                         self.workflow_eval_lock.wait(3)
868                     else:
869                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
870                         break
871
872                 if self.stop_polling.is_set():
873                     break
874
875                 loopperf.__enter__()
876             loopperf.__exit__()
877
878             while (self.task_queue.in_flight + len(self.processes)) > 0:
879                 if self.task_queue.error is not None:
880                     raise self.task_queue.error
881                 self.workflow_eval_lock.wait(3)
882
883         except UnsupportedRequirement:
884             raise
885         except:
886             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
887                 logger.error("Interrupted, workflow will be cancelled")
888             elif isinstance(sys.exc_info()[1], WorkflowException):
889                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
890             else:
891                 logger.exception("Workflow execution failed")
892
893             if self.pipeline:
894                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
895                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
896
897             if self.work_api == "containers" and not current_container:
898                 # Not running in a crunch container, so cancel any outstanding processes.
899                 for p in self.processes:
900                     try:
901                         self.api.container_requests().update(uuid=p,
902                                                              body={"priority": "0"}
903                         ).execute(num_retries=self.num_retries)
904                     except Exception:
905                         pass
906         finally:
907             self.workflow_eval_lock.release()
908             self.task_queue.drain()
909             self.stop_polling.set()
910             self.polling_thread.join()
911             self.task_queue.join()
912
913         if self.final_status == "UnsupportedRequirement":
914             raise UnsupportedRequirement("Check log for details.")
915
916         if self.final_output is None:
917             raise WorkflowException("Workflow did not return a result.")
918
919         if runtimeContext.submit and isinstance(tool, Runner):
920             logger.info("Final output collection %s", tool.final_output)
921             if workbench2 or workbench1:
922                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
923         else:
924             if self.output_tags is None:
925                 self.output_tags = ""
926
927             storage_classes = ""
928             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
929             if storage_class_req and storage_class_req.get("finalStorageClass"):
930                 storage_classes = aslist(storage_class_req["finalStorageClass"])
931             else:
932                 storage_classes = runtimeContext.storage_classes.strip().split(",")
933
934             output_properties = {}
935             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
936             if output_properties_req:
937                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
938                 for pr in output_properties_req["outputProperties"]:
939                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
940
941             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
942                                                                                           self.output_tags, output_properties,
943                                                                                           self.final_output)
944             self.set_crunch_output()
945
946         if runtimeContext.compute_checksum:
947             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
948             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
949
950         if self.trash_intermediate and self.final_status == "success":
951             self.trash_intermediate_output()
952
953         return (self.final_output, self.final_status)