import errors
import util
+_logger = logging.getLogger('arvados.collection')
+
def normalize_stream(s, stream):
stream_tokens = [s]
sortedfiles = list(stream.keys())
blocks[b[arvados.LOCATOR]] = streamoffset
streamoffset += b[arvados.BLOCKSIZE]
+ if len(stream_tokens) == 1:
+ stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
+
for f in sortedfiles:
current_span = None
fout = f.replace(' ', '\\040')
class CollectionReader(object):
def __init__(self, manifest_locator_or_text):
- if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
+ if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
self._manifest_text = None
- elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
+ elif re.match(r'(\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
self._manifest_text = manifest_locator_or_text
self._manifest_locator = None
else:
uuid=self._manifest_locator).execute()
self._manifest_text = c['manifest_text']
except Exception as e:
- logging.warning("API lookup failed for collection %s (%s: %s)" %
- (self._manifest_locator, type(e), str(e)))
+ _logger.warning("API lookup failed for collection %s (%s: %s)",
+ self._manifest_locator, type(e), str(e))
self._manifest_text = Keep.get(self._manifest_locator)
self._streams = []
for stream_line in self._manifest_text.split("\n"):
for f in s.all_files():
yield f
- def manifest_text(self):
+ def manifest_text(self, strip=False):
self._populate()
- return self._manifest_text
+ if strip:
+ m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
+ return m
+ else:
+ return self._manifest_text
class CollectionWriter(object):
KEEP_BLOCK_SIZE = 2**26
def __exit__(self):
self.finish()
- def _do_queued_work(self):
+ def do_queued_work(self):
# The work queue consists of three pieces:
# * _queued_file: The file object we're currently writing to the
# Collection.
self._work_trees()
else:
break
- self.checkpoint_state()
-
- def checkpoint_state(self):
- # Subclasses can implement this method to, e.g., report or record state.
- pass
def _work_file(self):
while True:
path, stream_name, max_manifest_depth = self._queued_trees[0]
make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
else os.listdir)
- self._queue_dirents(stream_name, make_dirents(path))
+ d = make_dirents(path)
+ if len(d) > 0:
+ self._queue_dirents(stream_name, d)
+ else:
+ self._queued_trees.popleft()
def _queue_file(self, source, filename=None):
assert (self._queued_file is None), "tried to queue more than one file"
def write_file(self, source, filename=None):
self._queue_file(source, filename)
- self._do_queued_work()
+ self.do_queued_work()
def write_directory_tree(self,
path, stream_name='.', max_manifest_depth=-1):
self._queue_tree(path, stream_name, max_manifest_depth)
- self._do_queued_work()
+ self.do_queued_work()
def write(self, newdata):
if hasattr(newdata, '__iter__'):
self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
- self.checkpoint_state()
def start_new_file(self, newfilename=None):
self.finish_current_file()
self._current_file_name = None
def finish(self):
- return Keep.put(self.manifest_text())
+ # Send the stripped manifest to Keep, to ensure that we use the
+ # same UUID regardless of what hints are used on the collection.
+ return Keep.put(self.stripped_manifest())
+
+ def stripped_manifest(self):
+ """
+ Return the manifest for the current collection with all permission
+ hints removed from the locators in the manifest.
+ """
+ raw = self.manifest_text()
+ clean = ''
+ for line in raw.split("\n"):
+ fields = line.split()
+ if len(fields) > 0:
+ locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
+ for x in fields[1:-1] ]
+ clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
+ return clean
def manifest_text(self):
self.finish_current_stream()
manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
manifest += "\n"
- #print 'writer',manifest
- #print 'after reader',CollectionReader(manifest).manifest_text()
-
- return CollectionReader(manifest).manifest_text()
+ if len(manifest) > 0:
+ return CollectionReader(manifest).manifest_text()
+ else:
+ return ""
def data_locators(self):
ret = []
@classmethod
def from_state(cls, state, *init_args, **init_kwargs):
+ # Try to build a new writer from scratch with the given state.
+ # If the state is not suitable to resume (because files have changed,
+ # been deleted, aren't predictable, etc.), raise a
+ # StaleWriterStateError. Otherwise, return the initialized writer.
+ # The caller is responsible for calling writer.do_queued_work()
+ # appropriately after it's returned.
writer = cls(*init_args, **init_kwargs)
for attr_name in cls.STATE_PROPS:
attr_value = state[attr_name]
except IOError as error:
raise errors.StaleWriterStateError(
"failed to reopen active file {}: {}".format(path, error))
- writer.preresume_hook()
- writer._do_queued_work()
return writer
- def preresume_hook(self):
- pass # Subclasses can override this as desired.
-
def check_dependencies(self):
for path, orig_stat in self._dependencies.items():
if not S_ISREG(orig_stat[ST_MODE]):