Merge branch 'master' into 8724-keep-block-check-script
authorradhika <radhika@curoverse.com>
Tue, 12 Apr 2016 03:37:22 +0000 (23:37 -0400)
committerradhika <radhika@curoverse.com>
Tue, 12 Apr 2016 03:37:22 +0000 (23:37 -0400)
32 files changed:
build/run-build-packages.sh
build/run-tests.sh
sdk/cli/bin/crunch-job
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/test_with_arvbox.sh
sdk/cwl/tests/test_submit.py
sdk/python/arvados/events.py
sdk/python/tests/test_websockets.py
services/dockercleaner/setup.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/tests/test_cache.py [new file with mode: 0644]
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
services/nodemanager/arvnodeman/computenode/driver/__init__.py
services/nodemanager/arvnodeman/computenode/driver/azure.py
services/nodemanager/arvnodeman/computenode/driver/ec2.py
services/nodemanager/arvnodeman/computenode/driver/gce.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_dispatch_slurm.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/testutil.py
tools/arvbox/bin/arvbox
tools/arvbox/lib/arvbox/docker/Dockerfile.demo
tools/arvbox/lib/arvbox/docker/common.sh
tools/arvbox/lib/arvbox/docker/crunch-setup.sh
tools/arvbox/lib/arvbox/docker/keep-setup.sh
tools/arvbox/lib/arvbox/docker/service/arv-git-httpd/run-service
tools/arvbox/lib/arvbox/docker/service/crunch-dispatch-local/run-service
tools/arvbox/lib/arvbox/docker/service/keep-web/run-service
tools/arvbox/lib/arvbox/docker/service/keepproxy/run-service
tools/arvbox/lib/arvbox/docker/service/sdk/run-service

index f2c77bf29e03e3b097c37375546c33cb6613efd8..3178ea2ad1ec2a9ccb2c80f51ead93ca1defbfda 100755 (executable)
@@ -458,6 +458,11 @@ cd $WORKSPACE/packages/$TARGET
 rm -rf "$WORKSPACE/services/dockercleaner/build"
 fpm_build $WORKSPACE/services/dockercleaner arvados-docker-cleaner 'Curoverse, Inc.' 'python3' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/services/dockercleaner/arvados_docker_cleaner.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Arvados Docker image cleaner"
 
+# The Arvados crunchstat-summary tool
+cd $WORKSPACE/packages/$TARGET
+rm -rf "$WORKSPACE/tools/crunchstat-summary/build"
+fpm_build $WORKSPACE/tools/crunchstat-summary ${PYTHON2_PKG_PREFIX}-crunchstat-summary 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/tools/crunchstat-summary/crunchstat_summary.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=Crunchstat-summary reads Arvados Crunch log files and summarize resource usage"
+
 # Forked libcloud
 LIBCLOUD_DIR=$(mktemp -d)
 (
index 98c8ac55a8f0f5193dfc7e68d2b7e30bd1907b5e..884eda3da13a8f8c25b13bfdad1d19d6ed1bbed9 100755 (executable)
@@ -385,7 +385,12 @@ setup_virtualenv() {
     if ! [[ -e "$venvdest/bin/activate" ]] || ! [[ -e "$venvdest/bin/pip" ]]; then
         virtualenv --setuptools "$@" "$venvdest" || fatal "virtualenv $venvdest failed"
     fi
-    "$venvdest/bin/pip" install 'setuptools>=18' 'pip>=7'
+    if [[ $("$venvdest/bin/python" --version 2>&1) =~ \ 3\.[012]\. ]]; then
+        # pip 8.0.0 dropped support for python 3.2, e.g., debian wheezy
+        "$venvdest/bin/pip" install 'setuptools>=18' 'pip>=7,<8'
+    else
+        "$venvdest/bin/pip" install 'setuptools>=18' 'pip>=7'
+    fi
     # ubuntu1404 can't seem to install mock via tests_require, but it can do this.
     "$venvdest/bin/pip" install 'mock>=1.0' 'pbr<1.7.0'
 }
index cc0b60c475fd0926765f3778cfc602d1f35f3b4f..b4cb21405f829f0f431700b0fe1ade8786e1ee95 100755 (executable)
@@ -416,8 +416,17 @@ if ($docker_locator = $Job->{docker_image_locator}) {
   Log (undef, "docker image hash is $docker_hash");
   $docker_stream =~ s/^\.//;
   my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
-    arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+if $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
+    exit 0
+fi
+declare -a exit_codes=("\${PIPESTATUS[@]}")
+if [ 0 != "\${exit_codes[0]}" ]; then
+   exit "\${exit_codes[0]}"  # `docker images` failed
+elif [ 1 != "\${exit_codes[1]}" ]; then
+   exit "\${exit_codes[1]}"  # `grep` encountered an error
+else
+   # Everything worked fine, but grep didn't find the image on this host.
+   arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
 fi
 };
 
@@ -853,9 +862,12 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
-        ."&& if which crunchrunner >/dev/null ; then VOLUME_CRUNCHRUNNER=\"--volume=\$(which crunchrunner):/usr/local/bin/crunchrunner\" ; fi "
-        ."&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUME_CERTS=\"--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt\" ; fi "
-        ."&& if test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUME_CERTS=\"--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt\" ; fi ";
+        # $VOLUME_CRUNCHRUNNER and $VOLUME_CERTS will be passed unquoted as
+        # arguments to `docker run`.  They must contain their own quoting.
+        .q{&& VOLUME_CRUNCHRUNNER="" VOLUME_CERTS="" }
+        .q{&& if which crunchrunner >/dev/null ; then VOLUME_CRUNCHRUNNER=--volume=$(which crunchrunner):/usr/local/bin/crunchrunner ; fi }
+        .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUME_CERTS=--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt ; }
+        .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUME_CERTS=--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt ; fi };
 
     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
@@ -922,7 +934,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 
       # Bind mount the crunchrunner binary and host TLS certificates file into
       # the container.
