closes #8508
authorradhika <radhika@curoverse.com>
Wed, 9 Mar 2016 16:43:18 +0000 (11:43 -0500)
committerradhika <radhika@curoverse.com>
Wed, 9 Mar 2016 16:43:18 +0000 (11:43 -0500)
Merge branch 'wtsi-hgi-8508-datamanager-test-badpaths'

27 files changed:
README [deleted file]
README.md [new file with mode: 0644]
crunch_scripts/crunchrunner [new file with mode: 0755]
doc/install/arvbox.html.textile.liquid
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/setup.py
sdk/cwl/test_with_arvbox.sh [new file with mode: 0755]
sdk/go/crunchrunner/crunchrunner.go
services/api/app/controllers/arvados/v1/repositories_controller.rb
services/api/test/functional/arvados/v1/repositories_controller_test.rb
services/crunch-run/crunchrun_test.go
services/crunch-run/logging_test.go
services/datamanager/datamanager.go
services/nodemanager/arvnodeman/baseactor.py [new file with mode: 0644]
services/nodemanager/arvnodeman/clientactor.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/fullstopactor.py [deleted file]
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/arvnodeman/timedcallback.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_failure.py
services/nodemanager/tests/testutil.py
tools/arvbox/bin/arvbox
tools/arvbox/lib/arvbox/docker/service/ready/run-service
tools/arvbox/lib/arvbox/docker/service/sdk/run-service

