6 from arvados.retry import retry_method
15 """split(path) -> streamname, filename
17 Separate the stream name and file name in a /-separated stream path.
18 If no stream name is available, assume '.'.
21 stream_name, file_name = path.rsplit('/', 1)
22 except ValueError: # No / in string
23 stream_name, file_name = '.', path
24 return stream_name, file_name
26 class ArvadosFileBase(object):
27 def __init__(self, name, mode):
33 def _before_close(orig_func):
34 @functools.wraps(orig_func)
35 def wrapper(self, *args, **kwargs):
37 raise ValueError("I/O operation on closed stream file")
38 return orig_func(self, *args, **kwargs)
44 def __exit__(self, exc_type, exc_value, traceback):
55 class ArvadosFileReaderBase(ArvadosFileBase):
56 class _NameAttribute(str):
57 # The Python file API provides a plain .name attribute.
58 # Older SDK provided a name() method.
59 # This class provides both, for maximum compatibility.
63 def __init__(self, name, mode, num_retries=None):
64 super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
66 self.num_retries = num_retries
67 self._readline_cache = (None, None)
71 data = self.readline()
76 def decompressed_name(self):
77 return re.sub('\.(bz2|gz)$', '', self.name)
79 @ArvadosFileBase._before_close
80 def seek(self, pos, whence=os.SEEK_CUR):
81 if whence == os.SEEK_CUR:
83 elif whence == os.SEEK_END:
85 self._filepos = min(max(pos, 0L), self.size())
90 @ArvadosFileBase._before_close
92 def readall(self, size=2**20, num_retries=None):
94 data = self.read(size, num_retries=num_retries)
99 @ArvadosFileBase._before_close
101 def readline(self, size=float('inf'), num_retries=None):
102 cache_pos, cache_data = self._readline_cache
103 if self.tell() == cache_pos:
107 data_size = len(data[-1])
108 while (data_size < size) and ('\n' not in data[-1]):
109 next_read = self.read(2 ** 20, num_retries=num_retries)
112 data.append(next_read)
113 data_size += len(next_read)
116 nextline_index = data.index('\n') + 1
118 nextline_index = len(data)
119 nextline_index = min(nextline_index, size)
120 self._readline_cache = (self.tell(), data[nextline_index:])
121 return data[:nextline_index]
123 @ArvadosFileBase._before_close
125 def decompress(self, decompress, size, num_retries=None):
126 for segment in self.readall(size, num_retries):
127 data = decompress(segment)
131 @ArvadosFileBase._before_close
133 def readall_decompressed(self, size=2**20, num_retries=None):
135 if self.name.endswith('.bz2'):
136 dc = bz2.BZ2Decompressor()
137 return self.decompress(dc.decompress, size,
138 num_retries=num_retries)
139 elif self.name.endswith('.gz'):
140 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
141 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
142 size, num_retries=num_retries)
144 return self.readall(size, num_retries=num_retries)
146 @ArvadosFileBase._before_close
148 def readlines(self, sizehint=float('inf'), num_retries=None):
151 for s in self.readall(num_retries=num_retries):
154 if data_size >= sizehint:
156 return ''.join(data).splitlines(True)
159 raise NotImplementedError()
161 def read(self, size, num_retries=None):
162 raise NotImplementedError()
164 def readfrom(self, start, size, num_retries=None):
165 raise NotImplementedError()
168 class StreamFileReader(ArvadosFileReaderBase):
169 def __init__(self, stream, segments, name):
170 super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
171 self._stream = stream
172 self.segments = segments
174 def stream_name(self):
175 return self._stream.name()
178 n = self.segments[-1]
179 return n.range_start + n.range_size
181 @ArvadosFileBase._before_close
183 def read(self, size, num_retries=None):
184 """Read up to 'size' bytes from the stream, starting at the current file position"""
189 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
191 lr = available_chunks[0]
192 data = self._stream._readfrom(lr.locator+lr.segment_offset,
194 num_retries=num_retries)
196 self._filepos += len(data)
199 @ArvadosFileBase._before_close
201 def readfrom(self, start, size, num_retries=None):
202 """Read up to 'size' bytes from the stream, starting at 'start'"""
207 for lr in locators_and_ranges(self.segments, start, size):
208 data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
209 num_retries=num_retries))
212 def as_manifest(self):
213 from stream import normalize_stream
215 for r in self.segments:
216 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
217 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
220 class BufferBlock(object):
222 A BufferBlock is a stand-in for a Keep block that is in the process of being
223 written. Writers can append to it, get the size, and compute the Keep locator.
225 There are three valid states:
231 Block is in the process of being uploaded to Keep, append is an error.
234 The block has been written to Keep, its internal buffer has been
235 released, fetching the block will fetch it via keep client (since we
236 discarded the internal copy), and identifiers referring to the BufferBlock
237 can be replaced with the block locator.
243 def __init__(self, blockid, starting_capacity, owner):
246 the identifier for this block
249 the initial buffer capacity
252 ArvadosFile that owns this block
254 self.blockid = blockid
255 self.buffer_block = bytearray(starting_capacity)
256 self.buffer_view = memoryview(self.buffer_block)
257 self.write_pointer = 0
258 self.state = BufferBlock.WRITABLE
262 def append(self, data):
264 Append some data to the buffer. Only valid if the block is in WRITABLE
265 state. Implements an expanding buffer, doubling capacity as needed to
266 accomdate all the data.
268 if self.state == BufferBlock.WRITABLE:
269 while (self.write_pointer+len(data)) > len(self.buffer_block):
270 new_buffer_block = bytearray(len(self.buffer_block) * 2)
271 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
272 self.buffer_block = new_buffer_block
273 self.buffer_view = memoryview(self.buffer_block)
274 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
275 self.write_pointer += len(data)
278 raise AssertionError("Buffer block is not writable")
281 """Amount of data written to the buffer"""
282 return self.write_pointer
285 """The Keep locator for this buffer's contents."""
286 if self._locator is None:
287 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
291 class AsyncKeepWriteErrors(Exception):
293 Roll up one or more Keep write exceptions (generated by background
294 threads) into a single one.
296 def __init__(self, errors):
300 return "\n".join(self.errors)
302 def _synchronized(orig_func):
303 @functools.wraps(orig_func)
304 def wrapper(self, *args, **kwargs):
306 return orig_func(self, *args, **kwargs)
309 class NoopLock(object):
313 def __exit__(self, exc_type, exc_value, traceback):
316 def acquire(self, blocking=False):
322 def _must_be_writable(orig_func):
323 # Decorator for methods that read actual Collection data.
324 @functools.wraps(orig_func)
325 def wrapper(self, *args, **kwargs):
326 if self.sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
327 raise IOError((errno.EROFS, "Collection is read only"))
328 return orig_func(self, *args, **kwargs)
332 class BlockManager(object):
334 BlockManager handles buffer blocks, background block uploads, and
335 background block prefetch for a Collection of ArvadosFiles.
337 def __init__(self, keep):
338 """keep: KeepClient object to use"""
340 self._bufferblocks = {}
341 self._put_queue = None
342 self._put_errors = None
343 self._put_threads = None
344 self._prefetch_queue = None
345 self._prefetch_threads = None
346 self.lock = threading.Lock()
349 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
351 Allocate a new, empty bufferblock in WRITABLE state and return it.
354 optional block identifier, otherwise one will be automatically assigned
357 optional capacity, otherwise will use default capacity
360 ArvadosFile that owns this block
363 blockid = "bufferblock%i" % len(self._bufferblocks)
364 bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
365 self._bufferblocks[bb.blockid] = bb
369 def dup_block(self, blockid, owner):
371 Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
374 the block to copy. May be an existing buffer block id.
377 ArvadosFile that owns the new block
379 new_blockid = "bufferblock%i" % len(self._bufferblocks)
380 block = self._bufferblocks[blockid]
381 bb = BufferBlock(new_blockid, len(block), owner)
383 self._bufferblocks[bb.blockid] = bb
387 def is_bufferblock(self, id):
388 return id in self._bufferblocks
391 def stop_threads(self):
393 Shut down and wait for background upload and download threads to finish.
395 if self._put_threads is not None:
396 for t in self._put_threads:
397 self._put_queue.put(None)
398 for t in self._put_threads:
400 self._put_threads = None
401 self._put_queue = None
402 self._put_errors = None
404 if self._prefetch_threads is not None:
405 for t in self._prefetch_threads:
406 self._prefetch_queue.put(None)
407 for t in self._prefetch_threads:
409 self._prefetch_threads = None
410 self._prefetch_queue = None
412 def commit_bufferblock(self, block):
414 Initiate a background upload of a bufferblock. This will block if the
415 upload queue is at capacity, otherwise it will return immediately.
420 Background uploader thread.
424 b = self._put_queue.get()
427 b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
428 b.state = BufferBlock.COMMITTED
430 b.buffer_block = None
431 except Exception as e:
433 self._put_errors.put(e)
435 if self._put_queue is not None:
436 self._put_queue.task_done()
439 if self._put_threads is None:
440 # Start uploader threads.
442 # If we don't limit the Queue size, the upload queue can quickly
443 # grow to take up gigabytes of RAM if the writing process is
444 # generating data more quickly than it can be send to the Keep
447 # With two upload threads and a queue size of 2, this means up to 4
448 # blocks pending. If they are full 64 MiB blocks, that means up to
449 # 256 MiB of internal buffering, which is the same size as the
450 # default download block cache in KeepClient.
451 self._put_queue = Queue.Queue(maxsize=2)
452 self._put_errors = Queue.Queue()
453 self._put_threads = [threading.Thread(target=worker, args=(self,)),
454 threading.Thread(target=worker, args=(self,))]
455 for t in self._put_threads:
459 # Mark the block as PENDING so to disallow any more appends.
460 block.state = BufferBlock.PENDING
461 self._put_queue.put(block)
463 def get_block(self, locator, num_retries, cache_only=False):
465 Fetch a block. First checks to see if the locator is a BufferBlock and
466 return that, if not, passes the request through to KeepClient.get().
469 if locator in self._bufferblocks:
470 bb = self._bufferblocks[locator]
471 if bb.state != BufferBlock.COMMITTED:
472 return bb.buffer_view[0:bb.write_pointer].tobytes()
474 locator = bb._locator
475 return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
477 def commit_all(self):
479 Commit all outstanding buffer blocks. Unlike commit_bufferblock(), this
480 is a synchronous call, and will not return until all buffer blocks are
481 uploaded. Raises AsyncKeepWriteErrors() if any blocks failed to
485 items = self._bufferblocks.items()
488 if v.state == BufferBlock.WRITABLE:
489 self.commit_bufferblock(v)
492 if self._put_queue is not None:
493 self._put_queue.join()
494 if not self._put_errors.empty():
498 e.append(self._put_errors.get(False))
501 raise AsyncKeepWriteErrors(e)
503 def block_prefetch(self, locator):
505 Initiate a background download of a block. This assumes that the
506 underlying KeepClient implements a block cache, so repeated requests
507 for the same block will not result in repeated downloads (unless the
508 block is evicted from the cache.) This method does not block.
511 """Background downloader thread."""
514 b = self._prefetch_queue.get()
522 if locator in self._bufferblocks:
524 if self._prefetch_threads is None:
525 self._prefetch_queue = Queue.Queue()
526 self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
527 threading.Thread(target=worker, args=(self,))]
528 for t in self._prefetch_threads:
531 self._prefetch_queue.put(locator)
534 class ArvadosFile(object):
536 ArvadosFile manages the underlying representation of a file in Keep as a sequence of
537 segments spanning a set of blocks, and implements random read/write access.
540 def __init__(self, parent, stream=[], segments=[]):
543 a list of Range objects representing a block stream
546 a list of Range objects representing segments
549 self._modified = True
552 self._add_segment(stream, s.locator, s.range_size)
553 self._current_bblock = None
554 if parent.sync_mode() == SYNC_READONLY:
555 self.lock = NoopLock()
557 self.lock = threading.Lock()
560 return self.parent.sync_mode()
564 return copy.copy(self._segments)
567 def clone(self, new_parent):
568 """Make a copy of this file."""
570 cp.parent = new_parent
574 for r in self._segments:
576 if self.parent._my_block_manager().is_bufferblock(r.locator):
577 if r.locator not in map_loc:
578 map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid
579 new_loc = map_loc[r.locator]
581 cp.segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
586 def __eq__(self, other):
587 if type(other) != ArvadosFile:
589 return self._segments == other.segments()
591 def __neq__(self, other):
592 return not self.__eq__(other)
595 def set_unmodified(self):
596 """Clear the modified flag"""
597 self._modified = False
601 """Test the modified flag"""
602 return self._modified
606 def truncate(self, size):
608 Adjust the size of the file. If `size` is less than the size of the file,
609 the file contents after `size` will be discarded. If `size` is greater
610 than the current size of the file, an IOError will be raised.
612 if size < self.size():
614 for r in self._segments:
615 range_end = r.range_start+r.range_size
616 if r.range_start >= size:
617 # segment is past the trucate size, all done
619 elif size < range_end:
620 nr = Range(r.locator, r.range_start, size - r.range_start)
621 nr.segment_offset = r.segment_offset
627 self._segments = new_segs
628 self._modified = True
629 elif size > self.size():
630 raise IOError("truncate() does not support extending the file size")
633 def readfrom(self, offset, size, num_retries):
635 read upto `size` bytes from the file starting at `offset`.
637 if size == 0 or offset >= self.size():
641 for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE):
642 self.parent._my_block_manager().block_prefetch(lr.locator)
644 for lr in locators_and_ranges(self._segments, offset, size):
645 d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
647 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
652 def _repack_writes(self):
654 Test if the buffer block has more data than is referenced by actual segments
655 (this happens when a buffered write over-writes a file range written in
656 a previous buffered write). Re-pack the buffer block for efficiency
657 and to avoid leaking information.
659 segs = self._segments
661 # Sum up the segments to get the total bytes of the file referencing
662 # into the buffer block.
663 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
664 write_total = sum([s.range_size for s in bufferblock_segs])
666 if write_total < self._current_bblock.size():
667 # There is more data in the buffer block than is actually accounted for by segments, so
668 # re-pack into a new buffer by copying over to a new buffer block.
669 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total, owner=self)
670 for t in bufferblock_segs:
671 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
672 t.segment_offset = new_bb.size() - t.range_size
674 self._current_bblock = new_bb
678 def writeto(self, offset, data, num_retries):
680 Write `data` to the file starting at `offset`. This will update
681 existing bytes and/or extend the size of the file as necessary.
686 if offset > self.size():
687 raise ArgumentError("Offset is past the end of the file")
689 if len(data) > config.KEEP_BLOCK_SIZE:
690 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
692 self._modified = True
694 if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
695 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
697 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
698 self._repack_writes()
699 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
700 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
701 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
703 self._current_bblock.append(data)
704 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
708 def add_segment(self, blocks, pos, size):
709 # Synchronized public api, see _add_segment
710 self._add_segment(blocks, pos, size)
712 def _add_segment(self, blocks, pos, size):
714 Add a segment to the end of the file, with `pos` and `offset` referencing a
715 section of the stream described by `blocks` (a list of Range objects)
717 self._modified = True
718 for lr in locators_and_ranges(blocks, pos, size):
719 last = self._segments[-1] if self._segments else Range(0, 0, 0)
720 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
721 self._segments.append(r)
726 """Get the file size"""
728 n = self._segments[-1]
729 return n.range_start + n.range_size
734 class ArvadosFileReader(ArvadosFileReaderBase):
735 def __init__(self, arvadosfile, name, mode="r", num_retries=None):
736 super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
737 self.arvadosfile = arvadosfile
740 return self.arvadosfile.size()
742 @ArvadosFileBase._before_close
744 def read(self, size, num_retries=None):
745 """Read up to `size` bytes from the stream, starting at the current file position"""
746 data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
747 self._filepos += len(data)
750 @ArvadosFileBase._before_close
752 def readfrom(self, offset, size, num_retries=None):
753 """Read up to `size` bytes from the stream, starting at the current file position"""
754 return self.arvadosfile.readfrom(offset, size, num_retries)
760 class ArvadosFileWriter(ArvadosFileReader):
761 def __init__(self, arvadosfile, name, mode, num_retries=None):
762 super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
764 @ArvadosFileBase._before_close
766 def write(self, data, num_retries=None):
767 if self.mode[0] == "a":
768 self.arvadosfile.writeto(self.size(), data)
770 self.arvadosfile.writeto(self._filepos, data, num_retries)
771 self._filepos += len(data)
773 @ArvadosFileBase._before_close
775 def writelines(self, seq, num_retries=None):
779 def truncate(self, size=None):
782 self.arvadosfile.truncate(size)
783 if self._filepos > self.size():
784 self._filepos = self.size()