_logger = logging.getLogger('arvados.collection')
class CollectionBase(object):
+ """Abstract base class for Collection classes."""
+
def __enter__(self):
return self
class CollectionWriter(CollectionBase):
+ """Deprecated, use Collection instead."""
+
def __init__(self, api_client=None, num_retries=0, replication=None):
"""Instantiate a CollectionWriter.
class ResumableCollectionWriter(CollectionWriter):
+ """Deprecated, use Collection instead."""
+
STATE_PROPS = ['_current_stream_files', '_current_stream_length',
'_current_stream_locators', '_current_stream_name',
'_current_file_name', '_current_file_pos', '_close_file',
@must_be_writable
@synchronized
@retry_method
- def save(self, merge=True, num_retries=None):
+ def save(self, storage_classes=None, merge=True, num_retries=None):
"""Save collection to an existing collection record.
Commit pending buffer blocks to Keep, merge with remote record (if
the API server. If you want to save a manifest to Keep only, see
`save_new()`.
+ :storage_classes:
+ Specify desirable storage classes to be used when writing data to Keep.
+
:merge:
Update and merge remote changes before saving. Otherwise, any
remote changes will be ignored and overwritten.
Retry count on API calls (if None, use the collection default)
"""
+ if storage_classes and type(storage_classes) is not list:
+ raise errors.ArgumentError("storage_classes must be list type.")
+
if not self.committed():
if not self._has_collection_uuid():
raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
self.update()
text = self.manifest_text(strip=False)
+ body={'manifest_text': text}
+ if storage_classes:
+ body["storage_classes_desired"] = storage_classes
+
self._remember_api_response(self._my_api().collections().update(
uuid=self._manifest_locator,
- body={'manifest_text': text}
+ body=body
).execute(
num_retries=num_retries))
self._manifest_text = self._api_response["manifest_text"]
self._portable_data_hash = self._api_response["portable_data_hash"]
self.set_committed(True)
+ elif storage_classes:
+ self._remember_api_response(self._my_api().collections().update(
+ uuid=self._manifest_locator,
+ body={"storage_classes_desired": storage_classes}
+ ).execute(
+ num_retries=num_retries))
return self._manifest_text
def save_new(self, name=None,
create_collection_record=True,
owner_uuid=None,
+ storage_classes=None,
ensure_unique_name=False,
num_retries=None):
"""Save collection to a new collection record.
the user, or project uuid that will own this collection.
If None, defaults to the current user.
+ :storage_classes:
+ Specify desirable storage classes to be used when writing data to Keep.
+
:ensure_unique_name:
If True, ask the API server to rename the collection
if it conflicts with a collection with the same name and owner. If
"replication_desired": self.replication_desired}
if owner_uuid:
body["owner_uuid"] = owner_uuid
+ if storage_classes:
+ if type(storage_classes) is not list:
+ raise errors.ArgumentError("storage_classes must be list type.")
+ body["storage_classes_desired"] = storage_classes
self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
text = self._api_response["manifest_text"]
return text
+ _token_re = re.compile(r'(\S+)(\s+|$)')
+ _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
+ _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+
@synchronized
def _import_manifest(self, manifest_text):
"""Import a manifest into a `Collection`.
stream_name = None
state = STREAM_NAME
- for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+ for token_and_separator in self._token_re.finditer(manifest_text):
tok = token_and_separator.group(1)
sep = token_and_separator.group(2)
continue
if state == BLOCKS:
- block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+ block_locator = self._block_re.match(tok)
if block_locator:
blocksize = int(block_locator.group(1))
blocks.append(Range(tok, streamoffset, blocksize, 0))
state = SEGMENTS
if state == SEGMENTS:
- file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+ file_segment = self._segment_re.match(tok)
if file_segment:
pos = int(file_segment.group(1))
size = int(file_segment.group(2))