diff --git a/README b/README
deleted file mode 100644 (file)
index cbb6fdd..0000000
--- a/README
+++ /dev/null
@@ -1,39 +0,0 @@
-Welcome to the Arvados Project!
-
-Arvados is an open source distributed computing platform for bioinformatics,
-data science, and high throughput analysis of massive data sets.  Arvados
-supports a variety of cloud, cluster and HPC environments.  Arvados consists of
-a content-addressable storage system "Keep" for managing and storing large
-collections of files; a containerized workflow engine "Crunch" designed for
-versioning, reproducibilty, and provenance of computations; and a variety
-related services and components including APIs, SDKs, and visual tools.
-
-The main Arvados web site is
-  https://arvados.org
-
-The Arvados public wiki is located at
-  https://dev.arvados.org/projects/arvados/wiki
-
-The Arvados public bug tracker is located at
-  https://dev.arvados.org/projects/arvados/issues
-
-For support see
-  http://doc.arvados.org/user/getting_started/community.html
-
-Installation documentation is located at
-  http://doc.arvados.org/install
-
-To try out Arvados quickly, you can use Arvbox, which provides Arvados
-components pre-installed in a Docker container (requires Docker 1.9+).  After
-cloning Arvados:
-
-  $ cd arvados/tools/arvbox/bin
-  $ ./arvbox start localdemo
-
-See http://doc.arvados.org/install/arvbox.html for details and documentation.
-
-If you wish to build the Arvados documentation yourself, follow the
-instructions in doc/README to build the documentation, then consult the
-"Install Guide".
-
-See COPYING for information about Arvados Free Software licenses.
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..629c2f0
--- /dev/null
+++ b/README.md
@@ -0,0 +1,74 @@
+[Arvados](https://arvados.org) is a free software distributed computing platform
+for bioinformatics, data science, and high throughput analysis of massive data
+sets.  Arvados supports a variety of cloud, cluster and HPC environments.
+
+Arvados consists of:
+
+* *Keep*: a petabyte-scale content-addressed distributed storage system for managing and
+  storing collections of files, accessible via HTTP and FUSE mount.
+
+* *Crunch*: a Docker-based cluster and HPC workflow engine designed providing
+  strong versioning, reproducibilty, and provenance of computations.
+
+* Related services and components including a web workbench for managing files
+  and compute jobs, REST APIs, SDKs, and other tools.
+
+## Quick start
+
+Curoverse maintains an Arvados public cloud demo at
+[https://cloud.curoverse.com](https://cloud.curoverse.com).  A Google account
+is required to log in.
+
+To try out Arvados on your local workstation, you can use Arvbox, which
+provides Arvados components pre-installed in a Docker container (requires
+Docker 1.9+).  After cloning the Arvados git repository:
+
+```
+$ cd arvados/tools/arvbox/bin
+$ ./arvbox start localdemo
+```
+
+In this mode you will only be able to connect to Arvbox from the same host.  To
+configure Arvbox to be accessible over a network and for other options see
+http://doc.arvados.org/install/arvbox.html for details.
+
+## Documentation
+
+Complete documentation, including a User Guide, Installation documentation and
+API documentation is available at http://doc.arvados.org/
+
+If you wish to build the Arvados documentation from a local git clone, see
+doc/README.textile for instructions.
+
+## Community
+
+The [#arvados](irc://irc.oftc.net:6667/#arvados IRC) (Internet Relay Chat)
+channel at the
+[Open and Free Technology Community (irc.oftc.net)](http://www.oftc.net/oftc/)
+is available for live discussion and support.  You can use a traditional IRC
+client or [join OFTC over the web.](https://webchat.oftc.net/?channels=arvados)
+
+The
+[Arvados user mailing list](http://lists.arvados.org/mailman/listinfo/arvados)
+is a forum for general discussion, questions, and news about Arvados
+development.  The
+[Arvados developer mailing list](http://lists.arvados.org/mailman/listinfo/arvados-dev)
+is a forum for more technical discussion, intended for developers and
+contributers to Arvados.
+
+## Development
+
+[![Build Status](https://ci.curoverse.com/buildStatus/icon?job=arvados-api-server)](https://ci.curoverse.com/job/arvados-api-server/)
+
+The Arvados public bug tracker is located at https://dev.arvados.org/projects/arvados/issues
+
+Continuous integration is hosted at https://ci.curoverse.com/
+
+Instructions for setting up a development environment and working on specific
+components can be found on the
+["Hacking Arvados" page of the Arvados wiki](https://dev.arvados.org/projects/arvados/wiki/Hacking).
+
+## Licensing
+
+Arvados is Free Software.  See COPYING for information about Arvados Free
+Software licenses.
diff --git a/crunch_scripts/crunchrunner b/crunch_scripts/crunchrunner
new file mode 100755 (executable)
index 0000000..71c10c9
--- /dev/null
@@ -0,0 +1,2 @@
+#!/bin/sh
+exec $TASK_KEEPMOUNT/$JOB_PARAMETER_CRUNCHRUNNER
index b24e5b5f00e0128e5faded02ccc9e949f7807b22..3ddc7c825819e2799d75ea8e9b94eb7f2bb6a41d 100644 (file)
@@ -23,23 +23,28 @@ h2. Requirements
 h2. Usage
 
 <pre>
-Arvados-in-a-box
+$ arvbox
+Arvados-in-a-box                      http://arvados.org
 
-arvbox (build|start|run|open|shell|ip|stop|reboot|reset|destroy|log|svrestart)
+arvbox (build|start|run|open|shell|ip|stop|rebuild|reset|destroy|log|svrestart)
 
 build <config>      build arvbox Docker image
 start|run <config>  start arvbox container
 open       open arvbox workbench in a web browser
 shell      enter arvbox shell
-ip         print arvbox ip address
+ip         print arvbox docker container ip address
+host       print arvbox published host
 status     print some information about current arvbox
 stop       stop arvbox container
 restart <config>  stop, then run again
-reboot  <config>  stop, build arvbox Docker image, run
+rebuild  <config>  stop, build arvbox Docker image, run
 reset      delete arvbox arvados data (be careful!)
 destroy    delete all arvbox code and data (be careful!)
-log       <service> tail log of specified service
-sv        <start|stop|restart> <service> change state of service inside arvbox
+log <service> tail log of specified service
+ls <options>  list directories inside arvbox
+cat <files>   get contents of files inside arvbox
+pipe       run a bash script piped in from stdin
+sv <start|stop|restart> <service> change state of service inside arvbox
 clone <from> <to>   clone an arvbox
 </pre>
 
@@ -61,11 +66,11 @@ Run the test suite.
 
 h3. publicdev
 
-Publicly accessible development configuration.  Similar to 'dev' except that service ports are published to the host's IP address and can accessed by anyone who can connect to the host system.  WARNING! The public arvbox configuration is NOT SECURE and must not be placed on a public IP address or used for production work.
+Publicly accessible development configuration.  Similar to 'dev' except that service ports are published to the host's IP address and can accessed by anyone who can connect to the host system.  See below for more information.  WARNING! The public arvbox configuration is NOT SECURE and must not be placed on a public IP address or used for production work.
 
 h3. publicdemo
 
-Publicly accessible development configuration.  Similar to 'localdemo' except that service ports are published to the host's IP address and can accessed by anyone who can connect to the host system.  WARNING! The public arvbox configuration is NOT SECURE and must not be placed on a public IP address or used for production work.
+Publicly accessible development configuration.  Similar to 'localdemo' except that service ports are published to the host's IP address and can accessed by anyone who can connect to the host system.  See below for more information.  WARNING! The public arvbox configuration is NOT SECURE and must not be placed on a public IP address or used for production work.
 
 h2. Environment variables
 
@@ -108,6 +113,27 @@ h3. ARVBOX_PUBLISH_IP
 
 The IP address on which to publish services when running in public configuration.  Overrides default detection of the host's IP address.
 
+h2. Using Arvbox for Arvados development
+
+The "Arvbox section of Hacking Arvados":https://dev.arvados.org/projects/arvados/wiki/Arvbox has information about using Arvbox for Arvados development.
+
+h2. Making Arvbox accessible from other hosts
+
+In "dev" and "localdemo" mode, Arvbox can only be accessed on the same host it is running.  To publish Arvbox service ports to the host's service ports and advertise the host's IP address for services, use @publicdev@ or @publicdemo@:
+
+<pre>
+$ arvbox rebuild publicdemo
+</pre>
+
+This attempts to auto-detect the correct IP address to use by taking the IP address of the default route device.  If the auto-detection is wrong, you want to publish a hostname instead of a raw address, or you need to access it through a different device (such as a router or firewall), set @ARVBOX_PUBLISH_IP@ to the desire hostname or IP address.
+
+<pre>
+$ export ARVBOX_PUBLISH_IP=example.com
+$ arvbox rebuild publicdemo
+</pre>
+
+Note: this expects to bind the host's port 80 (http) for workbench, so you cannot have a conflicting web server already running on the host.  It does not attempt to take bind the host's port 22 (ssh), as a result the arvbox ssh port is not published.
+
 h2. Notes
 
 Services are designed to install and auto-configure on start or restart.  For example, the service script for keepstore always compiles keepstore from source and registers the daemon with the API server.
index 8370e3d5e75a42e68fd73ee770c281b0388dd198..885497171e8828852c27f0ece6c455cc75c4ee49 100644 (file)
@@ -5,6 +5,8 @@ import arvados
 import arvados.events
 import arvados.commands.keepdocker
 import arvados.commands.run
+import arvados.collection
+import arvados.util
 import cwltool.draft2tool
 import cwltool.workflow
 import cwltool.main
@@ -15,12 +17,23 @@ import fnmatch
 import logging
 import re
 import os
+import sys
 
 from cwltool.process import get_feature
+from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
 
+crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
+crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
+certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
+
+tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
+outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
+keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
+
+
 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
@@ -142,9 +155,9 @@ class ArvadosJob(object):
         try:
             response = self.arvrunner.api.jobs().create(body={
                 "script": "crunchrunner",
-                "repository": kwargs["repository"],
-                "script_version": "master",
-                "script_parameters": {"tasks": [script_parameters]},
+                "repository": "arvados",
+                "script_version": "8488-cwl-crunchrunner-collection",
+                "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
                 "runtime_constraints": runtime_constraints
             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
 
@@ -186,6 +199,26 @@ class ArvadosJob(object):
             try:
                 outputs = {}
                 if record["output"]:
+                    logc = arvados.collection.Collection(record["log"])
+                    log = logc.open(logc.keys()[0])
+                    tmpdir = None
+                    outdir = None
+                    keepdir = None
+                    for l in log.readlines():
+                        g = tmpdirre.match(l)
+                        if g:
+                            tmpdir = g.group(1)
+                        g = outdirre.match(l)
+                        if g:
+                            outdir = g.group(1)
+                        g = keepre.match(l)
+                        if g:
+                            keepdir = g.group(1)
+                        if tmpdir and outdir and keepdir:
+                            break
+
+                    self.builder.outdir = outdir
+                    self.builder.pathmapper.keepdir = keepdir
                     outputs = self.collect_outputs("keep:" + record["output"])
             except Exception as e:
                 logger.exception("Got exception while collecting job outputs:")
@@ -229,11 +262,20 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
             arvrunner.add_uploaded(src, (ab, st.fn))
             self._pathmap[src] = (ab, st.fn)
 
+        self.keepdir = None
+
+    def reversemap(self, target):
+        if target.startswith("keep:"):
+            return target
+        elif self.keepdir and target.startswith(self.keepdir):
+            return "keep:" + target[len(self.keepdir)+1:]
+        else:
+            return super(ArvPathMapper, self).reversemap(target)
 
 
 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
     def __init__(self, arvrunner, toolpath_object, **kwargs):
-        super(ArvadosCommandTool, self).__init__(toolpath_object, outdir="$(task.outdir)", tmpdir="$(task.tmpdir)", **kwargs)
+        super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
         self.arvrunner = arvrunner
 
     def makeJobRunner(self):
@@ -302,56 +344,84 @@ class ArvCwlRunner(object):
     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
-        self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
-                                                                   "components": {},
-                                                                   "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+        try:
+            self.api.collections().get(uuid=crunchrunner_pdh).execute()
+        except arvados.errors.ApiError as e:
+            import httplib2
+            h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
+            resp, content = h.request(crunchrunner_download, "GET")
+            resp2, content2 = h.request(certs_download, "GET")
+            with arvados.collection.Collection() as col:
+                with col.open("crunchrunner", "w") as f:
+                    f.write(content)
+                with col.open("ca-certificates.crt", "w") as f:
+                    f.write(content2)
+
+                col.save_new("crunchrunner binary", ensure_unique_name=True)
 
         self.fs_access = CollectionFsAccess(input_basedir)
 
         kwargs["fs_access"] = self.fs_access
         kwargs["enable_reuse"] = args.enable_reuse
-        kwargs["repository"] = args.repository
+
+        kwargs["outdir"] = "$(task.outdir)"
+        kwargs["tmpdir"] = "$(task.tmpdir)"
 
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
         else:
-            jobiter = tool.job(job_order,
-                            input_basedir,
-                            self.output_callback,
-                            **kwargs)
-
-            for runnable in jobiter:
-                if runnable:
-                    with self.lock:
-                        runnable.run(**kwargs)
-                else:
-                    if self.jobs:
-                        try:
-                            self.cond.acquire()
-                            self.cond.wait()
-                        finally:
-                            self.cond.release()
-                    else:
-                        logger.error("Workflow cannot make any more progress.")
-                        break
-
-            while self.jobs:
-                try:
-                    self.cond.acquire()
-                    self.cond.wait()
-                finally:
-                    self.cond.release()
+            self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
+                                                                   "components": {},
+                                                                   "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
 
-            events.close()
+            jobiter = tool.job(job_order,
+                               input_basedir,
+                               self.output_callback,
+                               docker_outdir="$(task.outdir)",
+                               **kwargs)
 
-            if self.final_output is None:
-                raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+            try:
+                for runnable in jobiter:
+                    if runnable:
+                        with self.lock:
+                            runnable.run(**kwargs)
+                    else:
+                        if self.jobs:
+                            try:
+                                self.cond.acquire()
+                                self.cond.wait(1)
+                            except RuntimeError:
+                                pass
+                            finally:
+                                self.cond.release()
+                        else:
+                            logger.error("Workflow cannot make any more progress.")
+                            break
+
+                while self.jobs:
+                    try:
+                        self.cond.acquire()
+                        self.cond.wait(1)
+                    except RuntimeError:
+                        pass
+                    finally:
+                        self.cond.release()
+
+                events.close()
+
+                if self.final_output is None:
+                    raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+
+            except:
+                if sys.exc_info()[0] is not KeyboardInterrupt:
+                    logger.exception("Caught unhandled exception, marking pipeline as failed")
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
 
             return self.final_output
 
 
 def main(args, stdout, stderr, api_client=None):
-    runner = ArvCwlRunner(api_client=arvados.api('v1'))
     args.insert(0, "--leave-outputs")
     parser = cwltool.main.arg_parser()
     exgroup = parser.add_mutually_exclusive_group()
@@ -362,6 +432,10 @@ def main(args, stdout, stderr, api_client=None):
                         default=False, dest="enable_reuse",
                         help="")
 
-    parser.add_argument('--repository', type=str, default="peter/crunchrunner", help="Repository containing the 'crunchrunner' program.")
+    try:
+        runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+    except Exception as e:
+        logger.error(e)
+        return 1
 
     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)
index 65ae16b5158aebe388afc7f42e2e247f4e13733f..cdbb41be17160861e672eb94995ad315f7b6e461 100644 (file)
@@ -30,8 +30,8 @@ setup(name='arvados-cwl-runner',
           'bin/arvados-cwl-runner'
       ],
       install_requires=[
-          'cwltool>=1.0.20160129152024',
-          'arvados-python-client>=0.1.20160122132348'
+          'cwltool>=1.0.20160302134341',
+          'arvados-python-client>=0.1.20160219154918'
       ],
       zip_safe=True,
       cmdclass={'egg_info': tagger},
diff --git a/sdk/cwl/test_with_arvbox.sh b/sdk/cwl/test_with_arvbox.sh
new file mode 100755 (executable)
index 0000000..aef2700
--- /dev/null
@@ -0,0 +1,71 @@
+#!/bin/sh
+
+if ! which arvbox >/dev/null ; then
+    export PATH=$PATH:$(readlink -f $(dirname $0)/../../tools/arvbox/bin)
+fi
+
+reset_container=1
+leave_running=0
+config=dev
+
+while test -n "$1" ; do
+    arg="$1"
+    case "$arg" in
+        --no-reset-container)
+            reset_container=0
+            shift
+            ;;
+        --leave-running)
+            leave_running=1
+            shift
+            ;;
+        --config)
+            config=$2
+            shift ; shift
+            ;;
+        -*)
+            break
+            ;;
+    esac
+done
+
+if test -z "$ARVBOX_CONTAINER" ; then
+   export ARVBOX_CONTAINER=cwltest
+fi
+
+if test $reset_container = 1 ; then
+    arvbox reset -f
+fi
+
+arvbox start $config
+
+arvbox pipe <<EOF
+set -eu -o pipefail
+
+. /usr/local/lib/arvbox/common.sh
+
+cd /usr/src/arvados/sdk/cwl
+python setup.py sdist
+pip_install \$(ls dist/arvados-cwl-runner-*.tar.gz | tail -n1)
+
+mkdir -p /tmp/cwltest
+cd /tmp/cwltest
+if ! test -d common-workflow-language ; then
+  git clone https://github.com/common-workflow-language/common-workflow-language.git
+fi
+cd common-workflow-language
+git pull
+export ARVADOS_API_HOST=localhost:8000
+export ARVADOS_API_HOST_INSECURE=1
+export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
+env
+exec ./run_test.sh "$@"
+EOF
+
+CODE=$?
+
+if test $leave_running = 0 ; then
+    arvbox stop
+fi
+
+exit $CODE
index 8e24e18fda845866909aff7f6bba1bd02234d53c..226cf9122be430d8c08c03c595447a3448a19a22 100644 (file)
@@ -1,13 +1,17 @@
 package main
 
 import (
+       "crypto/x509"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "io/ioutil"
        "log"
+       "net/http"
        "os"
        "os/exec"
        "os/signal"
+       "path"
        "strings"
        "syscall"
 )
@@ -209,6 +213,10 @@ func runner(api IArvadosClient,
                "$(task.outdir)": outdir,
                "$(task.keep)":   keepmount}
 
+       log.Printf("crunchrunner: $(task.tmpdir)=%v", tmpdir)
+       log.Printf("crunchrunner: $(task.outdir)=%v", outdir)
+       log.Printf("crunchrunner: $(task.keep)=%v", keepmount)
+
        // Set up subprocess
        for k, v := range taskp.Command {
                taskp.Command[k] = substitute(v, replacements)
@@ -317,6 +325,15 @@ func main() {
                log.Fatal(err)
        }
 
+       certpath := path.Join(path.Dir(os.Args[0]), "ca-certificates.crt")
+       certdata, err := ioutil.ReadFile(certpath)
+       if err == nil {
+               log.Printf("Using TLS certificates at %v", certpath)
+               certs := x509.NewCertPool()
+               certs.AppendCertsFromPEM(certdata)
+               api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs
+       }
+
        jobUuid := os.Getenv("JOB_UUID")
        taskUuid := os.Getenv("TASK_UUID")
        tmpdir := os.Getenv("TASK_WORK")
index 4bf9a6a0945462e2bf74596d620ec21575541844..183ed4d8a80e269356c82cdf19b08d5dc0120a80 100644 (file)
@@ -91,11 +91,11 @@ class Arvados::V1::RepositoriesController < ApplicationController
     @repo_info.values.each do |repo|
       repo[:user_permissions].each do |user_uuid, user_perms|
         if user_perms['can_manage']
-          user_perms['gitolite_permissions'] = 'RW'
+          user_perms['gitolite_permissions'] = 'RW+'
           user_perms['can_write'] = true
           user_perms['can_read'] = true
         elsif user_perms['can_write']
-          user_perms['gitolite_permissions'] = 'RW'
+          user_perms['gitolite_permissions'] = 'RW+'
           user_perms['can_read'] = true
         elsif user_perms['can_read']
           user_perms['gitolite_permissions'] = 'R'
index 514bb66bb2b55eaabfffd9e2494c59500c1a58bc..241a34eb1079aaa51715def3fea19769915285b6 100644 (file)
@@ -176,13 +176,13 @@ class Arvados::V1::RepositoriesControllerTest < ActionController::TestCase
         end
         if perms['can_write']
           assert u.can? write: repo['uuid']
-          assert_match /RW/, perms['gitolite_permissions']
+          assert_match /RW\+/, perms['gitolite_permissions']
         else
           refute_match /W/, perms['gitolite_permissions']
         end
         if perms['can_manage']
           assert u.can? manage: repo['uuid']
-          assert_match /RW/, perms['gitolite_permissions']
+          assert_match /RW\+/, perms['gitolite_permissions']
         end
       end
     end
index 53cdbbc54e956538c084ce68e3aba89a29973825..659b3c0ede524a31af3ada93369fcc6cab808e2a 100644 (file)
@@ -36,7 +36,7 @@ var _ = Suite(&TestSuite{})
 type ArvTestClient struct {
        Total   int64
        Calls   int
-       Content arvadosclient.Dict
+       Content []arvadosclient.Dict
        ContainerRecord
        Logs          map[string]*bytes.Buffer
        WasSetRunning bool
@@ -131,7 +131,7 @@ func (this *ArvTestClient) Create(resourceType string,
        output interface{}) error {
 
        this.Calls += 1
-       this.Content = parameters
+       this.Content = append(this.Content, parameters)
 
        if resourceType == "logs" {
                et := parameters["log"].(arvadosclient.Dict)["event_type"].(string)
@@ -168,8 +168,8 @@ func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arva
 }
 
 func (this *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
-
-       this.Content = parameters
+       this.Calls += 1
+       this.Content = append(this.Content, parameters)
        if resourceType == "containers" {
                if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
                        this.WasSetRunning = true
@@ -399,8 +399,9 @@ func (s *TestSuite) TestCommitLogs(c *C) {
        err := cr.CommitLogs()
        c.Check(err, IsNil)
 
-       c.Check(api.Content["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       c.Check(api.Content["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
+       c.Check(api.Calls, Equals, 2)
+       c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
        c.Check(*cr.LogsPDH, Equals, "63da7bdacf08c40f604daad80c261e9a+60")
 }
 
@@ -412,7 +413,7 @@ func (s *TestSuite) TestUpdateContainerRecordRunning(c *C) {
        err := cr.UpdateContainerRecordRunning()
        c.Check(err, IsNil)
 
-       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Running")
+       c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running")
 }
 
 func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
@@ -430,9 +431,9 @@ func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
        err := cr.UpdateContainerRecordComplete()
        c.Check(err, IsNil)
 
-       c.Check(api.Content["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH)
-       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, *cr.ExitCode)
-       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+       c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH)
+       c.Check(api.Content[0]["container"].(arvadosclient.Dict)["exit_code"], Equals, *cr.ExitCode)
+       c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 }
 
 func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
@@ -445,9 +446,9 @@ func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
        err := cr.UpdateContainerRecordComplete()
        c.Check(err, IsNil)
 
-       c.Check(api.Content["container"].(arvadosclient.Dict)["log"], IsNil)
-       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], IsNil)
-       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
+       c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], IsNil)
+       c.Check(api.Content[0]["container"].(arvadosclient.Dict)["exit_code"], IsNil)
+       c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
 }
 
 // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
@@ -470,7 +471,7 @@ func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvT
        c.Check(err, IsNil)
        c.Check(api.WasSetRunning, Equals, true)
 
-       c.Check(api.Content["container"].(arvadosclient.Dict)["log"], NotNil)
+       c.Check(api.Content[api.Calls-1]["container"].(arvadosclient.Dict)["log"], NotNil)
 
        if err != nil {
                for k, v := range api.Logs {
@@ -498,8 +499,9 @@ func (s *TestSuite) TestFullRunHello(c *C) {
                t.finish <- dockerclient.WaitResult{}
        })
 
-       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
-       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+       c.Check(api.Calls, Equals, 7)
+       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true)
 
@@ -522,9 +524,10 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
                t.finish <- dockerclient.WaitResult{ExitCode: 1}
        })
 
-       c.Check(api.Content["container"].(arvadosclient.Dict)["log"], NotNil)
-       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
-       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+       c.Check(api.Calls, Equals, 8)
+       c.Check(api.Content[7]["container"].(arvadosclient.Dict)["log"], NotNil)
+       c.Check(api.Content[7]["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
+       c.Check(api.Content[7]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true)
        c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
@@ -546,8 +549,9 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
                t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
-       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
-       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+       c.Check(api.Calls, Equals, 7)
+       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 
        log.Print(api.Logs["stdout"].String())
 
@@ -570,8 +574,9 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
                t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
-       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
-       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+       c.Check(api.Calls, Equals, 7)
+       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
 }
@@ -617,7 +622,8 @@ func (s *TestSuite) TestCancel(c *C) {
 
        c.Check(err, IsNil)
 
-       c.Check(api.Content["container"].(arvadosclient.Dict)["log"], NotNil)
+       c.Check(api.Calls, Equals, 6)
+       c.Check(api.Content[5]["container"].(arvadosclient.Dict)["log"], NotNil)
 
        if err != nil {
                for k, v := range api.Logs {
@@ -626,7 +632,7 @@ func (s *TestSuite) TestCancel(c *C) {
                }
        }
 
-       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
+       c.Check(api.Content[5]["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
 
@@ -648,8 +654,9 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
                t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
-       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
-       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+       c.Check(api.Calls, Equals, 7)
+       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true)
 }
index bce324d478571aefe0dddf0a0647199a3f0fc1e4..79214fca7dc4d3f5e0ac0b77475c6a0a5b27138d 100644 (file)
@@ -40,8 +40,8 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
        logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
                "2015-12-29T15:51:45.000000002Z Goodbye\n"
 
-       c.Check(api.Content["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
-       c.Check(api.Content["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext)
+       c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
+       c.Check(api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext)
        c.Check(string(kc.Content), Equals, logtext)
 }
 
@@ -83,14 +83,14 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
        cr.CrunchLog.Close()
        logtext1 := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
                "2015-12-29T15:51:45.000000003Z Goodbye\n"
-       c.Check(api.Content["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
-       c.Check(api.Content["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext1)
+       c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
+       c.Check(api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext1)
 
        stdout.Close()
        logtext2 := "2015-12-29T15:51:45.000000002Z Doing stuff\n" +
                "2015-12-29T15:51:45.000000004Z Blurb\n"
-       c.Check(api.Content["log"].(arvadosclient.Dict)["event_type"], Equals, "stdout")
-       c.Check(api.Content["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext2)
+       c.Check(api.Content[1]["log"].(arvadosclient.Dict)["event_type"], Equals, "stdout")
+       c.Check(api.Content[1]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext2)
 
        mt, err := cr.LogCollection.ManifestText()
        c.Check(err, IsNil)
index 312bea07fd4bcceacdf8e1dd6a0ab9a6eec7bfa6..8e128358422a560a42cbdaa03e20feb8067fa6ba 100644 (file)
@@ -22,6 +22,7 @@ var (
        logEventTypePrefix  string
        logFrequencySeconds int
        minutesBetweenRuns  int
+       collectionBatchSize int
        dryRun              bool
 )
 
@@ -38,6 +39,10 @@ func init() {
                "minutes-between-runs",
                0,
                "How many minutes we wait between data manager runs. 0 means run once and exit.")
+       flag.IntVar(&collectionBatchSize,
+               "collection-batch-size",
+               1000,
+               "How many collections to request in each batch.")
        flag.BoolVar(&dryRun,
                "dry-run",
                false,
@@ -193,7 +198,7 @@ func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
                                collection.GetCollectionsParams{
                                        Client:    arv,
                                        Logger:    arvLogger,
-                                       BatchSize: 50})
+                                       BatchSize: collectionBatchSize})
                        collDone <- struct{}{}
                }()
 
diff --git a/services/nodemanager/arvnodeman/baseactor.py b/services/nodemanager/arvnodeman/baseactor.py
new file mode 100644 (file)
index 0000000..9591b42
--- /dev/null
@@ -0,0 +1,85 @@
+from __future__ import absolute_import, print_function
+
+import errno
+import logging
+import os
+import threading
+import traceback
+
+import pykka
+
+class _TellCallableProxy(object):
+    """Internal helper class for proxying callables."""
+
+    def __init__(self, ref, attr_path):
+        self.actor_ref = ref
+        self._attr_path = attr_path
+
+    def __call__(self, *args, **kwargs):
+        message = {
+            'command': 'pykka_call',
+            'attr_path': self._attr_path,
+            'args': args,
+            'kwargs': kwargs,
+        }
+        self.actor_ref.tell(message)
+
+
+class TellActorProxy(pykka.ActorProxy):
+    """ActorProxy in which all calls are implemented as using tell().
+
+    The standard pykka.ActorProxy always uses ask() and returns a Future.  If
+    the target method raises an exception, it is placed in the Future object
+    and re-raised when get() is called on the Future.  Unfortunately, most
+    messaging in Node Manager is asynchronous and the caller does not store the
+    Future object returned by the call to ActorProxy.  As a result, exceptions
+    resulting from these calls end up in limbo, neither reported in the logs
+    nor handled by on_failure().
+
+    The TellActorProxy uses tell() instead of ask() and does not return a
+    Future object.  As a result, if the target method raises an exception, it
+    will be logged and on_failure() will be called as intended.
+
+    """
+
+    def __repr__(self):
+        return '<ActorProxy for %s, attr_path=%s>' % (
+            self.actor_ref, self._attr_path)
+
+    def __getattr__(self, name):
+        """Get a callable from the actor."""
+        attr_path = self._attr_path + (name,)
+        if attr_path not in self._known_attrs:
+            self._known_attrs = self._get_attributes()
+        attr_info = self._known_attrs.get(attr_path)
+        if attr_info is None:
+            raise AttributeError('%s has no attribute "%s"' % (self, name))
+        if attr_info['callable']:
+            if attr_path not in self._callable_proxies:
+                self._callable_proxies[attr_path] = _TellCallableProxy(
+                    self.actor_ref, attr_path)
+            return self._callable_proxies[attr_path]
+        else:
+            raise AttributeError('attribute "%s" is not a callable on %s' % (name, self))
+
+class TellableActorRef(pykka.ActorRef):
+    """ActorRef adding the tell_proxy() method to get TellActorProxy."""
+
+    def tell_proxy(self):
+        return TellActorProxy(self)
+
+class BaseNodeManagerActor(pykka.ThreadingActor):
+    """Base class for actors in node manager, redefining actor_ref as a
+    TellableActorRef and providing a default on_failure handler.
+    """
+
+    def __init__(self, *args, **kwargs):
+         super(pykka.ThreadingActor, self).__init__(*args, **kwargs)
+         self.actor_ref = TellableActorRef(self)
+
+    def on_failure(self, exception_type, exception_value, tb):
+        lg = getattr(self, "_logger", logging)
+        if (exception_type in (threading.ThreadError, MemoryError) or
+            exception_type is OSError and exception_value.errno == errno.ENOMEM):
+            lg.critical("Unhandled exception is a fatal error, killing Node Manager")
+            os.killpg(os.getpgid(0), 9)
index 9a9ce588d382f54b9e399b5b280b8ed979557927..e1307494ec6644b10d8d1ca6954035223d94f5ee 100644 (file)
@@ -38,7 +38,7 @@ class RemotePollLoopActor(actor_class):
         super(RemotePollLoopActor, self).__init__()
         self._client = client
         self._timer = timer_actor
-        self._later = self.actor_ref.proxy()
+        self._later = self.actor_ref.tell_proxy()
         self._polling_started = False
         self.min_poll_wait = poll_wait
         self.max_poll_wait = max_poll_wait
index 2ae4fc8923612d474b833fcf9f345b255148ee3d..e11dcc77badbe58de0b56cec9d04e1316d79d2a6 100644 (file)
@@ -26,7 +26,7 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
         super(ComputeNodeStateChangeBase, self).__init__()
         RetryMixin.__init__(self, retry_wait, max_retry_wait,
                             None, cloud_client, timer_actor)
-        self._later = self.actor_ref.proxy()
+        self._later = self.actor_ref.tell_proxy()
         self._arvados = arvados_client
         self.subscribers = set()
 
@@ -37,6 +37,8 @@ class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
         self._set_logger()
 
     def _finished(self):
+        if self.subscribers is None:
+            raise Exception("Actor tried to finish twice")
         _notify_subscribers(self._later, self.subscribers)
         self.subscribers = None
         self._logger.info("finished")
@@ -225,6 +227,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         if not self._cloud.destroy_node(self.cloud_node):
             if self._cloud.broken(self.cloud_node):
                 self._later.cancel_shutdown(self.NODE_BROKEN)
+                return
             else:
                 # Force a retry.
                 raise cloud_types.LibcloudError("destroy_node failed")
@@ -304,7 +307,7 @@ class ComputeNodeMonitorActor(config.actor_class):
                  boot_fail_after=1800
     ):
         super(ComputeNodeMonitorActor, self).__init__()
-        self._later = self.actor_ref.proxy()
+        self._later = self.actor_ref.tell_proxy()
         self._last_log = None
         self._shutdowns = shutdown_timer
         self._cloud_node_fqdn = cloud_fqdn_func
index dcfe1ceb133e671527e70967dd25a783d64210a6..15891a92bcf8a7fdd5365595489e5efe96a5e0cd 100644 (file)
@@ -12,7 +12,7 @@ import httplib2
 import pykka
 from apiclient import errors as apierror
 
-from .fullstopactor import FullStopActor
+from .baseactor import BaseNodeManagerActor
 
 # IOError is the base class for socket.error, ssl.SSLError, and friends.
 # It seems like it hits the sweet spot for operations we want to retry:
@@ -20,7 +20,7 @@ from .fullstopactor import FullStopActor
 NETWORK_ERRORS = (IOError,)
 ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
 
-actor_class = FullStopActor
+actor_class = BaseNodeManagerActor
 
 class NodeManagerConfig(ConfigParser.SafeConfigParser):
     """Node Manager Configuration class.
index 0993c479625f23a209c90412fa4426ff2c406d23..33b6cd58f6aff2897cef4c89d0c4b60a149b0ee4 100644 (file)
@@ -121,7 +121,7 @@ class NodeManagerDaemonActor(actor_class):
         self._new_arvados = arvados_factory
         self._new_cloud = cloud_factory
         self._cloud_driver = self._new_cloud()
-        self._later = self.actor_ref.proxy()
+        self._later = self.actor_ref.tell_proxy()
         self.shutdown_windows = shutdown_windows
         self.server_calculator = server_calculator
         self.min_cloud_size = self.server_calculator.cheapest_size()
@@ -174,11 +174,12 @@ class NodeManagerDaemonActor(actor_class):
             poll_stale_after=self.poll_stale_after,
             node_stale_after=self.node_stale_after,
             cloud_client=self._cloud_driver,
-            boot_fail_after=self.boot_fail_after).proxy()
-        actor.subscribe(self._later.node_can_shutdown)
+            boot_fail_after=self.boot_fail_after)
+        actorTell = actor.tell_proxy()
+        actorTell.subscribe(self._later.node_can_shutdown)
         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
-                                             actor.update_cloud_node)
-        record = _ComputeNodeRecord(actor, cloud_node)
+                                             actorTell.update_cloud_node)
+        record = _ComputeNodeRecord(actor.proxy(), cloud_node)
         return record
 
     def update_cloud_nodes(self, nodelist):
@@ -360,7 +361,7 @@ class NodeManagerDaemonActor(actor_class):
             arvados_client=self._new_arvados(),
             arvados_node=arvados_node,
             cloud_client=self._new_cloud(),
-            cloud_size=cloud_size).proxy()
+            cloud_size=cloud_size).tell_proxy()
         self.booting[new_setup.actor_ref.actor_urn] = new_setup
         self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
 
@@ -413,7 +414,7 @@ class NodeManagerDaemonActor(actor_class):
             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
         self.shutdowns[cloud_node_id] = shutdown
         self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
-        shutdown.subscribe(self._later.node_finished_shutdown)
+        shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
 
     @_check_poll_freshness
     def node_can_shutdown(self, node_actor):
@@ -438,12 +439,10 @@ class NodeManagerDaemonActor(actor_class):
         if not success:
             if cancel_reason == self._node_shutdown.NODE_BROKEN:
                 self.cloud_nodes.blacklist(cloud_node_id)
-            del self.shutdowns[cloud_node_id]
-            del self.sizes_booting_shutdown[cloud_node_id]
         elif cloud_node_id in self.booted:
             self.booted.pop(cloud_node_id).actor.stop()
-            del self.shutdowns[cloud_node_id]
-            del self.sizes_booting_shutdown[cloud_node_id]
+        del self.shutdowns[cloud_node_id]
+        del self.sizes_booting_shutdown[cloud_node_id]
 
     def shutdown(self):
         self._logger.info("Shutting down after signal.")
diff --git a/services/nodemanager/arvnodeman/fullstopactor.py b/services/nodemanager/arvnodeman/fullstopactor.py
deleted file mode 100644 (file)
index 07e0625..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-from __future__ import absolute_import, print_function
-
-import errno
-import logging
-import os
-import threading
-import traceback
-
-import pykka
-
-class FullStopActor(pykka.ThreadingActor):
-    def on_failure(self, exception_type, exception_value, tb):
-        lg = getattr(self, "_logger", logging)
-        if (exception_type in (threading.ThreadError, MemoryError) or
-            exception_type is OSError and exception_value.errno == errno.ENOMEM):
-            lg.critical("Unhandled exception is a fatal error, killing Node Manager")
-            os.killpg(os.getpgid(0), 9)
index c8b3d19485b2c9cb8d6ee6e4353ddeb2c0b9c560..78bd2db5cc05fe9516c10e718506ef11734055db 100644 (file)
@@ -69,14 +69,14 @@ def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
     max_poll_time = config.getint('Daemon', 'max_poll_time')
 
-    timer = TimedCallBackActor.start(poll_time / 10.0).proxy()
+    timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
     cloud_node_poller = CloudNodeListMonitorActor.start(
-        config.new_cloud_client(), timer, poll_time, max_poll_time).proxy()
+        config.new_cloud_client(), timer, poll_time, max_poll_time).tell_proxy()
     arvados_node_poller = ArvadosNodeListMonitorActor.start(
-        config.new_arvados_client(), timer, poll_time, max_poll_time).proxy()
+        config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
-        poll_time, max_poll_time).proxy()
+        poll_time, max_poll_time).tell_proxy()
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
@@ -110,7 +110,7 @@ def main(args=None):
         server_calculator = build_server_calculator(config)
         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
             launch_pollers(config, server_calculator)
-        cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
+        cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
         node_daemon = NodeManagerDaemonActor.start(
             job_queue_poller, arvados_node_poller, cloud_node_poller,
             cloud_node_updater, timer,
@@ -123,7 +123,7 @@ def main(args=None):
             config.getint('Daemon', 'boot_fail_after'),
             config.getint('Daemon', 'node_stale_after'),
             node_setup, node_shutdown, node_monitor,
-            max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
+            max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
 
         signal.pause()
         daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
index 615f798f5b4ff045abb423e6b04f1339164ac44d..12d6280873e8fe23669bbf6f1dce08a952bfcda2 100644 (file)
@@ -18,7 +18,7 @@ class TimedCallBackActor(actor_class):
     """
     def __init__(self, max_sleep=1):
         super(TimedCallBackActor, self).__init__()
-        self._proxy = self.actor_ref.proxy()
+        self._proxy = self.actor_ref.tell_proxy()
         self.messages = []
         self.max_sleep = max_sleep
 
index f41fa6cb1af57b6b1cb005a09bec95207ace0b14..554fb88b4723463884f408bbb4454b279a208387 100644 (file)
@@ -26,6 +26,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                           cloud_size=get_cloud_size,
                                           actor_ref=mock_actor)
         mock_actor.proxy.return_value = mock_proxy
+        mock_actor.tell_proxy.return_value = mock_proxy
 
         self.last_setup = mock_proxy
         return mock_actor
index afebb9ca32e0b5c167a96260a566feb81df3a45d..35605fcd8c564ef910bfcd352d90e36d0680e064 100644 (file)
@@ -12,9 +12,9 @@ import pykka
 
 from . import testutil
 
-import arvnodeman.fullstopactor
+import arvnodeman.baseactor
 
-class BogusActor(arvnodeman.fullstopactor.FullStopActor):
+class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
     def __init__(self, e):
         super(BogusActor, self).__init__()
         self.exp = e
@@ -23,26 +23,17 @@ class BogusActor(arvnodeman.fullstopactor.FullStopActor):
         raise self.exp
 
 class ActorUnhandledExceptionTest(unittest.TestCase):
-    def test1(self):
+    def test_fatal_error(self):
         for e in (MemoryError(), threading.ThreadError(), OSError(errno.ENOMEM, "")):
             with mock.patch('os.killpg') as killpg_mock:
-                act = BogusActor.start(e)
-                act.tell({
-                    'command': 'pykka_call',
-                    'attr_path': ("doStuff",),
-                    'args': [],
-                    'kwargs': {}
-                })
-                act.stop(block=True)
+                act = BogusActor.start(e).tell_proxy()
+                act.doStuff()
+                act.actor_ref.stop(block=True)
                 self.assertTrue(killpg_mock.called)
 
+    def test_nonfatal_error(self):
         with mock.patch('os.killpg') as killpg_mock:
-            act = BogusActor.start(OSError(errno.ENOENT, ""))
-            act.tell({
-                'command': 'pykka_call',
-                'attr_path': ("doStuff",),
-                'args': [],
-                'kwargs': {}
-            })
-            act.stop(block=True)
+            act = BogusActor.start(OSError(errno.ENOENT, "")).tell_proxy()
+            act.doStuff()
+            act.actor_ref.stop(block=True)
             self.assertFalse(killpg_mock.called)
index 6cde766fa312f5b0e07ba53148a93844d26dbf47..5803b05318e795fc9ea7cf3c195d96a00ea9818b 100644 (file)
@@ -85,7 +85,10 @@ class MockTimer(object):
             to_deliver = self.messages
             self.messages = []
         for callback, args, kwargs in to_deliver:
-            callback(*args, **kwargs)
+            try:
+                callback(*args, **kwargs)
+            except pykka.ActorDeadError:
+                pass
 
     def schedule(self, want_time, callback, *args, **kwargs):
         with self.lock:
index d790cb6b9f44346011ed41240a039a4d494d6b9f..88726a46836e636dd895288d9244e15be34b8515 100755 (executable)
@@ -56,14 +56,26 @@ getip() {
     docker inspect $ARVBOX_CONTAINER | grep \"IPAddress\" | head -n1 | tr -d ' ":,\n' | cut -c10-
 }
 
+gethost() {
+    set +e
+    OVERRIDE=$(docker exec -i $ARVBOX_CONTAINER cat /var/run/localip_override 2>/dev/null)
+    CODE=$?
+    set -e
+    if test "$CODE" = 0 ; then
+       echo $OVERRIDE
+    else
+        getip
+    fi
+}
+
 updateconf() {
     if test -f ~/.config/arvados/$ARVBOX_CONTAINER.conf ; then
-        sed "s/ARVADOS_API_HOST=.*/ARVADOS_API_HOST=$(getip):8000/" <$HOME/.config/arvados/$ARVBOX_CONTAINER.conf >$HOME/.config/arvados/$ARVBOX_CONTAINER.conf.tmp
+        sed "s/ARVADOS_API_HOST=.*/ARVADOS_API_HOST=$(gethost):8000/" <$HOME/.config/arvados/$ARVBOX_CONTAINER.conf >$HOME/.config/arvados/$ARVBOX_CONTAINER.conf.tmp
         mv ~/.config/arvados/$ARVBOX_CONTAINER.conf.tmp ~/.config/arvados/$ARVBOX_CONTAINER.conf
     else
         mkdir -p $HOME/.config/arvados
         cat >$HOME/.config/arvados/$ARVBOX_CONTAINER.conf <<EOF
-ARVADOS_API_HOST=$(getip):8000
+ARVADOS_API_HOST=$(gethost):8000
 ARVADOS_API_TOKEN=
 ARVADOS_API_HOST_INSECURE=true
 EOF
@@ -86,14 +98,14 @@ wait_for_arvbox() {
     if test -n "$localip" ; then
         echo "export ARVADOS_API_HOST=$localip:8000"
     else
-        echo "export ARVADOS_API_HOST=$(getip):8000"
+        echo "export ARVADOS_API_HOST=$(gethost):8000"
     fi
 }
 
 run() {
     if docker ps -a | grep -E "$ARVBOX_CONTAINER$" -q ; then
-        echo "Container $ARVBOX_CONTAINER is already running, use stop, restart or reboot"
-        exit 0
+        echo "Container $ARVBOX_CONTAINER is already running, use stop, restart or rebuild"
+        exit 1
     fi
 
     if echo "$1" | grep '^public' ; then
@@ -234,7 +246,7 @@ stop() {
 
 build() {
     if ! test -f "$ARVBOX_DOCKER/Dockerfile.base" ;  then
-        echo "Could not find Dockerfile ($ARVBOX_DOCKER/Dockerfile.base)"
+        echo "Could not find Dockerfile (expected it at $ARVBOX_DOCKER/Dockerfile.base)"
         exit 1
     fi
     docker build -t arvados/arvbox-base -f "$ARVBOX_DOCKER/Dockerfile.base" "$ARVBOX_DOCKER"
@@ -273,7 +285,11 @@ case "$subcmd" in
         ;;
 
     sh*)
-        docker exec -ti $ARVBOX_CONTAINER /usr/bin/env TERM=$TERM GEM_HOME=/var/lib/gems /bin/bash
+        exec docker exec -ti $ARVBOX_CONTAINER /usr/bin/env TERM=$TERM GEM_HOME=/var/lib/gems /bin/bash
+        ;;
+
+    pipe)
+        exec docker exec -i $ARVBOX_CONTAINER /usr/bin/env GEM_HOME=/var/lib/gems /bin/bash -
         ;;
 
     stop)
@@ -286,26 +302,31 @@ case "$subcmd" in
         run $@
         ;;
 
-    reboot)
+    rebuild)
         check $@
         stop
         build $@
         run $@
         ;;
 
-    ip|open)
-        if test "$subcmd" = 'ip' ; then
-            echo $(getip)
-        else
-            xdg-open http://$(getip)
-        fi
+    ip)
+        getip
+        ;;
+
+    host)
+        gethost
+        ;;
+
+    open)
+        exec xdg-open http://$(gethost)
         ;;
 
     status)
         echo "Selected: $ARVBOX_CONTAINER"
         if docker ps -a --filter "status=running" | grep -E "$ARVBOX_CONTAINER$" -q ; then
             echo "Status: running"
-            echo "IP: $(getip)"
+            echo "Container IP: $(getip)"
+            echo "Published host: $(gethost)"
         else
             echo "Status: not running"
         fi
@@ -352,19 +373,31 @@ case "$subcmd" in
 
     log)
         if test -n "$1" ; then
-            docker exec -ti $ARVBOX_CONTAINER /usr/bin/env TERM=$TERM less --follow-name +GF "/etc/service/$1/log/main/current"
+            exec docker exec -ti $ARVBOX_CONTAINER /usr/bin/env TERM=$TERM less --follow-name +GF "/etc/service/$1/log/main/current"
         else
-            docker exec -ti $ARVBOX_CONTAINER /usr/bin/env TERM=$TERM tail $(docker exec -ti $ARVBOX_CONTAINER find -L /etc -path '/etc/service/*/log/main/current' -printf " %p")
+            exec docker exec -ti $ARVBOX_CONTAINER /usr/bin/env TERM=$TERM tail $(docker exec -ti $ARVBOX_CONTAINER find -L /etc -path '/etc/service/*/log/main/current' -printf " %p")
         fi
         ;;
 
-    sv)
+    cat)
         if test -n "$1" ; then
-            docker exec -ti $ARVBOX_CONTAINER sv "$1" "$2"
+            exec docker exec -ti $ARVBOX_CONTAINER cat "$@"
+        else
+            echo "Usage: $0 $subcmd <files>"
+        fi
+        ;;
+
+    ls)
+        exec docker exec -ti $ARVBOX_CONTAINER /usr/bin/env TERM=$TERM ls "$@"
+        ;;
+
+    sv)
+        if test -n "$1" -a -n "$2" ; then
+            exec docker exec -ti $ARVBOX_CONTAINER sv "$@"
         else
-            echo "Usage: $0 $subcmd <service>"
+            echo "Usage: $0 $subcmd <start|stop|restart> <service>"
             echo "Available services:"
-            docker exec -ti $ARVBOX_CONTAINER ls /etc/service
+            exec docker exec -ti $ARVBOX_CONTAINER ls /etc/service
         fi
         ;;
 
@@ -380,23 +413,27 @@ case "$subcmd" in
         ;;
 
     *)
