X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6038a018b758e1a4babc5669df50622cd470df2f..23f0fc06dbb6d7e82d820a8c65997f32c760f34e:/sdk/python/arvados/stream.py diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py index 0d0caee267..edfb7711b8 100644 --- a/sdk/python/arvados/stream.py +++ b/sdk/python/arvados/stream.py @@ -1,230 +1,107 @@ -import gflags -import httplib -import httplib2 -import logging +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import print_function +from __future__ import absolute_import +from future.utils import listvalues +from builtins import object +import collections +import hashlib import os -import pprint -import sys -import types -import subprocess -import json -import UserDict import re -import hashlib -import string -import bz2 -import zlib -import fcntl -import time import threading +import functools +import copy -from keep import * -import config -import errors - -class StreamFileReader(object): - def __init__(self, stream, pos, size, name): - self._stream = stream - self._pos = pos - self._size = size - self._name = name - self._filepos = 0 - - def name(self): - return self._name - - def decompressed_name(self): - return re.sub('\.(bz2|gz)$', '', self._name) - - def size(self): - return self._size - - def stream_name(self): - return self._stream.name() - - def read(self, size, **kwargs): - self._stream.seek(self._pos + self._filepos) - data = self._stream.read(min(size, self._size - self._filepos)) - self._filepos += len(data) - return data - - def readall(self, size=2**20, **kwargs): - while True: - data = self.read(size, **kwargs) - if data == '': - break - yield data - - def seek(self, pos): - self._filepos = pos - - def bunzip2(self, size): - decompressor = bz2.BZ2Decompressor() - for chunk in self.readall(size): - data = decompressor.decompress(chunk) - if data and data != '': - yield data - - def gunzip(self, size): - decompressor = zlib.decompressobj(16+zlib.MAX_WBITS) - for chunk in self.readall(size): - data = decompressor.decompress(decompressor.unconsumed_tail + chunk) - if data and data != '': - yield data - - def readall_decompressed(self, size=2**20): - self._stream.seek(self._pos + self._filepos) - if re.search('\.bz2$', self._name): - return self.bunzip2(size) - elif re.search('\.gz$', self._name): - return self.gunzip(size) - else: - return self.readall(size) - - def readlines(self, decompress=True): - if decompress: - datasource = self.readall_decompressed() - else: - self._stream.seek(self._pos + self._filepos) - datasource = self.readall() - data = '' - for newdata in datasource: - data += newdata - sol = 0 - while True: - eol = string.find(data, "\n", sol) - if eol < 0: - break - yield data[sol:eol+1] - sol = eol+1 - data = data[sol:] - if data != '': - yield data - - def as_manifest(self): - if self.size() == 0: - return ("%s %s 0:0:%s\n" - % (self._stream.name(), config.EMPTY_BLOCK_LOCATOR, self.name())) - return string.join(self._stream.tokens_for_range(self._pos, self._size), - " ") + "\n" +from ._ranges import locators_and_ranges, Range +from .arvfile import StreamFileReader +from arvados.retry import retry_method +from arvados.keep import * +from . import config +from . import errors +from ._normalize_stream import normalize_stream class StreamReader(object): - def __init__(self, tokens): - self._tokens = tokens - self._current_datablock_data = None - self._current_datablock_pos = 0 - self._current_datablock_index = -1 - self._pos = 0 - + def __init__(self, tokens, keep=None, debug=False, _empty=False, + num_retries=0): self._stream_name = None - self.data_locators = [] - self.files = [] + self._data_locators = [] + self._files = collections.OrderedDict() + self._keep = keep + self.num_retries = num_retries + + streamoffset = 0 - for tok in self._tokens: - if self._stream_name == None: + # parse stream + for tok in tokens: + if debug: print('tok', tok) + if self._stream_name is None: self._stream_name = tok.replace('\\040', ' ') - elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok): - self.data_locators += [tok] - elif re.search(r'^\d+:\d+:\S+', tok): - pos, size, name = tok.split(':',2) - self.files += [[int(pos), int(size), name.replace('\\040', ' ')]] - else: - raise errors.SyntaxError("Invalid manifest format") - - def tokens(self): - return self._tokens - - def tokens_for_range(self, range_start, range_size): - resp = [self._stream_name] - return_all_tokens = False - block_start = 0 - token_bytes_skipped = 0 - for locator in self.data_locators: - sizehint = re.search(r'\+(\d+)', locator) - if not sizehint: - return_all_tokens = True - if return_all_tokens: - resp += [locator] - next - blocksize = int(sizehint.group(0)) - if range_start + range_size <= block_start: - break - if range_start < block_start + blocksize: - resp += [locator] - else: - token_bytes_skipped += blocksize - block_start += blocksize - for f in self.files: - if ((f[0] < range_start + range_size) - and - (f[0] + f[1] > range_start) - and - f[1] > 0): - resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])] - return resp + continue + + s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok) + if s: + blocksize = int(s.group(1)) + self._data_locators.append(Range(tok, streamoffset, blocksize, 0)) + streamoffset += blocksize + continue + + s = re.search(r'^(\d+):(\d+):(\S+)', tok) + if s: + pos = int(s.group(1)) + size = int(s.group(2)) + name = s.group(3).replace('\\040', ' ') + if name not in self._files: + self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name) + else: + filereader = self._files[name] + filereader.segments.append(Range(pos, filereader.size(), size)) + continue + + raise errors.SyntaxError("Invalid manifest format") def name(self): return self._stream_name + def files(self): + return self._files + def all_files(self): - for f in self.files: - pos, size, name = f - yield StreamFileReader(self, pos, size, name) - - def nextdatablock(self): - if self._current_datablock_index < 0: - self._current_datablock_pos = 0 - self._current_datablock_index = 0 - else: - self._current_datablock_pos += self.current_datablock_size() - self._current_datablock_index += 1 - self._current_datablock_data = None - - def current_datablock_data(self): - if self._current_datablock_data == None: - self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index]) - return self._current_datablock_data - - def current_datablock_size(self): - if self._current_datablock_index < 0: - self.nextdatablock() - sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index]) - if sizehint: - return int(sizehint.group(0)) - return len(self.current_datablock_data()) - - def seek(self, pos): - """Set the position of the next read operation.""" - self._pos = pos - - def really_seek(self): - """Find and load the appropriate data block, so the byte at - _pos is in memory. - """ - if self._pos == self._current_datablock_pos: - return True - if (self._current_datablock_pos != None and - self._pos >= self._current_datablock_pos and - self._pos <= self._current_datablock_pos + self.current_datablock_size()): - return True - if self._pos < self._current_datablock_pos: - self._current_datablock_index = -1 - self.nextdatablock() - while (self._pos > self._current_datablock_pos and - self._pos > self._current_datablock_pos + self.current_datablock_size()): - self.nextdatablock() - - def read(self, size): - """Read no more than size bytes -- but at least one byte, - unless _pos is already at the end of the stream. - """ + return listvalues(self._files) + + def size(self): + n = self._data_locators[-1] + return n.range_start + n.range_size + + def locators_and_ranges(self, range_start, range_size): + return locators_and_ranges(self._data_locators, range_start, range_size) + + @retry_method + def _keepget(self, locator, num_retries=None): + return self._keep.get(locator, num_retries=num_retries) + + @retry_method + def readfrom(self, start, size, num_retries=None): + """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: - return '' - self.really_seek() - while self._pos >= self._current_datablock_pos + self.current_datablock_size(): - self.nextdatablock() - if self._current_datablock_index >= len(self.data_locators): - return None - data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size] - self._pos += len(data) - return data + return b'' + if self._keep is None: + self._keep = KeepClient(num_retries=self.num_retries) + data = [] + for lr in locators_and_ranges(self._data_locators, start, size): + data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) + return b''.join(data) + + def manifest_text(self, strip=False): + manifest_text = [self.name().replace(' ', '\\040')] + if strip: + for d in self._data_locators: + m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator) + manifest_text.append(m.group(0)) + else: + manifest_text.extend([d.locator for d in self._data_locators]) + manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040')) + for seg in f.segments]) + for f in listvalues(self._files)]) + return ' '.join(manifest_text) + '\n'