from ._ranges import locators_and_ranges, replace_range, Range
from .retry import retry_method
+MOD = "mod"
+
def split(path):
"""split(path) -> streamname, filename
"""
- def __init__(self, parent, stream=[], segments=[]):
+ def __init__(self, parent, name, stream=[], segments=[]):
"""
ArvadosFile constructor.
for s in segments:
self._add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
+ self.name = name
def writable(self):
return self.parent.writable()
return copy.copy(self._segments)
@synchronized
- def clone(self, new_parent):
+ def clone(self, new_parent, new_name):
"""Make a copy of this file."""
- cp = ArvadosFile(new_parent)
+ cp = ArvadosFile(new_parent, new_name)
cp.replace_contents(self)
return cp
if self._current_bblock:
self._repack_writes()
self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+ self.parent.notify(MOD, self.parent, self.name, (self, self))
@must_be_writable
@synchronized
"""
- def __init__(self, arvadosfile, name, mode="r", num_retries=None):
- super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
+ def __init__(self, arvadosfile, mode="r", num_retries=None):
+ super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
self.arvadosfile = arvadosfile
def size(self):
"""
- def __init__(self, arvadosfile, name, mode, num_retries=None):
- super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
+ def __init__(self, arvadosfile, mode, num_retries=None):
+ super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
@_FileLikeObjectBase._before_close
@retry_method
def __init__(self, parent=None):
self.parent = parent
self._modified = True
+ self._callback = None
self._items = {}
def _my_api(self):
if item is None:
# create new file
if create_type == COLLECTION:
- item = Subcollection(self)
+ item = Subcollection(self, pathcomponents[0])
else:
- item = ArvadosFile(self)
+ item = ArvadosFile(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
self._modified = True
self.notify(ADD, self, pathcomponents[0], item)
else:
if item is None:
# create new collection
- item = Subcollection(self)
+ item = Subcollection(self, pathcomponents[0])
self._items[pathcomponents[0]] = item
self._modified = True
self.notify(ADD, self, pathcomponents[0], item)
name = os.path.basename(path)
if mode == "r":
- return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
+ return ArvadosFileReader(arvfile, mode, num_retries=self.num_retries)
else:
- return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
+ return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
@synchronized
def modified(self):
def _clonefrom(self, source):
for k,v in source.items():
- self._items[k] = v.clone(self)
+ self._items[k] = v.clone(self, k)
def clone(self):
raise NotImplementedError()
modified_from = self[target_name]
# Actually make the copy.
- dup = source_obj.clone(self)
+ dup = source_obj.clone(self, target_name)
self._items[target_name] = dup
self._modified = True
holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
for k in self:
if k not in end_collection:
- changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
+ changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
for k in end_collection:
if k in self:
if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
elif end_collection[k] != self[k]:
- changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
+ changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
else:
- changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
+ changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
return changes
@must_be_writable
stripped = self.manifest_text(strip=True)
return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+ @synchronized
+ def subscribe(self, callback):
+ if self._callback is None:
+ self._callback = callback
+ else:
+ raise errors.ArgumentError("A callback is already set on this collection.")
+
+ @synchronized
+ def unsubscribe(self):
+ if self._callback is not None:
+ self._callback = None
+
+ @synchronized
+ def notify(self, event, collection, name, item):
+ if self._callback:
+ self._callback(event, collection, name, item)
+ self.root_collection().notify(event, collection, name, item)
+
@synchronized
def __eq__(self, other):
if other is self:
self._api_response = None
self.lock = threading.RLock()
- self.callbacks = []
self.events = None
if manifest_locator_or_text:
return self._manifest_locator
@synchronized
- def clone(self, new_parent=None, readonly=False, new_config=None):
+ def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
if new_config is None:
new_config = self._config
if readonly:
self._manifest_text = text
self.set_unmodified()
- @synchronized
- def subscribe(self, callback):
- self.callbacks.append(callback)
-
- @synchronized
- def unsubscribe(self, callback):
- self.callbacks.remove(callback)
-
- @synchronized
- def notify(self, event, collection, name, item):
- for c in self.callbacks:
- c(event, collection, name, item)
-
@synchronized
def _import_manifest(self, manifest_text):
"""Import a manifest into a `Collection`.
self.set_unmodified()
+ @synchronized
+ def notify(self, event, collection, name, item):
+ if self._callback:
+ self._callback(event, collection, name, item)
+
class Subcollection(RichCollectionBase):
"""This is a subdirectory within a collection that doesn't have its own API
"""
- def __init__(self, parent):
+ def __init__(self, parent, name):
super(Subcollection, self).__init__(parent)
self.lock = self.root_collection().lock
self._manifest_text = None
+ self.name = name
def root_collection(self):
return self.parent.root_collection()
def _my_block_manager(self):
return self.root_collection()._my_block_manager()
- def notify(self, event, collection, name, item):
- return self.root_collection().notify(event, collection, name, item)
-
def stream_name(self):
- for k, v in self.parent.items():
- if v is self:
- return os.path.join(self.parent.stream_name(), k)
- return '.'
+ return os.path.join(self.parent.stream_name(), self.name)
@synchronized
- def clone(self, new_parent):
- c = Subcollection(new_parent)
+ def clone(self, new_parent, new_name):
+ c = Subcollection(new_parent, new_name)
c._clonefrom(self)
return c
blocks[loc] = d
stream.append(Range(loc, n, len(d)))
n += len(d)
- af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)])
- return ArvadosFileReader(af, "count.txt")
+ af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), "count.txt", stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)])
+ return ArvadosFileReader(af)
def test_read_returns_first_block(self):
# read() calls will be aligned on block boundaries - see #3663.
blockmanager = arvados.arvfile._BlockManager(self.keep_client())
blockmanager.prefetch_enabled = False
col = Collection(keep_client=self.keep_client(), block_manager=blockmanager)
- af = ArvadosFile(col,
+ af = ArvadosFile(col, "test",
stream=stream,
segments=segments)
- return ArvadosFileReader(af, "test", **kwargs)
+ return ArvadosFileReader(af, **kwargs)
def read_for_test(self, reader, byte_count, **kwargs):
return reader.read(byte_count, **kwargs)