-        echo "Arvados-in-a-box"
+        echo "Arvados-in-a-box                      http://arvados.org"
         echo
-        echo "$(basename $0) (build|start|run|open|shell|ip|stop|reboot|reset|destroy|log|svrestart)"
+        echo "$(basename $0) (build|start|run|open|shell|ip|stop|rebuild|reset|destroy|log|svrestart)"
         echo
         echo "build <config>      build arvbox Docker image"
         echo "start|run <config>  start $ARVBOX_CONTAINER container"
         echo "open       open arvbox workbench in a web browser"
         echo "shell      enter arvbox shell"
-        echo "ip         print arvbox ip address"
+        echo "ip         print arvbox docker container ip address"
+        echo "host       print arvbox published host"
         echo "status     print some information about current arvbox"
         echo "stop       stop arvbox container"
         echo "restart <config>  stop, then run again"
-        echo "reboot  <config>  stop, build arvbox Docker image, run"
+        echo "rebuild <config>  stop, build arvbox Docker image, run"
         echo "reset      delete arvbox arvados data (be careful!)"
         echo "destroy    delete all arvbox code and data (be careful!)"
-        echo "log       <service> tail log of specified service"
-        echo "sv        <start|stop|restart> <service> change state of service inside arvbox"
+        echo "log <service> tail log of specified service"
+        echo "ls <options>  list directories inside arvbox"
+        echo "cat <files>   get contents of files inside arvbox"
+        echo "pipe       run a bash script piped in from stdin"
+        echo "sv <start|stop|restart> <service> change state of service inside arvbox"
         echo "clone <from> <to>   clone an arvbox"
         ;;
 esac
