collection to conflict with itself). Also fix filename character encoding.
for lr in readsegs:
block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
- data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ blockview = memoryview(block)
+ data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
else:
break
return ''.join(data)
self._manifest_locator = None
self._manifest_text = None
self._api_response = None
+ self._past_versions = set()
self.lock = threading.RLock()
self.events = None
def writable(self):
return True
+ @synchronized
+ def known_past_version(self, modified_at_and_portable_data_hash):
+ return modified_at_and_portable_data_hash in self._past_versions
+
@synchronized
@retry_method
def update(self, other=None, num_retries=None):
if self._manifest_locator is None:
raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+ if self.known_past_version((response["modified_at"], response["portable_data_hash"])):
+ # We've merged this record this before. Don't do anything.
+ return
+ else:
+ self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
other = CollectionReader(response["manifest_text"])
baseline = CollectionReader(self._manifest_text)
self.apply(baseline.diff(other))
self._block_manager = _BlockManager(self._my_keep())
return self._block_manager
+ def _remember_api_response(self, response):
+ self._api_response = response
+ self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
+
def _populate_from_api_server(self):
# As in KeepClient itself, we must wait until the last
# possible moment to instantiate an API client, in order to
# clause, just like any other Collection lookup
# failure. Return an exception, or None if successful.
try:
- self._api_response = self._my_api().collections().get(
+ self._remember_api_response(self._my_api().collections().get(
uuid=self._manifest_locator).execute(
- num_retries=self.num_retries)
+ num_retries=self.num_retries))
self._manifest_text = self._api_response['manifest_text']
return None
except Exception as e:
self.update()
text = self.manifest_text(strip=False)
- self._api_response = self._my_api().collections().update(
+ self._remember_api_response(self._my_api().collections().update(
uuid=self._manifest_locator,
body={'manifest_text': text}
).execute(
- num_retries=num_retries)
+ num_retries=num_retries))
self._manifest_text = self._api_response["manifest_text"]
self.set_committed()
if create_collection_record:
if name is None:
- name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+ name = "New collection"
ensure_unique_name = True
body = {"manifest_text": text,
if owner_uuid:
body["owner_uuid"] = owner_uuid
- self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+ self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
text = self._api_response["manifest_text"]
self._manifest_locator = self._api_response["uuid"]
"""This is a subdirectory within a collection that doesn't have its own API
server record.
- It falls under the umbrella of the root collection.
+ Subcollection locking falls under the umbrella lock of its root collection.
"""
Testing and Development
-----------------------
+Debian packages you need to build llfuse:
+
+$ apt-get install python-dev pkg-config libfuse-dev libattr1-dev
+
This package is one part of the Arvados source package, and it has
integration tests to check interoperability with other Arvados
components. Our `hacking guide
"""Manage the set of inodes. This is the mapping from a numeric id
to a concrete File or Directory object"""
- def __init__(self, inode_cache):
+ def __init__(self, inode_cache, encoding="utf-8"):
self._entries = {}
self._counter = itertools.count(llfuse.ROOT_INODE)
self.inode_cache = inode_cache
+ self.encoding = encoding
def __getitem__(self, item):
return self._entries[item]
if not inode_cache:
inode_cache = InodeCache(cap=256*1024*1024)
- self.inodes = Inodes(inode_cache)
+ self.inodes = Inodes(inode_cache, encoding=encoding)
self.uid = uid
self.gid = gid
- self.encoding = encoding
# dict of inode to filehandle
self._filehandles = {}
[["event_type", "in", ["create", "update", "delete"]]],
self.on_event)
+ @catch_exceptions
def on_event(self, ev):
if 'event_type' in ev:
with llfuse.lock:
if item is not None:
item.invalidate()
if ev["object_kind"] == "arvados#collection":
- item.update(to_pdh=ev.get("properties", {}).get("new_attributes", {}).get("portable_data_hash"))
+ new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
+ record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
+ item.update(to_record_version=record_version)
else:
item.update()
- oldowner = ev.get("properties", {}).get("old_attributes", {}).get("owner_uuid")
+ oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
olditemparent = self.inodes.inode_cache.find(oldowner)
if olditemparent is not None:
olditemparent.invalidate()
entry = llfuse.EntryAttributes()
entry.st_ino = inode
entry.generation = 0
- entry.entry_timeout = 300
- entry.attr_timeout = 300
+ entry.entry_timeout = 60
+ entry.attr_timeout = 60
entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
if isinstance(e, Directory):
entry.st_size = e.size()
entry.st_blksize = 512
- entry.st_blocks = (e.size()/512)+1
+ entry.st_blocks = (entry.st_size/512)+1
entry.st_atime = int(e.atime())
entry.st_mtime = int(e.mtime())
entry.st_ctime = int(e.mtime())
@catch_exceptions
def lookup(self, parent_inode, name):
- name = unicode(name, self.encoding)
+ name = unicode(name, self.inodes.encoding)
inode = None
if name == '.':
@catch_exceptions
def read(self, fh, off, size):
_logger.debug("arv-mount read %i %i %i", fh, off, size)
+
if fh in self._filehandles:
handle = self._filehandles[fh]
else:
e = off
while e < len(handle.entries):
if handle.entries[e][1].inode in self.inodes:
- try:
- yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
- except UnicodeEncodeError:
- pass
+ yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
e += 1
@catch_exceptions
# delete any other directory entries that were not in found in 'items'
for i in oldentries:
- _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
- llfuse.invalidate_entry(self.inode, str(i))
+ _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
+ llfuse.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
self.inodes.del_entry(oldentries[i])
changed = True
self._entries = oldentries
return False
for n in oldentries:
- llfuse.invalidate_entry(self.inode, str(n))
+ llfuse.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
self.inodes.del_entry(oldentries[n])
llfuse.invalidate_inode(self.inode)
self.invalidate()
def on_event(self, event, collection, name, item):
if collection == self.collection:
- _logger.debug("%s %s %s %s", event, collection, name, item)
+ name = sanitize_filename(name)
+ _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
with llfuse.lock:
if event == arvados.collection.ADD:
self.new_entry(name, item, self.mtime())
elif event == arvados.collection.DEL:
ent = self._entries[name]
del self._entries[name]
- llfuse.invalidate_entry(self.inode, name)
+ llfuse.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
self.inodes.del_entry(ent)
elif event == arvados.collection.MOD:
if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
return self.collection_locator
@use_counter
- def update(self, to_pdh=None):
+ def update(self, to_record_version=None):
try:
if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
return True
return
_logger.debug("Updating %s", self.collection_locator)
- if self.collection:
- if self.collection.portable_data_hash() == to_pdh:
- _logger.debug("%s is fresh at pdh '%s'", self.collection_locator, to_pdh)
+ if self.collection is not None:
+ if self.collection.known_past_version(to_record_version):
+ _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
else:
self.collection.update()
else:
# Acually move the entry from source directory to this directory.
del src._entries[name_old]
self._entries[name_new] = ent
- llfuse.invalidate_entry(src.inode, name_old)
+ llfuse.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
class SharedDirectory(Directory):
self.arvfile = arvfile
def size(self):
- return self.arvfile.size()
+ with llfuse.lock_released:
+ return self.arvfile.size()
def readfrom(self, off, size, num_retries=0):
with llfuse.lock_released: