14723: Adds comments for the changes to StagingPathMapper
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
index a548719741e3b02ab2fd6d9f9acd0f8a2d137885..eeb44dbd7f232193b6e7102f85e3ae905b71dfc9 100644 (file)
@@ -2,6 +2,12 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
+from __future__ import division
+from builtins import next
+from builtins import object
+from builtins import str
+from future.utils import viewvalues
+
 import argparse
 import logging
 import os
@@ -23,26 +29,28 @@ import arvados.config
 from arvados.keep import KeepClient
 from arvados.errors import ApiError
 
+import arvados_cwl.util
 from .arvcontainer import RunnerContainer
 from .arvjob import RunnerJob, RunnerTemplate
 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
-from .arvtool import ArvadosCommandTool
+from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from .task_queue import TaskQueue
 from .context import ArvLoadingContext, ArvRuntimeContext
-from .util import get_current_container
 from ._version import __version__
 
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
 from cwltool.command_line_tool import compute_checksums
 
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
+DEFAULT_PRIORITY = 500
+
 class RuntimeStatusLoggingHandler(logging.Handler):
     """
     Intercepts logging calls and report them as runtime statuses on runner
@@ -51,6 +59,7 @@ class RuntimeStatusLoggingHandler(logging.Handler):
     def __init__(self, runtime_status_update_func):
         super(RuntimeStatusLoggingHandler, self).__init__()
         self.runtime_status_update = runtime_status_update_func
+        self.updatingRuntimeStatus = False
 
     def emit(self, record):
         kind = None
@@ -58,22 +67,27 @@ class RuntimeStatusLoggingHandler(logging.Handler):
             kind = 'error'
         elif record.levelno >= logging.WARNING:
             kind = 'warning'
-        if kind is not None:
-            log_msg = record.getMessage()
-            if '\n' in log_msg:
-                # If the logged message is multi-line, use its first line as status
-                # and the rest as detail.
-                status, detail = log_msg.split('\n', 1)
-                self.runtime_status_update(
-                    kind,
-                    "%s: %s" % (record.name, status),
-                    detail
-                )
-            else:
-                self.runtime_status_update(
-                    kind,
-                    "%s: %s" % (record.name, record.getMessage())
-                )
+        if kind is not None and self.updatingRuntimeStatus is not True:
+            self.updatingRuntimeStatus = True
+            try:
+                log_msg = record.getMessage()
+                if '\n' in log_msg:
+                    # If the logged message is multi-line, use its first line as status
+                    # and the rest as detail.
+                    status, detail = log_msg.split('\n', 1)
+                    self.runtime_status_update(
+                        kind,
+                        "%s: %s" % (record.name, status),
+                        detail
+                    )
+                else:
+                    self.runtime_status_update(
+                        kind,
+                        "%s: %s" % (record.name, record.getMessage())
+                    )
+            finally:
+                self.updatingRuntimeStatus = False
+
 
 class ArvCwlExecutor(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
@@ -93,6 +107,7 @@ class ArvCwlExecutor(object):
             arvargs.output_name = None
             arvargs.output_tags = None
             arvargs.thread_count = 1
+            arvargs.collection_cache_size = None
 
         self.api = api_client
         self.processes = {}
@@ -114,13 +129,21 @@ class ArvCwlExecutor(object):
         self.thread_count = arvargs.thread_count
         self.poll_interval = 12
         self.loadingContext = None
+        self.should_estimate_cache_size = True
 
         if keep_client is not None:
             self.keep_client = keep_client
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
-        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+        if arvargs.collection_cache_size:
+            collection_cache_size = arvargs.collection_cache_size*1024*1024
+            self.should_estimate_cache_size = False
+        else:
+            collection_cache_size = 256*1024*1024
+
+        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+                                                cap=collection_cache_size)
 
         self.fetcher_constructor = partial(CollectionFetcher,
                                            api_client=self.api,
@@ -146,7 +169,7 @@ class ArvCwlExecutor(object):
                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
 
         if self.work_api == "jobs":
-            logger.warn("""
+            logger.warning("""
 *******************************
 Using the deprecated 'jobs' API.
 
@@ -167,8 +190,13 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
         # Add a custom logging handler to the root logger for runtime status reporting
         # if running inside a container
-        if get_current_container(self.api, self.num_retries, logger):
+        if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
             root_logger = logging.getLogger('')
+
+            # Remove existing RuntimeStatusLoggingHandlers if they exist
+            handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
+            root_logger.handlers = handlers
+
             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
             root_logger.addHandler(handler)
 
@@ -176,14 +204,18 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
                                                      collection_cache=self.collection_cache)
 
+        validate_cluster_target(self, self.runtimeContext)
+
 
     def arv_make_tool(self, toolpath_object, loadingContext):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, loadingContext)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
             return ArvadosWorkflow(self, toolpath_object, loadingContext)
+        elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
+            return ArvadosExpressionTool(self, toolpath_object, loadingContext)
         else:
-            return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
+            raise Exception("Unknown tool %s" % toolpath_object.get("class"))
 
     def output_callback(self, out, processStatus):
         with self.workflow_eval_lock:
@@ -202,7 +234,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
 
     def start_run(self, runnable, runtimeContext):
-        self.task_queue.add(partial(runnable.run, runtimeContext))
+        self.task_queue.add(partial(runnable.run, runtimeContext),
+                            self.workflow_eval_lock, self.stop_polling)
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
@@ -212,7 +245,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         with self.workflow_eval_lock:
             j = self.processes[uuid]
             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
-            self.task_queue.add(partial(j.done, record))
+            self.task_queue.add(partial(j.done, record),
+                                self.workflow_eval_lock, self.stop_polling)
             del self.processes[uuid]
 
     def runtime_status_update(self, kind, message, detail=None):
@@ -222,7 +256,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         activity statuses, for example in the RuntimeStatusLoggingHandler.
         """
         with self.workflow_eval_lock:
-            current = get_current_container(self.api, self.num_retries, logger)
+            current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
             if current is None:
                 return
             runtime_status = current.get('runtime_status', {})
@@ -315,7 +349,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                 if self.stop_polling.is_set():
                     break
                 with self.workflow_eval_lock:
-                    keys = list(self.processes.keys())
+                    keys = list(self.processes)
                 if not keys:
                     remain_wait = self.poll_interval
                     continue
@@ -326,21 +360,26 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                 elif self.work_api == "jobs":
                     table = self.poll_api.jobs()
 
-                try:
-                    proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
-                except Exception as e:
-                    logger.warn("Error checking states on API server: %s", e)
-                    remain_wait = self.poll_interval
-                    continue
+                pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
 
-                for p in proc_states["items"]:
-                    self.on_message({
-                        "object_uuid": p["uuid"],
-                        "event_type": "update",
-                        "properties": {
-                            "new_attributes": p
-                        }
-                    })
+                while keys:
+                    page = keys[:pageSize]
+                    keys = keys[pageSize:]
+                    try:
+                        proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
+                    except Exception:
+                        logger.exception("Error checking states on API server: %s")
+                        remain_wait = self.poll_interval
+                        continue
+
+                    for p in proc_states["items"]:
+                        self.on_message({
+                            "object_uuid": p["uuid"],
+                            "event_type": "update",
+                            "properties": {
+                                "new_attributes": p
+                            }
+                        })
                 finish_poll = time.time()
                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
         except:
@@ -360,9 +399,9 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         for i in self.intermediate_output_collections:
             try:
                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
-            except:
-                logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
-            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+            except Exception:
+                logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+            except (KeyboardInterrupt, SystemExit):
                 break
 
     def check_features(self, obj):
@@ -379,7 +418,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                             "Option 'dockerOutputDirectory' must be an absolute path.")
             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
