count. It's only a soft limit on memory usage but still a big improvement.
# segment is past the trucate size, all done
break
elif size < range_end:
- nr = Range(r.locator, r.range_start, size - r.range_start)
+ nr = Range(r.locator, r.range_start, size - r.range_start, 0)
nr.segment_offset = r.segment_offset
new_segs.append(nr)
break
"""Internal implementation of add_segment."""
self._modified = True
for lr in locators_and_ranges(blocks, pos, size):
- last = self._segments[-1] if self._segments else Range(0, 0, 0)
+ last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
self._segments.append(r)
block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
if block_locator:
blocksize = long(block_locator.group(1))
- blocks.append(Range(tok, streamoffset, blocksize))
+ blocks.append(Range(tok, streamoffset, blocksize, 0))
streamoffset += blocksize
else:
state = SEGMENTS
s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
if s:
blocksize = long(s.group(1))
- self._data_locators.append(Range(tok, streamoffset, blocksize))
+ self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
streamoffset += blocksize
continue
size = long(s.group(2))
name = s.group(3).replace('\\040', ' ')
if name not in self._files:
- self._files[name] = StreamFileReader(self, [Range(pos, 0, size)], name)
+ self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
else:
filereader = self._files[name]
filereader.segments.append(Range(pos, filereader.size(), size))
self._entries = collections.OrderedDict()
self._counter = itertools.count(1)
self.cap = cap
+ self._total = 0
+
+ def _remove(self, obj, clear):
+ if clear and not obj.clear():
+ _logger.warn("Could not clear %s in_use %s", obj, obj.in_use())
+ return False
+ self._total -= obj._cache_size
+ del self._entries[obj._cache_priority]
+ _logger.warn("Cleared %s total now %i", obj, self._total)
+ return True
def cap_cache(self):
- if len(self._entries) > self.cap:
- ent = iter(self._entries)
- ents = [next(ent) for i in xrange(0, len(self._entries) - self.cap)]
- for key in ents:
- capobj = self._entries[key]
- if capobj.clear():
- _logger.debug("Cleared %s", self._entries[key])
- del self._entries[key]
+ _logger.warn("total is %i cap is %i", self._total, self.cap)
+ if self._total > self.cap:
+ need_gc = False
+ for key in list(self._entries.keys()):
+ if self._total < self.cap or len(self._entries) < 4:
+ break
+ self._remove(self._entries[key], True)
+
def manage(self, obj):
- obj._cache_priority = next(self._counter)
- self._entries[obj._cache_priority] = obj
- _logger.debug("Managing %s", obj)
- self.cap_cache()
+ if obj.persisted():
+ obj._cache_priority = next(self._counter)
+ obj._cache_size = obj.objsize()
+ self._entries[obj._cache_priority] = obj
+ self._total += obj.objsize()
+ _logger.warn("Managing %s total now %i", obj, self._total)
+ self.cap_cache()
def touch(self, obj):
- if obj._cache_priority in self._entries:
- del self._entries[obj._cache_priority]
- self.manage(obj)
+ if obj.persisted():
+ if obj._cache_priority in self._entries:
+ self._remove(obj, False)
+ self.manage(obj)
+ _logger.warn("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
def unmanage(self, obj):
- if obj._cache_priority in self._entries:
- if obj.clear():
- _logger.debug("Cleared %s", obj)
- del self._entries[obj._cache_priority]
-
+ if obj.persisted() and obj._cache_priority in self._entries:
+ self._remove(obj, True)
class Inodes(object):
"""Manage the set of inodes. This is the mapping from a numeric id
to a concrete File or Directory object"""
- def __init__(self, inode_cache=1000):
+ def __init__(self, inode_cache=256*1024*1024):
self._entries = {}
self._counter = itertools.count(llfuse.ROOT_INODE)
self._obj_cache = InodeCache(cap=inode_cache)
import time
import ciso8601
import calendar
+import functools
def convertTime(t):
"""Parse Arvados timestamp to unix time."""
except (TypeError, ValueError):
return 0
+def use_counter(orig_func):
+ @functools.wraps(orig_func)
+ def use_counter_wrapper(self, *args, **kwargs):
+ try:
+ self.inc_use()
+ return orig_func(self, *args, **kwargs)
+ finally:
+ self.dec_use()
+ return use_counter_wrapper
+
class FreshBase(object):
"""Base class for maintaining fresh/stale state to determine when to update."""
def __init__(self):
self._last_update = time.time()
self._atime = time.time()
self._poll_time = 60
+ self.use_count = 0
# Mark the value as stale
def invalidate(self):
def atime(self):
return self._atime
+
+ def persisted(self):
+ return False
+
+ def clear(self, force=False):
+ pass
+
+ def in_use(self):
+ return self.use_count > 0
+
+ def inc_use(self):
+ self.use_count += 1
+
+ def dec_use(self):
+ self.use_count -= 1
+
+ def objsize(self):
+ return 0
import functools
from fusefile import StringFile, StreamReaderFile, ObjectFile
-from fresh import FreshBase, convertTime
+from fresh import FreshBase, convertTime, use_counter
from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
else:
return _disallowed_filename_characters.sub('_', dirty)
-def use_counter(orig_func):
- @functools.wraps(orig_func)
- def use_counter_wrapper(self, *args, **kwargs):
- try:
- self.inc_use()
- return orig_func(self, *args, **kwargs)
- finally:
- self.dec_use()
- return use_counter_wrapper
-
class Directory(FreshBase):
"""Generic directory object, backed by a dict.
self.inodes = inodes
self._entries = {}
self._mtime = time.time()
- self.use_count = 0
# Overriden by subclasses to implement logic to update the entries dict
# when the directory is stale
def size(self):
return 0
- def in_use(self):
- return self.use_count > 0
-
- def inc_use(self):
- self.use_count += 1
-
- def dec_use(self):
- self.use_count -= 1
+ def persisted(self):
+ return False
def checkupdate(self):
if self.stale():
oldentries = self._entries
self._entries = {}
for n in oldentries:
- if isinstance(n, Directory):
- if not n.clear(force):
- self._entries = oldentries
- return False
+ if not oldentries[n].clear(force):
+ self._entries = oldentries
+ return False
for n in oldentries:
- if isinstance(n, Directory):
- llfuse.invalidate_entry(self.inode, str(n))
- self.inodes.del_entry(oldentries[n])
+ llfuse.invalidate_entry(self.inode, str(n))
+ self.inodes.del_entry(oldentries[n])
llfuse.invalidate_inode(self.inode)
self.invalidate()
return True
else:
self.collection_locator = collection
self._mtime = 0
+ self._manifest_size = 0
def same(self, i):
return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
self.update()
def new_collection(self, new_collection_object, coll_reader):
+ self.clear(force=True)
+
self.collection_object = new_collection_object
self._mtime = convertTime(self.collection_object.get('modified_at'))
if self.collection_object_file is not None:
self.collection_object_file.update(self.collection_object)
- self.clear(force=True)
for s in coll_reader.all_streams():
cwd = self
for part in s.name().split('/'):
partname = sanitize_filename(part)
if partname not in cwd._entries:
cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
- # (hack until using new API)
- cwd._entries[partname].inc_use()
- # end hack
cwd = cwd._entries[partname]
for k, v in s.files().items():
cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
self.new_collection(new_collection_object, coll_reader)
+ self._manifest_size = len(coll_reader.manifest_text())
+ _logger.debug("%s manifest_size %i", self, self._manifest_size)
+
self.fresh()
return True
except arvados.errors.NotFoundError:
return super(CollectionDirectory, self).__contains__(k)
def invalidate(self):
- super(CollectionDirectory, self).invalidate()
self.collection_object = None
+ self.collection_object_file = None
+ super(CollectionDirectory, self).invalidate()
- def clear(self, force=False):
- if self.collection_locator is None:
- return False
- else:
- return super(CollectionDirectory, self).clear(force)
+ def persisted(self):
+ return (self.collection_locator is not None)
+ def objsize(self):
+ return self._manifest_size * 128
class MagicDirectory(Directory):
"""A special directory that logically contains the set of all extant keep locators.
else:
return super(ProjectDirectory, self).__contains__(k)
+ def persisted(self):
+ return False
+
+ def objsize(self):
+ return len(self.project_object) * 1024 if self.project_object else 0
class SharedDirectory(Directory):
"""A special directory that represents users or groups who have shared projects with me."""
def mtime(self):
return self._mtime
- def clear(self):
- pass
+ def clear(self, force=False):
+ return True
- def inc_use(self):
- pass
-
- def dec_use(self):
- pass
class StreamReaderFile(File):
"""Wraps a StreamFileReader as a file."""
parser.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
parser.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
- parser.add_argument('--inode-cache', type=int, help="Inode cache size", default=1024)
+ parser.add_argument('--inode-cache', type=int, help="Inode cache size (default 128MiB)", default=128*1024*1024)
parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
dest="exec_args", metavar=('command', 'args', '...', '--'),