-      $command .= "\"\$VOLUME_CRUNCHRUNNER\" \"\$VOLUME_CERTS\" ";
+      $command .= "\$VOLUME_CRUNCHRUNNER \$VOLUME_CERTS ";
 
       while (my ($env_key, $env_val) = each %ENV)
       {
index ab8d725bd775d34d3588d17ba8004327231cea80..8f2102c6c32aa6d21f4532f246b1524946878372 100644 (file)
@@ -366,6 +366,7 @@ class RunnerJob(object):
         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
 
         response = self.arvrunner.api.jobs().create(body={
+            "owner_uuid": self.arvrunner.project_uuid,
             "script": "cwl-runner",
             "script_version": "master",
             "repository": "arvados",
index bc0289e20415a88f9379504707c88a4befee1248..bee193858410581801ca2308f8b4045f8dea0179 100755 (executable)
@@ -23,6 +23,10 @@ while test -n "$1" ; do
             config=$2
             shift ; shift
             ;;
+        -h|--help)
+            echo "$0 [--no-reset-container] [--leave-running] [--config dev|localdemo]"
+            exit
+            ;;
         -*)
             break
             ;;
index 4e343b6265cb02cdd3393fd32b884129fc153795..19745a0c7c3fd33981087e029c4ceb8b51fa8f80 100644 (file)
@@ -17,7 +17,8 @@ class TestSubmit(unittest.TestCase):
             return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
         keep().put.side_effect = putstub
         keepdocker.return_value = True
-        api.users().current().execute.return_value = {"uuid": "zzzzz-tpzed-zzzzzzzzzzzzzzz"}
+        user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
+        api.users().current().execute.return_value = {"uuid": user_uuid}
         api.collections().list().execute.return_value = {"items": []}
         api.collections().create().execute.side_effect = ({"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
                                                            "portable_data_hash": "99999999999999999999999999999991+99"},
@@ -43,17 +44,59 @@ class TestSubmit(unittest.TestCase):
 
         api.jobs().create.assert_called_with(
             body={
+                'owner_uuid': user_uuid,
                 'runtime_constraints': {
                     'docker_image': 'arvados/jobs'
                 },
-            'script_parameters': {
-                'x': {
-                    'path': '99999999999999999999999999999992+99/blorp.txt',
-                    'class': 'File'
+                'script_parameters': {
+                    'x': {
+                        'path': '99999999999999999999999999999992+99/blorp.txt',
+                        'class': 'File'
+                    },
+                    'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
                 },
-                'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+                'repository': 'arvados',
+                'script_version': 'master',
+                'script': 'cwl-runner'
             },
-            'repository': 'arvados',
+            find_or_create=True)
+
+    @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+    @mock.patch("arvados.collection.KeepClient")
+    @mock.patch("arvados.events.subscribe")
+    def test_submit_with_project_uuid(self, events, keep, keepdocker):
+        api = mock.MagicMock()
+        def putstub(p, **kwargs):
+            return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
+        keep().put.side_effect = putstub
+        keepdocker.return_value = True
+        api.users().current().execute.return_value = {"uuid": "zzzzz-tpzed-zzzzzzzzzzzzzzz"}
+        api.collections().list().execute.return_value = {"items": []}
+        api.collections().create().execute.side_effect = ({"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
+                                                           "portable_data_hash": "99999999999999999999999999999991+99"},
+                                                          {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
+                                                           "portable_data_hash": "99999999999999999999999999999992+99"})
+        api.jobs().create().execute.return_value = {"uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz", "state": "Queued"}
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+        arvados_cwl.main(["--debug", "--submit", "--project-uuid", project_uuid,
+                          "--no-wait", "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                         sys.stdout, sys.stderr, api_client=api)
+
+        api.jobs().create.assert_called_with(
+            body={
+                'owner_uuid': project_uuid,
+                'runtime_constraints': {
+                    'docker_image': 'arvados/jobs'
+                },
+                'script_parameters': {
+                    'x': {
+                        'path': '99999999999999999999999999999992+99/blorp.txt',
+                        'class': 'File'
+                    },
+                    'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+                },
+                'repository': 'arvados',
                 'script_version': 'master',
                 'script': 'cwl-runner'
             },
index df824a331ea41a2fd702587be9c5d2828884ffb5..79960c43bf559161038eb4006662ecbc6314c113 100644 (file)
@@ -14,8 +14,8 @@ from ws4py.client.threadedclient import WebSocketClient
 _logger = logging.getLogger('arvados.events')
 
 
-class EventClient(WebSocketClient):
-    def __init__(self, url, filters, on_event, last_log_id):
+class _EventClient(WebSocketClient):
+    def __init__(self, url, filters, on_event, last_log_id, on_closed):
         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
             ssl_options['cert_reqs'] = ssl.CERT_NONE
@@ -26,19 +26,23 @@ class EventClient(WebSocketClient):
         # IPv4 addresses (common with "localhost"), only one of them
         # will be attempted -- and it might not be the right one. See
         # ws4py's WebSocketBaseClient.__init__.
-        super(EventClient, self).__init__(url, ssl_options=ssl_options)
+        super(_EventClient, self).__init__(url, ssl_options=ssl_options)
+
         self.filters = filters
         self.on_event = on_event
         self.last_log_id = last_log_id
         self._closing_lock = threading.RLock()
         self._closing = False
         self._closed = threading.Event()
+        self.on_closed = on_closed
 
     def opened(self):
-        self.subscribe(self.filters, self.last_log_id)
+        for f in self.filters:
+            self.subscribe(f, self.last_log_id)
 
     def closed(self, code, reason=None):
         self._closed.set()
+        self.on_closed()
 
     def received_message(self, m):
         with self._closing_lock:
@@ -51,21 +55,69 @@ class EventClient(WebSocketClient):
         :timeout: is the number of seconds to wait for ws4py to
         indicate that the connection has closed.
         """
-        super(EventClient, self).close(code, reason)
+        super(_EventClient, self).close(code, reason)
         with self._closing_lock:
             # make sure we don't process any more messages.
             self._closing = True
         # wait for ws4py to tell us the connection is closed.
         self._closed.wait(timeout=timeout)
 
-    def subscribe(self, filters, last_log_id=None):
-        m = {"method": "subscribe", "filters": filters}
+    def subscribe(self, f, last_log_id=None):
+        m = {"method": "subscribe", "filters": f}
         if last_log_id is not None:
             m["last_log_id"] = last_log_id
         self.send(json.dumps(m))
 
-    def unsubscribe(self, filters):
-        self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
+    def unsubscribe(self, f):
+        self.send(json.dumps({"method": "unsubscribe", "filters": f}))
+
+
+class EventClient(object):
+    def __init__(self, url, filters, on_event_cb, last_log_id):
+        self.url = url
+        if filters:
+            self.filters = [filters]
+        else:
+            self.filters = [[]]
+        self.on_event_cb = on_event_cb
+        self.last_log_id = last_log_id
+        self.is_closed = False
+        self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+
+    def connect(self):
+        self.ec.connect()
+
+    def close_connection(self):
+        self.ec.close_connection()
+
+    def subscribe(self, f, last_log_id=None):
+        self.filters.append(f)
+        self.ec.subscribe(f, last_log_id)
+
+    def unsubscribe(self, f):
+        del self.filters[self.filters.index(f)]
+        self.ec.unsubscribe(f)
+
+    def close(self, code=1000, reason='', timeout=0):
+        self.is_closed = True
+        self.ec.close(code, reason, timeout)
+
+    def on_event(self, m):
+        if m.get('id') != None:
+            self.last_log_id = m.get('id')
+        self.on_event_cb(m)
+
+    def on_closed(self):
+        if self.is_closed == False:
+            _logger.warn("Unexpected close. Reconnecting.")
+            self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
+            while True:
+              try:
+                  self.ec.connect()
+                  break
+              except Exception as e:
+                  _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e, exc_info=e)
+                  time.sleep(5)
 
 
 class PollClient(threading.Thread):
@@ -143,12 +195,12 @@ class PollClient(threading.Thread):
             # to do so raises the same exception."
             pass
 
-    def subscribe(self, filters):
+    def subscribe(self, f):
         self.on_event({'status': 200})
-        self.filters.append(filters)
+        self.filters.append(f)
 
-    def unsubscribe(self, filters):
-        del self.filters[self.filters.index(filters)]
+    def unsubscribe(self, f):
+        del self.filters[self.filters.index(f)]
 
 
 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
index 37b644aaf1eed4e4b01e4e8e90f3a8606a2e7ed5..907dd93100e645082b06a55841fffcdc0ff350f4 100644 (file)
@@ -1,9 +1,13 @@
 import arvados
 import arvados.events
 from datetime import datetime, timedelta, tzinfo
+import logging
+import logging.handlers
 import mock
 import Queue
 import run_test_server
+import StringIO
+import tempfile
 import threading
 import time
 import unittest
@@ -18,8 +22,11 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         self.ws = None
 
     def tearDown(self):
-        if self.ws:
-            self.ws.close()
+        try:
+            if self.ws:
+                self.ws.close()
+        except Exception as e:
+            print("Error in teardown: ", e)
         super(WebsocketTest, self).tearDown()
         run_test_server.reset()
 
@@ -120,3 +127,101 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
     def isotz(self, offset):
         """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
         return '{:+03d}{:02d}'.format(offset/60, offset%60)
+
+    # Test websocket reconnection on (un)execpted close
+    def _test_websocket_reconnect(self, close_unexpected):
+        run_test_server.authorize_with('active')
+        events = Queue.Queue(100)
+
+        logstream = StringIO.StringIO()
+        rootLogger = logging.getLogger()
+        streamHandler = logging.StreamHandler(logstream)
+        rootLogger.addHandler(streamHandler)
+
+        filters = [['object_uuid', 'is_a', 'arvados#human']]
+        filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)])
+        self.ws = arvados.events.subscribe(
+            arvados.api('v1'), filters,
+            events.put_nowait,
+            poll_fallback=False,
+            last_log_id=None)
+        self.assertIsInstance(self.ws, arvados.events.EventClient)
+        self.assertEqual(200, events.get(True, 5)['status'])
+
+        # create obj
+        human = arvados.api('v1').humans().create(body={}).execute()
+
+        # expect an event
+        self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
+        with self.assertRaises(Queue.Empty):
+            self.assertEqual(events.get(True, 2), None)
+
+        # close (im)properly
+        if close_unexpected:
+            self.ws.close_connection()
+        else:
+            self.ws.close()
+
+        # create one more obj
+        human2 = arvados.api('v1').humans().create(body={}).execute()
+
+        # (un)expect the object creation event
+        if close_unexpected:
+            log_object_uuids = []
+            for i in range(0, 2):
+                event = events.get(True, 5)
+                if event.get('object_uuid') != None:
+                    log_object_uuids.append(event['object_uuid'])
+            with self.assertRaises(Queue.Empty):
+                self.assertEqual(events.get(True, 2), None)
+            self.assertNotIn(human['uuid'], log_object_uuids)
+            self.assertIn(human2['uuid'], log_object_uuids)
+        else:
+            with self.assertRaises(Queue.Empty):
+                self.assertEqual(events.get(True, 2), None)
+
+        # verify log message to ensure that an (un)expected close
+        log_messages = logstream.getvalue()
+        closeLogFound = log_messages.find("Unexpected close. Reconnecting.")
+        retryLogFound = log_messages.find("Error during websocket reconnect. Will retry")
+        if close_unexpected:
+            self.assertNotEqual(closeLogFound, -1)
+        else:
+            self.assertEqual(closeLogFound, -1)
+        rootLogger.removeHandler(streamHandler)
+
+    def test_websocket_reconnect_on_unexpected_close(self):
+        self._test_websocket_reconnect(True)
+
+    def test_websocket_no_reconnect_on_close_by_user(self):
+        self._test_websocket_reconnect(False)
+
+    # Test websocket reconnection retry
+    @mock.patch('arvados.events._EventClient.connect')
+    def test_websocket_reconnect_retry(self, event_client_connect):
+        event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
+
+        logstream = StringIO.StringIO()
+        rootLogger = logging.getLogger()
+        streamHandler = logging.StreamHandler(logstream)
+        rootLogger.addHandler(streamHandler)
+
+        run_test_server.authorize_with('active')
+        events = Queue.Queue(100)
+
+        filters = [['object_uuid', 'is_a', 'arvados#human']]
+        self.ws = arvados.events.subscribe(
+            arvados.api('v1'), filters,
+            events.put_nowait,
+            poll_fallback=False,
+            last_log_id=None)
+        self.assertIsInstance(self.ws, arvados.events.EventClient)
+
+        # simulate improper close
+        self.ws.on_closed()
+
+        # verify log messages to ensure retry happened
+        log_messages = logstream.getvalue()
+        found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect. Will retry")
+        self.assertNotEqual(found, -1)
+        rootLogger.removeHandler(streamHandler)
index 1a9d0fd62eb70d667bc82b7e5a85a6f720ef1bb5..3ca9714066e23a1e3ad2c58d40691686da2b36e8 100644 (file)
@@ -25,7 +25,7 @@ setup(name="arvados-docker-cleaner",
           ('share/doc/arvados-docker-cleaner', ['agpl-3.0.txt']),
       ],
       install_requires=[
-        'docker-py',
+        'docker-py==1.7.2',
         ],
       tests_require=[
         'pbr<1.7.0',
index 196bb221e901e132d10db4f2bdbd7ed060f794e3..3f2bcd5ec2464fb5351a42be205b17f1fe93e49c 100644 (file)
@@ -320,6 +320,11 @@ class CollectionDirectoryBase(Directory):
         self.flush()
         src.flush()
 
+    def clear(self, force=False):
+        r = super(CollectionDirectoryBase, self).clear(force)
+        self.collection = None
+        return r
+
 
 class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree representing a collection."""
diff --git a/services/fuse/tests/test_cache.py b/services/fuse/tests/test_cache.py
new file mode 100644 (file)
index 0000000..7aa0009
--- /dev/null
@@ -0,0 +1,45 @@
+import arvados
+import arvados.collection
+import arvados_fuse
+import arvados_fuse.command
+import json
+import logging
+import os
+import tempfile
+import unittest
+
+from .integration_test import IntegrationTest
+from .mount_test_base import MountTestBase
+
+class TmpCollectionTest(IntegrationTest):
+    mnt_args = ["--directory-cache=0"]
+
+    @IntegrationTest.mount(argv=mnt_args)
+    def test_cache_spill(self):
+        pdh = []
+        for i in range(0, 8):
+            cw = arvados.collection.Collection()
+            f = cw.open("blurg%i" % i, "w")
+            f.write("bloop%i" % i)
+
+            cw.mkdirs("dir%i" % i)
+            f = cw.open("dir%i/blurg" % i, "w")
+            f.write("dirbloop%i" % i)
+
+            cw.save_new()
+            pdh.append(cw.portable_data_hash())
+        self.pool_test(self.mnt, pdh)
+
+    @staticmethod
+    def _test_cache_spill(self, mnt, pdh):
+        for i,v in enumerate(pdh):
+            j = os.path.join(mnt, "by_id", v, "blurg%i" % i)
+            self.assertTrue(os.path.exists(j))
+            j = os.path.join(mnt, "by_id", v, "dir%i/blurg" % i)
+            self.assertTrue(os.path.exists(j))
+
+        for i,v in enumerate(pdh):
+            j = os.path.join(mnt, "by_id", v, "blurg%i" % i)
+            self.assertTrue(os.path.exists(j))
+            j = os.path.join(mnt, "by_id", v, "dir%i/blurg" % i)
+            self.assertTrue(os.path.exists(j))
index 2ddfb0a00704036d8ae8f7528a977147006c5151..552ed01b728d9e1e7bc11df7940eee2970ab01ff 100644 (file)
@@ -219,7 +219,17 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
                 return orig_func(self, *args, **kwargs)
         return stop_wrapper
 
-    @ComputeNodeStateChangeBase._finish_on_exception
+    def _cancel_on_exception(orig_func):
+        @functools.wraps(orig_func)
+        def finish_wrapper(self, *args, **kwargs):
+            try:
+                return orig_func(self, *args, **kwargs)
+            except Exception as error:
+                self._logger.error("Actor error %s", error)
+                self._later.cancel_shutdown("Unhandled exception %s" % error)
+        return finish_wrapper
+
+    @_cancel_on_exception
     @_stop_if_window_closed
     @RetryMixin._retry()
     def shutdown_node(self):
@@ -387,6 +397,9 @@ class ComputeNodeMonitorActor(config.actor_class):
         else:
             return "node is not idle."
 
+    def resume_node(self):
+        pass
+
     def consider_shutdown(self):
         try:
             next_opening = self._shutdowns.next_opening()
index 255e50a53018351ecf3f972d12002229f16954a2..41919db07e12efe7a262c4635e9a8432febdec2f 100644 (file)
@@ -6,16 +6,28 @@ import subprocess
 import time
 
 from . import \
-    ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
+    ComputeNodeSetupActor, ComputeNodeUpdateActor
 from . import ComputeNodeShutdownActor as ShutdownActorBase
+from . import ComputeNodeMonitorActor as MonitorActorBase
 from .. import RetryMixin
 
-class ComputeNodeShutdownActor(ShutdownActorBase):
+class SlurmMixin(object):
     SLURM_END_STATES = frozenset(['down\n', 'down*\n',
                                   'drain\n', 'drain*\n',
                                   'fail\n', 'fail*\n'])
     SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
 
+    def _set_node_state(self, nodename, state, *args):
+        cmd = ['scontrol', 'update', 'NodeName=' + nodename,
+               'State=' + state]
+        cmd.extend(args)
+        subprocess.check_output(cmd)
+
+    def _get_slurm_state(self, nodename):
+        return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
+
+
+class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
     def on_start(self):
         arv_node = self._arvados_node()
         if arv_node is None:
@@ -27,21 +39,12 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
             self._logger.info("Draining SLURM node %s", self._nodename)
             self._later.issue_slurm_drain()
 
-    def _set_node_state(self, state, *args):
-        cmd = ['scontrol', 'update', 'NodeName=' + self._nodename,
-               'State=' + state]
-        cmd.extend(args)
-        subprocess.check_output(cmd)
-
-    def _get_slurm_state(self):
-        return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
-
     @RetryMixin._retry((subprocess.CalledProcessError,))
     def cancel_shutdown(self, reason):
         if self._nodename:
-            if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
+            if self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
                 # Resume from "drng" or "drain"
-                self._set_node_state('RESUME')
+                self._set_node_state(self._nodename, 'RESUME')
             else:
                 # Node is in a state such as 'idle' or 'alloc' so don't
                 # try to resume it because that will just raise an error.
@@ -51,16 +54,41 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
     @RetryMixin._retry((subprocess.CalledProcessError,))
     @ShutdownActorBase._stop_if_window_closed
     def issue_slurm_drain(self):
-        self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
+        self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
         self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
         self._later.await_slurm_drain()
 
     @RetryMixin._retry((subprocess.CalledProcessError,))
     @ShutdownActorBase._stop_if_window_closed
     def await_slurm_drain(self):
-        output = self._get_slurm_state()
+        output = self._get_slurm_state(self._nodename)
         if output in self.SLURM_END_STATES:
             self._later.shutdown_node()
         else:
             self._timer.schedule(time.time() + 10,
                                  self._later.await_slurm_drain)
+
+
+class ComputeNodeMonitorActor(SlurmMixin, MonitorActorBase):
+
+    def shutdown_eligible(self):
+        if self.arvados_node is not None:
+            state = self._get_slurm_state(self.arvados_node['hostname'])
+            # Automatically eligible for shutdown if it's down or failed, but
+            # not drain to avoid a race condition with resume_node().
+            if state in self.SLURM_END_STATES:
+                if state in self.SLURM_DRAIN_STATES:
+                    return "node is draining"
+                else:
+                    return True
+        return super(ComputeNodeMonitorActor, self).shutdown_eligible()
+
+    def resume_node(self):
+        try:
+            if (self.arvados_node is not None and
+                self._get_slurm_state(self.arvados_node['hostname']) in self.SLURM_DRAIN_STATES):
+                # Resume from "drng" or "drain"
+                self._set_node_state(self.arvados_node['hostname'], 'RESUME')
+        except Exception as error:
+            self._logger.warn(
+                "Exception reenabling node: %s", error, exc_info=error)
index 0576999ea6fe7308ccfd66b6b05b2eb776845e4c..95b6fa8e0ce962d67b43b26f8106469d9d692faf 100644 (file)
@@ -79,7 +79,7 @@ class BaseComputeNodeDriver(RetryMixin):
             key = NodeAuthSSHKey(ssh_file.read())
         return 'auth', key
 
-    def search_for(self, term, list_method, key=attrgetter('id'), **kwargs):
+    def search_for_now(self, term, list_method, key=attrgetter('id'), **kwargs):
         """Return one matching item from a list of cloud objects.
 
         Raises ValueError if the number of matching objects is not exactly 1.
@@ -92,16 +92,25 @@ class BaseComputeNodeDriver(RetryMixin):
           value search for a `term` match on each item.  Returns the
           object's 'id' attribute by default.
         """
+        items = getattr(self.real, list_method)(**kwargs)
+        results = [item for item in items if key(item) == term]
+        count = len(results)
+        if count != 1:
+            raise ValueError("{} returned {} results for {!r}".format(
+                    list_method, count, term))
+        return results[0]
+
+    def search_for(self, term, list_method, key=attrgetter('id'), **kwargs):
+        """Return one cached matching item from a list of cloud objects.
+
+        See search_for_now() for details of arguments and exceptions.
+        This method caches results, so it's good to find static cloud objects
+        like node sizes, regions, etc.
+        """
         cache_key = (list_method, term)
         if cache_key not in self.SEARCH_CACHE:
-            items = getattr(self.real, list_method)(**kwargs)
-            results = [item for item in items
-                       if key(item) == term]
-            count = len(results)
-            if count != 1:
-                raise ValueError("{} returned {} results for '{}'".format(
-                        list_method, count, term))
-            self.SEARCH_CACHE[cache_key] = results[0]
+            self.SEARCH_CACHE[cache_key] = self.search_for_now(
+                term, list_method, key, **kwargs)
         return self.SEARCH_CACHE[cache_key]
 
     def list_nodes(self, **kwargs):
@@ -109,6 +118,18 @@ class BaseComputeNodeDriver(RetryMixin):
         l.update(kwargs)
         return self.real.list_nodes(**l)
 
+    def create_cloud_name(self, arvados_node):
+        """Return a cloud node name for the given Arvados node record.
+
+        Subclasses must override this method.  It should return a string
+        that can be used as the name for a newly-created cloud node,
+        based on identifying information in the Arvados node record.
+
+        Arguments:
+        * arvados_node: This Arvados node record to seed the new cloud node.
+        """
+        raise NotImplementedError("BaseComputeNodeDriver.create_cloud_name")
+
     def arvados_create_kwargs(self, size, arvados_node):
         """Return dynamic keyword arguments for create_node.
 
@@ -143,19 +164,17 @@ class BaseComputeNodeDriver(RetryMixin):
             kwargs.update(self.arvados_create_kwargs(size, arvados_node))
             kwargs['size'] = size
             return self.real.create_node(**kwargs)
-        except self.CLOUD_ERRORS:
+        except self.CLOUD_ERRORS as create_error:
             # Workaround for bug #6702: sometimes the create node request
             # succeeds but times out and raises an exception instead of
             # returning a result.  If this happens, we get stuck in a retry
             # loop forever because subsequent create_node attempts will fail
             # due to node name collision.  So check if the node we intended to
             # create shows up in the cloud node list and return it if found.
-            node = self.search_for(kwargs['name'], 'list_nodes', self._name_key)
-            if node:
-                return node
-            else:
-                # something else went wrong, re-raise the exception
-                raise
+            try:
+                return self.search_for_now(kwargs['name'], 'list_nodes', self._name_key)
+            except ValueError:
+                raise create_error
 
     def post_create_node(self, cloud_node):
         # ComputeNodeSetupActor calls this method after the cloud node is
index 167d8b3210937acc226eaa1b5d41e333225b4176..e293d1bebeb5a479b69ff3e22784b9a467b17dd2 100644 (file)
@@ -38,15 +38,18 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
             auth_kwargs, list_kwargs, create_kwargs,
             driver_class)
 
+    def create_cloud_name(self, arvados_node):
+        uuid_parts = arvados_node['uuid'].split('-', 2)
+        return 'compute-{parts[2]}-{parts[0]}'.format(parts=uuid_parts)
+
     def arvados_create_kwargs(self, size, arvados_node):
-        cluster_id, _, node_id = arvados_node['uuid'].split('-')
-        name = 'compute-{}-{}'.format(node_id, cluster_id)
         tags = {
             'booted_at': time.strftime(ARVADOS_TIMEFMT, time.gmtime()),
             'arv-ping-url': self._make_ping_url(arvados_node)
         }
         tags.update(self.tags)
 
+        name = self.create_cloud_name(arvados_node)
         customdata = """#!/bin/sh
 mkdir -p    /var/tmp/arv-node-data/meta-data
 echo %s > /var/tmp/arv-node-data/arv-ping-url
index d314d38986e0df62a3c79624c28e77c7630cdbb6..8deabbd50a6163da537193d0df39ac720f1d04d0 100644 (file)
@@ -64,8 +64,10 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
     def _init_subnet_id(self, subnet_id):
         return 'ex_subnet', self.search_for(subnet_id, 'ex_list_subnets')
 
+    create_cloud_name = staticmethod(arvados_node_fqdn)
+
     def arvados_create_kwargs(self, size, arvados_node):
-        return {'name': arvados_node_fqdn(arvados_node),
+        return {'name': self.create_cloud_name(arvados_node),
                 'ex_userdata': self._make_ping_url(arvados_node)}
 
     def post_create_node(self, cloud_node):
index be9988333b60ae4a9bae2711fe8c50b4f65831d2..b853f00a6728693cce4b855021e18bb35c869087 100644 (file)
@@ -60,9 +60,12 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
             self.create_kwargs['ex_metadata']['sshKeys'] = (
                 'root:' + ssh_file.read().strip())
 
+    def create_cloud_name(self, arvados_node):
+        uuid_parts = arvados_node['uuid'].split('-', 2)
+        return 'compute-{parts[2]}-{parts[0]}'.format(parts=uuid_parts)
+
     def arvados_create_kwargs(self, size, arvados_node):
-        cluster_id, _, node_id = arvados_node['uuid'].split('-')
-        name = 'compute-{}-{}'.format(node_id, cluster_id)
+        name = self.create_cloud_name(arvados_node)
         disks = [
             {'autoDelete': True,
              'boot': True,
index 7976f21f1a11b8083593a8e7ac9a68c494a47e16..a02da230d8a8b8765debcc18f6971bb1769aacb2 100644 (file)
@@ -151,6 +151,9 @@ class NodeManagerDaemonActor(actor_class):
     def _update_poll_time(self, poll_key):
         self.last_polls[poll_key] = time.time()
 
+    def _resume_node(self, node_record):
+        node_record.actor.resume_node()
+
     def _pair_nodes(self, node_record, arvados_node):
         self._logger.info("Cloud node %s is now paired with Arvados node %s",
                           node_record.cloud_node.name, arvados_node['uuid'])
@@ -218,6 +221,15 @@ class NodeManagerDaemonActor(actor_class):
                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
                     self._pair_nodes(cloud_rec, arv_node)
                     break
+        for rec in self.cloud_nodes.nodes.itervalues():
+            # crunch-dispatch turns all slurm states that are not either "idle"
+            # or "alloc" into "down", but in case that behavior changes, assume
+            # any state that is not "idle" or "alloc" could be a state we want
+            # to try to resume from.
+            if (rec.arvados_node is not None and
+                rec.arvados_node["info"].get("slurm_state") not in ("idle", "alloc") and
+                rec.cloud_node.id not in self.shutdowns):
+                self._resume_node(rec)
 
     def _nodes_booting(self, size):
         s = sum(1
@@ -238,8 +250,18 @@ class NodeManagerDaemonActor(actor_class):
                   for c in self.cloud_nodes.nodes.itervalues()
                   if size is None or c.cloud_node.size.id == size.id)
 
+    def _nodes_down(self, size):
+        # Make sure to iterate over self.cloud_nodes because what we're
+        # counting here are compute nodes that are reported by the cloud
+        # provider but are considered "down" by Arvados.
+        return sum(1 for down in
+                   pykka.get_all(rec.actor.in_state('down') for rec in
+                                 self.cloud_nodes.nodes.itervalues()
+                                 if size is None or rec.cloud_node.size.id == size.id)
+                   if down)
+
     def _nodes_up(self, size):
-        up = self._nodes_booting(size) + self._nodes_booted(size)
+        up = (self._nodes_booting(size) + self._nodes_booted(size)) - self._nodes_down(size)
         return up
 
     def _total_price(self):
index 66137181421ccc592cead81950fbb2b7be3f37bb..14cd85e414ec271f2d8d1a3a9c283ac3c724fe7c 100644 (file)
@@ -361,7 +361,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
     def test_shutdown_without_arvados_node(self):
         self.make_actor(start_time=0)
         self.shutdowns._set_state(True, 600)
-        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertIs(True, self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_no_shutdown_missing(self):
         arv_node = testutil.arvados_node_mock(10, job_uuid=None,
@@ -386,7 +386,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.make_actor(11, arv_node)
         self.shutdowns._set_state(True, 600)
         self.cloud_client.broken.return_value = True
-        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertIs(True, self.node_actor.shutdown_eligible().get(self.TIMEOUT))
 
     def test_no_shutdown_when_window_closed(self):
         self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
index 8648783bac5889f11a328af3b277bd1f21da5665..135b817d91b725d26f712a8f2b1f5a5bb1f93144 100644 (file)
@@ -3,6 +3,7 @@
 from __future__ import absolute_import, print_function
 
 import subprocess
+import time
 import unittest
 
 import mock
@@ -87,3 +88,66 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         proc_mock.return_value = 'drain\n'
         super(SLURMComputeNodeShutdownActorTestCase,
               self).test_arvados_node_cleaned_after_shutdown()
+
+class SLURMComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
+                                      unittest.TestCase):
+
+    def make_mocks(self, node_num):
+        self.shutdowns = testutil.MockShutdownTimer()
+        self.shutdowns._set_state(False, 300)
+        self.timer = mock.MagicMock(name='timer_mock')
+        self.updates = mock.MagicMock(name='update_mock')
+        self.cloud_mock = testutil.cloud_node_mock(node_num)
+        self.subscriber = mock.Mock(name='subscriber_mock')
+        self.cloud_client = mock.MagicMock(name='cloud_client')
+        self.cloud_client.broken.return_value = False
+
+    def make_actor(self, node_num=1, arv_node=None, start_time=None):
+        if not hasattr(self, 'cloud_mock'):
+            self.make_mocks(node_num)
+        if start_time is None:
+            start_time = time.time()
+        self.node_actor = slurm_dispatch.ComputeNodeMonitorActor.start(
+            self.cloud_mock, start_time, self.shutdowns,
+            testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+            arv_node, boot_fail_after=300).proxy()
+        self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
+
+    @mock.patch("subprocess.check_output")
+    def test_resume_node(self, check_output):
+        arv_node = testutil.arvados_node_mock()
+        self.make_actor(arv_node=arv_node)
+        check_output.return_value = "drain\n"
+        self.node_actor.resume_node().get(self.TIMEOUT)
+        self.assertIn(mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]), check_output.call_args_list)
+        self.assertIn(mock.call(['scontrol', 'update', 'NodeName=' + arv_node['hostname'], 'State=RESUME']), check_output.call_args_list)
+
+    @mock.patch("subprocess.check_output")
+    def test_no_resume_idle_node(self, check_output):
+        arv_node = testutil.arvados_node_mock()
+        self.make_actor(arv_node=arv_node)
+        check_output.return_value = "idle\n"
+        self.node_actor.resume_node().get(self.TIMEOUT)
+        self.assertIn(mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]), check_output.call_args_list)
+        self.assertNotIn(mock.call(['scontrol', 'update', 'NodeName=' + arv_node['hostname'], 'State=RESUME']), check_output.call_args_list)
+
+    @mock.patch("subprocess.check_output")
+    def test_resume_node_exception(self, check_output):
+        arv_node = testutil.arvados_node_mock()
+        self.make_actor(arv_node=arv_node)
+        check_output.side_effect = Exception()
+        self.node_actor.resume_node().get(self.TIMEOUT)
+        self.assertIn(mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]), check_output.call_args_list)
+        self.assertNotIn(mock.call(['scontrol', 'update', 'NodeName=' + arv_node['hostname'], 'State=RESUME']), check_output.call_args_list)
+
+    @mock.patch("subprocess.check_output")
+    def test_shutdown_down_node(self, check_output):
+        check_output.return_value = "down\n"
+        self.make_actor(arv_node=testutil.arvados_node_mock())
+        self.assertIs(True, self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+
+    @mock.patch("subprocess.check_output")
+    def test_no_shutdown_drain_node(self, check_output):
+        check_output.return_value = "drain\n"
+        self.make_actor(arv_node=testutil.arvados_node_mock())
+        self.assertEquals('node is draining', self.node_actor.shutdown_eligible().get(self.TIMEOUT))
index 2daca08ecf7eb114173725bb88deff2969ad5bf3..00e05a147ffaf58ea46d345b4b3f4fd84a56a333 100644 (file)
@@ -279,7 +279,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.last_setup.arvados_node.get.return_value = arv_node
         return self.last_setup
 
-    def test_no_new_node_when_booted_node_not_usable(self):
+    def test_new_node_when_booted_node_not_usable(self):
         cloud_node = testutil.cloud_node_mock(4)
         arv_node = testutil.arvados_node_mock(4, crunch_worker_state='down')
         setup = self.start_node_boot(cloud_node, arv_node)
@@ -290,7 +290,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_server_wishlist(
             [testutil.MockSize(1)]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
-        self.assertEqual(1, self.node_setup.start.call_count)
+        self.assertEqual(2, self.node_setup.start.call_count)
 
     def test_no_duplication_when_booting_node_listed_fast(self):
         # Test that we don't start two ComputeNodeMonitorActors when
@@ -718,3 +718,25 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         # test for that.
         self.assertEqual(2, sizecounts[small.id])
         self.assertEqual(1, sizecounts[big.id])
+
+    @mock.patch("arvnodeman.daemon.NodeManagerDaemonActor._resume_node")
+    def test_resume_drained_nodes(self, resume_node):
+        cloud_node = testutil.cloud_node_mock(1)
+        arv_node = testutil.arvados_node_mock(1, info={"ec2_instance_id": "1", "slurm_state": "down"})
+        self.make_daemon([cloud_node], [arv_node])
+        resume_node.assert_called_with(self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values()[0])
+        self.stop_proxy(self.daemon)
+
+    @mock.patch("arvnodeman.daemon.NodeManagerDaemonActor._resume_node")
+    def test_no_resume_shutdown_nodes(self, resume_node):
+        cloud_node = testutil.cloud_node_mock(1)
+        arv_node = testutil.arvados_node_mock(1, info={"ec2_instance_id": "1", "slurm_state": "down"})
+
+        self.make_daemon([cloud_node], [])
+
+        self.node_shutdown = mock.MagicMock(name='shutdown_mock')
+        self.daemon.shutdowns.get(self.TIMEOUT)[cloud_node.id] = self.node_shutdown
+
+        self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        resume_node.assert_not_called()
index b9e7beabb5ca1237cc1b64619c9a412872c2923b..b376ca792a01e9bd9a8aecf5ee782982168e6174 100644 (file)
@@ -6,6 +6,7 @@ import datetime
 import threading
 import time
 
+import libcloud.common.types as cloud_types
 import mock
 import pykka
 
@@ -142,6 +143,30 @@ class DriverTestMixin(object):
             self.assertTrue(self.driver_mock.called)
             self.assertIs(driver.real, driver_mock2)
 
+    def test_create_can_find_node_after_timeout(self):
+        driver = self.new_driver()
+        arv_node = arvados_node_mock()
+        cloud_node = cloud_node_mock()
+        cloud_node.name = driver.create_cloud_name(arv_node)
+        create_method = self.driver_mock().create_node
+        create_method.side_effect = cloud_types.LibcloudError("fake timeout")
+        list_method = self.driver_mock().list_nodes
+        list_method.return_value = [cloud_node]
+        actual = driver.create_node(MockSize(1), arv_node)
+        self.assertIs(cloud_node, actual)
+
+    def test_create_can_raise_exception_after_timeout(self):
+        driver = self.new_driver()
+        arv_node = arvados_node_mock()
+        create_method = self.driver_mock().create_node
+        create_method.side_effect = cloud_types.LibcloudError("fake timeout")
+        list_method = self.driver_mock().list_nodes
+        list_method.return_value = []
+        with self.assertRaises(cloud_types.LibcloudError) as exc_test:
+            driver.create_node(MockSize(1), arv_node)
+        self.assertIs(create_method.side_effect, exc_test.exception)
+
+
 class RemotePollLoopActorTestMixin(ActorTestMixin):
     def build_monitor(self, *args, **kwargs):
         self.timer = mock.MagicMock(name='timer_mock')
index e21db0dfea7332b462d9ad841e0faa6cf5f9bf38..928adf8f2856d0cabd65b0076f0db56096a0e188 100755 (executable)
@@ -47,6 +47,8 @@ PG_DATA="$ARVBOX_DATA/postgres"
 VAR_DATA="$ARVBOX_DATA/var"
 PASSENGER="$ARVBOX_DATA/passenger"
 GEMS="$ARVBOX_DATA/gems"
+PIPCACHE="$ARVBOX_DATA/pip"
+GOSTUFF="$ARVBOX_DATA/gopath"
 
 getip() {
     docker inspect $ARVBOX_CONTAINER | grep \"IPAddress\" | head -n1 | tr -d ' ":,\n' | cut -c10-
@@ -149,7 +151,8 @@ run() {
         updateconf
         wait_for_arvbox
     else
-        mkdir -p "$PG_DATA" "$VAR_DATA" "$PASSENGER" "$GEMS"
+        mkdir -p "$PG_DATA" "$VAR_DATA" "$PASSENGER" "$GEMS" "$PIPCACHE" "$GOSTUFF"
+
 
         if ! test -d "$ARVADOS_ROOT" ; then
             git clone https://github.com/curoverse/arvados.git "$ARVADOS_ROOT"
@@ -173,6 +176,8 @@ run() {
                    "--volume=$VAR_DATA:/var/lib/arvados:rw" \
                    "--volume=$PASSENGER:/var/lib/passenger:rw" \
                    "--volume=$GEMS:/var/lib/gems:rw" \
+                   "--volume=$PIPCACHE:/var/lib/pip:rw" \
+                   "--volume=$GOSTUFF:/var/lib/gopath:rw" \
                    arvados/arvbox-dev \
                    /usr/local/bin/runsvinit -svdir=/etc/test-service
 
@@ -210,6 +215,8 @@ run() {
                    "--volume=$VAR_DATA:/var/lib/arvados:rw" \
                    "--volume=$PASSENGER:/var/lib/passenger:rw" \
                    "--volume=$GEMS:/var/lib/gems:rw" \
+                   "--volume=$PIPCACHE:/var/lib/pip:rw" \
+                   "--volume=$GOSTUFF:/var/lib/gopath:rw" \
                    $PUBLIC \
                    arvados/arvbox-dev
             updateconf
index 1f134159f7236f4e7d3c5aaddab13b0a7146b0fe..a04c06da44292f7643edbeaa6973371b5d05def8 100644 (file)
@@ -14,4 +14,8 @@ RUN sudo -u arvbox /var/lib/arvbox/service/api/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/workbench/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/doc/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/vm/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/keep-web/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/keepproxy/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/arv-git-httpd/run-service --only-deps
+RUN sudo -u arvbox /usr/local/lib/arvbox/keep-setup.sh --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/sdk/run-service
index 4c2de4798c2d0731d92b1db64dc00753c243925e..0e50a06ef59b4de5c3e18516bd63b3ed8af27f32 100644 (file)
@@ -37,13 +37,13 @@ run_bundler() {
     else
         frozen=""
     fi
-    if ! flock /var/lib/arvados/gems.lock bundle install --path $GEM_HOME --local --no-deployment $frozen "$@" ; then
-        flock /var/lib/arvados/gems.lock bundle install --path $GEM_HOME --no-deployment $frozen "$@"
+    if ! flock /var/lib/gems/gems.lock bundle install --path $GEM_HOME --local --no-deployment $frozen "$@" ; then
+        flock /var/lib/gems/gems.lock bundle install --path $GEM_HOME --no-deployment $frozen "$@"
     fi
 }
 
 pip_install() {
-    pushd /var/lib/arvados/pip
+    pushd /var/lib/pip
     for p in $(ls http*.tar.gz) ; do
         if test -f $p ; then
             ln -sf $p $(echo $p | sed 's/.*%2F\(.*\)/\1/')
@@ -56,7 +56,7 @@ pip_install() {
     done
     popd
 
-    if ! pip install --no-index --find-links /var/lib/arvados/pip $1 ; then
+    if ! pip install --no-index --find-links /var/lib/pip $1 ; then
         pip install $1
     fi
 }
index a3397c109d41b7bd6af75c5a2a157debb30fa2fe..e447fe737b89b8218ba845bd5b26feedd202cf82 100755 (executable)
@@ -5,14 +5,14 @@ set -eux -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
 
-mkdir -p /var/lib/arvados/gostuff
-cd /var/lib/arvados/gostuff
+mkdir -p /var/lib/gopath
+cd /var/lib/gopath
 
 export GOPATH=$PWD
 mkdir -p "$GOPATH/src/git.curoverse.com"
 ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
-flock /var/lib/arvados/gostuff.lock go get -t "git.curoverse.com/arvados.git/services/crunchstat"
-flock /var/lib/arvados/gostuff.lock go get -t "git.curoverse.com/arvados.git/sdk/go/crunchrunner"
+flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunchstat"
+flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/sdk/go/crunchrunner"
 install bin/crunchstat bin/crunchrunner /usr/local/bin
 
 export ARVADOS_API_HOST=$localip:${services[api]}
index b66463f1c3e363c1f96913928ca00d851885e07a..f77de10ce97faa5838eb2d0348252852f247d947 100755 (executable)
@@ -6,15 +6,19 @@ set -eux -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
 
-mkdir -p /var/lib/arvados/gostuff
-cd /var/lib/arvados/gostuff
+mkdir -p /var/lib/gopath
+cd /var/lib/gopath
 
 export GOPATH=$PWD
 mkdir -p "$GOPATH/src/git.curoverse.com"
 ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
-flock /var/lib/arvados/gostuff.lock go get -t "git.curoverse.com/arvados.git/services/keepstore"
+flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keepstore"
 install bin/keepstore /usr/local/bin
 
+if test "$1" = "--only-deps" ; then
+    exit
+fi
+
 mkdir -p /var/lib/arvados/$1
 
 export ARVADOS_API_HOST=$localip:${services[api]}
@@ -47,5 +51,6 @@ exec /usr/local/bin/keepstore \
      -listen=:$2 \
      -enforce-permissions=true \
      -blob-signing-key-file=/var/lib/arvados/blob_signing_key \
+     -data-manager-token-file=/var/lib/arvados/superuser_token \
      -max-buffers=20 \
      -volume=/var/lib/arvados/$1
index 854464efd0fc20f7e611f7c8f11576f0d761310c..518fe33d049a753cd9cece8b416c46dc4045e483 100755 (executable)
@@ -1,19 +1,23 @@
 #!/bin/bash
 
 exec 2>&1
-set -eux -o pipefail
+set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
 
-mkdir -p /var/lib/arvados/gostuff
-cd /var/lib/arvados/gostuff
+mkdir -p /var/lib/gopath
+cd /var/lib/gopath
 
 export GOPATH=$PWD
 mkdir -p "$GOPATH/src/git.curoverse.com"
 ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
-flock /var/lib/arvados/gostuff.lock go get -t "git.curoverse.com/arvados.git/services/arv-git-httpd"
+flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/arv-git-httpd"
 install bin/arv-git-httpd /usr/local/bin
 
+if test "$1" = "--only-deps" ; then
+    exit
+fi
+
 export ARVADOS_API_HOST=$localip:${services[api]}
 export ARVADOS_API_HOST_INSECURE=1
 export GITOLITE_HTTP_HOME=/var/lib/arvados/git
index 211b43885d6e49c5585ee57c359f1d29b5a55b90..c2d2cb88ebd1e269ff0c96603e2a456ca196149d 100755 (executable)
@@ -5,14 +5,14 @@ set -eux -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
 
-mkdir -p /var/lib/arvados/gostuff
-cd /var/lib/arvados/gostuff
+mkdir -p /var/lib/gopath
+cd /var/lib/gopath
 
 export GOPATH=$PWD
 mkdir -p "$GOPATH/src/git.curoverse.com"
 ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
-flock /var/lib/arvados/gostuff.lock go get -t "git.curoverse.com/arvados.git/services/crunch-run"
-flock /var/lib/arvados/gostuff.lock go get -t "git.curoverse.com/arvados.git/services/crunch-dispatch-local"
+flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunch-run"
+flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunch-dispatch-local"
 install bin/crunch-run bin/crunch-dispatch-local /usr/local/bin
 
 export ARVADOS_API_HOST=$localip:${services[api]}
index a2c6aa195fbed303eb7ffcf7261a10bb79090f50..fe53725228d91bc1181a2bae6a1045d188f98694 100755 (executable)
@@ -1,19 +1,23 @@
 #!/bin/bash
 
 exec 2>&1
-set -eux -o pipefail
+set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
 
-mkdir -p /var/lib/arvados/gostuff
-cd /var/lib/arvados/gostuff
+mkdir -p /var/lib/gopath
+cd /var/lib/gopath
 
 export GOPATH=$PWD
 mkdir -p "$GOPATH/src/git.curoverse.com"
 ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
-flock /var/lib/arvados/gostuff.lock go get -t "git.curoverse.com/arvados.git/services/keep-web"
+flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keep-web"
 install bin/keep-web /usr/local/bin
 
+if test "$1" = "--only-deps" ; then
+    exit
+fi
+
 export ARVADOS_API_HOST=$localip:${services[api]}
 export ARVADOS_API_HOST_INSECURE=1
 export ARVADOS_API_TOKEN=$(cat /var/lib/arvados/superuser_token)
index 413a67ed5640907f1b1809b497328f42c129e7cb..00b2e01f8305a4d21c17a48cfe79ba7fa31049c8 100755 (executable)
@@ -2,19 +2,23 @@
 
 exec 2>&1
 sleep 2
-set -eux -o pipefail
+set -ex -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
 
-mkdir -p /var/lib/arvados/gostuff
-cd /var/lib/arvados/gostuff
+mkdir -p /var/lib/gopath
+cd /var/lib/gopath
 
 export GOPATH=$PWD
 mkdir -p "$GOPATH/src/git.curoverse.com"
 ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
-flock /var/lib/arvados/gostuff.lock go get -t "git.curoverse.com/arvados.git/services/keepproxy"
+flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keepproxy"
 install bin/keepproxy /usr/local/bin
 
+if test "$1" = "--only-deps" ; then
+    exit
+fi
+
 export ARVADOS_API_HOST=$localip:${services[api]}
 export ARVADOS_API_HOST_INSECURE=1
 export ARVADOS_API_TOKEN=$(cat /var/lib/arvados/superuser_token)
index 3ee6f2a04265103f4dcf14fcc33742d26e636b22..29452ab9943c7853da919fa130de0b5690e249ba 100755 (executable)
@@ -5,10 +5,10 @@ set -eux -o pipefail
 
 . /usr/local/lib/arvbox/common.sh
 
-mkdir -p ~/.pip /var/lib/arvados/pip
+mkdir -p ~/.pip /var/lib/pip
 cat > ~/.pip/pip.conf <<EOF
 [global]
-download_cache = /var/lib/arvados/pip
+download_cache = /var/lib/pip
 EOF
 
 cd /usr/src/arvados/sdk/cli