11158: Fixes & test updates for ProjectDirectory.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 3 Jul 2017 20:24:08 +0000 (16:24 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 27 Jul 2017 13:26:01 +0000 (09:26 -0400)
* list_all() orders by uuid and uses it for paging.

* Move dynamic item lookup from __contains__ to __getitem__ to fix some tests.

* Remove test that check for pipeline objects because ProjectDirectory doesn't
create them any more.

* FuseSharedTest now runs in a multiprocess worker to avoid deadlock.

* Tweak local_store_get used in testing to raise NotFoundError for consistency
with real get().

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curoverse.com>

sdk/python/arvados/keep.py
sdk/python/arvados/util.py
services/fuse/arvados_fuse/command.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/tests/test_mount.py

index e6e93f080659abf9a51cec9d4425cafe8bdc976b..4103b308f186aca376781932c766d1c7fb6da35d 100644 (file)
@@ -27,6 +27,7 @@ import sys
 import threading
 from . import timer
 import urllib.parse
+import errno
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -540,7 +541,7 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-    
+
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
@@ -551,19 +552,19 @@ class KeepClient(object):
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-        
+
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-        
+
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-        
+
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -612,25 +613,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-        
+
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-        
+
         def done(self):
             return self.queue.successful_copies
-        
+
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-        
+
         def response(self):
             return self.queue.response
-    
-    
+
+
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -1130,7 +1131,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
@@ -1187,8 +1188,16 @@ class KeepClient(object):
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
             return b''
-        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
-            return f.read()
+
+        try:
+            with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
+                return f.read()
+        except IOError as e:
+            if e.errno == errno.ENOENT:
+                raise arvados.errors.NotFoundError("Not found '%s'" % locator.md5sum)
+            else:
+                raise
+
 
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)
index 779b416e2c6d79e0ca04296dc696be5aee68dfa8..1ab999653b6f342e0d4e8e3d36fb8b775a886c91 100644 (file)
@@ -369,17 +369,17 @@ def is_hex(s, *length_args):
 def list_all(fn, num_retries=0, **kwargs):
     # Default limit to (effectively) api server's MAX_LIMIT
     kwargs.setdefault('limit', sys.maxsize)
-    kwargs.setdefault('order', 'created_at asc')
+    kwargs.setdefault('order', 'uuid asc')
     kwargs.setdefault('count', 'none')
+    addfilters = kwargs.get("filters", [])
     items = []
-    offset = 0
     while True:
-        c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
+        c = fn(**kwargs).execute(num_retries=num_retries)
         items.extend(c['items'])
         if len(c['items']) < c['limit']:
             # Didn't return a full page, so we're done.
             break
-        offset = c['offset'] + len(c['items'])
+        kwargs["filters"] = addfilters + [["uuid", ">", c['items'][-1]['uuid']]]
     return items
 
 def ca_certs_path(fallback=httplib2.CA_CERTS):
index b3717ff07c23cb665505645d869b0670d1566b54..4dad90c86758edb118d7ab4b04958417533b9653 100644 (file)
@@ -205,12 +205,16 @@ class Mount(object):
         self.logger.info("enable write is %s", self.args.enable_write)
 
     def _setup_api(self):
-        self.api = arvados.safeapi.ThreadSafeApiCache(
-            apiconfig=arvados.config.settings(),
-            keep_params={
-                'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
-                'num_retries': self.args.retries,
-            })
+        try:
+            self.api = arvados.safeapi.ThreadSafeApiCache(
+                apiconfig=arvados.config.settings(),
+                keep_params={
+                    'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+                    'num_retries': self.args.retries,
+                })
+        except KeyError as e:
+            self.logger.error("Missing environment: %s", e)
+            exit(1)
         # Do a sanity check that we have a working arvados host + token.
         self.api.users().current().execute()
 
index 49d9711d89d615a0ad25e0d498ea8ae528472dd2..70bc982585780d99d68ac57cb1b1d7e2e8ac54f3 100644 (file)
@@ -847,12 +847,10 @@ class ProjectDirectory(Directory):
                 contents = arvados.util.list_all(self.api.groups().list,
                                                  self.num_retries,
                                                  filters=[["owner_uuid", "=", self.project_uuid],
-                                                          ["group_class", "=", "project"]],
-                                                 limit=1000)
+                                                          ["group_class", "=", "project"]])
                 contents.extend(arvados.util.list_all(self.api.collections().list,
                                                       self.num_retries,
-                                                      filters=[["owner_uuid", "=", self.project_uuid]],
-                                                      limit=1000))
+                                                      filters=[["owner_uuid", "=", self.project_uuid]]))
 
             # end with llfuse.lock_released, re-acquire lock
 
@@ -861,39 +859,43 @@ class ProjectDirectory(Directory):
                        samefn,
                        self.createDirectory)
         finally:
-            self._full_listing = False
             self._updating_lock.release()
 
     @use_counter
     @check_update
-    def __getitem__(self, item):
-        if item == '.arvados#project':
+    def __getitem__(self, k):
+        if k == '.arvados#project':
             return self.project_object_file
-        else:
-            return super(ProjectDirectory, self).__getitem__(item)
+        elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
+            return super(ProjectDirectory, self).__getitem__(k)
+        with llfuse.lock_released:
+            contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                       ["group_class", "=", "project"],
+                                                       ["name", "=", k]],
+                                              limit=1).execute(num_retries=self.num_retries)["items"]
+            if not contents:
+                contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                                ["name", "=", k]],
+                                                       limit=1).execute(num_retries=self.num_retries)["items"]
+        if contents:
+            i = contents[0]
+            name = sanitize_filename(self.namefn(i))
+            if name != k:
+                raise KeyError(k)
+            ent = self.createDirectory(i)
+            self._entries[name] = self.inodes.add_entry(ent)
+            return self._entries[name]
+        # Didn't find item
+        raise KeyError(k)
 
     def __contains__(self, k):
         if k == '.arvados#project':
             return True
