-import gflags
-import httplib
-import httplib2
import logging
import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
-import threading
from collections import deque
from stat import *
+from .arvfile import ArvadosFileBase
from keep import *
-from stream import *
+from .stream import StreamReader, split
import config
import errors
import util
fout = f.replace(' ', '\\040')
for segment in stream[f]:
segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
- if current_span == None:
+ if current_span is None:
current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
else:
if segmentoffset == current_span[1]:
stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
- if current_span != None:
+ if current_span is not None:
stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
- if len(stream[f]) == 0:
+ if not stream[f]:
stream_tokens.append("0:0:{0}".format(fout))
return stream_tokens
class CollectionBase(object):
def __enter__(self):
- pass
+ return self
- def __exit__(self):
+ def __exit__(self, exc_type, exc_value, traceback):
pass
def _my_keep(self):
hints other than size hints) removed from the locators.
"""
raw = self.manifest_text()
- clean = ''
+ clean = []
for line in raw.split("\n"):
fields = line.split()
- if len(fields) > 0:
- locators = [ (re.sub(r'\+[^\d][^\+]*', '', x) if re.match(util.keep_locator_pattern, x) else x)
- for x in fields[1:-1] ]
- clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
- return clean
+ if fields:
+ clean_fields = fields[:1] + [
+ (re.sub(r'\+[^\d][^\+]*', '', x)
+ if re.match(util.keep_locator_pattern, x)
+ else x)
+ for x in fields[1:]]
+ clean += [' '.join(clean_fields), "\n"]
+ return ''.join(clean)
+
class CollectionReader(CollectionBase):
def __init__(self, manifest_locator_or_text, api_client=None,
return
error_via_api = None
error_via_keep = None
- should_try_keep = (not self._manifest_text and
+ should_try_keep = ((self._manifest_text is None) and
util.keep_locator_pattern.match(
self._manifest_locator))
- if (not self._manifest_text and
+ if ((self._manifest_text is None) and
util.signed_locator_pattern.match(self._manifest_locator)):
error_via_keep = self._populate_from_keep()
- if not self._manifest_text:
+ if self._manifest_text is None:
error_via_api = self._populate_from_api_server()
- if error_via_api != None and not should_try_keep:
+ if error_via_api is not None and not should_try_keep:
raise error_via_api
- if (not self._manifest_text and
+ if ((self._manifest_text is None) and
not error_via_keep and
should_try_keep):
# Looks like a keep locator, and we didn't already try keep above
error_via_keep = self._populate_from_keep()
- if not self._manifest_text:
+ if self._manifest_text is None:
# Nothing worked!
raise arvados.errors.NotFoundError(
("Failed to retrieve collection '{}' " +
streams = {}
for s in self.all_streams():
for f in s.all_files():
- filestream = s.name() + "/" + f.name()
- r = filestream.rindex("/")
- streamname = filestream[:r]
- filename = filestream[r+1:]
+ streamname, filename = split(s.name() + "/" + f.name())
if streamname not in streams:
streams[streamname] = {}
if filename not in streams[streamname]:
for r in f.segments:
streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
- self._streams = []
- sortedstreams = list(streams.keys())
- sortedstreams.sort()
- for s in sortedstreams:
- self._streams.append(normalize_stream(s, streams[s]))
+ self._streams = [normalize_stream(s, streams[s])
+ for s in sorted(streams)]
# Regenerate the manifest text based on the normalized streams
- self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
+ self._manifest_text = ''.join(
+ [StreamReader(stream, keep=self._my_keep()).manifest_text()
+ for stream in self._streams])
- return self
+ def open(self, streampath, filename=None):
+ """open(streampath[, filename]) -> file-like object
+
+ Pass in the path of a file to read from the Collection, either as a
+ single string or as two separate stream name and file name arguments.
+ This method returns a file-like object to read that file.
+ """
+ self._populate()
+ if filename is None:
+ streampath, filename = split(streampath)
+ keep_client = self._my_keep()
+ for stream_s in self._streams:
+ stream = StreamReader(stream_s, keep_client,
+ num_retries=self.num_retries)
+ if stream.name() == streampath:
+ break
+ else:
+ raise ValueError("stream '{}' not found in Collection".
+ format(streampath))
+ try:
+ return stream.files()[filename]
+ except KeyError:
+ raise ValueError("file '{}' not found in Collection stream '{}'".
+ format(filename, streampath))
def all_streams(self):
self._populate()
for f in s.all_files():
yield f
- def manifest_text(self, strip=False):
- if strip:
+ def manifest_text(self, strip=False, normalize=False):
+ if normalize:
+ cr = CollectionReader(self.manifest_text())
+ cr.normalize()
+ return cr.manifest_text(strip=strip, normalize=False)
+ elif strip:
return self.stripped_manifest()
else:
self._populate()
return self._manifest_text
+class _WriterFile(ArvadosFileBase):
+ def __init__(self, coll_writer, name):
+ super(_WriterFile, self).__init__(name, 'wb')
+ self.dest = coll_writer
+
+ def close(self):
+ super(_WriterFile, self).close()
+ self.dest.finish_current_file()
+
+ @ArvadosFileBase._before_close
+ def write(self, data):
+ self.dest.write(data)
+
+ @ArvadosFileBase._before_close
+ def writelines(self, seq):
+ for data in seq:
+ self.write(data)
+
+ @ArvadosFileBase._before_close
+ def flush(self):
+ self.dest.flush_data()
+
+
class CollectionWriter(CollectionBase):
KEEP_BLOCK_SIZE = 2**26
self._queued_file = None
self._queued_dirents = deque()
self._queued_trees = deque()
+ self._last_open = None
- def __exit__(self):
- self.finish()
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type is None:
+ self.finish()
def do_queued_work(self):
# The work queue consists of three pieces:
def _work_trees(self):
path, stream_name, max_manifest_depth = self._queued_trees[0]
- make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
- else os.listdir)
- d = make_dirents(path)
- if len(d) > 0:
+ d = util.listdir_recursive(
+ path, max_depth = (None if max_manifest_depth == 0 else 0))
+ if d:
self._queue_dirents(stream_name, d)
else:
self._queued_trees.popleft()
while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
self.flush_data()
+ def open(self, streampath, filename=None):
+ """open(streampath[, filename]) -> file-like object
+
+ Pass in the path of a file to write to the Collection, either as a
+ single string or as two separate stream name and file name arguments.
+ This method returns a file-like object you can write to add it to the
+ Collection.
+
+ You may only have one file object from the Collection open at a time,
+ so be sure to close the object when you're done. Using the object in
+ a with statement makes that easy::
+
+ with cwriter.open('./doc/page1.txt') as outfile:
+ outfile.write(page1_data)
+ with cwriter.open('./doc/page2.txt') as outfile:
+ outfile.write(page2_data)
+ """
+ if filename is None:
+ streampath, filename = split(streampath)
+ if self._last_open and not self._last_open.closed:
+ raise errors.AssertionError(
+ "can't open '{}' when '{}' is still open".format(
+ filename, self._last_open.name))
+ if streampath != self.current_stream_name():
+ self.start_new_stream(streampath)
+ self.set_current_file_name(filename)
+ self._last_open = _WriterFile(self, filename)
+ return self._last_open
+
def flush_data(self):
data_buffer = ''.join(self._data_buffer)
if data_buffer:
return self._current_file_name
def finish_current_file(self):
- if self._current_file_name == None:
+ if self._current_file_name is None:
if self._current_file_pos == self._current_stream_length:
return
raise errors.AssertionError(
manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
manifest += "\n"
- if manifest:
- return manifest
- else:
- return ""
+ return manifest
def data_locators(self):
ret = []