import logging
import os
import re
+import errno
from collections import deque
from stat import *
class Collection(CollectionBase):
- def __init__(self, manifest_locator_or_text=None, api_client=None,
+ def __init__(self, manifest_locator_or_text=None, parent=None, api_client=None,
keep_client=None, num_retries=0, block_manager=None):
+ self.parent = parent
self._items = None
self._api_client = api_client
self._keep_client = keep_client
+ self._block_manager = block_manager
+
self.num_retries = num_retries
self._manifest_locator = None
self._manifest_text = None
self._api_response = None
- if block_manager is None:
- self.block_manager = BlockManager(keep_client)
-
if manifest_locator_or_text:
if re.match(util.keep_locator_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
raise errors.ArgumentError(
"Argument to CollectionReader must be a manifest or a collection UUID")
+ def _my_api(self):
+ if self._api_client is None:
+ if self.parent is not None:
+ return self.parent._my_api()
+ self._api_client = arvados.api('v1')
+ self._keep_client = None # Make a new one with the new api.
+ return self._api_client
+
+ def _my_keep(self):
+ if self._keep_client is None:
+ if self.parent is not None:
+ return self.parent._my_keep()
+ self._keep_client = KeepClient(api_client=self._my_api(),
+ num_retries=self.num_retries)
+ return self._keep_client
+
+ def _my_block_manager(self):
+ if self._block_manager is None:
+ if self.parent is not None:
+ return self.parent._my_block_manager()
+ self._block_manager = BlockManager(self._my_keep())
+ return self._block_manager
+
def _populate_from_api_server(self):
# As in KeepClient itself, we must wait until the last
# possible moment to instantiate an API client, in order to
# clause, just like any other Collection lookup
# failure. Return an exception, or None if successful.
try:
- if self._api_client is None:
- self._api_client = arvados.api('v1')
- self._keep_client = None # Make a new one with the new api.
- self._api_response = self._api_client.collections().get(
+ self._api_response = self._my_api().collections().get(
uuid=self._manifest_locator).execute(
num_retries=self.num_retries)
self._manifest_text = self._api_response['manifest_text']
return self
def __exit__(self, exc_type, exc_value, traceback):
- self.save()
+ self.save(no_locator=True)
+ if self._block_manager is not None:
+ self._block_manager.stop_threads()
@_populate_first
- def find(self, path, create=False):
+ def find(self, path, create=False, create_collection=False):
p = path.split("/")
if p[0] == '.':
del p[0]
# item must be a file
if item is None and create:
# create new file
- item = ArvadosFile(self.block_manager, keep=self._keep_client)
+ if create_collection:
+ item = Collection(parent=self, num_retries=self.num_retries)
+ else:
+ item = ArvadosFile(self)
self._items[p[0]] = item
return item
else:
if item is None and create:
# create new collection
- item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries, block_manager=self.block_manager)
+ item = Collection(parent=self, num_retries=self.num_retries)
self._items[p[0]] = item
del p[0]
return item.find("/".join(p), create=create)
f = self.find(path, create=create)
if f is None:
- raise ArgumentError("File not found")
+ raise IOError((errno.ENOENT, "File not found"))
if not isinstance(f, ArvadosFile):
- raise ArgumentError("Path must refer to a file.")
+ raise IOError((errno.EISDIR, "Path must refer to a file."))
if mode[0] == "w":
f.truncate(0)
@_populate_first
def __iter__(self):
- self._items.iterkeys()
+ return self._items.iterkeys()
@_populate_first
def iterkeys(self):
- self._items.iterkeys()
+ return self._items.iterkeys()
@_populate_first
def __getitem__(self, k):
- r = self.find(k)
- if r:
- return r
- else:
- raise KeyError(k)
+ return self._items[k]
@_populate_first
def __contains__(self, k):
- return self.find(k) is not None
+ return k in self._items
@_populate_first
def __len__(self):
@_populate_first
def __delitem__(self, p):
- p = path.split("/")
- if p[0] == '.':
- del p[0]
-
- if len(p) > 0:
- item = self._items.get(p[0])
- if item is None:
- raise NotFoundError()
- if len(p) == 1:
- del self._items[p[0]]
- else:
- del p[0]
- del item["/".join(p)]
- else:
- raise NotFoundError()
+ del self._items[p]
@_populate_first
def keys(self):
def items(self):
return self._items.items()
+ @_populate_first
+ def exists(self, path):
+ return self.find(path) != None
+
+ @_populate_first
+ def remove(self, path):
+ p = path.split("/")
+ if p[0] == '.':
+ del p[0]
+
+ if len(p) > 0:
+ item = self._items.get(p[0])
+ if item is None:
+ raise IOError((errno.ENOENT, "File not found"))
+ if len(p) == 1:
+ del self._items[p[0]]
+ else:
+ del p[0]
+ item.remove("/".join(p))
+ else:
+ raise IOError((errno.ENOENT, "File not found"))
+
@_populate_first
def manifest_text(self, strip=False, normalize=False):
if self.modified() or self._manifest_text is None or normalize:
return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
@_populate_first
- def commit_bufferblocks(self):
- pass
-
- @_populate_first
- def save(self):
+ def save(self, no_locator=False):
if self.modified():
+ self._my_block_manager().commit_all()
self._my_keep().put(self.manifest_text(strip=True))
- if re.match(util.collection_uuid_pattern, self._manifest_locator):
- self._api_response = self._api_client.collections().update(
+ if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
+ self._api_response = self._my_api().collections().update(
uuid=self._manifest_locator,
body={'manifest_text': self.manifest_text(strip=False)}
).execute(
num_retries=self.num_retries)
- else:
+ elif not no_locator:
raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
self.set_unmodified()
@_populate_first
- def save_as(self, name, owner_uuid=None):
+ def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
+ self._my_block_manager().commit_all()
self._my_keep().put(self.manifest_text(strip=True))
body = {"manifest_text": self.manifest_text(strip=False),
"name": name}
if owner_uuid:
body["owner_uuid"] = owner_uuid
- self._api_response = self._api_client.collections().create(body=body).execute(num_retries=self.num_retries)
+ self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
self._manifest_locator = self._api_response["uuid"]
self.set_unmodified()
+ @_populate_first
+ def rename(self, old, new):
+ old_path, old_fn = os.path.split(old)
+ old_col = self.find(path)
+ if old_col is None:
+ raise IOError((errno.ENOENT, "File not found"))
+ if not isinstance(old_p, Collection):
+ raise IOError((errno.ENOTDIR, "Parent in path is a file, not a directory"))
+ if old_fn in old_col:
+ new_path, new_fn = os.path.split(new)
+ new_col = self.find(new_path, create=True, create_collection=True)
+ if not isinstance(new_col, Collection):
+ raise IOError((errno.ENOTDIR, "Destination is a file, not a directory"))
+ ent = old_col[old_fn]
+ del old_col[old_fn]
+ ent.parent = new_col
+ new_col[new_fn] = ent
+ else:
+ raise IOError((errno.ENOENT, "File not found"))
def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
if into_collection is not None:
for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
v = item[k]
st = []
- for s in v._segments:
+ for s in v.segments:
loc = s.locator
if loc.startswith("bufferblock"):
- loc = v.bbm._bufferblocks[loc].locator()
+ loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
+ if portable_locators:
+ loc = KeepLocator(loc).stripped()
st.append(LocatorAndRange(loc, locator_block_size(loc),
s.segment_offset, s.range_size))
stream[k] = st
- buf += ' '.join(normalize_stream(stream_name, stream))
- buf += "\n"
+ if stream:
+ buf += ' '.join(normalize_stream(stream_name, stream))
+ buf += "\n"
for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
- buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k))
+ buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
elif isinstance(item, ArvadosFile):
st = []
- for s in item._segments:
+ for s in item.segments:
loc = s.locator
if loc.startswith("bufferblock"):
loc = item._bufferblocks[loc].calculate_locator()
+ if portable_locators:
+ loc = KeepLocator(loc).stripped()
st.append(LocatorAndRange(loc, locator_block_size(loc),
s.segment_offset, s.range_size))
stream[stream_name] = st