-        else:
-            if super(ProjectDirectory, self).__contains__(k):
-                return True
-            elif not self._full_listing:
-                with llfuse.lock_released:
-                    contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
-                                                               ["group_class", "=", "project"],
-                                                               ["name", "=", k]],
-                                                      limit=1).execute(num_retries=self.num_retries)["items"]
-                    if not contents:
-                        contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
-                                                                        ["name", "=", k]],
-                                                               limit=1).execute(num_retries=self.num_retries)["items"]
-                if contents:
-                    i = contents[0]
-                    name = sanitize_filename(self.namefn(i))
-                    ent = self.createDirectory(i)
-                    self._entries[name] = self.inodes.add_entry(ent)
-                    return True
+        try:
+            self[k]
+            return True
+        except KeyError:
+            pass
         return False
 
     @use_counter
index 225e4b2d22bc50d8dd8a7a97fae8cf767cc3d638..ec8868af7d799857d0eba14e8478f3030d9969cd 100644 (file)
@@ -220,66 +220,62 @@ class FuseTagsUpdateTest(MountTestBase):
             attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
 
 
+def fuseSharedTestHelper(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            # Double check that we can open and read objects in this folder as a file,
+            # and that its contents are what we expect.
+            baz_path = os.path.join(
+                mounttmp,
+                'FUSE User',
+                'FUSE Test Project',
+                'collection in FUSE project',
+                'baz')
+            with open(baz_path) as f:
+                self.assertEqual("baz", f.read())
+
+            # check mtime on collection
+            st = os.stat(baz_path)
+            try:
+                mtime = st.st_mtime_ns / 1000000000
+            except AttributeError:
+                mtime = st.st_mtime
+            self.assertEqual(mtime, 1391448174)
+
+            # shared_dirs is a list of the directories exposed
+            # by fuse.SharedDirectory (i.e. any object visible
+            # to the current user)
+            shared_dirs = llfuse.listdir(mounttmp)
+            shared_dirs.sort()
+            self.assertIn('FUSE User', shared_dirs)
+
+            # fuse_user_objs is a list of the objects owned by the FUSE
+            # test user (which present as files in the 'FUSE User'
+            # directory)
+            fuse_user_objs = llfuse.listdir(os.path.join(mounttmp, 'FUSE User'))
+            fuse_user_objs.sort()
+            self.assertEqual(['FUSE Test Project',                    # project owned by user
+                              'collection #1 owned by FUSE',          # collection owned by user
+                              'collection #2 owned by FUSE'          # collection owned by user
+                          ], fuse_user_objs)
+
+            # test_proj_files is a list of the files in the FUSE Test Project.
+            test_proj_files = llfuse.listdir(os.path.join(mounttmp, 'FUSE User', 'FUSE Test Project'))
+            test_proj_files.sort()
+            self.assertEqual(['collection in FUSE project'
+                          ], test_proj_files)
+
+
+    Test().runTest()
+
 class FuseSharedTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.SharedDirectory,
                         exclude=self.api.users().current().execute()['uuid'])
+        keep = arvados.keep.KeepClient()
+        keep.put("baz")
 
-        # shared_dirs is a list of the directories exposed
-        # by fuse.SharedDirectory (i.e. any object visible
-        # to the current user)
-        shared_dirs = llfuse.listdir(self.mounttmp)
-        shared_dirs.sort()
-        self.assertIn('FUSE User', shared_dirs)
-
-        # fuse_user_objs is a list of the objects owned by the FUSE
-        # test user (which present as files in the 'FUSE User'
-        # directory)
-        fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
-        fuse_user_objs.sort()
-        self.assertEqual(['FUSE Test Project',                    # project owned by user
-                          'collection #1 owned by FUSE',          # collection owned by user
-                          'collection #2 owned by FUSE',          # collection owned by user
-                          'pipeline instance owned by FUSE.pipelineInstance',  # pipeline instance owned by user
-                      ], fuse_user_objs)
-
-        # test_proj_files is a list of the files in the FUSE Test Project.
-        test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
-        test_proj_files.sort()
-        self.assertEqual(['collection in FUSE project',
-                          'pipeline instance in FUSE project.pipelineInstance',
-                          'pipeline template in FUSE project.pipelineTemplate'
-                      ], test_proj_files)
-
-        # Double check that we can open and read objects in this folder as a file,
-        # and that its contents are what we expect.
-        pipeline_template_path = os.path.join(
-                self.mounttmp,
-                'FUSE User',
-                'FUSE Test Project',
-                'pipeline template in FUSE project.pipelineTemplate')
-        with open(pipeline_template_path) as f:
-            j = json.load(f)
-            self.assertEqual("pipeline template in FUSE project", j['name'])
-
-        # check mtime on template
-        st = os.stat(pipeline_template_path)
-        try:
-            mtime = st.st_mtime_ns / 1000000000
-        except AttributeError:
-            mtime = st.st_mtime
-        self.assertEqual(mtime, 1397493304)
-
-        # check mtime on collection
-        st = os.stat(os.path.join(
-                self.mounttmp,
-                'FUSE User',
-                'collection #1 owned by FUSE'))
-        try:
-            mtime = st.st_mtime_ns / 1000000000
-        except AttributeError:
-            mtime = st.st_mtime
-        self.assertEqual(mtime, 1391448174)
+        self.pool.apply(fuseSharedTestHelper, (self.mounttmp,))
 
 
 class FuseHomeTest(MountTestBase):