6 from arvados.retry import retry_method
14 """split(path) -> streamname, filename
16 Separate the stream name and file name in a /-separated stream path.
17 If no stream name is available, assume '.'.
20 stream_name, file_name = path.rsplit('/', 1)
21 except ValueError: # No / in string
22 stream_name, file_name = '.', path
23 return stream_name, file_name
25 class ArvadosFileBase(object):
26 def __init__(self, name, mode):
32 def _before_close(orig_func):
33 @functools.wraps(orig_func)
34 def wrapper(self, *args, **kwargs):
36 raise ValueError("I/O operation on closed stream file")
37 return orig_func(self, *args, **kwargs)
43 def __exit__(self, exc_type, exc_value, traceback):
54 class ArvadosFileReaderBase(ArvadosFileBase):
55 class _NameAttribute(str):
56 # The Python file API provides a plain .name attribute.
57 # Older SDK provided a name() method.
58 # This class provides both, for maximum compatibility.
62 def __init__(self, name, mode, num_retries=None):
63 super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
65 self.num_retries = num_retries
66 self._readline_cache = (None, None)
70 data = self.readline()
75 def decompressed_name(self):
76 return re.sub('\.(bz2|gz)$', '', self.name)
78 @ArvadosFileBase._before_close
79 def seek(self, pos, whence=os.SEEK_CUR):
80 if whence == os.SEEK_CUR:
82 elif whence == os.SEEK_END:
84 self._filepos = min(max(pos, 0L), self.size())
89 @ArvadosFileBase._before_close
91 def readall(self, size=2**20, num_retries=None):
93 data = self.read(size, num_retries=num_retries)
98 @ArvadosFileBase._before_close
100 def readline(self, size=float('inf'), num_retries=None):
101 cache_pos, cache_data = self._readline_cache
102 if self.tell() == cache_pos:
106 data_size = len(data[-1])
107 while (data_size < size) and ('\n' not in data[-1]):
108 next_read = self.read(2 ** 20, num_retries=num_retries)
111 data.append(next_read)
112 data_size += len(next_read)
115 nextline_index = data.index('\n') + 1
117 nextline_index = len(data)
118 nextline_index = min(nextline_index, size)
119 self._readline_cache = (self.tell(), data[nextline_index:])
120 return data[:nextline_index]
122 @ArvadosFileBase._before_close
124 def decompress(self, decompress, size, num_retries=None):
125 for segment in self.readall(size, num_retries):
126 data = decompress(segment)
130 @ArvadosFileBase._before_close
132 def readall_decompressed(self, size=2**20, num_retries=None):
134 if self.name.endswith('.bz2'):
135 dc = bz2.BZ2Decompressor()
136 return self.decompress(dc.decompress, size,
137 num_retries=num_retries)
138 elif self.name.endswith('.gz'):
139 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
140 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
141 size, num_retries=num_retries)
143 return self.readall(size, num_retries=num_retries)
145 @ArvadosFileBase._before_close
147 def readlines(self, sizehint=float('inf'), num_retries=None):
150 for s in self.readall(num_retries=num_retries):
153 if data_size >= sizehint:
155 return ''.join(data).splitlines(True)
158 raise NotImplementedError()
160 def read(self, size, num_retries=None):
161 raise NotImplementedError()
163 def readfrom(self, start, size, num_retries=None):
164 raise NotImplementedError()
167 class StreamFileReader(ArvadosFileReaderBase):
168 def __init__(self, stream, segments, name):
169 super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
170 self._stream = stream
171 self.segments = segments
173 def stream_name(self):
174 return self._stream.name()
177 n = self.segments[-1]
178 return n.range_start + n.range_size
180 @ArvadosFileBase._before_close
182 def read(self, size, num_retries=None):
183 """Read up to 'size' bytes from the stream, starting at the current file position"""
188 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
190 lr = available_chunks[0]
191 data = self._stream._readfrom(lr.locator+lr.segment_offset,
193 num_retries=num_retries)
195 self._filepos += len(data)
198 @ArvadosFileBase._before_close
200 def readfrom(self, start, size, num_retries=None):
201 """Read up to 'size' bytes from the stream, starting at 'start'"""
206 for lr in locators_and_ranges(self.segments, start, size):
207 data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
208 num_retries=num_retries))
211 def as_manifest(self):
212 from stream import normalize_stream
214 for r in self.segments:
215 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
216 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
219 class BufferBlock(object):
221 A BufferBlock is a stand-in for a Keep block that is in the process of being
222 written. Writers can append to it, get the size, and compute the Keep locator.
224 There are three valid states:
226 WRITABLE - can append
228 PENDING - is in the process of being uploaded to Keep, append is an error
230 COMMITTED - the block has been written to Keep, its internal buffer has been
231 released, and the BufferBlock should be discarded in favor of fetching the
232 block through normal Keep means.
238 def __init__(self, blockid, starting_capacity):
240 blockid: the identifier for this block
241 starting_capacity: the initial buffer capacity
243 self.blockid = blockid
244 self.buffer_block = bytearray(starting_capacity)
245 self.buffer_view = memoryview(self.buffer_block)
246 self.write_pointer = 0
247 self.state = BufferBlock.WRITABLE
250 def append(self, data):
252 Append some data to the buffer. Only valid if the block is in WRITABLE
253 state. Implements an expanding buffer, doubling capacity as needed to
254 accomdate all the data.
256 if self.state == BufferBlock.WRITABLE:
257 while (self.write_pointer+len(data)) > len(self.buffer_block):
258 new_buffer_block = bytearray(len(self.buffer_block) * 2)
259 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
260 self.buffer_block = new_buffer_block
261 self.buffer_view = memoryview(self.buffer_block)
262 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
263 self.write_pointer += len(data)
266 raise AssertionError("Buffer block is not writable")
269 '''Amount of data written to the buffer'''
270 return self.write_pointer
273 '''The Keep locator for this buffer's contents.'''
274 if self._locator is None:
275 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
279 class AsyncKeepWriteErrors(Exception):
281 Roll up one or more Keep write exceptions (generated by background
282 threads) into a single one.
284 def __init__(self, errors):
288 return "\n".join(self.errors)
290 class BlockManager(object):
292 BlockManager handles buffer blocks, background block uploads, and
293 background block prefetch for a Collection of ArvadosFiles.
295 def __init__(self, keep):
296 '''keep: KeepClient object to use'''
298 self._bufferblocks = {}
299 self._put_queue = None
300 self._put_errors = None
301 self._put_threads = None
302 self._prefetch_queue = None
303 self._prefetch_threads = None
305 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14):
307 Allocate a new, empty bufferblock in WRITABLE state and return it.
308 blockid: optional block identifier, otherwise one will be automatically assigned
309 starting_capacity: optional capacity, otherwise will use default capacity
312 blockid = "bufferblock%i" % len(self._bufferblocks)
313 bb = BufferBlock(blockid, starting_capacity=starting_capacity)
314 self._bufferblocks[bb.blockid] = bb
317 def stop_threads(self):
319 Shut down and wait for background upload and download threads to finish.
321 if self._put_threads is not None:
322 for t in self._put_threads:
323 self._put_queue.put(None)
324 for t in self._put_threads:
326 self._put_threads = None
327 self._put_queue = None
328 self._put_errors = None
330 if self._prefetch_threads is not None:
331 for t in self._prefetch_threads:
332 self._prefetch_queue.put(None)
333 for t in self._prefetch_threads:
335 self._prefetch_threads = None
336 self._prefetch_queue = None
338 def commit_bufferblock(self, block):
340 Initiate a background upload of a bufferblock. This will block if the
341 upload queue is at capacity, otherwise it will return immediately.
346 Background uploader thread.
350 b = self._put_queue.get()
353 b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
354 b.state = BufferBlock.COMMITTED
356 b.buffer_block = None
357 except Exception as e:
359 self._put_errors.put(e)
361 if self._put_queue is not None:
362 self._put_queue.task_done()
364 if self._put_threads is None:
365 # Start uploader threads.
367 # If we don't limit the Queue size, the upload queue can quickly
368 # grow to take up gigabytes of RAM if the writing process is
369 # generating data more quickly than it can be send to the Keep
372 # With two upload threads and a queue size of 2, this means up to 4
373 # blocks pending. If they are full 64 MiB blocks, that means up to
374 # 256 MiB of internal buffering, which is the same size as the
375 # default download block cache in KeepClient.
376 self._put_queue = Queue.Queue(maxsize=2)
377 self._put_errors = Queue.Queue()
378 self._put_threads = [threading.Thread(target=worker, args=(self,)),
379 threading.Thread(target=worker, args=(self,))]
380 for t in self._put_threads:
384 # Mark the block as PENDING so to disallow any more appends.
385 block.state = BufferBlock.PENDING
386 self._put_queue.put(block)
388 def get_block(self, locator, num_retries, cache_only=False):
390 Fetch a block. First checks to see if the locator is a BufferBlock and
391 return that, if not, passes the request through to KeepClient.get().
393 if locator in self._bufferblocks:
394 bb = self._bufferblocks[locator]
395 if bb.state != BufferBlock.COMMITTED:
396 return bb.buffer_view[0:bb.write_pointer].tobytes()
398 locator = bb._locator
399 return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
401 def commit_all(self):
403 Commit all outstanding buffer blocks. Unlike commit_bufferblock(), this
404 is a synchronous call, and will not return until all buffer blocks are
405 uploaded. Raises AsyncKeepWriteErrors() if any blocks failed to
408 for k,v in self._bufferblocks.items():
409 if v.state == BufferBlock.WRITABLE:
410 self.commit_bufferblock(v)
411 if self._put_queue is not None:
412 self._put_queue.join()
413 if not self._put_errors.empty():
417 e.append(self._put_errors.get(False))
420 raise AsyncKeepWriteErrors(e)
422 def block_prefetch(self, locator):
424 Initiate a background download of a block. This assumes that the
425 underlying KeepClient implements a block cache, so repeated requests
426 for the same block will not result in repeated downloads (unless the
427 block is evicted from the cache.) This method does not block.
430 '''Background downloader thread.'''
433 b = self._prefetch_queue.get()
440 if locator in self._bufferblocks:
442 if self._prefetch_threads is None:
443 self._prefetch_queue = Queue.Queue()
444 self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
445 threading.Thread(target=worker, args=(self,))]
446 for t in self._prefetch_threads:
449 self._prefetch_queue.put(locator)
452 class ArvadosFile(object):
454 ArvadosFile manages the underlying representation of a file in Keep as a sequence of
455 segments spanning a set of blocks, and implements random read/write access.
458 def __init__(self, parent, stream=[], segments=[]):
460 stream: a list of Range objects representing a block stream
461 segments: a list of Range objects representing segments
464 self._modified = True
467 self.add_segment(stream, s.locator, s.range_size)
468 self._current_bblock = None
469 self.lock = threading.Lock()
472 '''Make a copy of this file.'''
473 # TODO: copy bufferblocks?
476 cp.parent = self.parent
478 cp.segments = [Range(r.locator, r.range_start, r.range_size, r.segment_offset) for r in self.segments]
481 def set_unmodified(self):
482 '''Clear the modified flag'''
483 self._modified = False
486 '''Test the modified flag'''
487 return self._modified
489 def truncate(self, size):
491 Adjust the size of the file. If "size" is less than the size of the file,
492 the file contents after "size" will be discarded. If "size" is greater
493 than the current size of the file, an IOError will be raised.
495 if size < self.size():
497 for r in self.segments:
498 range_end = r.range_start+r.range_size
499 if r.range_start >= size:
500 # segment is past the trucate size, all done
502 elif size < range_end:
503 nr = Range(r.locator, r.range_start, size - r.range_start)
504 nr.segment_offset = r.segment_offset
510 self.segments = new_segs
511 self._modified = True
512 elif size > self.size():
513 raise IOError("truncate() does not support extending the file size")
516 def readfrom(self, offset, size, num_retries):
518 read upto "size" bytes from the file starting at "offset".
520 if size == 0 or offset >= self.size():
524 for lr in locators_and_ranges(self.segments, offset, size + config.KEEP_BLOCK_SIZE):
525 self.parent._my_block_manager().block_prefetch(lr.locator)
527 for lr in locators_and_ranges(self.segments, offset, size):
528 d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
530 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
535 def _repack_writes(self):
537 Test if the buffer block has more data than is referenced by actual segments
538 (this happens when a buffered write over-writes a file range written in
539 a previous buffered write). Re-pack the buffer block for efficiency
540 and to avoid leaking information.
544 # Sum up the segments to get the total bytes of the file referencing
545 # into the buffer block.
546 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
547 write_total = sum([s.range_size for s in bufferblock_segs])
549 if write_total < self._current_bblock.size():
550 # There is more data in the buffer block than is actually accounted for by segments, so
551 # re-pack into a new buffer by copying over to a new buffer block.
552 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total)
553 for t in bufferblock_segs:
554 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
555 t.segment_offset = new_bb.size() - t.range_size
557 self._current_bblock = new_bb
559 def writeto(self, offset, data, num_retries):
561 Write "data" to the file starting at "offset". This will update
562 existing bytes and/or extend the size of the file as necessary.
567 if offset > self.size():
568 raise ArgumentError("Offset is past the end of the file")
570 if len(data) > config.KEEP_BLOCK_SIZE:
571 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
573 self._modified = True
575 if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
576 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock()
578 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
579 self._repack_writes()
580 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
581 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
582 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock()
584 self._current_bblock.append(data)
585 replace_range(self.segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
587 def add_segment(self, blocks, pos, size):
589 Add a segment to the end of the file, with "pos" and "offset" referencing a
590 section of the stream described by "blocks" (a list of Range objects)
592 self._modified = True
593 for lr in locators_and_ranges(blocks, pos, size):
594 last = self.segments[-1] if self.segments else Range(0, 0, 0)
595 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
596 self.segments.append(r)
599 '''Get the file size'''
601 n = self.segments[-1]
602 return n.range_start + n.range_size
607 class ArvadosFileReader(ArvadosFileReaderBase):
608 def __init__(self, arvadosfile, name, mode="r", num_retries=None):
609 super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
610 self.arvadosfile = arvadosfile.clone()
613 return self.arvadosfile.size()
615 @ArvadosFileBase._before_close
617 def read(self, size, num_retries=None):
618 """Read up to 'size' bytes from the stream, starting at the current file position"""
619 data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
620 self._filepos += len(data)
623 @ArvadosFileBase._before_close
625 def readfrom(self, offset, size, num_retries=None):
626 """Read up to 'size' bytes from the stream, starting at the current file position"""
627 return self.arvadosfile.readfrom(offset, size, num_retries)
633 class SynchronizedArvadosFile(object):
634 def __init__(self, arvadosfile):
635 self.arvadosfile = arvadosfile
640 def __getattr__(self, name):
641 with self.arvadosfile.lock:
642 return getattr(self.arvadosfile, name)
645 class ArvadosFileWriter(ArvadosFileReader):
646 def __init__(self, arvadosfile, name, mode, num_retries=None):
647 self.arvadosfile = SynchronizedArvadosFile(arvadosfile)
648 super(ArvadosFileWriter, self).__init__(self.arvadosfile, name, mode, num_retries=num_retries)
650 @ArvadosFileBase._before_close
652 def write(self, data, num_retries=None):
653 if self.mode[0] == "a":
654 self.arvadosfile.writeto(self.size(), data)
656 self.arvadosfile.writeto(self._filepos, data, num_retries)
657 self._filepos += len(data)
659 @ArvadosFileBase._before_close
661 def writelines(self, seq, num_retries=None):
665 def truncate(self, size=None):
668 self.arvadosfile.truncate(size)
669 if self._filepos > self.size():
670 self._filepos = self.size()