Merge branch 'wtsi/13093-crunch-dispatch-slurm-add-mem' refs #13093
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 23 Feb 2018 18:26:59 +0000 (13:26 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 23 Feb 2018 18:27:35 +0000 (13:27 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

28 files changed:
build/run-tests.sh
sdk/R/install_deps.R [new file with mode: 0644]
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/tests/test_submit.py
sdk/go/arvados/collection_fs_test.go
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/keep.py
sdk/python/tests/test_keep_client.py
services/api/app/controllers/arvados/v1/nodes_controller.rb
services/api/test/functional/arvados/v1/nodes_controller_test.rb
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fresh.py
services/fuse/arvados_fuse/fusefile.py
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/mounts_test.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/volume.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
tools/arvbox/lib/arvbox/docker/service/doc/run-service
tools/sync-groups/sync-groups.go
vendor/vendor.json

index 520d3e89d3c5c1a37103371510f22155284fcf90..48b3eab38ac864ab0c66d4a40d17502f60d4cdb3 100755 (executable)
@@ -781,19 +781,7 @@ do_install sdk/ruby ruby_sdk
 
 install_R_sdk() {
     cd "$WORKSPACE/sdk/R" \
-       && R --quiet --vanilla <<EOF
-options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
-if (!requireNamespace("devtools")) {
-  install.packages("devtools")
-}
-if (!requireNamespace("roxygen2")) {
-  install.packages("roxygen2")
-}
-if (!requireNamespace("pkgdown")) {
-  devtools::install_github("hadley/pkgdown")
-}
-devtools::install_dev_deps()
-EOF
+       && R --quiet --vanilla --file=install_deps.R
 }
 do_install sdk/R R_sdk
 
diff --git a/sdk/R/install_deps.R b/sdk/R/install_deps.R
new file mode 100644 (file)
index 0000000..a54a9a2
--- /dev/null
@@ -0,0 +1,18 @@
+options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
+if (!requireNamespace("devtools")) {
+  install.packages("devtools")
+}
+if (!requireNamespace("roxygen2")) {
+  install.packages("roxygen2")
+}
+
+# These install from github so install known-good versions instead of
+# letting any push to master break our build.
+if (!requireNamespace("pkgload")) {
+  devtools::install_github("r-lib/pkgload", ref="7a97de62adf1793c03e73095937e4655baad79c9")
+}
+if (!requireNamespace("pkgdown")) {
+  devtools::install_github("r-lib/pkgdown", ref="897ffbc016549c11c4263cb5d1f6e9f5c99efb45")
+}
+
+devtools::install_dev_deps()
index 71ddd172214c4dac1b20907c8cf5a18bce6c37b2..4701b4d8f13a29a2c1dc8f3bc5558de788a5a1fa 100644 (file)
@@ -405,6 +405,7 @@ class ArvCwlRunner(object):
                         "success")
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+        self.eval_timeout = kwargs.get("eval_timeout")
 
         kwargs["make_fs_access"] = make_fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
@@ -413,6 +414,8 @@ class ArvCwlRunner(object):
         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
 
         if self.work_api == "containers":
+            if self.ignore_docker_for_reuse:
+                raise validate.ValidationException("--ignore-docker-for-reuse not supported with containers API.")
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
index abe67c8fb3c552c7e66925093ae1377b1e26b4e9..a2aaa8d49e176a1795c5a5f1d7c17f4a84b658ad 100644 (file)
@@ -385,6 +385,8 @@ class RunnerContainer(Runner):
         if self.arvrunner.project_uuid:
             command.append("--project-uuid="+self.arvrunner.project_uuid)
 
+        command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout)
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
index 9cabea0794716fa79d88752616d4e7205ae6b4eb..4ab5fb524c8427b21b64c8f1aa15dbbfe10b3cb8 100644 (file)
@@ -234,7 +234,7 @@ def stubs(func):
             'state': 'Committed',
             'owner_uuid': None,
             'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                        '--enable-reuse', '--on-error=continue',
