3198: Fix frontrunning (subsequent updates after a commit causing the
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 12 Jun 2015 19:54:51 +0000 (15:54 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 12 Jun 2015 19:54:51 +0000 (15:54 -0400)
collection to conflict with itself).  Also fix filename character encoding.

sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
services/fuse/README.rst
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/arvados_fuse/fusefile.py

index ffe3d35b66b7af1e2eefe83386c2ac17d5a6c979..83ef76bcab404f04758a0da3740e139936572500 100644 (file)
@@ -821,7 +821,8 @@ class ArvadosFile(object):
         for lr in readsegs:
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
-                data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
+                blockview = memoryview(block)
+                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
             else:
                 break
         return ''.join(data)
index 70341d8d68538ac83d31cb30f85d8e94f76e86d5..81be529051c68abd2d24d192570e61dc9f01ea12 100644 (file)
@@ -1168,6 +1168,7 @@ class Collection(RichCollectionBase):
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
+        self._past_versions = set()
 
         self.lock = threading.RLock()
         self.events = None
@@ -1197,6 +1198,10 @@ class Collection(RichCollectionBase):
     def writable(self):
         return True
 
+    @synchronized
+    def known_past_version(self, modified_at_and_portable_data_hash):
+        return modified_at_and_portable_data_hash in self._past_versions
+
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
@@ -1206,6 +1211,11 @@ class Collection(RichCollectionBase):
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+            if self.known_past_version((response["modified_at"], response["portable_data_hash"])):
+                # We've merged this record this before.  Don't do anything.
+                return
+            else:
+                self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
@@ -1233,6 +1243,10 @@ class Collection(RichCollectionBase):
             self._block_manager = _BlockManager(self._my_keep())
         return self._block_manager
 
+    def _remember_api_response(self, response):
+        self._api_response = response
+        self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
+
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
@@ -1242,9 +1256,9 @@ class Collection(RichCollectionBase):
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
-            self._api_response = self._my_api().collections().get(
+            self._remember_api_response(self._my_api().collections().get(
                 uuid=self._manifest_locator).execute(
-                    num_retries=self.num_retries)
+                    num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
             return None
         except Exception as e:
@@ -1401,11 +1415,11 @@ class Collection(RichCollectionBase):
                 self.update()
 
             text = self.manifest_text(strip=False)
-            self._api_response = self._my_api().collections().update(
+            self._remember_api_response(self._my_api().collections().update(
                 uuid=self._manifest_locator,
                 body={'manifest_text': text}
                 ).execute(
-                    num_retries=num_retries)
+                    num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
             self.set_committed()
 
@@ -1452,7 +1466,7 @@ class Collection(RichCollectionBase):
 
         if create_collection_record:
             if name is None:
-                name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+                name = "New collection"
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
@@ -1460,7 +1474,7 @@ class Collection(RichCollectionBase):
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
-            self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
@@ -1543,7 +1557,7 @@ class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
-    It falls under the umbrella of the root collection.
+    Subcollection locking falls under the umbrella lock of its root collection.
 
     """
 
index d9a9a0789f8da010c8f199804ad14f75fcc7e240..f0b2677d86bd8626941b571e2bb20374c2613fa4 100644 (file)
@@ -55,6 +55,10 @@ on your system.
 Testing and Development
 -----------------------
 
+Debian packages you need to build llfuse:
+
+$ apt-get install python-dev pkg-config libfuse-dev libattr1-dev
+
 This package is one part of the Arvados source package, and it has
 integration tests to check interoperability with other Arvados
 components.  Our `hacking guide
index abe9821ec3504ff6fabbc02f4deda35869013027..4f91de90da34b030fe804e5f365e07e0785216f1 100644 (file)
@@ -183,10 +183,11 @@ class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
     to a concrete File or Directory object"""
 
-    def __init__(self, inode_cache):
+    def __init__(self, inode_cache, encoding="utf-8"):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
         self.inode_cache = inode_cache
+        self.encoding = encoding
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -263,10 +264,9 @@ class Operations(llfuse.Operations):
 
         if not inode_cache:
             inode_cache = InodeCache(cap=256*1024*1024)
-        self.inodes = Inodes(inode_cache)
+        self.inodes = Inodes(inode_cache, encoding=encoding)
         self.uid = uid
         self.gid = gid
-        self.encoding = encoding
 
         # dict of inode to filehandle
         self._filehandles = {}
@@ -303,6 +303,7 @@ class Operations(llfuse.Operations):
                                  [["event_type", "in", ["create", "update", "delete"]]],
                                  self.on_event)
 
+    @catch_exceptions
     def on_event(self, ev):
         if 'event_type' in ev:
             with llfuse.lock:
@@ -310,11 +311,13 @@ class Operations(llfuse.Operations):
                 if item is not None:
                     item.invalidate()
                     if ev["object_kind"] == "arvados#collection":
-                        item.update(to_pdh=ev.get("properties", {}).get("new_attributes", {}).get("portable_data_hash"))
+                        new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
+                        record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
+                        item.update(to_record_version=record_version)
                     else:
                         item.update()
 
-                oldowner = ev.get("properties", {}).get("old_attributes", {}).get("owner_uuid")
+                oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
                 olditemparent = self.inodes.inode_cache.find(oldowner)
                 if olditemparent is not None:
                     olditemparent.invalidate()
@@ -335,8 +338,8 @@ class Operations(llfuse.Operations):
         entry = llfuse.EntryAttributes()
         entry.st_ino = inode
         entry.generation = 0
-        entry.entry_timeout = 300
-        entry.attr_timeout = 300
+        entry.entry_timeout = 60
+        entry.attr_timeout = 60
 
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
         if isinstance(e, Directory):
@@ -357,7 +360,7 @@ class Operations(llfuse.Operations):
         entry.st_size = e.size()
 
         entry.st_blksize = 512
-        entry.st_blocks = (e.size()/512)+1
+        entry.st_blocks = (entry.st_size/512)+1
         entry.st_atime = int(e.atime())
         entry.st_mtime = int(e.mtime())
         entry.st_ctime = int(e.mtime())
@@ -379,7 +382,7 @@ class Operations(llfuse.Operations):
 
     @catch_exceptions
     def lookup(self, parent_inode, name):
-        name = unicode(name, self.encoding)
+        name = unicode(name, self.inodes.encoding)
         inode = None
 
         if name == '.':
@@ -431,6 +434,7 @@ class Operations(llfuse.Operations):
     @catch_exceptions
     def read(self, fh, off, size):
         _logger.debug("arv-mount read %i %i %i", fh, off, size)
+
         if fh in self._filehandles:
             handle = self._filehandles[fh]
         else:
@@ -513,10 +517,7 @@ class Operations(llfuse.Operations):
         e = off
         while e < len(handle.entries):
             if handle.entries[e][1].inode in self.inodes:
-                try:
-                    yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
-                except UnicodeEncodeError:
-                    pass
+                yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
             e += 1
 
     @catch_exceptions
index 26e7fc1972194bf44ae65cb373781e920a444967..f661e41483f5fb3a3bc952c44796c0d8c317b796 100644 (file)
@@ -143,8 +143,8 @@ class Directory(FreshBase):
 
         # delete any other directory entries that were not in found in 'items'
         for i in oldentries:
-            _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
-            llfuse.invalidate_entry(self.inode, str(i))
+            _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
+            llfuse.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
             self.inodes.del_entry(oldentries[i])
             changed = True
 
@@ -165,7 +165,7 @@ class Directory(FreshBase):
                     self._entries = oldentries
                     return False
             for n in oldentries:
-                llfuse.invalidate_entry(self.inode, str(n))
+                llfuse.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
                 self.inodes.del_entry(oldentries[n])
             llfuse.invalidate_inode(self.inode)
             self.invalidate()
@@ -235,14 +235,15 @@ class CollectionDirectoryBase(Directory):
 
     def on_event(self, event, collection, name, item):
         if collection == self.collection:
-            _logger.debug("%s %s %s %s", event, collection, name, item)
+            name = sanitize_filename(name)
+            _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
             with llfuse.lock:
                 if event == arvados.collection.ADD:
                     self.new_entry(name, item, self.mtime())
                 elif event == arvados.collection.DEL:
                     ent = self._entries[name]
                     del self._entries[name]
-                    llfuse.invalidate_entry(self.inode, name)
+                    llfuse.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
                     self.inodes.del_entry(ent)
                 elif event == arvados.collection.MOD:
                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
@@ -371,7 +372,7 @@ class CollectionDirectory(CollectionDirectoryBase):
         return self.collection_locator
 
     @use_counter
-    def update(self, to_pdh=None):
+    def update(self, to_record_version=None):
         try:
             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
                 return True
@@ -387,9 +388,9 @@ class CollectionDirectory(CollectionDirectoryBase):
                         return
 
                     _logger.debug("Updating %s", self.collection_locator)
-                    if self.collection:
-                        if self.collection.portable_data_hash() == to_pdh:
-                            _logger.debug("%s is fresh at pdh '%s'", self.collection_locator, to_pdh)
+                    if self.collection is not None:
+                        if self.collection.known_past_version(to_record_version):
+                            _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
                         else:
                             self.collection.update()
                     else:
@@ -768,7 +769,7 @@ class ProjectDirectory(Directory):
         # Acually move the entry from source directory to this directory.
         del src._entries[name_old]
         self._entries[name_new] = ent
-        llfuse.invalidate_entry(src.inode, name_old)
+        llfuse.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
 
 
 class SharedDirectory(Directory):
index d33f9f9e41655eede0e794027de9d8d4546248b4..4d472cff1cca38380d80afa63b9783027ad1db30 100644 (file)
@@ -46,7 +46,8 @@ class FuseArvadosFile(File):
         self.arvfile = arvfile
 
     def size(self):
-        return self.arvfile.size()
+        with llfuse.lock_released:
+            return self.arvfile.size()
 
     def readfrom(self, off, size, num_retries=0):
         with llfuse.lock_released: