import logging
import os
import re
+import errno
from collections import deque
from stat import *
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
from keep import *
from .stream import StreamReader, normalize_stream, locator_block_size
from .ranges import Range, LocatorAndRange
class Collection(CollectionBase):
- def __init__(self, manifest_locator_or_text=None, api_client=None,
- keep_client=None, num_retries=0):
+ def __init__(self, parent=None, manifest_locator_or_text=None, api_client=None,
+ keep_client=None, num_retries=0, block_manager=None):
+ self._parent = parent
self._items = None
self._api_client = api_client
self._keep_client = keep_client
+ self._block_manager = block_manager
+
self.num_retries = num_retries
self._manifest_locator = None
self._manifest_text = None
raise errors.ArgumentError(
"Argument to CollectionReader must be a manifest or a collection UUID")
+ def _my_api(self):
+ if self._api_client is None:
+ if self._parent is not None:
+ return self._parent._my_api()
+ self._api_client = arvados.api('v1')
+ self._keep_client = None # Make a new one with the new api.
+ return self._api_client
+
+ def _my_keep(self):
+ if self._keep_client is None:
+ if self._parent is not None:
+ return self._parent._my_keep()
+ self._keep_client = KeepClient(api_client=self._my_api(),
+ num_retries=self.num_retries)
+ return self._keep_client
+
+ def _my_block_manager(self):
+ if self._block_manager is None:
+ if self._parent is not None:
+ return self._parent._my_block_manager()
+ self._block_manager = BlockManager(self._my_keep())
+ return self._block_manager
+
def _populate_from_api_server(self):
# As in KeepClient itself, we must wait until the last
# possible moment to instantiate an API client, in order to
# clause, just like any other Collection lookup
# failure. Return an exception, or None if successful.
try:
- if self._api_client is None:
- self._api_client = arvados.api('v1')
- self._keep_client = None # Make a new one with the new api.
- self._api_response = self._api_client.collections().get(
+ self._api_response = self._my_api().collections().get(
uuid=self._manifest_locator).execute(
num_retries=self.num_retries)
self._manifest_text = self._api_response['manifest_text']
return self
def __exit__(self, exc_type, exc_value, traceback):
- self.save()
+ self.save(no_locator=True)
+ if self._block_manager is not None:
+ self._block_manager.stop_threads()
@_populate_first
def find(self, path, create=False):
# item must be a file
if item is None and create:
# create new file
- item = ArvadosFile(keep=self._keep_client)
+ item = ArvadosFile(self, keep=self._keep_client)
self._items[p[0]] = item
return item
else:
if item is None and create:
# create new collection
- item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries)
+ item = Collection(parent=self, num_retries=self.num_retries)
self._items[p[0]] = item
del p[0]
return item.find("/".join(p), create=create)
f = self.find(path, create=create)
if f is None:
- raise ArgumentError("File not found")
+ raise IOError((errno.ENOENT, "File not found"))
if not isinstance(f, ArvadosFile):
- raise ArgumentError("Path must refer to a file.")
+ raise IOError((errno.EISDIR, "Path must refer to a file."))
if mode[0] == "w":
f.truncate(0)
@_populate_first
def __iter__(self):
- for k in self._items.keys():
- yield k
+ self._items.iterkeys()
+
+ @_populate_first
+ def iterkeys(self):
+ self._items.iterkeys()
@_populate_first
def __getitem__(self, k):
- r = self.find(k)
- if r:
- return r
- else:
- raise KeyError(k)
+ return self._items[k]
@_populate_first
def __contains__(self, k):
- return self.find(k) is not None
+ return k in self._items
@_populate_first
def __len__(self):
@_populate_first
def __delitem__(self, p):
+ del self._items[p]
+
+ @_populate_first
+ def keys(self):
+ return self._items.keys()
+
+ @_populate_first
+ def values(self):
+ return self._items.values()
+
+ @_populate_first
+ def items(self):
+ return self._items.items()
+
+ @_populate_first
+ def exists(self, path):
+ return self.find(path) != None
+
+ @_populate_first
+ def remove(self, path):
p = path.split("/")
if p[0] == '.':
del p[0]
if len(p) > 0:
item = self._items.get(p[0])
if item is None:
- raise NotFoundError()
+ raise IOError((errno.ENOENT, "File not found"))
if len(p) == 1:
del self._items[p[0]]
else:
del p[0]
- del item["/".join(p)]
+ item.remove("/".join(p))
else:
- raise NotFoundError()
+ raise IOError((errno.ENOENT, "File not found"))
@_populate_first
- def keys(self):
- return self._items.keys()
+ def manifest_text(self, strip=False, normalize=False):
+ if self.modified() or self._manifest_text is None or normalize:
+ return export_manifest(self, stream_name=".", portable_locators=strip)
+ else:
+ if strip:
+ return self.stripped_manifest()
+ else:
+ return self._manifest_text
- @_populate_first
- def values(self):
- return self._items.values()
+ def portable_data_hash(self):
+ stripped = self.manifest_text(strip=True)
+ return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
@_populate_first
- def items(self):
- return self._items.items()
+ def save(self, no_locator=False):
+ if self.modified():
+ self._my_block_manager().commit_all()
+ self._my_keep().put(self.manifest_text(strip=True))
+ if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
+ self._api_response = self._my_api().collections().update(
+ uuid=self._manifest_locator,
+ body={'manifest_text': self.manifest_text(strip=False)}
+ ).execute(
+ num_retries=self.num_retries)
+ elif not no_locator:
+ raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
+ self.set_unmodified()
@_populate_first
- def save(self):
- self._my_keep().put(self.portable_manifest_text())
-
+ def save_as(self, name, owner_uuid=None):
+ self._my_block_manager().commit_all()
+ self._my_keep().put(self.manifest_text(strip=True))
+ body = {"manifest_text": self.manifest_text(strip=False),
+ "name": name}
+ if owner_uuid:
+ body["owner_uuid"] = owner_uuid
+ self._api_response = self._my_api().collections().create(body=body).execute(num_retries=self.num_retries)
+ self._manifest_locator = self._api_response["uuid"]
+ self.set_unmodified()
def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
for s in v._segments:
loc = s.locator
if loc.startswith("bufferblock"):
- loc = v._bufferblocks[loc].calculate_locator()
+ loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
st.append(LocatorAndRange(loc, locator_block_size(loc),
s.segment_offset, s.range_size))
stream[k] = st
- buf += ' '.join(normalize_stream(stream_name, stream))
- buf += "\n"
+ if stream:
+ buf += ' '.join(normalize_stream(stream_name, stream))
+ buf += "\n"
for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k))
elif isinstance(item, ArvadosFile):