index f560de0325a38e15e7ce2e4eceb41bff01bb9757..977f61298ff7c475f69c7097e6b15d1650096a1b 100755 (executable)
@@ -34,11 +34,12 @@ if ! docker version >/dev/null 2>/dev/null ; then
   waiting="$waiting docker"
 fi
 
-if ! which arv >/dev/null ; then
-  waiting="$waiting sdk"
-elif ! which arv-get >/dev/null ; then
-  waiting="$waiting sdk"
-fi
+for sdk_app in arv arv-get cwl-runner arv-mount ; do
+    if ! which $sdk_app >/dev/null ; then
+        waiting="$waiting sdk"
+        break
+    fi
+done
 
 if ! (ps x | grep -v grep | grep "crunch-dispatch") > /dev/null ; then
     waiting="$waiting crunch-dispatch"
index b51f0fcae8f53bd15de8f4a3f7f5b76d88baabe1..3ee6f2a04265103f4dcf14fcc33742d26e636b22 100755 (executable)
@@ -19,6 +19,10 @@ cd /usr/src/arvados/sdk/python
 python setup.py sdist
 pip_install $(ls dist/arvados-python-client-*.tar.gz | tail -n1)
 
+cd /usr/src/arvados/sdk/cwl
+python setup.py sdist
+pip_install $(ls dist/arvados-cwl-runner-*.tar.gz | tail -n1)
+
 cd /usr/src/arvados/services/fuse
 python setup.py sdist
 pip_install $(ls dist/arvados_fuse-*.tar.gz | tail -n1)