Merge branch '21026-sanitize-html-doc'
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import subprocess
21 import time
22 import urllib
23
24 from cwltool.errors import WorkflowException
25 import cwltool.workflow
26 from schema_salad.sourceline import SourceLine, cmap
27 import schema_salad.validate as validate
28 from schema_salad.ref_resolver import file_uri, uri_file_path
29
30 import arvados
31 import arvados.config
32 from arvados.keep import KeepClient
33 from arvados.errors import ApiError
34
35 import arvados_cwl.util
36 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
37 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map, print_keep_deps
38 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
39 from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
40 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
41 from .perf import Perf
42 from .pathmapper import NoFollowPathMapper
43 from cwltool.task_queue import TaskQueue
44 from .context import ArvLoadingContext, ArvRuntimeContext
45 from ._version import __version__
46
47 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
48 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
49 from cwltool.command_line_tool import compute_checksums
50 from cwltool.load_tool import load_tool
51
52 logger = logging.getLogger('arvados.cwl-runner')
53 metrics = logging.getLogger('arvados.cwl-runner.metrics')
54
55 DEFAULT_PRIORITY = 500
56
57 class RuntimeStatusLoggingHandler(logging.Handler):
58     """
59     Intercepts logging calls and report them as runtime statuses on runner
60     containers.
61     """
62     def __init__(self, runtime_status_update_func):
63         super(RuntimeStatusLoggingHandler, self).__init__()
64         self.runtime_status_update = runtime_status_update_func
65         self.updatingRuntimeStatus = False
66
67     def emit(self, record):
68         kind = None
69         if record.levelno >= logging.ERROR:
70             kind = 'error'
71         elif record.levelno >= logging.WARNING:
72             kind = 'warning'
73         if kind == 'warning' and record.name == "salad":
74             # Don't send validation warnings to runtime status,
75             # they're noisy and unhelpful.
76             return
77         if kind is not None and self.updatingRuntimeStatus is not True:
78             self.updatingRuntimeStatus = True
79             try:
80                 log_msg = record.getMessage()
81                 if '\n' in log_msg:
82                     # If the logged message is multi-line, use its first line as status
83                     # and the rest as detail.
84                     status, detail = log_msg.split('\n', 1)
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, status),
88                         detail
89                     )
90                 else:
91                     self.runtime_status_update(
92                         kind,
93                         "%s: %s" % (record.name, record.getMessage())
94                     )
95             finally:
96                 self.updatingRuntimeStatus = False
97
98
99 class ArvCwlExecutor(object):
100     """Execute a CWL tool or workflow, submit work (using containers API),
101     wait for them to complete, and report output.
102
103     """
104
105     def __init__(self, api_client,
106                  arvargs=None,
107                  keep_client=None,
108                  num_retries=4,
109                  thread_count=4,
110                  stdout=sys.stdout):
111
112         if arvargs is None:
113             arvargs = argparse.Namespace()
114             arvargs.work_api = None
115             arvargs.output_name = None
116             arvargs.output_tags = None
117             arvargs.thread_count = 1
118             arvargs.collection_cache_size = None
119             arvargs.git_info = True
120             arvargs.submit = False
121             arvargs.defer_downloads = False
122
123         self.api = api_client
124         self.processes = {}
125         self.workflow_eval_lock = threading.Condition(threading.RLock())
126         self.final_output = None
127         self.final_status = None
128         self.num_retries = num_retries
129         self.uuid = None
130         self.stop_polling = threading.Event()
131         self.poll_api = None
132         self.pipeline = None
133         self.final_output_collection = None
134         self.output_name = arvargs.output_name
135         self.output_tags = arvargs.output_tags
136         self.project_uuid = None
137         self.intermediate_output_ttl = 0
138         self.intermediate_output_collections = []
139         self.trash_intermediate = False
140         self.thread_count = arvargs.thread_count
141         self.poll_interval = 12
142         self.loadingContext = None
143         self.should_estimate_cache_size = True
144         self.fs_access = None
145         self.secret_store = None
146         self.stdout = stdout
147         self.fast_submit = False
148         self.git_info = arvargs.git_info
149
150         if keep_client is not None:
151             self.keep_client = keep_client
152         else:
153             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
154
155         if arvargs.collection_cache_size:
156             collection_cache_size = arvargs.collection_cache_size*1024*1024
157             self.should_estimate_cache_size = False
158         else:
159             collection_cache_size = 256*1024*1024
160
161         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
162                                                 cap=collection_cache_size)
163
164         self.fetcher_constructor = partial(CollectionFetcher,
165                                            api_client=self.api,
166                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
167                                            num_retries=self.num_retries)
168
169         self.work_api = None
170         expected_api = ["containers"]
171         for api in expected_api:
172             try:
173                 methods = self.api._rootDesc.get('resources')[api]['methods']
174                 if ('httpMethod' in methods['create'] and
175                     (arvargs.work_api == api or arvargs.work_api is None)):
176                     self.work_api = api
177                     break
178             except KeyError:
179                 pass
180
181         if not self.work_api:
182             if arvargs.work_api is None:
183                 raise Exception("No supported APIs")
184             else:
185                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
186
187         if self.work_api == "jobs":
188             logger.error("""
189 *******************************
190 The 'jobs' API is no longer supported.
191 *******************************""")
192             exit(1)
193
194         self.loadingContext = ArvLoadingContext(vars(arvargs))
195         self.loadingContext.fetcher_constructor = self.fetcher_constructor
196         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
197         self.loadingContext.construct_tool_object = self.arv_make_tool
198
199         # Add a custom logging handler to the root logger for runtime status reporting
200         # if running inside a container
201         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
202             root_logger = logging.getLogger('')
203
204             # Remove existing RuntimeStatusLoggingHandlers if they exist
205             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
206             root_logger.handlers = handlers
207
208             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
209             root_logger.addHandler(handler)
210
211         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
212         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
213                                                      collection_cache=self.collection_cache)
214
215         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
216
217         validate_cluster_target(self, self.toplevel_runtimeContext)
218
219
220     def arv_make_tool(self, toolpath_object, loadingContext):
221         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
222             return ArvadosCommandTool(self, toolpath_object, loadingContext)
223         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
224             return ArvadosWorkflow(self, toolpath_object, loadingContext)
225         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
226             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
227         else:
228             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
229
230     def output_callback(self, out, processStatus):
231         with self.workflow_eval_lock:
232             if processStatus == "success":
233                 logger.info("Overall process status is %s", processStatus)
234                 state = "Complete"
235             else:
236                 logger.error("Overall process status is %s", processStatus)
237                 state = "Failed"
238             if self.pipeline:
239                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
240                                                         body={"state": state}).execute(num_retries=self.num_retries)
241             self.final_status = processStatus
242             self.final_output = out
243             self.workflow_eval_lock.notifyAll()
244
245
246     def start_run(self, runnable, runtimeContext):
247         self.task_queue.add(partial(runnable.run, runtimeContext),
248                             self.workflow_eval_lock, self.stop_polling)
249
250     def process_submitted(self, container):
251         with self.workflow_eval_lock:
252             self.processes[container.uuid] = container
253
254     def process_done(self, uuid, record):
255         with self.workflow_eval_lock:
256             j = self.processes[uuid]
257             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
258             self.task_queue.add(partial(j.done, record),
259                                 self.workflow_eval_lock, self.stop_polling)
260             del self.processes[uuid]
261
262     def runtime_status_update(self, kind, message, detail=None):
263         """
264         Updates the runtime_status field on the runner container.
265         Called when there's a need to report errors, warnings or just
266         activity statuses, for example in the RuntimeStatusLoggingHandler.
267         """
268
269         if kind not in ('error', 'warning', 'activity'):
270             # Ignore any other status kind
271             return
272
273         with self.workflow_eval_lock:
274             current = None
275             try:
276                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
277             except Exception as e:
278                 logger.info("Couldn't get current container: %s", e)
279             if current is None:
280                 return
281             runtime_status = current.get('runtime_status', {})
282
283             original_updatemessage = updatemessage = runtime_status.get(kind, "")
284             if kind == "activity" or not updatemessage:
285                 updatemessage = message
286
287             # Subsequent messages tacked on in detail
288             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
289             maxlines = 40
290             if updatedetail.count("\n") < maxlines:
291                 if updatedetail:
292                     updatedetail += "\n"
293                 updatedetail += message + "\n"
294
295                 if detail:
296                     updatedetail += detail + "\n"
297
298                 if updatedetail.count("\n") >= maxlines:
299                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
300
301             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
302                 # don't waste time doing an update if nothing changed
303                 # (usually because we exceeded the max lines)
304                 return
305
306             runtime_status.update({
307                 kind: updatemessage,
308                 kind+'Detail': updatedetail,
309             })
310
311             try:
312                 self.api.containers().update(uuid=current['uuid'],
313                                             body={
314                                                 'runtime_status': runtime_status,
315                                             }).execute(num_retries=self.num_retries)
316             except Exception as e:
317                 logger.info("Couldn't update runtime_status: %s", e)
318
319     def wrapped_callback(self, cb, obj, st):
320         with self.workflow_eval_lock:
321             cb(obj, st)
322             self.workflow_eval_lock.notifyAll()
323
324     def get_wrapped_callback(self, cb):
325         return partial(self.wrapped_callback, cb)
326
327     def on_message(self, event):
328         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
329             uuid = event["object_uuid"]
330             if event["properties"]["new_attributes"]["state"] == "Running":
331                 with self.workflow_eval_lock:
332                     j = self.processes[uuid]
333                     if j.running is False:
334                         j.running = True
335                         j.update_pipeline_component(event["properties"]["new_attributes"])
336                         logger.info("%s %s is Running", self.label(j), uuid)
337             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
338                 self.process_done(uuid, event["properties"]["new_attributes"])
339
340     def label(self, obj):
341         return "[%s %s]" % (self.work_api[0:-1], obj.name)
342
343     def poll_states(self):
344         """Poll status of containers listed in the processes dict.
345
346         Runs in a separate thread.
347         """
348
349         try:
350             remain_wait = self.poll_interval
351             while True:
352                 if remain_wait > 0:
353                     self.stop_polling.wait(remain_wait)
354                 if self.stop_polling.is_set():
355                     break
356                 with self.workflow_eval_lock:
357                     keys = list(self.processes)
358                 if not keys:
359                     remain_wait = self.poll_interval
360                     continue
361
362                 begin_poll = time.time()
363                 if self.work_api == "containers":
364                     table = self.poll_api.container_requests()
365
366                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
367
368                 while keys:
369                     page = keys[:pageSize]
370                     try:
371                         proc_states = table.list(filters=[["uuid", "in", page]], select=["uuid", "container_uuid", "state", "log_uuid",
372                                                                                          "output_uuid", "modified_at", "properties"]).execute(num_retries=self.num_retries)
373                     except Exception as e:
374                         logger.warning("Temporary error checking states on API server: %s", e)
375                         remain_wait = self.poll_interval
376                         continue
377
378                     for p in proc_states["items"]:
379                         self.on_message({
380                             "object_uuid": p["uuid"],
381                             "event_type": "update",
382                             "properties": {
383                                 "new_attributes": p
384                             }
385                         })
386                     keys = keys[pageSize:]
387
388                 finish_poll = time.time()
389                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
390         except:
391             logger.exception("Fatal error in state polling thread.")
392             with self.workflow_eval_lock:
393                 self.processes.clear()
394                 self.workflow_eval_lock.notifyAll()
395         finally:
396             self.stop_polling.set()
397
398     def add_intermediate_output(self, uuid):
399         if uuid:
400             self.intermediate_output_collections.append(uuid)
401
402     def trash_intermediate_output(self):
403         logger.info("Cleaning up intermediate output collections")
404         for i in self.intermediate_output_collections:
405             try:
406                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
407             except Exception:
408                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
409             except (KeyboardInterrupt, SystemExit):
410                 break
411
412     def check_features(self, obj, parentfield=""):
413         if isinstance(obj, dict):
414             if obj.get("class") == "DockerRequirement":
415                 if obj.get("dockerOutputDirectory"):
416                     if not obj.get("dockerOutputDirectory").startswith('/'):
417                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
418                             "Option 'dockerOutputDirectory' must be an absolute path.")
419             if obj.get("class") == "InplaceUpdateRequirement":
420                 if obj["inplaceUpdate"] and parentfield == "requirements":
421                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
422             for k,v in viewitems(obj):
423                 self.check_features(v, parentfield=k)
424         elif isinstance(obj, list):
425             for i,v in enumerate(obj):
426                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
427                     self.check_features(v, parentfield=parentfield)
428
429     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
430         outputObj = copy.deepcopy(outputObj)
431
432         files = []
433         def capture(fileobj):
434             files.append(fileobj)
435
436         adjustDirObjs(outputObj, capture)
437         adjustFileObjs(outputObj, capture)
438
439         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
440
441         final = arvados.collection.Collection(api_client=self.api,
442                                               keep_client=self.keep_client,
443                                               num_retries=self.num_retries)
444
445         for k,v in generatemapper.items():
446             if v.type == "Directory" and v.resolved.startswith("_:"):
447                     continue
448             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
449                 with final.open(v.target, "wb") as f:
450                     f.write(v.resolved.encode("utf-8"))
451                     continue
452
453             if not v.resolved.startswith("keep:"):
454                 raise Exception("Output source is not in keep or a literal")
455             sp = v.resolved.split("/")
456             srccollection = sp[0][5:]
457             try:
458                 reader = self.collection_cache.get(srccollection)
459                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
460                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
461             except arvados.errors.ArgumentError as e:
462                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
463                 raise
464             except IOError as e:
465                 logger.error("While preparing output collection: %s", e)
466                 raise
467
468         def rewrite(fileobj):
469             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
470             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
471                 if k in fileobj:
472                     del fileobj[k]
473
474         adjustDirObjs(outputObj, rewrite)
475         adjustFileObjs(outputObj, rewrite)
476
477         with final.open("cwl.output.json", "w") as f:
478             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
479             f.write(res)
480
481
482         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
483                        ensure_unique_name=True, properties=output_properties)
484
485         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
486                     final.api_response()["name"],
487                     final.manifest_locator())
488
489         final_uuid = final.manifest_locator()
490         tags = tagsString.split(',')
491         for tag in tags:
492              self.api.links().create(body={
493                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
494                 }).execute(num_retries=self.num_retries)
495
496         def finalcollection(fileobj):
497             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
498
499         adjustDirObjs(outputObj, finalcollection)
500         adjustFileObjs(outputObj, finalcollection)
501
502         return (outputObj, final)
503
504     def set_crunch_output(self):
505         if self.work_api == "containers":
506             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
507             if current is None:
508                 return
509             try:
510                 self.api.containers().update(uuid=current['uuid'],
511                                              body={
512                                                  'output': self.final_output_collection.portable_data_hash(),
513                                                  'output_properties': self.final_output_collection.get_properties(),
514                                              }).execute(num_retries=self.num_retries)
515                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
516                                               body={
517                                                   'is_trashed': True
518                                               }).execute(num_retries=self.num_retries)
519             except Exception:
520                 logger.exception("Setting container output")
521                 raise
522
523     def apply_reqs(self, job_order_object, tool):
524         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
525             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
526                 raise WorkflowException(
527                     "`cwl:requirements` in the input object is not part of CWL "
528                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
529                     "can set the cwlVersion to v1.1 or greater and re-run with "
530                     "--enable-dev.")
531             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
532             for req in job_reqs:
533                 tool.requirements.append(req)
534
535     @staticmethod
536     def get_git_info(tool):
537         in_a_git_repo = False
538         cwd = None
539         filepath = None
540
541         if tool.tool["id"].startswith("file://"):
542             # check if git is installed
543             try:
544                 filepath = uri_file_path(tool.tool["id"])
545                 cwd = os.path.dirname(filepath)
546                 subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
547                 in_a_git_repo = True
548             except Exception as e:
549                 pass
550
551         gitproperties = {}
552
553         if in_a_git_repo:
554             git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
555             git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
556             git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
557             git_branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
558             git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
559             git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
560             git_describe = subprocess.run(["git", "describe", "--always", "--tags"], cwd=cwd, capture_output=True, text=True).stdout
561             git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
562             git_path = filepath[len(git_toplevel):]
563
564             gitproperties = {
565                 "http://arvados.org/cwl#gitCommit": git_commit.strip(),
566                 "http://arvados.org/cwl#gitDate": git_date.strip(),
567                 "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
568                 "http://arvados.org/cwl#gitBranch": git_branch.strip(),
569                 "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
570                 "http://arvados.org/cwl#gitStatus": git_status.strip(),
571                 "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
572                 "http://arvados.org/cwl#gitPath": git_path.strip(),
573             }
574         else:
575             for g in ("http://arvados.org/cwl#gitCommit",
576                       "http://arvados.org/cwl#gitDate",
577                       "http://arvados.org/cwl#gitCommitter",
578                       "http://arvados.org/cwl#gitBranch",
579                       "http://arvados.org/cwl#gitOrigin",
580                       "http://arvados.org/cwl#gitStatus",
581                       "http://arvados.org/cwl#gitDescribe",
582                       "http://arvados.org/cwl#gitPath"):
583                 if g in tool.metadata:
584                     gitproperties[g] = tool.metadata[g]
585
586         return gitproperties
587
588     def set_container_request_properties(self, container, properties):
589         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
590         for cr in resp["items"]:
591             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
592             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
593
594     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
595         self.debug = runtimeContext.debug
596
597         self.runtime_status_update("activity", "initialization")
598
599         git_info = self.get_git_info(updated_tool) if self.git_info else {}
600         if git_info:
601             logger.info("Git provenance")
602             for g in git_info:
603                 if git_info[g]:
604                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
605
606         runtimeContext.git_info = git_info
607
608         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
609         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
610         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
611         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
612
613         if not self.fast_submit:
614             updated_tool.visit(self.check_features)
615
616         self.pipeline = None
617         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
618         self.secret_store = runtimeContext.secret_store
619
620         self.trash_intermediate = runtimeContext.trash_intermediate
621         if self.trash_intermediate and self.work_api != "containers":
622             raise Exception("--trash-intermediate is only supported with --api=containers.")
623
624         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
625         if self.intermediate_output_ttl and self.work_api != "containers":
626             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
627         if self.intermediate_output_ttl < 0:
628             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
629
630         if runtimeContext.submit_request_uuid and self.work_api != "containers":
631             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
632
633         runtimeContext = runtimeContext.copy()
634
635         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
636         if runtimeContext.storage_classes == "default":
637             runtimeContext.storage_classes = default_storage_classes
638         if runtimeContext.intermediate_storage_classes == "default":
639             runtimeContext.intermediate_storage_classes = default_storage_classes
640
641         if not runtimeContext.name:
642             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
643             if git_info.get("http://arvados.org/cwl#gitDescribe"):
644                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
645             runtimeContext.name = self.name
646
647         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
648             # When creating or updating workflow record, by default
649             # always copy dependencies and ensure Docker images are up
650             # to date.
651             runtimeContext.copy_deps = True
652             runtimeContext.match_local_docker = True
653
654         if runtimeContext.print_keep_deps:
655             runtimeContext.copy_deps = False
656             runtimeContext.match_local_docker = False
657
658         if runtimeContext.update_workflow and self.project_uuid is None:
659             # If we are updating a workflow, make sure anything that
660             # gets uploaded goes into the same parent project, unless
661             # an alternate --project-uuid was provided.
662             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
663             runtimeContext.project_uuid = existing_wf["owner_uuid"]
664
665         self.project_uuid = runtimeContext.project_uuid
666
667         self.runtime_status_update("activity", "data transfer")
668
669         # Upload local file references in the job order.
670         with Perf(metrics, "upload_job_order"):
671             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
672                                          updated_tool, job_order, runtimeContext)
673
674         # determine if we are submitting or directly executing the workflow.
675         #
676         # the last clause means: if it is a command line tool, and we
677         # are going to wait for the result, and always_submit_runner
678         # is false, then we don't submit a runner process.
679
680         submitting = (runtimeContext.submit and not
681                        (updated_tool.tool["class"] == "CommandLineTool" and
682                         runtimeContext.wait and
683                         not runtimeContext.always_submit_runner))
684
685         loadingContext = self.loadingContext.copy()
686         loadingContext.do_validate = False
687         loadingContext.disable_js_validation = True
688         tool = updated_tool
689
690         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
691         # Also uploads docker images.
692         if not self.fast_submit:
693             logger.info("Uploading workflow dependencies")
694             with Perf(metrics, "upload_workflow_deps"):
695                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
696         else:
697             # in the fast submit case, we are running a workflow that
698             # has already been uploaded to Arvados, so we assume all
699             # the dependencies have been pinned to keep references and
700             # there is nothing to do.
701             merged_map = {}
702
703         loadingContext.loader = tool.doc_loader
704         loadingContext.avsc_names = tool.doc_schema
705         loadingContext.metadata = tool.metadata
706         loadingContext.skip_resolve_all = True
707
708         workflow_wrapper = None
709         if (submitting and not self.fast_submit) or runtimeContext.update_workflow or runtimeContext.create_workflow or runtimeContext.print_keep_deps:
710             # upload workflow and get back the workflow wrapper
711
712             workflow_wrapper = upload_workflow(self, tool, job_order,
713                                                runtimeContext.project_uuid,
714                                                runtimeContext,
715                                                uuid=runtimeContext.update_workflow,
716                                                submit_runner_ram=runtimeContext.submit_runner_ram,
717                                                name=runtimeContext.name,
718                                                merged_map=merged_map,
719                                                submit_runner_image=runtimeContext.submit_runner_image,
720                                                git_info=git_info,
721                                                set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow),
722                                                jobmapper=jobmapper)
723
724             if runtimeContext.update_workflow or runtimeContext.create_workflow:
725                 # We're registering the workflow, so create or update
726                 # the workflow record and then exit.
727                 uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
728                                             runtimeContext.project_uuid, runtimeContext.update_workflow)
729                 self.stdout.write(uuid + "\n")
730                 return (None, "success")
731
732             if runtimeContext.print_keep_deps:
733                 # Just find and print out all the collection dependencies and exit
734                 print_keep_deps(self, runtimeContext, merged_map, tool)
735                 return (None, "success")
736
737             # Did not register a workflow, we're going to submit
738             # it instead.
739             loadingContext.loader.idx.clear()
740             loadingContext.loader.idx["_:main"] = workflow_wrapper
741             workflow_wrapper["id"] = "_:main"
742
743             # Reload the minimal wrapper workflow.
744             self.fast_submit = True
745             tool = load_tool(workflow_wrapper, loadingContext)
746             loadingContext.loader.idx["_:main"] = workflow_wrapper
747
748         if not submitting:
749             # If we are going to run the workflow now (rather than
750             # submit it), we need to update the workflow document
751             # replacing file references with keep references.  If we
752             # are just going to construct a run submission, we don't
753             # need to do this.
754             update_from_merged_map(tool, merged_map)
755
756         self.apply_reqs(job_order, tool)
757
758         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
759         self.eval_timeout = runtimeContext.eval_timeout
760
761         runtimeContext.use_container = True
762         runtimeContext.tmpdir_prefix = "tmp"
763         runtimeContext.work_api = self.work_api
764
765         if not self.output_name:
766              self.output_name = "Output from workflow %s" % runtimeContext.name
767
768         self.output_name  = cleanup_name_for_collection(self.output_name)
769
770         if self.work_api == "containers":
771             if self.ignore_docker_for_reuse:
772                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
773             runtimeContext.outdir = "/var/spool/cwl"
774             runtimeContext.docker_outdir = "/var/spool/cwl"
775             runtimeContext.tmpdir = "/tmp"
776             runtimeContext.docker_tmpdir = "/tmp"
777
778         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
779             raise Exception("--priority must be in the range 1..1000.")
780
781         if self.should_estimate_cache_size:
782             visited = set()
783             estimated_size = [0]
784             def estimate_collection_cache(obj):
785                 if obj.get("location", "").startswith("keep:"):
786                     m = pdh_size.match(obj["location"][5:])
787                     if m and m.group(1) not in visited:
788                         visited.add(m.group(1))
789                         estimated_size[0] += int(m.group(2))
790             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
791             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
792             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
793
794         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
795
796         runnerjob = None
797         if runtimeContext.submit:
798             # We are submitting instead of running immediately.
799             #
800             # Create a "Runner job" that when run() is invoked,
801             # creates the container request to run the workflow.
802             if self.work_api == "containers":
803                 if submitting:
804                     loadingContext.metadata = updated_tool.metadata.copy()
805                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
806                                            self.output_name,
807                                            self.output_tags,
808                                            submit_runner_ram=runtimeContext.submit_runner_ram,
809                                            name=runtimeContext.name,
810                                            on_error=runtimeContext.on_error,
811                                            submit_runner_image=runtimeContext.submit_runner_image,
812                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
813                                            merged_map=merged_map,
814                                            priority=runtimeContext.priority,
815                                            secret_store=self.secret_store,
816                                            collection_cache_size=runtimeContext.collection_cache_size,
817                                            collection_cache_is_default=self.should_estimate_cache_size,
818                                            git_info=git_info)
819                 else:
820                     runtimeContext.runnerjob = tool.tool["id"]
821
822         if runtimeContext.cwl_runner_job is not None:
823             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
824
825         jobiter = tool.job(job_order,
826                            self.output_callback,
827                            runtimeContext)
828
829         if runtimeContext.submit and not runtimeContext.wait:
830             # User provided --no-wait so submit the container request,
831             # get the container request uuid, print it out, and exit.
832             runnerjob = next(jobiter)
833             runnerjob.run(runtimeContext)
834             self.stdout.write(runnerjob.uuid+"\n")
835             return (None, "success")
836
837         # We either running the workflow directly, or submitting it
838         # and will wait for a final result.
839
840         self.runtime_status_update("activity", "workflow execution")
841
842         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
843         if current_container:
844             logger.info("Running inside container %s", current_container.get("uuid"))
845             self.set_container_request_properties(current_container, git_info)
846
847         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
848         self.polling_thread = threading.Thread(target=self.poll_states)
849         self.polling_thread.start()
850
851         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
852
853         try:
854             self.workflow_eval_lock.acquire()
855
856             # Holds the lock while this code runs and releases it when
857             # it is safe to do so in self.workflow_eval_lock.wait(),
858             # at which point on_message can update job state and
859             # process output callbacks.
860
861             loopperf = Perf(metrics, "jobiter")
862             loopperf.__enter__()
863             for runnable in jobiter:
864                 loopperf.__exit__()
865
866                 if self.stop_polling.is_set():
867                     break
868
869                 if self.task_queue.error is not None:
870                     raise self.task_queue.error
871
872                 if runnable:
873                     with Perf(metrics, "run"):
874                         self.start_run(runnable, runtimeContext)
875                 else:
876                     if (self.task_queue.in_flight + len(self.processes)) > 0:
877                         self.workflow_eval_lock.wait(3)
878                     else:
879                         if self.final_status is None:
880                             logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
881                         break
882
883                 if self.stop_polling.is_set():
884                     break
885
886                 loopperf.__enter__()
887             loopperf.__exit__()
888
889             while (self.task_queue.in_flight + len(self.processes)) > 0:
890                 if self.task_queue.error is not None:
891                     raise self.task_queue.error
892                 self.workflow_eval_lock.wait(3)
893
894         except UnsupportedRequirement:
895             raise
896         except:
897             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
898                 logger.error("Interrupted, workflow will be cancelled")
899             elif isinstance(sys.exc_info()[1], WorkflowException):
900                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
901             else:
902                 logger.exception("Workflow execution failed")
903
904             if self.pipeline:
905                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
906                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
907
908             if self.work_api == "containers" and not current_container:
909                 # Not running in a crunch container, so cancel any outstanding processes.
910                 for p in self.processes:
911                     try:
912                         self.api.container_requests().update(uuid=p,
913                                                              body={"priority": "0"}
914                         ).execute(num_retries=self.num_retries)
915                     except Exception:
916                         pass
917         finally:
918             self.workflow_eval_lock.release()
919             self.task_queue.drain()
920             self.stop_polling.set()
921             self.polling_thread.join()
922             self.task_queue.join()
923
924         if self.final_status == "UnsupportedRequirement":
925             raise UnsupportedRequirement("Check log for details.")
926
927         if self.final_output is None:
928             raise WorkflowException("Workflow did not return a result.")
929
930         if runtimeContext.submit and isinstance(tool, Runner):
931             logger.info("Final output collection %s", tool.final_output)
932             if workbench2 or workbench1:
933                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
934         else:
935             if self.output_tags is None:
936                 self.output_tags = ""
937
938             storage_classes = ""
939             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
940             if storage_class_req and storage_class_req.get("finalStorageClass"):
941                 storage_classes = aslist(storage_class_req["finalStorageClass"])
942             else:
943                 storage_classes = runtimeContext.storage_classes.strip().split(",")
944
945             output_properties = {}
946             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
947             if output_properties_req:
948                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
949                 for pr in output_properties_req["outputProperties"]:
950                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
951
952             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
953                                                                                           self.output_tags, output_properties,
954                                                                                           self.final_output)
955             self.set_crunch_output()
956
957         if runtimeContext.compute_checksum:
958             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
959             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
960
961         if self.trash_intermediate and self.final_status == "success":
962             self.trash_intermediate_output()
963
964         return (self.final_output, self.final_status)