+                        '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'name': 'submit_wf.cwl',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
@@ -499,7 +499,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = [
             'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-            '--disable-reuse', '--on-error=continue',
+            '--disable-reuse', '--on-error=continue', '--eval-timeout=20',
             '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["use_existing"] = False
 
@@ -522,7 +522,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = [
             'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-            '--disable-reuse', '--on-error=continue',
+            '--disable-reuse', '--on-error=continue', '--eval-timeout=20',
             '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["use_existing"] = False
         expect_container["name"] = "submit_wf_no_reuse.cwl"
@@ -557,7 +557,7 @@ class TestSubmit(unittest.TestCase):
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                                  '--enable-reuse', '--on-error=stop',
+                                                  '--enable-reuse', '--on-error=stop', '--eval-timeout=20',
                                                   '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -581,7 +581,7 @@ class TestSubmit(unittest.TestCase):
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                                  "--output-name="+output_name, '--enable-reuse', '--on-error=continue',
+                                                  "--output-name="+output_name, '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
                                                   '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["output_name"] = output_name
 
@@ -606,7 +606,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
                                        '--enable-reuse', '--on-error=continue',
-                                       "--intermediate-output-ttl=3600",
+                                       "--intermediate-output-ttl=3600", '--eval-timeout=20',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -629,7 +629,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
                                        '--enable-reuse', '--on-error=continue',
-                                       "--trash-intermediate",
+                                       "--trash-intermediate", '--eval-timeout=20',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -653,7 +653,7 @@ class TestSubmit(unittest.TestCase):
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                                  "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
+                                                  "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
                                                   '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -735,7 +735,7 @@ class TestSubmit(unittest.TestCase):
             'name': 'expect_arvworkflow.cwl#main',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
             'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                        '--enable-reuse', '--on-error=continue',
+                        '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
                         '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
@@ -851,7 +851,7 @@ class TestSubmit(unittest.TestCase):
             'name': 'a test workflow',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
             'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                        '--enable-reuse', '--on-error=continue',
+                        '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
@@ -908,7 +908,30 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["owner_uuid"] = project_uuid
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                       '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid,
+                                       '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid, '--eval-timeout=20',
+                                       '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+    @stubs
+    def test_submit_container_eval_timeout(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--eval-timeout=60",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                                       '--enable-reuse', '--on-error=continue', '--eval-timeout=60.0',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
index bd5d08bcf5e8f278606be6bac2037ce7b9215ecb..5b9d0e2effc153ca5322a8ffea3955301def2000 100644 (file)
@@ -433,7 +433,7 @@ func (s *CollectionFSSuite) TestMkdir(c *check.C) {
        err = s.fs.Remove("foo/bar")
        c.Check(err, check.IsNil)
 
-       // mkdir succeds after the file is deleted
+       // mkdir succeeds after the file is deleted
        err = s.fs.Mkdir("foo/bar", 0755)
        c.Check(err, check.IsNil)
 
index aa6bdad90bea0551f7043fbdd0889f6dea2ff6a8..f4580f346bbed43a5642e974d54c8e5922c24efd 100644 (file)
@@ -866,6 +866,9 @@ class ArvadosFile(object):
 
     """
 
+    __slots__ = ('parent', 'name', '_writers', '_committed',
+                 '_segments', 'lock', '_current_bblock', 'fuse_entry')
+
     def __init__(self, parent, name, stream=[], segments=[]):
         """
         ArvadosFile constructor.
index 4be098d3511656e42a176b5fe46ea0de83355b10..33333ee86558c4b0244917a9ffb2c75645d321fa 100644 (file)
@@ -1531,6 +1531,10 @@ class Collection(RichCollectionBase):
 
         return text
 
+    _token_re = re.compile(r'(\S+)(\s+|$)')
+    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
+    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+
     @synchronized
     def _import_manifest(self, manifest_text):
         """Import a manifest into a `Collection`.
@@ -1549,7 +1553,7 @@ class Collection(RichCollectionBase):
         stream_name = None
         state = STREAM_NAME
 
-        for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+        for token_and_separator in self._token_re.finditer(manifest_text):
             tok = token_and_separator.group(1)
             sep = token_and_separator.group(2)
 
@@ -1564,7 +1568,7 @@ class Collection(RichCollectionBase):
                 continue
 
             if state == BLOCKS:
-                block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+                block_locator = self._block_re.match(tok)
                 if block_locator:
                     blocksize = int(block_locator.group(1))
                     blocks.append(Range(tok, streamoffset, blocksize, 0))
@@ -1573,7 +1577,7 @@ class Collection(RichCollectionBase):
                     state = SEGMENTS
 
             if state == SEGMENTS:
-                file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+                file_segment = self._segment_re.match(tok)
                 if file_segment:
                     pos = int(file_segment.group(1))
                     size = int(file_segment.group(2))
index 351f7f5dda8a96ebb805fd4d4896380cb3addbb8..e8e95afc7013650c67e753a3f2de4e7ec227fc44 100644 (file)
@@ -541,7 +541,7 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-    
+
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
@@ -552,19 +552,19 @@ class KeepClient(object):
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-        
+
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-        
+
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-        
+
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -613,25 +613,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-        
+
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-        
+
         def done(self):
             return self.queue.successful_copies
-        
+
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-        
+
         def response(self):
             return self.queue.response
-    
-    
+
+
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -996,84 +996,90 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
-        locator = KeepLocator(loc_s)
-        if method == "GET":
-            slot, first = self.block_cache.reserve_cache(locator.md5sum)
-            if not first:
-                self.hits_counter.add(1)
-                v = slot.get()
-                return v
-
-        self.misses_counter.add(1)
-
-        headers = {
-            'X-Request-Id': (request_id or
-                             (hasattr(self, 'api_client') and self.api_client.request_id) or
-                             arvados.util.new_request_id()),
-        }
-
-        # If the locator has hints specifying a prefix (indicating a
-        # remote keepproxy) or the UUID of a local gateway service,
-        # read data from the indicated service(s) instead of the usual
-        # list of local disk services.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
-        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
-                           for hint in locator.hints if (
-                                   hint.startswith('K@') and
-                                   len(hint) == 29 and
-                                   self._gateway_services.get(hint[2:])
-                                   )])
-        # Map root URLs to their KeepService objects.
-        roots_map = {
-            root: self.KeepService(root, self._user_agent_pool,
-                                   upload_counter=self.upload_counter,
-                                   download_counter=self.download_counter,
-                                   headers=headers)
-            for root in hint_roots
-        }
-
-        # See #3147 for a discussion of the loop implementation.  Highlights:
-        # * Refresh the list of Keep services after each failure, in case
-        #   it's being updated.
-        # * Retry until we succeed, we're out of retries, or every available
-        #   service has returned permanent failure.
-        sorted_roots = []
-        roots_map = {}
+        slot = None
         blob = None
-        loop = retry.RetryLoop(num_retries, self._check_loop_result,
-                               backoff_start=2)
-        for tries_left in loop:
-            try:
-                sorted_roots = self.map_new_services(
-                    roots_map, locator,
-                    force_rebuild=(tries_left < num_retries),
-                    need_writable=False,
-                    headers=headers)
-            except Exception as error:
-                loop.save_result(error)
-                continue
+        try:
+            locator = KeepLocator(loc_s)
+            if method == "GET":
+                slot, first = self.block_cache.reserve_cache(locator.md5sum)
+                if not first:
+                    self.hits_counter.add(1)
+                    blob = slot.get()
+                    if blob is None:
+                        raise arvados.errors.KeepReadError(
+                            "failed to read {}".format(loc_s))
+                    return blob
+
+            self.misses_counter.add(1)
+
+            headers = {
+                'X-Request-Id': (request_id or
+                                 (hasattr(self, 'api_client') and self.api_client.request_id) or
+                                 arvados.util.new_request_id()),
+            }
+
+            # If the locator has hints specifying a prefix (indicating a
+            # remote keepproxy) or the UUID of a local gateway service,
+            # read data from the indicated service(s) instead of the usual
+            # list of local disk services.
+            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                               for hint in locator.hints if (
+                                       hint.startswith('K@') and
+                                       len(hint) == 29 and
+                                       self._gateway_services.get(hint[2:])
+                                       )])
+            # Map root URLs to their KeepService objects.
+            roots_map = {
+                root: self.KeepService(root, self._user_agent_pool,
+                                       upload_counter=self.upload_counter,
+                                       download_counter=self.download_counter,
+                                       headers=headers)
+                for root in hint_roots
+            }
+
+            # See #3147 for a discussion of the loop implementation.  Highlights:
+            # * Refresh the list of Keep services after each failure, in case
+            #   it's being updated.
+            # * Retry until we succeed, we're out of retries, or every available
+            #   service has returned permanent failure.
+            sorted_roots = []
+            roots_map = {}
+            loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                                   backoff_start=2)
+            for tries_left in loop:
+                try:
+                    sorted_roots = self.map_new_services(
+                        roots_map, locator,
+                        force_rebuild=(tries_left < num_retries),
+                        need_writable=False,
+                        headers=headers)
+                except Exception as error:
+                    loop.save_result(error)
+                    continue
 
-            # Query KeepService objects that haven't returned
-            # permanent failure, in our specified shuffle order.
-            services_to_try = [roots_map[root]
-                               for root in sorted_roots
-                               if roots_map[root].usable()]
-            for keep_service in services_to_try:
-                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
-                if blob is not None:
-                    break
-            loop.save_result((blob, len(services_to_try)))
-
-        # Always cache the result, then return it if we succeeded.
-        if method == "GET":
-            slot.set(blob)
-            self.block_cache.cap_cache()
-        if loop.success():
-            if method == "HEAD":
-                return True
-            else:
-                return blob
+                # Query KeepService objects that haven't returned
+                # permanent failure, in our specified shuffle order.
+                services_to_try = [roots_map[root]
+                                   for root in sorted_roots
+                                   if roots_map[root].usable()]
+                for keep_service in services_to_try:
+                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+                    if blob is not None:
+                        break
+                loop.save_result((blob, len(services_to_try)))
+
+            # Always cache the result, then return it if we succeeded.
+            if loop.success():
+                if method == "HEAD":
+                    return True
+                else:
+                    return blob
+        finally:
+            if slot is not None:
+                slot.set(blob)
+                self.block_cache.cap_cache()
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1144,7 +1150,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
index e0bb734b21fbf2671c51a4ce22dd5c954432a488..872c93bae25b5480de1cbf91400f716543415700 100644 (file)
@@ -1171,7 +1171,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
 
         def finished(self):
             return False
-    
+
     def setUp(self):
         self.copies = 3
         self.pool = arvados.KeepClient.KeepWriterThreadPool(
@@ -1215,7 +1215,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             self.pool.add_task(ks, None)
         self.pool.join()
         self.assertEqual(self.pool.done(), self.copies-1)
-    
+
 
 @tutil.skip_sleep
 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
@@ -1250,3 +1250,27 @@ class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 self.keep_client.put('foo', num_retries=1, copies=2)
         self.assertEqual(2, req_mock.call_count)
+
+class KeepClientAPIErrorTest(unittest.TestCase):
+    def test_api_fail(self):
+        class ApiMock(object):
+            def __getattr__(self, r):
+                if r == "api_token":
+                    return "abc"
+                else:
+                    raise arvados.errors.KeepReadError()
+        keep_client = arvados.KeepClient(api_client=ApiMock(),
+                                             proxy='', local_store='')
+
+        # The bug this is testing for is that if an API (not
+        # keepstore) exception is thrown as part of a get(), the next
+        # attempt to get that same block will result in a deadlock.
+        # This is why there are two get()s in a row.  Unfortunately,
+        # the failure mode for this test is that the test suite
+        # deadlocks, there isn't a good way to avoid that without
+        # adding a special case that has no use except for this test.
+
+        with self.assertRaises(arvados.errors.KeepReadError):
+            keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
+        with self.assertRaises(arvados.errors.KeepReadError):
+            keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
index 247d15e1c1ce76547aaa75975c95322d1e898887..a2b22ea7f6c55709dff9d745368cbbe46bed5669 100644 (file)
@@ -39,6 +39,7 @@ class Arvados::V1::NodesController < ApplicationController
     }
     @object.update_attributes!(attrs_to_update)
     @object.assign_slot if params[:assign_slot]
+    @object.save!
     show
   end
 
index c198c4c8ee9874cc1769b11428c11f44290fcf85..dc8b3acdd7de02c83f0a668426bcf63a078c5111 100644 (file)
@@ -92,6 +92,10 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
     assert_operator 0, :<, json_response['slot_number']
     n = json_response['slot_number']
     assert_equal "compute#{n}", json_response['hostname']
+
+    node = Node.where(uuid: json_response['uuid']).first
+    assert_equal n, node.slot_number
+    assert_equal "compute#{n}", node.hostname
   end
 
   test "update node and assign slot" do
@@ -102,6 +106,10 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
     assert_operator 0, :<, json_response['slot_number']
     n = json_response['slot_number']
     assert_equal "compute#{n}", json_response['hostname']
+
+    node.reload
+    assert_equal n, node.slot_number
+    assert_equal "compute#{n}", node.hostname
   end
 
   test "update node and assign slot, don't clobber hostname" do
index 788d475e33c0d094d719503e6b9fc4dba386e1ec..f1e49f5afcffff32143b9033c5f83dddcd0c7c65 100644 (file)
@@ -156,12 +156,30 @@ class InodeCache(object):
 
     def _remove(self, obj, clear):
         if clear:
-            if obj.in_use():
-                _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
-                return
+            # Kernel behavior seems to be that if a file is
+            # referenced, its parents remain referenced too. This
+            # means has_ref() exits early when a collection is not
+            # candidate for eviction.
+            #
+            # By contrast, in_use() doesn't increment references on
+            # parents, so it requires a full tree walk to determine if
+            # a collection is a candidate for eviction.  This takes
+            # .07s for 240000 files, which becomes a major drag when
+            # cap_cache is being called several times a second and
+            # there are multiple non-evictable collections in the
+            # cache.
+            #
+            # So it is important for performance that we do the
+            # has_ref() check first.
+
             if obj.has_ref(True):
                 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
                 return
+
+            if obj.in_use():
+                _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
+                return
+
             obj.kernel_invalidate()
             _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
             obj.clear()
@@ -202,7 +220,8 @@ class InodeCache(object):
                     if obj not in self._by_uuid[obj.cache_uuid]:
                         self._by_uuid[obj.cache_uuid].append(obj)
             self._total += obj.objsize()
-            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
+            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
+                          obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
             self.cap_cache()
 
     def touch(self, obj):
index 8b680f0663d25cf423e68251f1a82b8ed7384bc2..2a3a19c54c66005a6f96cd8d1dbd6de3c6345aad 100644 (file)
@@ -59,6 +59,10 @@ class FreshBase(object):
     * Clear the object contents (invalidates the object)
 
     """
+
+    __slots__ = ("_stale", "_poll", "_last_update", "_atime", "_poll_time", "use_count",
+                 "ref_count", "dead", "cache_size", "cache_uuid", "allow_attr_cache")
+
     def __init__(self):
         self._stale = True
         self._poll = False
index 585536176007bdfcc889a47647f85114e6a34fb7..cedb4fb451cdf6fbdaefe0b4caa3a20ef424d69e 100644 (file)
@@ -15,6 +15,8 @@ _logger = logging.getLogger('arvados.arvados_fuse')
 class File(FreshBase):
     """Base for file objects."""
 
+    __slots__ = ("inode", "parent_inode", "_mtime")
+
     def __init__(self, parent_inode, _mtime=0):
         super(File, self).__init__()
         self.inode = None
@@ -46,6 +48,8 @@ class File(FreshBase):
 class FuseArvadosFile(File):
     """Wraps a ArvadosFile."""
 
+    __slots__ = ('arvfile',)
+
     def __init__(self, parent_inode, arvfile, _mtime):
         super(FuseArvadosFile, self).__init__(parent_inode, _mtime)
         self.arvfile = arvfile
index 2638bf08cdab53e84d0ff206542286ad58ff3f94..f18d82c06b29b7948a90431be686001b1bd9e572 100644 (file)
@@ -21,7 +21,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/curoverse/azure-sdk-for-go/storage"
+       "github.com/Azure/azure-sdk-for-go/storage"
 )
 
 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
@@ -105,9 +105,18 @@ type AzureBlobVolume struct {
        AzureReplication      int
        ReadOnly              bool
        RequestTimeout        arvados.Duration
+       StorageClasses        []string
 
-       azClient storage.Client
-       bsClient *azureBlobClient
+       azClient  storage.Client
+       container *azureContainer
+}
+
+// singleSender is a single-attempt storage.Sender.
+type singleSender struct{}
+
+// Send performs req exactly once.
+func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
+       return c.HTTPClient.Do(req)
 }
 
 // Examples implements VolumeWithExamples.
@@ -155,6 +164,7 @@ func (v *AzureBlobVolume) Start() error {
        if err != nil {
                return fmt.Errorf("creating Azure storage client: %s", err)
        }
+       v.azClient.Sender = &singleSender{}
 
        if v.RequestTimeout == 0 {
                v.RequestTimeout = azureDefaultRequestTimeout
@@ -163,15 +173,13 @@ func (v *AzureBlobVolume) Start() error {
                Timeout: time.Duration(v.RequestTimeout),
        }
        bs := v.azClient.GetBlobService()
-       v.bsClient = &azureBlobClient{
-               client: &bs,
+       v.container = &azureContainer{
+               ctr: bs.GetContainerReference(v.ContainerName),
        }
 
-       ok, err := v.bsClient.ContainerExists(v.ContainerName)
-       if err != nil {
+       if ok, err := v.container.Exists(); err != nil {
                return err
-       }
-       if !ok {
+       } else if !ok {
                return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
        }
        return nil
@@ -184,7 +192,7 @@ func (v *AzureBlobVolume) DeviceID() string {
 
 // Return true if expires_at metadata attribute is found on the block
 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
-       metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
+       metadata, err := v.container.GetBlobMetadata(loc)
        if err != nil {
                return false, metadata, v.translateError(err)
        }
@@ -251,7 +259,7 @@ func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int,
        if azureMaxGetBytes < BlockSize {
                // Unfortunately the handler doesn't tell us how long the blob
                // is expected to be, so we have to ask Azure.
-               props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+               props, err := v.container.GetBlobProperties(loc)
                if err != nil {
                        return 0, v.translateError(err)
                }
@@ -293,9 +301,9 @@ func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int,
                        go func() {
                                defer close(gotRdr)
                                if startPos == 0 && endPos == expectSize {
-                                       rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+                                       rdr, err = v.container.GetBlob(loc)
                                } else {
-                                       rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+                                       rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
                                }
                        }()
                        select {
@@ -364,7 +372,7 @@ func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte
        gotRdr := make(chan struct{})
        go func() {
                defer close(gotRdr)
-               rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+               rdr, err = v.container.GetBlob(loc)
        }()
        select {
        case <-ctx.Done():
@@ -411,7 +419,7 @@ func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) err
                        body = http.NoBody
                        bufr.Close()
                }
-               errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), body, nil)
+               errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
        }()
        select {
        case <-ctx.Done():
@@ -445,7 +453,7 @@ func (v *AzureBlobVolume) Touch(loc string) error {
        }
 
        metadata["touch"] = fmt.Sprintf("%d", time.Now())
-       return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
+       return v.container.SetBlobMetadata(loc, metadata, nil)
 }
 
 // Mtime returns the last-modified property of a block blob.
@@ -458,11 +466,11 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
                return time.Time{}, os.ErrNotExist
        }
 
-       props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+       props, err := v.container.GetBlobProperties(loc)
        if err != nil {
                return time.Time{}, err
        }
-       return time.Parse(time.RFC1123, props.LastModified)
+       return time.Time(props.LastModified), nil
 }
 
 // IndexTo writes a list of Keep blocks that are stored in the
@@ -470,22 +478,19 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        params := storage.ListBlobsParameters{
                Prefix:  prefix,
-               Include: "metadata",
+               Include: &storage.IncludeBlobDataset{Metadata: true},
        }
        for {
-               resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
+               resp, err := v.container.ListBlobs(params)
                if err != nil {
                        return err
                }
                for _, b := range resp.Blobs {
-                       t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
-                       if err != nil {
-                               return err
-                       }
                        if !v.isKeepBlock(b.Name) {
                                continue
                        }
-                       if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
+                       modtime := time.Time(b.Properties.LastModified)
+                       if b.Properties.ContentLength == 0 && modtime.Add(azureWriteRaceInterval).After(time.Now()) {
                                // A new zero-length blob is probably
                                // just a new non-empty blob that
                                // hasn't committed its data yet (see
@@ -497,7 +502,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
                                // Trashed blob; exclude it from response
                                continue
                        }
-                       fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
+                       fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
                }
                if resp.NextMarker == "" {
                        return nil
@@ -517,7 +522,7 @@ func (v *AzureBlobVolume) Trash(loc string) error {
        // we get the Etag before checking Mtime, and use If-Match to
        // ensure we don't delete data if Put() or Touch() happens
        // between our calls to Mtime() and DeleteBlob().
-       props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+       props, err := v.container.GetBlobProperties(loc)
        if err != nil {
                return err
        }
@@ -529,16 +534,16 @@ func (v *AzureBlobVolume) Trash(loc string) error {
 
        // If TrashLifetime == 0, just delete it
        if theConfig.TrashLifetime == 0 {
-               return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
-                       "If-Match": props.Etag,
+               return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
+                       IfMatch: props.Etag,
                })
        }
 
        // Otherwise, mark as trash
-       return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
+       return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
                "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
-       }, map[string]string{
-               "If-Match": props.Etag,
+       }, &storage.SetBlobMetadataOptions{
+               IfMatch: props.Etag,
        })
 }
 
@@ -546,7 +551,7 @@ func (v *AzureBlobVolume) Trash(loc string) error {
 // Delete the expires_at metadata attribute
 func (v *AzureBlobVolume) Untrash(loc string) error {
        // if expires_at does not exist, return NotFoundError
-       metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
+       metadata, err := v.container.GetBlobMetadata(loc)
        if err != nil {
                return v.translateError(err)
        }
@@ -556,7 +561,7 @@ func (v *AzureBlobVolume) Untrash(loc string) error {
 
        // reset expires_at metadata attribute
        metadata["expires_at"] = ""
-       err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
+       err = v.container.SetBlobMetadata(loc, metadata, nil)
        return v.translateError(err)
 }
 
@@ -586,6 +591,11 @@ func (v *AzureBlobVolume) Replication() int {
        return v.AzureReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *AzureBlobVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // If possible, translate an Azure SDK error to a recognizable error
 // like os.ErrNotExist.
 func (v *AzureBlobVolume) translateError(err error) error {
@@ -611,10 +621,10 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 func (v *AzureBlobVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int
-       params := storage.ListBlobsParameters{Include: "metadata"}
+       params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
 
        for {
-               resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
+               resp, err := v.container.ListBlobs(params)
                if err != nil {
                        log.Printf("EmptyTrash: ListBlobs: %v", err)
                        break
@@ -638,8 +648,8 @@ func (v *AzureBlobVolume) EmptyTrash() {
                                continue
                        }
 
-                       err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
-                               "If-Match": b.Properties.Etag,
+                       err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
+                               IfMatch: b.Properties.Etag,
                        })
                        if err != nil {
                                log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
@@ -659,7 +669,7 @@ func (v *AzureBlobVolume) EmptyTrash() {
 
 // InternalStats returns bucket I/O and API call counters.
 func (v *AzureBlobVolume) InternalStats() interface{} {
-       return &v.bsClient.stats
+       return &v.container.stats
 }
 
 type azureBlobStats struct {
@@ -687,75 +697,105 @@ func (s *azureBlobStats) TickErr(err error) {
        s.statsTicker.TickErr(err, errType)
 }
 
-// azureBlobClient wraps storage.BlobStorageClient in order to count
-// I/O and API usage stats.
-type azureBlobClient struct {
-       client *storage.BlobStorageClient
-       stats  azureBlobStats
+// azureContainer wraps storage.Container in order to count I/O and
+// API usage stats.
+type azureContainer struct {
+       ctr   *storage.Container
+       stats azureBlobStats
 }
 
-func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
+func (c *azureContainer) Exists() (bool, error) {
        c.stats.Tick(&c.stats.Ops)
-       ok, err := c.client.ContainerExists(cname)
+       ok, err := c.ctr.Exists()
        c.stats.TickErr(err)
        return ok, err
 }
 
-func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
+func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
        c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
-       m, err := c.client.GetBlobMetadata(cname, bname)
+       b := c.ctr.GetBlobReference(bname)
+       err := b.GetMetadata(nil)
        c.stats.TickErr(err)
-       return m, err
+       return b.Metadata, err
 }
 
-func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
+func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
        c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
-       p, err := c.client.GetBlobProperties(cname, bname)
+       b := c.ctr.GetBlobReference(bname)
+       err := b.GetProperties(nil)
        c.stats.TickErr(err)
-       return p, err
+       return &b.Properties, err
 }
 
-func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
+func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
        c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
-       rdr, err := c.client.GetBlob(cname, bname)
+       b := c.ctr.GetBlobReference(bname)
+       rdr, err := b.Get(nil)
        c.stats.TickErr(err)
        return NewCountingReader(rdr, c.stats.TickInBytes), err
 }
 
-func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
+func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
        c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
-       rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
+       b := c.ctr.GetBlobReference(bname)
+       rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
+               Range: &storage.BlobRange{
+                       Start: uint64(start),
+                       End:   uint64(end),
+               },
+               GetBlobOptions: opts,
+       })
        c.stats.TickErr(err)
        return NewCountingReader(rdr, c.stats.TickInBytes), err
 }
 
-func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
+// If we give it an io.Reader that doesn't also have a Len() int
+// method, the Azure SDK determines data size by copying the data into
+// a new buffer, which is not a good use of memory.
+type readerWithAzureLen struct {
+       io.Reader
+       len int
+}
+
+// Len satisfies the private lener interface in azure-sdk-for-go.
+func (r *readerWithAzureLen) Len() int {
+       return r.len
+}
+
+func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
        c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
        if size != 0 {
-               rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
+               rdr = &readerWithAzureLen{
+                       Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
+                       len:    size,
+               }
        }
-       err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
+       b := c.ctr.GetBlobReference(bname)
+       err := b.CreateBlockBlobFromReader(rdr, opts)
        c.stats.TickErr(err)
        return err
 }
 
-func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
+func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
        c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
-       err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
+       b := c.ctr.GetBlobReference(bname)
+       b.Metadata = m
+       err := b.SetMetadata(opts)
        c.stats.TickErr(err)
        return err
 }
 
-func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
        c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
-       resp, err := c.client.ListBlobs(cname, params)
+       resp, err := c.ctr.ListBlobs(params)
        c.stats.TickErr(err)
        return resp, err
 }
 
-func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
+func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
        c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
-       err := c.client.DeleteBlob(cname, bname, hdrs)
+       b := c.ctr.GetBlobReference(bname)
+       err := b.Delete(opts)
        c.stats.TickErr(err)
        return err
 }
index 31b79da2982547c1b731eb9ae7336edce0c0de8a..60a7911768f009ef6209292d6c1e04b6cccbe6e7 100644 (file)
@@ -26,7 +26,8 @@ import (
        "testing"
        "time"
 
-       "github.com/curoverse/azure-sdk-for-go/storage"
+       "github.com/Azure/azure-sdk-for-go/storage"
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -35,7 +36,7 @@ const (
        // used by Microsoft's Azure emulator: the Azure SDK
        // recognizes that magic string and changes its behavior to
        // cater to the Azure SDK's own test suite.
-       fakeAccountName = "fakeAccountName"
+       fakeAccountName = "fakeaccountname"
        fakeAccountKey  = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
 )
 
@@ -307,7 +308,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                                b := storage.Blob{
                                        Name: hash,
                                        Properties: storage.BlobProperties{
-                                               LastModified:  blob.Mtime.Format(time.RFC1123),
+                                               LastModified:  storage.TimeRFC1123(blob.Mtime),
                                                ContentLength: int64(len(blob.Data)),
                                                Etag:          blob.Etag,
                                        },
@@ -385,7 +386,7 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
                ReadOnly:         readonly,
                AzureReplication: replication,
                azClient:         azClient,
-               bsClient:         &azureBlobClient{client: &bs},
+               container:        &azureContainer{ctr: bs.GetContainerReference(container)},
        }
 
        return &TestableAzureBlobVolume{
@@ -707,6 +708,18 @@ func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
 
+func (s *StubbedAzureBlobSuite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: Azure
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
        v.azHandler.PutRaw(v.ContainerName, locator, data)
 }
index 883aa712e3db0bdcb8fee2eb0a583affd3150e53..66a212456d51c78e09cc12dc41bdccb17bf953df 100644 (file)
@@ -46,11 +46,11 @@ func (s *MountsSuite) TestMounts(c *check.C) {
        resp := s.call("GET", "/mounts", "", nil)
        c.Check(resp.Code, check.Equals, http.StatusOK)
        var mntList []struct {
-               UUID        string
-               DeviceID    string
-               ReadOnly    bool
-               Replication int
-               Tier        int
+               UUID           string
+               DeviceID       string
+               ReadOnly       bool
+               Replication    int
+               StorageClasses []string
        }
        err := json.Unmarshal(resp.Body.Bytes(), &mntList)
        c.Assert(err, check.IsNil)
@@ -61,7 +61,7 @@ func (s *MountsSuite) TestMounts(c *check.C) {
                c.Check(m.DeviceID, check.Equals, "mock-device-id")
                c.Check(m.ReadOnly, check.Equals, false)
                c.Check(m.Replication, check.Equals, 1)
-               c.Check(m.Tier, check.Equals, 1)
+               c.Check(m.StorageClasses, check.DeepEquals, []string{"default"})
        }
        c.Check(mntList[0].UUID, check.Not(check.Equals), mntList[1].UUID)
 
index 90e8a1b4f6913da10b6789a4111a7ac8aa479721..a60b2fc27e321f553c9784691702282ecb39a6e4 100644 (file)
@@ -152,6 +152,7 @@ type S3Volume struct {
        RaceWindow         arvados.Duration
        ReadOnly           bool
        UnsafeDelete       bool
+       StorageClasses     []string
 
        bucket *s3bucket
 
@@ -686,6 +687,11 @@ func (v *S3Volume) Replication() int {
        return v.S3Replication
 }
 
+// GetStorageClasses implements Volume
+func (v *S3Volume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 func (v *S3Volume) isKeepBlock(s string) bool {
index acc1b11df32526c132d763d970915f9f30735437..4081e1e63c4825a08712a93bd552de7818f018d5 100644 (file)
@@ -19,6 +19,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -435,6 +436,18 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
        return v
 }
 
+func (s *StubbedS3Suite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: S3
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
 func (v *TestableS3Volume) Start() error {
        tmp, err := ioutil.TempFile("", "keepstore")
        v.c.Assert(err, check.IsNil)
index 69802abdd1b5c4e22422293331d6cb0eec371896..1f8fba5d067c2a0731cb05eeebf81cc76bc315b7 100644 (file)
@@ -240,6 +240,9 @@ type Volume interface {
        // Return a globally unique ID of the underlying storage
        // device if possible, otherwise "".
        DeviceID() string
+
+       // Get the storage classes associated with this volume
+       GetStorageClasses() []string
 }
 
 // A VolumeWithExamples provides example configs to display in the
@@ -284,12 +287,12 @@ type VolumeManager interface {
 
 // A VolumeMount is an attachment of a Volume to a VolumeManager.
 type VolumeMount struct {
-       UUID        string
-       DeviceID    string
-       ReadOnly    bool
-       Replication int
-       Tier        int
-       volume      Volume
+       UUID           string
+       DeviceID       string
+       ReadOnly       bool
+       Replication    int
+       StorageClasses []string
+       volume         Volume
 }
 
 // Generate a UUID the way API server would for a "KeepVolumeMount"
@@ -326,13 +329,17 @@ func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
        }
        vm.mountMap = make(map[string]*VolumeMount)
        for _, v := range volumes {
+               sc := v.GetStorageClasses()
+               if len(sc) == 0 {
+                       sc = []string{"default"}
+               }
                mnt := &VolumeMount{
-                       UUID:        (*VolumeMount)(nil).generateUUID(),
-                       DeviceID:    v.DeviceID(),
-                       ReadOnly:    !v.Writable(),
-                       Replication: v.Replication(),
-                       Tier:        1,
-                       volume:      v,
+                       UUID:           (*VolumeMount)(nil).generateUUID(),
+                       DeviceID:       v.DeviceID(),
+                       ReadOnly:       !v.Writable(),
+                       Replication:    v.Replication(),
+                       StorageClasses: sc,
+                       volume:         v,
                }
                vm.iostats[v] = &ioStats{}
                vm.mounts = append(vm.mounts, mnt)
index baed6a71b60ae556a28b0cc3478e147ad77f90af..43ddd090cc1cfd22419e80aa86f1e838ffebd479 100644 (file)
@@ -241,3 +241,7 @@ func (v *MockVolume) Replication() int {
 
 func (v *MockVolume) EmptyTrash() {
 }
+
+func (v *MockVolume) GetStorageClasses() []string {
+       return nil
+}
index ea9aa489c5af8c357cd8c195851e94456537ce2a..b4f18ad13e6d0c93e0c0b40023b9153d3c7a6d99 100644 (file)
@@ -110,6 +110,7 @@ type UnixVolume struct {
        ReadOnly             bool
        Serialize            bool
        DirectoryReplication int
+       StorageClasses       []string
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -644,6 +645,11 @@ func (v *UnixVolume) Replication() int {
        return v.DirectoryReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *UnixVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // InternalStats returns I/O and filesystem ops counters.
 func (v *UnixVolume) InternalStats() interface{} {
        return &v.os.stats
index ea3d91d98c2e42deed99417c87254af42b8148e8..7f1cd219644ab241f2c0a8a0e2353c8f4c16844f 100644 (file)
@@ -19,6 +19,7 @@ import (
        "testing"
        "time"
 
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -427,3 +428,15 @@ func (s *UnixVolumeSuite) TestStats(c *check.C) {
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
 }
+
+func (s *UnixVolumeSuite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: Directory
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
index 597a011e72075975706a42a31ed03bc74ff42b36..37d7088b7a7c65bc8632e21269f465d6850d50b9 100644 (file)
@@ -122,7 +122,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     def prepare_arvados_node(self, node):
         self._clean_arvados_node(node, "Prepared by Node Manager")
         self.arvados_node = self._arvados.nodes().update(
-            body={}, assign_slot=True).execute()
+            uuid=node['uuid'], body={}, assign_slot=True).execute()
         self._later.create_cloud_node()
 
     @ComputeNodeStateChangeBase._finish_on_exception
index 97cc79d32fd2d110f6bd879441316cdcfec6adc1..183ff2abfd5e4e162c5b0102c298991adeb33cdf 100755 (executable)
@@ -12,19 +12,7 @@ cd /usr/src/arvados/doc
 run_bundler --without=development
 
 cd /usr/src/arvados/sdk/R
-R --quiet --vanilla <<EOF
-options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
-if (!requireNamespace("devtools")) {
-  install.packages("devtools")
-}
-if (!requireNamespace("roxygen2")) {
-  install.packages("roxygen2")
-}
-if (!requireNamespace("pkgdown")) {
-  devtools::install_github("hadley/pkgdown")
-}
-devtools::install_dev_deps()
-EOF
+R --quiet --vanilla --file=install_deps.R
 
 if test "$1" = "--only-deps" ; then
     exit
index 10569b2e139b89a2904820ab62dab6cbc3a747b5..af7b2e92ebeb0cb60697ac03dacc25e33553782b 100644 (file)
@@ -217,7 +217,7 @@ func SetParentGroup(cfg *ConfigParams) error {
                        return fmt.Errorf("error searching for parent group: %s", err)
                }
                if len(gl.Items) == 0 {
-                       // Default parent group not existant, create one.
+                       // Default parent group does not exist, create it.
                        if cfg.Verbose {
                                log.Println("Default parent group not found, creating...")
                        }
index 483ab70b6d0ba15f1f2fa675bd818bff5f1f853b..a4f750b4c4d0445567ad20da7ac9408eb12a692d 100644 (file)
                        "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
                        "revisionTime": "2017-07-27T13:52:37Z"
                },
+               {
+                       "checksumSHA1": "xHZe/h/tyrqmS9qiR03bLfRv5FI=",
+                       "path": "github.com/Azure/azure-sdk-for-go/storage",
+                       "revision": "f8eeb65a1a1f969696b49aada9d24073f2c2acd1",
+                       "revisionTime": "2018-02-15T19:19:13Z"
+               },
+               {
+                       "checksumSHA1": "PfyfOXsPbGEWmdh54cguqzdwloY=",
+                       "path": "github.com/Azure/azure-sdk-for-go/version",
+                       "revision": "471256ff7c6c93b96131845cef5309d20edd313d",
+                       "revisionTime": "2018-02-14T01:17:07Z"
+               },
+               {
+                       "checksumSHA1": "LQWU/2M2E4L/hVzT9BVW1SkLrpA=",
+                       "path": "github.com/Azure/go-autorest/autorest",
+                       "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+                       "revisionTime": "2017-11-30T17:00:06Z"
+               },
+               {
+                       "checksumSHA1": "nBQ7cdhoeYUur6G6HG97uueoDmE=",
+                       "path": "github.com/Azure/go-autorest/autorest/adal",
+                       "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+                       "revisionTime": "2017-11-30T17:00:06Z"
+               },
+               {
+                       "checksumSHA1": "zXyLmDVpkYkIsL0yinNLoW82IZc=",
+                       "path": "github.com/Azure/go-autorest/autorest/azure",
+                       "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+                       "revisionTime": "2017-11-30T17:00:06Z"
+               },
+               {
+                       "checksumSHA1": "9nXCi9qQsYjxCeajJKWttxgEt0I=",
+                       "path": "github.com/Azure/go-autorest/autorest/date",
+                       "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+                       "revisionTime": "2017-11-30T17:00:06Z"
+               },
                {
                        "checksumSHA1": "o/3cn04KAiwC7NqNVvmfVTD+hgA=",
                        "path": "github.com/Microsoft/go-winio",
                        "revisionTime": "2018-01-08T08:51:32Z"
                },
                {
-                       "checksumSHA1": "pAu+do4x7E5SFLfIqJeGwhcOd6E=",
-                       "path": "github.com/curoverse/azure-sdk-for-go/storage",
-                       "revision": "1620af6b32398bfc91827ceae54a8cc1f55df04d",
-                       "revisionTime": "2016-12-14T20:08:43Z"
+                       "checksumSHA1": "+TKtBzv23ywvmmqRiGEjUba4YmI=",
+                       "path": "github.com/dgrijalva/jwt-go",
+                       "revision": "dbeaa9332f19a944acb5736b4456cfcc02140e29",
+                       "revisionTime": "2017-10-19T21:57:19Z"
                },
                {
                        "checksumSHA1": "Gj+xR1VgFKKmFXYOJMnAczC3Znk=",
                        "revision": "83612a56d3dd153a94a629cd64925371c9adad78",
                        "revisionTime": "2017-11-26T05:04:59Z"
                },
+               {
+                       "checksumSHA1": "T9E+5mKBQ/BX4wlNxgaPfetxdeI=",
+                       "path": "github.com/marstr/guid",
+                       "revision": "8bdf7d1a087ccc975cf37dd6507da50698fd19ca",
+                       "revisionTime": "2017-04-27T23:51:15Z"
+               },
                {
                        "checksumSHA1": "bKMZjd2wPw13VwoE7mBeSv5djFA=",
                        "path": "github.com/matttproud/golang_protobuf_extensions/pbutil",
                        "revision": "cb4147076ac75738c9a7d279075a253c0cc5acbd",
                        "revisionTime": "2018-01-25T13:30:57Z"
                },
+               {
+                       "checksumSHA1": "eDQ6f1EsNf+frcRO/9XukSEchm8=",
+                       "path": "github.com/satori/go.uuid",
+                       "revision": "36e9d2ebbde5e3f13ab2e25625fd453271d6522e",
+                       "revisionTime": "2018-01-03T17:44:51Z"
+               },
                {
                        "checksumSHA1": "UwtyqB7CaUWPlw0DVJQvw0IFQZs=",
                        "path": "github.com/sergi/go-diff/diffmatchpatch",