-            for v in obj.itervalues():
+            for v in viewvalues(obj):
                 self.check_features(v)
         elif isinstance(obj, list):
             for i,v in enumerate(obj):
@@ -403,17 +442,16 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                               num_retries=self.num_retries)
 
         for k,v in generatemapper.items():
-            if k.startswith("_:"):
-                if v.type == "Directory":
+            if v.type == "Directory" and v.resolved.startswith("_:"):
                     continue
-                if v.type == "CreateFile":
-                    with final.open(v.target, "wb") as f:
-                        f.write(v.resolved.encode("utf-8"))
+            if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
+                with final.open(v.target, "wb") as f:
+                    f.write(v.resolved.encode("utf-8"))
                     continue
 
-            if not k.startswith("keep:"):
+            if not v.resolved.startswith("keep:"):
                 raise Exception("Output source is not in keep or a literal")
-            sp = k.split("/")
+            sp = v.resolved.split("/")
             srccollection = sp[0][5:]
             try:
                 reader = self.collection_cache.get(srccollection)
@@ -423,7 +461,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
                 raise
             except IOError as e:
-                logger.warn("While preparing output collection: %s", e)
+                logger.error("While preparing output collection: %s", e)
+                raise
 
         def rewrite(fileobj):
             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
@@ -435,7 +474,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         adjustFileObjs(outputObj, rewrite)
 
         with final.open("cwl.output.json", "w") as f:
-            json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
+            res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
+            f.write(res)
 
         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
 
@@ -460,7 +500,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
     def set_crunch_output(self):
         if self.work_api == "containers":
