super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
self._filepos = 0L
self.num_retries = num_retries
- self.need_lock = False
self._readline_cache = (None, None)
def __iter__(self):
pos += self._filepos
elif whence == os.SEEK_END:
pos += self.size()
- self._filepos = min(max(pos, 0L), self._size())
+ self._filepos = min(max(pos, 0L), self.size())
def tell(self):
return self._filepos
- def size(self):
- return self._size()
-
@ArvadosFileBase._before_close
@retry_method
def readall(self, size=2**20, num_retries=None):
class StreamFileReader(ArvadosFileReaderBase):
def __init__(self, stream, segments, name):
- super(StreamFileReader, self).__init__(name, 'rb')
+ super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
self._stream = stream
self.segments = segments
- self.num_retries = stream.num_retries
- self._filepos = 0L
- self.num_retries = stream.num_retries
- self._readline_cache = (None, None)
def stream_name(self):
return self._stream.name()
- def _size(self):
+ def size(self):
n = self.segments[-1]
return n.range_start + n.range_size
return ''
data = []
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
- data.append(self._stream._readfrom(locator+segmentoffset, segmentsize,
+ for lr in locators_and_ranges(self.segments, start, size):
+ data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
num_retries=num_retries))
return ''.join(data)
def as_manifest(self):
- manifest_text = ['.']
- manifest_text.extend([d.locator for d in self._stream._data_locators])
- manifest_text.extend(["{}:{}:{}".format(seg.locator, seg.range_size, self.name().replace(' ', '\\040')) for seg in self.segments])
- return manifest_text #arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
+ from stream import normalize_stream
+ segs = []
+ for r in self.segments:
+ segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
+ return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
-class ArvadosFile(ArvadosFileReaderBase):
- def __init__(self, name, mode, stream, segments):
- super(ArvadosFile, self).__init__(name, mode)
+class ArvadosFile(object):
+ def __init__(self, stream, segments):
+ # TODO: build segments list
self.segments = []
- def truncate(self, size=None):
- if size is None:
- size = self._filepos
-
- segs = locators_and_ranges(self.segments, 0, size)
+ def truncate(self, size):
+ pass
+ # TODO: fixme
+
+ # segs = locators_and_ranges(self.segments, 0, size)
+
+ # newstream = []
+ # self.segments = []
+ # streamoffset = 0L
+ # fileoffset = 0L
+
+ # for seg in segs:
+ # for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg.locator+seg.range_start, seg[SEGMENTSIZE]):
+ # newstream.append([locator, blocksize, streamoffset])
+ # self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
+ # streamoffset += blocksize
+ # fileoffset += segmentsize
+ # if len(newstream) == 0:
+ # newstream.append(config.EMPTY_BLOCK_LOCATOR)
+ # self.segments.append([0, 0, 0])
+ # self._stream._data_locators = newstream
+ # if self._filepos > fileoffset:
+ # self._filepos = fileoffset
+
+ def readfrom(self, offset, data):
+ pass
- newstream = []
- self.segments = []
- streamoffset = 0L
- fileoffset = 0L
-
- for seg in segs:
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg.locator+seg.range_start, seg[SEGMENTSIZE]):
- newstream.append([locator, blocksize, streamoffset])
- self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
- streamoffset += blocksize
- fileoffset += segmentsize
- if len(newstream) == 0:
- newstream.append(config.EMPTY_BLOCK_LOCATOR)
- self.segments.append([0, 0, 0])
- self._stream._data_locators = newstream
- if self._filepos > fileoffset:
- self._filepos = fileoffset
-
- def _writeto(self, offset, data):
+ def writeto(self, offset, data):
if offset > self._size():
raise ArgumentError("Offset is past the end of the file")
- self._stream._append(data)
- replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
-
- def writeto(self, offset, data):
- self._writeto(offset, data)
-
- def write(self, data):
- self._writeto(self._filepos, data)
- self._filepos += len(data)
-
- def writelines(self, seq):
- for s in seq:
- self._writeto(self._filepos, s)
- self._filepos += len(s)
+ # TODO: fixme
+ # self._stream._append(data)
+ # replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
def flush(self):
pass
r.block_size = lr.block_size
r.segment_offset = lr.segment_offset
self.segments.append(r)
+
+
+class ArvadosFileReader(ArvadosFileReaderBase):
+ def __init__(self, arvadosfile, name, mode='rb'):
+ super(ArvadosFileReader, self).__init__(name)
+ self.arvadosfile = arvadosfile
+
+ def size(self):
+ n = self.segments[-1]
+ return n.range_start + n.range_size
+
+ @ArvadosFileBase._before_close
+ @retry_method
+ def read(self, size, num_retries=None):
+ """Read up to 'size' bytes from the stream, starting at the current file position"""
+ if size == 0:
+ return ''
+
+ data = self.arvadosfile.readfrom(self._filepos, size)
+ self._filepos += len(data)
+ return data
+
+
+class ArvadosFileWriter(ArvadosFileReader):
+ def __init__(self, arvadosfile, name):
+ super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode='wb')
+
+ def write(self, data):
+ self.arvadosfile.writeto(self._filepos, data)
+ self._filepos += len(data)
+
+ def writelines(self, seq):
+ for s in seq:
+ self.write(s)
+
+ def truncate(self, size=None):
+ if size is None:
+ size = self._filepos
+ self.arvadosfile.truncate(size)