6 from arvados.retry import retry_method
16 """split(path) -> streamname, filename
18 Separate the stream name and file name in a /-separated stream path.
19 If no stream name is available, assume '.'.
22 stream_name, file_name = path.rsplit('/', 1)
23 except ValueError: # No / in string
24 stream_name, file_name = '.', path
25 return stream_name, file_name
27 class ArvadosFileBase(object):
28 def __init__(self, name, mode):
34 def _before_close(orig_func):
35 @functools.wraps(orig_func)
36 def wrapper(self, *args, **kwargs):
38 raise ValueError("I/O operation on closed stream file")
39 return orig_func(self, *args, **kwargs)
45 def __exit__(self, exc_type, exc_value, traceback):
56 class ArvadosFileReaderBase(ArvadosFileBase):
57 class _NameAttribute(str):
58 # The Python file API provides a plain .name attribute.
59 # Older SDK provided a name() method.
60 # This class provides both, for maximum compatibility.
64 def __init__(self, name, mode, num_retries=None):
65 super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
67 self.num_retries = num_retries
68 self._readline_cache = (None, None)
72 data = self.readline()
77 def decompressed_name(self):
78 return re.sub('\.(bz2|gz)$', '', self.name)
80 @ArvadosFileBase._before_close
81 def seek(self, pos, whence=os.SEEK_CUR):
82 if whence == os.SEEK_CUR:
84 elif whence == os.SEEK_END:
86 self._filepos = min(max(pos, 0L), self.size())
91 @ArvadosFileBase._before_close
93 def readall(self, size=2**20, num_retries=None):
95 data = self.read(size, num_retries=num_retries)
100 @ArvadosFileBase._before_close
102 def readline(self, size=float('inf'), num_retries=None):
103 cache_pos, cache_data = self._readline_cache
104 if self.tell() == cache_pos:
108 data_size = len(data[-1])
109 while (data_size < size) and ('\n' not in data[-1]):
110 next_read = self.read(2 ** 20, num_retries=num_retries)
113 data.append(next_read)
114 data_size += len(next_read)
117 nextline_index = data.index('\n') + 1
119 nextline_index = len(data)
120 nextline_index = min(nextline_index, size)
121 self._readline_cache = (self.tell(), data[nextline_index:])
122 return data[:nextline_index]
124 @ArvadosFileBase._before_close
126 def decompress(self, decompress, size, num_retries=None):
127 for segment in self.readall(size, num_retries):
128 data = decompress(segment)
132 @ArvadosFileBase._before_close
134 def readall_decompressed(self, size=2**20, num_retries=None):
136 if self.name.endswith('.bz2'):
137 dc = bz2.BZ2Decompressor()
138 return self.decompress(dc.decompress, size,
139 num_retries=num_retries)
140 elif self.name.endswith('.gz'):
141 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
142 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
143 size, num_retries=num_retries)
145 return self.readall(size, num_retries=num_retries)
147 @ArvadosFileBase._before_close
149 def readlines(self, sizehint=float('inf'), num_retries=None):
152 for s in self.readall(num_retries=num_retries):
155 if data_size >= sizehint:
157 return ''.join(data).splitlines(True)
160 raise NotImplementedError()
162 def read(self, size, num_retries=None):
163 raise NotImplementedError()
165 def readfrom(self, start, size, num_retries=None):
166 raise NotImplementedError()
169 class StreamFileReader(ArvadosFileReaderBase):
170 def __init__(self, stream, segments, name):
171 super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
172 self._stream = stream
173 self.segments = segments
175 def stream_name(self):
176 return self._stream.name()
179 n = self.segments[-1]
180 return n.range_start + n.range_size
182 @ArvadosFileBase._before_close
184 def read(self, size, num_retries=None):
185 """Read up to 'size' bytes from the stream, starting at the current file position"""
190 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
192 lr = available_chunks[0]
193 data = self._stream._readfrom(lr.locator+lr.segment_offset,
195 num_retries=num_retries)
197 self._filepos += len(data)
200 @ArvadosFileBase._before_close
202 def readfrom(self, start, size, num_retries=None):
203 """Read up to 'size' bytes from the stream, starting at 'start'"""
208 for lr in locators_and_ranges(self.segments, start, size):
209 data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
210 num_retries=num_retries))
213 def as_manifest(self):
214 from stream import normalize_stream
216 for r in self.segments:
217 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
218 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
221 class BufferBlock(object):
223 A BufferBlock is a stand-in for a Keep block that is in the process of being
224 written. Writers can append to it, get the size, and compute the Keep locator.
226 There are three valid states:
232 Block is in the process of being uploaded to Keep, append is an error.
235 The block has been written to Keep, its internal buffer has been
236 released, fetching the block will fetch it via keep client (since we
237 discarded the internal copy), and identifiers referring to the BufferBlock
238 can be replaced with the block locator.
244 def __init__(self, blockid, starting_capacity, owner):
247 the identifier for this block
250 the initial buffer capacity
253 ArvadosFile that owns this block
255 self.blockid = blockid
256 self.buffer_block = bytearray(starting_capacity)
257 self.buffer_view = memoryview(self.buffer_block)
258 self.write_pointer = 0
259 self.state = BufferBlock.WRITABLE
263 def append(self, data):
265 Append some data to the buffer. Only valid if the block is in WRITABLE
266 state. Implements an expanding buffer, doubling capacity as needed to
267 accomdate all the data.
269 if self.state == BufferBlock.WRITABLE:
270 while (self.write_pointer+len(data)) > len(self.buffer_block):
271 new_buffer_block = bytearray(len(self.buffer_block) * 2)
272 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
273 self.buffer_block = new_buffer_block
274 self.buffer_view = memoryview(self.buffer_block)
275 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
276 self.write_pointer += len(data)
279 raise AssertionError("Buffer block is not writable")
282 """Amount of data written to the buffer"""
283 return self.write_pointer
286 """The Keep locator for this buffer's contents."""
287 if self._locator is None:
288 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
292 class AsyncKeepWriteErrors(Exception):
294 Roll up one or more Keep write exceptions (generated by background
295 threads) into a single one.
297 def __init__(self, errors):
301 return "\n".join(self.errors)
303 def _synchronized(orig_func):
304 @functools.wraps(orig_func)
305 def wrapper(self, *args, **kwargs):
307 return orig_func(self, *args, **kwargs)
310 class NoopLock(object):
314 def __exit__(self, exc_type, exc_value, traceback):
317 def acquire(self, blocking=False):
327 def _must_be_writable(orig_func):
328 # Decorator for methods that read actual Collection data.
329 @functools.wraps(orig_func)
330 def wrapper(self, *args, **kwargs):
331 if self.sync_mode() == SYNC_READONLY:
332 raise IOError((errno.EROFS, "Collection is read only"))
333 return orig_func(self, *args, **kwargs)
337 class BlockManager(object):
339 BlockManager handles buffer blocks, background block uploads, and
340 background block prefetch for a Collection of ArvadosFiles.
342 def __init__(self, keep):
343 """keep: KeepClient object to use"""
345 self._bufferblocks = {}
346 self._put_queue = None
347 self._put_errors = None
348 self._put_threads = None
349 self._prefetch_queue = None
350 self._prefetch_threads = None
351 self.lock = threading.Lock()
352 self.prefetch_enabled = True
353 self.num_put_threads = 2
354 self.num_get_threads = 2
357 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
359 Allocate a new, empty bufferblock in WRITABLE state and return it.
362 optional block identifier, otherwise one will be automatically assigned
365 optional capacity, otherwise will use default capacity
368 ArvadosFile that owns this block
371 blockid = "bufferblock%i" % len(self._bufferblocks)
372 bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
373 self._bufferblocks[bb.blockid] = bb
377 def dup_block(self, blockid, owner):
379 Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
382 the block to copy. May be an existing buffer block id.
385 ArvadosFile that owns the new block
387 new_blockid = "bufferblock%i" % len(self._bufferblocks)
388 block = self._bufferblocks[blockid]
389 bb = BufferBlock(new_blockid, len(block), owner)
391 self._bufferblocks[bb.blockid] = bb
395 def is_bufferblock(self, id):
396 return id in self._bufferblocks
399 def stop_threads(self):
401 Shut down and wait for background upload and download threads to finish.
403 if self._put_threads is not None:
404 for t in self._put_threads:
405 self._put_queue.put(None)
406 for t in self._put_threads:
408 self._put_threads = None
409 self._put_queue = None
410 self._put_errors = None
412 if self._prefetch_threads is not None:
413 for t in self._prefetch_threads:
414 self._prefetch_queue.put(None)
415 for t in self._prefetch_threads:
417 self._prefetch_threads = None
418 self._prefetch_queue = None
420 def commit_bufferblock(self, block):
422 Initiate a background upload of a bufferblock. This will block if the
423 upload queue is at capacity, otherwise it will return immediately.
428 Background uploader thread.
432 b = self._put_queue.get()
435 b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
436 b.state = BufferBlock.COMMITTED
438 b.buffer_block = None
439 except Exception as e:
441 self._put_errors.put(e)
443 if self._put_queue is not None:
444 self._put_queue.task_done()
447 if self._put_threads is None:
448 # Start uploader threads.
450 # If we don't limit the Queue size, the upload queue can quickly
451 # grow to take up gigabytes of RAM if the writing process is
452 # generating data more quickly than it can be send to the Keep
455 # With two upload threads and a queue size of 2, this means up to 4
456 # blocks pending. If they are full 64 MiB blocks, that means up to
457 # 256 MiB of internal buffering, which is the same size as the
458 # default download block cache in KeepClient.
459 self._put_queue = Queue.Queue(maxsize=2)
460 self._put_errors = Queue.Queue()
462 self._put_threads = []
463 for i in xrange(0, self.num_put_threads):
464 t = threading.Thread(target=worker, args=(self,))
465 self._put_threads.append(t)
469 # Mark the block as PENDING so to disallow any more appends.
470 block.state = BufferBlock.PENDING
471 self._put_queue.put(block)
473 def get_block(self, locator, num_retries, cache_only=False):
475 Fetch a block. First checks to see if the locator is a BufferBlock and
476 return that, if not, passes the request through to KeepClient.get().
479 if locator in self._bufferblocks:
480 bb = self._bufferblocks[locator]
481 if bb.state != BufferBlock.COMMITTED:
482 return bb.buffer_view[0:bb.write_pointer].tobytes()
484 locator = bb._locator
485 return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
487 def commit_all(self):
489 Commit all outstanding buffer blocks. Unlike commit_bufferblock(), this
490 is a synchronous call, and will not return until all buffer blocks are
491 uploaded. Raises AsyncKeepWriteErrors() if any blocks failed to
495 items = self._bufferblocks.items()
498 if v.state == BufferBlock.WRITABLE:
499 self.commit_bufferblock(v)
502 if self._put_queue is not None:
503 self._put_queue.join()
504 if not self._put_errors.empty():
508 e.append(self._put_errors.get(False))
511 raise AsyncKeepWriteErrors(e)
513 def block_prefetch(self, locator):
515 Initiate a background download of a block. This assumes that the
516 underlying KeepClient implements a block cache, so repeated requests
517 for the same block will not result in repeated downloads (unless the
518 block is evicted from the cache.) This method does not block.
521 if not self.prefetch_enabled:
525 """Background downloader thread."""
528 b = self._prefetch_queue.get()
536 if locator in self._bufferblocks:
538 if self._prefetch_threads is None:
539 self._prefetch_queue = Queue.Queue()
540 self._prefetch_threads = []
541 for i in xrange(0, self.num_get_threads):
542 t = threading.Thread(target=worker, args=(self,))
543 self._prefetch_threads.append(t)
546 self._prefetch_queue.put(locator)
549 class ArvadosFile(object):
551 ArvadosFile manages the underlying representation of a file in Keep as a sequence of
552 segments spanning a set of blocks, and implements random read/write access.
555 def __init__(self, parent, stream=[], segments=[]):
558 a list of Range objects representing a block stream
561 a list of Range objects representing segments
564 self._modified = True
567 self._add_segment(stream, s.locator, s.range_size)
568 self._current_bblock = None
569 if parent.sync_mode() == SYNC_READONLY:
570 self.lock = NoopLock()
572 self.lock = threading.Lock()
575 return self.parent.sync_mode()
579 return copy.copy(self._segments)
582 def clone(self, new_parent):
583 """Make a copy of this file."""
584 cp = ArvadosFile(new_parent)
588 for r in self._segments:
590 if self.parent._my_block_manager().is_bufferblock(r.locator):
591 if r.locator not in map_loc:
592 map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid
593 new_loc = map_loc[r.locator]
595 cp._segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
600 def __eq__(self, other):
601 if type(other) != ArvadosFile:
603 return self._segments == other.segments()
605 def __neq__(self, other):
606 return not self.__eq__(other)
609 def set_unmodified(self):
610 """Clear the modified flag"""
611 self._modified = False
615 """Test the modified flag"""
616 return self._modified
620 def truncate(self, size):
622 Adjust the size of the file. If `size` is less than the size of the file,
623 the file contents after `size` will be discarded. If `size` is greater
624 than the current size of the file, an IOError will be raised.
626 if size < self._size():
628 for r in self._segments:
629 range_end = r.range_start+r.range_size
630 if r.range_start >= size:
631 # segment is past the trucate size, all done
633 elif size < range_end:
634 nr = Range(r.locator, r.range_start, size - r.range_start)
635 nr.segment_offset = r.segment_offset
641 self._segments = new_segs
642 self._modified = True
643 elif size > self._size():
644 raise IOError("truncate() does not support extending the file size")
647 def readfrom(self, offset, size, num_retries):
649 read upto `size` bytes from the file starting at `offset`.
651 if size == 0 or offset >= self._size():
655 for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE):
656 self.parent._my_block_manager().block_prefetch(lr.locator)
658 for lr in locators_and_ranges(self._segments, offset, size):
659 d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
661 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
666 def _repack_writes(self):
668 Test if the buffer block has more data than is referenced by actual segments
669 (this happens when a buffered write over-writes a file range written in
670 a previous buffered write). Re-pack the buffer block for efficiency
671 and to avoid leaking information.
673 segs = self._segments
675 # Sum up the segments to get the total bytes of the file referencing
676 # into the buffer block.
677 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
678 write_total = sum([s.range_size for s in bufferblock_segs])
680 if write_total < self._current_bblock.size():
681 # There is more data in the buffer block than is actually accounted for by segments, so
682 # re-pack into a new buffer by copying over to a new buffer block.
683 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
684 for t in bufferblock_segs:
685 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
686 t.segment_offset = new_bb.size() - t.range_size
688 self._current_bblock = new_bb
692 def writeto(self, offset, data, num_retries):
694 Write `data` to the file starting at `offset`. This will update
695 existing bytes and/or extend the size of the file as necessary.
700 if offset > self._size():
701 raise ArgumentError("Offset is past the end of the file")
703 if len(data) > config.KEEP_BLOCK_SIZE:
704 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
706 self._modified = True
708 if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
709 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
711 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
712 self._repack_writes()
713 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
714 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
715 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
717 self._current_bblock.append(data)
719 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
723 def add_segment(self, blocks, pos, size):
724 # Synchronized public api, see _add_segment
725 self._add_segment(blocks, pos, size)
727 def _add_segment(self, blocks, pos, size):
729 Add a segment to the end of the file, with `pos` and `offset` referencing a
730 section of the stream described by `blocks` (a list of Range objects)
732 self._modified = True
733 for lr in locators_and_ranges(blocks, pos, size):
734 last = self._segments[-1] if self._segments else Range(0, 0, 0)
735 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
736 self._segments.append(r)
739 """Get the file size"""
741 n = self._segments[-1]
742 return n.range_start + n.range_size
748 """Get the file size"""
751 class ArvadosFileReader(ArvadosFileReaderBase):
752 def __init__(self, arvadosfile, name, mode="r", num_retries=None):
753 super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
754 self.arvadosfile = arvadosfile
757 return self.arvadosfile.size()
759 @ArvadosFileBase._before_close
761 def read(self, size, num_retries=None):
762 """Read up to `size` bytes from the stream, starting at the current file position"""
763 data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
764 self._filepos += len(data)
767 @ArvadosFileBase._before_close
769 def readfrom(self, offset, size, num_retries=None):
770 """Read up to `size` bytes from the stream, starting at the current file position"""
771 return self.arvadosfile.readfrom(offset, size, num_retries)
777 class ArvadosFileWriter(ArvadosFileReader):
778 def __init__(self, arvadosfile, name, mode, num_retries=None):
779 super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
781 @ArvadosFileBase._before_close
783 def write(self, data, num_retries=None):
784 if self.mode[0] == "a":
785 self.arvadosfile.writeto(self.size(), data)
787 self.arvadosfile.writeto(self._filepos, data, num_retries)
788 self._filepos += len(data)
790 @ArvadosFileBase._before_close
792 def writelines(self, seq, num_retries=None):
796 def truncate(self, size=None):
799 self.arvadosfile.truncate(size)
800 if self._filepos > self.size():
801 self._filepos = self.size()