-            current = get_current_container(self.api, self.num_retries, logger)
+            current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
             if current is None:
                 return
             try:
@@ -472,8 +512,9 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                               body={
                                                   'is_trashed': True
                                               }).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.info("Setting container output: %s", e)
+            except Exception:
+                logger.exception("Setting container output")
+                return
         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
                                    body={
@@ -537,7 +578,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                       uuid=existing_uuid,
                                       submit_runner_ram=runtimeContext.submit_runner_ram,
                                       name=runtimeContext.name,
-                                      merged_map=merged_map)
+                                      merged_map=merged_map,
+                                      loadingContext=loadingContext)
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
                 return (tmpl.uuid, "success")
@@ -575,17 +617,29 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
             raise Exception("--priority must be in the range 1..1000.")
 
+        if self.should_estimate_cache_size:
+            visited = set()
+            estimated_size = [0]
+            def estimate_collection_cache(obj):
+                if obj.get("location", "").startswith("keep:"):
+                    m = pdh_size.match(obj["location"][5:])
+                    if m and m.group(1) not in visited:
+                        visited.add(m.group(1))
+                        estimated_size[0] += int(m.group(2))
+            visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
+            runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
+            self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
+
+        logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
+
         runnerjob = None
         if runtimeContext.submit:
             # Submit a runner job to run the workflow for us.
             if self.work_api == "containers":
-                if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
+                if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
                     runtimeContext.runnerjob = tool.tool["id"]
-                    runnerjob = tool.job(job_order,
-                                         self.output_callback,
-                                         runtimeContext).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
+                    tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
                                                 self.output_name,
                                                 self.output_tags,
                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
@@ -595,9 +649,11 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
                                                 merged_map=merged_map,
                                                 priority=runtimeContext.priority,
-                                                secret_store=self.secret_store)
+                                                secret_store=self.secret_store,
+                                                collection_cache_size=runtimeContext.collection_cache_size,
+                                                collection_cache_is_default=self.should_estimate_cache_size)
             elif self.work_api == "jobs":
-                runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
+                tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
                                       self.output_name,
                                       self.output_tags,
                                       submit_runner_ram=runtimeContext.submit_runner_ram,
@@ -615,14 +671,20 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
-        if runnerjob and not runtimeContext.wait:
-            submitargs = runtimeContext.copy()
-            submitargs.submit = False
-            runnerjob.run(submitargs)
+        if runtimeContext.cwl_runner_job is not None:
+            self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+
+        jobiter = tool.job(job_order,
+                           self.output_callback,
+                           runtimeContext)
+
+        if runtimeContext.submit and not runtimeContext.wait:
+            runnerjob = next(jobiter)
+            runnerjob.run(runtimeContext)
             return (runnerjob.uuid, "success")
 
-        current_container = get_current_container(self.api, self.num_retries, logger)
-        if current:
+        current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
+        if current_container:
             logger.info("Running inside container %s", current_container.get("uuid"))
 
         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
@@ -631,17 +693,9 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
 
-        if runnerjob:
-            jobiter = iter((runnerjob,))
-        else:
-            if runtimeContext.cwl_runner_job is not None:
-                self.uuid = runtimeContext.cwl_runner_job.get('uuid')
-            jobiter = tool.job(job_order,
-                               self.output_callback,
-                               runtimeContext)
-
         try:
             self.workflow_eval_lock.acquire()
+
             # Holds the lock while this code runs and releases it when
             # it is safe to do so in self.workflow_eval_lock.wait(),
             # at which point on_message can update job state and
@@ -667,6 +721,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                     else:
                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
                         break
+
+                if self.stop_polling.is_set():
+                    break
+
                 loopperf.__enter__()
             loopperf.__exit__()
 
@@ -680,13 +738,18 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         except:
             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
                 logger.error("Interrupted, workflow will be cancelled")
+            elif isinstance(sys.exc_info()[1], WorkflowException):
+                logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             else:
-                logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.exception("Workflow execution failed")
+
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
-            if runnerjob and runnerjob.uuid and self.work_api == "containers":
-                self.api.container_requests().update(uuid=runnerjob.uuid,
+            if runtimeContext.submit and isinstance(tool, Runner):
+                runnerjob = tool
+                if runnerjob.uuid and self.work_api == "containers":
+                    self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.workflow_eval_lock.release()
@@ -701,8 +764,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
-        if runtimeContext.submit and isinstance(runnerjob, Runner):
-            logger.info("Final output collection %s", runnerjob.final_output)
+        if runtimeContext.submit and isinstance(tool, Runner):
+            logger.info("Final output collection %s", tool.final_output)
         else:
             if self.output_name is None:
                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))