25 class StreamFileReader(object):
26 def __init__(self, stream, pos, size, name):
36 def decompressed_name(self):
37 return re.sub('\.(bz2|gz)$', '', self._name)
42 def stream_name(self):
43 return self._stream.name()
45 def read(self, size, **kwargs):
46 self._stream.seek(self._pos + self._filepos)
47 data = self._stream.read(min(size, self._size - self._filepos))
48 self._filepos += len(data)
51 def readall(self, size=2**20, **kwargs):
53 data = self.read(size, **kwargs)
61 def bunzip2(self, size):
62 decompressor = bz2.BZ2Decompressor()
63 for chunk in self.readall(size):
64 data = decompressor.decompress(chunk)
65 if data and data != '':
68 def gunzip(self, size):
69 decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
70 for chunk in self.readall(size):
71 data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
72 if data and data != '':
75 def readall_decompressed(self, size=2**20):
76 self._stream.seek(self._pos + self._filepos)
77 if re.search('\.bz2$', self._name):
78 return self.bunzip2(size)
79 elif re.search('\.gz$', self._name):
80 return self.gunzip(size)
82 return self.readall(size)
84 def readlines(self, decompress=True):
86 datasource = self.readall_decompressed()
88 self._stream.seek(self._pos + self._filepos)
89 datasource = self.readall()
91 for newdata in datasource:
95 eol = string.find(data, "\n", sol)
104 def as_manifest(self):
106 return ("%s %s 0:0:%s\n"
107 % (self._stream.name(), config.EMPTY_BLOCK_LOCATOR, self.name()))
108 return string.join(self._stream.tokens_for_range(self._pos, self._size),
111 class StreamReader(object):
112 def __init__(self, tokens):
113 self._tokens = tokens
114 self._current_datablock_data = None
115 self._current_datablock_pos = 0
116 self._current_datablock_index = -1
119 self._stream_name = None
120 self.data_locators = []
123 for tok in self._tokens:
124 if self._stream_name == None:
125 self._stream_name = tok.replace('\\040', ' ')
126 elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
127 self.data_locators += [tok]
128 elif re.search(r'^\d+:\d+:\S+', tok):
129 pos, size, name = tok.split(':',2)
130 self.files += [[int(pos), int(size), name.replace('\\040', ' ')]]
132 raise errors.SyntaxError("Invalid manifest format")
137 def tokens_for_range(self, range_start, range_size):
138 resp = [self._stream_name]
139 return_all_tokens = False
141 token_bytes_skipped = 0
142 for locator in self.data_locators:
143 sizehint = re.search(r'\+(\d+)', locator)
145 return_all_tokens = True
146 if return_all_tokens:
149 blocksize = int(sizehint.group(0))
150 if range_start + range_size <= block_start:
152 if range_start < block_start + blocksize:
155 token_bytes_skipped += blocksize
156 block_start += blocksize
158 if ((f[0] < range_start + range_size)
160 (f[0] + f[1] > range_start)
163 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
167 return self._stream_name
172 yield StreamFileReader(self, pos, size, name)
174 def nextdatablock(self):
175 if self._current_datablock_index < 0:
176 self._current_datablock_pos = 0
177 self._current_datablock_index = 0
179 self._current_datablock_pos += self.current_datablock_size()
180 self._current_datablock_index += 1
181 self._current_datablock_data = None
183 def current_datablock_data(self):
184 if self._current_datablock_data == None:
185 self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
186 return self._current_datablock_data
188 def current_datablock_size(self):
189 if self._current_datablock_index < 0:
191 sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
193 return int(sizehint.group(0))
194 return len(self.current_datablock_data())
197 """Set the position of the next read operation."""
200 def really_seek(self):
201 """Find and load the appropriate data block, so the byte at
204 if self._pos == self._current_datablock_pos:
206 if (self._current_datablock_pos != None and
207 self._pos >= self._current_datablock_pos and
208 self._pos <= self._current_datablock_pos + self.current_datablock_size()):
210 if self._pos < self._current_datablock_pos:
211 self._current_datablock_index = -1
213 while (self._pos > self._current_datablock_pos and
214 self._pos > self._current_datablock_pos + self.current_datablock_size()):
217 def read(self, size):
218 """Read no more than size bytes -- but at least one byte,
219 unless _pos is already at the end of the stream.
224 while self._pos >= self._current_datablock_pos + self.current_datablock_size():
226 if self._current_datablock_index >= len(self.data_locators):
228 data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
229 self._pos += len(data)