14 from .errors import KeepWriteError, AssertionError
15 from .keep import KeepLocator
16 from ._normalize_stream import normalize_stream
17 from ._ranges import locators_and_ranges, replace_range, Range
18 from .retry import retry_method
22 _logger = logging.getLogger('arvados.arvfile')
25 """split(path) -> streamname, filename
27 Separate the stream name and file name in a /-separated stream path and
28 return a tuple (stream_name, file_name). If no stream name is available,
33 stream_name, file_name = path.rsplit('/', 1)
34 except ValueError: # No / in string
35 stream_name, file_name = '.', path
36 return stream_name, file_name
38 class _FileLikeObjectBase(object):
39 def __init__(self, name, mode):
45 def _before_close(orig_func):
46 @functools.wraps(orig_func)
47 def before_close_wrapper(self, *args, **kwargs):
49 raise ValueError("I/O operation on closed stream file")
50 return orig_func(self, *args, **kwargs)
51 return before_close_wrapper
56 def __exit__(self, exc_type, exc_value, traceback):
67 class ArvadosFileReaderBase(_FileLikeObjectBase):
68 def __init__(self, name, mode, num_retries=None):
69 super(ArvadosFileReaderBase, self).__init__(name, mode)
71 self.num_retries = num_retries
72 self._readline_cache = (None, None)
76 data = self.readline()
81 def decompressed_name(self):
82 return re.sub('\.(bz2|gz)$', '', self.name)
84 @_FileLikeObjectBase._before_close
85 def seek(self, pos, whence=os.SEEK_SET):
86 if whence == os.SEEK_CUR:
88 elif whence == os.SEEK_END:
90 self._filepos = min(max(pos, 0L), self.size())
95 @_FileLikeObjectBase._before_close
97 def readall(self, size=2**20, num_retries=None):
99 data = self.read(size, num_retries=num_retries)
104 @_FileLikeObjectBase._before_close
106 def readline(self, size=float('inf'), num_retries=None):
107 cache_pos, cache_data = self._readline_cache
108 if self.tell() == cache_pos:
112 data_size = len(data[-1])
113 while (data_size < size) and ('\n' not in data[-1]):
114 next_read = self.read(2 ** 20, num_retries=num_retries)
117 data.append(next_read)
118 data_size += len(next_read)
121 nextline_index = data.index('\n') + 1
123 nextline_index = len(data)
124 nextline_index = min(nextline_index, size)
125 self._readline_cache = (self.tell(), data[nextline_index:])
126 return data[:nextline_index]
128 @_FileLikeObjectBase._before_close
130 def decompress(self, decompress, size, num_retries=None):
131 for segment in self.readall(size, num_retries):
132 data = decompress(segment)
136 @_FileLikeObjectBase._before_close
138 def readall_decompressed(self, size=2**20, num_retries=None):
140 if self.name.endswith('.bz2'):
141 dc = bz2.BZ2Decompressor()
142 return self.decompress(dc.decompress, size,
143 num_retries=num_retries)
144 elif self.name.endswith('.gz'):
145 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
146 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
147 size, num_retries=num_retries)
149 return self.readall(size, num_retries=num_retries)
151 @_FileLikeObjectBase._before_close
153 def readlines(self, sizehint=float('inf'), num_retries=None):
156 for s in self.readall(num_retries=num_retries):
159 if data_size >= sizehint:
161 return ''.join(data).splitlines(True)
164 raise NotImplementedError()
166 def read(self, size, num_retries=None):
167 raise NotImplementedError()
169 def readfrom(self, start, size, num_retries=None):
170 raise NotImplementedError()
173 class StreamFileReader(ArvadosFileReaderBase):
174 class _NameAttribute(str):
175 # The Python file API provides a plain .name attribute.
176 # Older SDK provided a name() method.
177 # This class provides both, for maximum compatibility.
181 def __init__(self, stream, segments, name):
182 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
183 self._stream = stream
184 self.segments = segments
186 def stream_name(self):
187 return self._stream.name()
190 n = self.segments[-1]
191 return n.range_start + n.range_size
193 @_FileLikeObjectBase._before_close
195 def read(self, size, num_retries=None):
196 """Read up to 'size' bytes from the stream, starting at the current file position"""
201 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
203 lr = available_chunks[0]
204 data = self._stream.readfrom(lr.locator+lr.segment_offset,
206 num_retries=num_retries)
208 self._filepos += len(data)
211 @_FileLikeObjectBase._before_close
213 def readfrom(self, start, size, num_retries=None):
214 """Read up to 'size' bytes from the stream, starting at 'start'"""
219 for lr in locators_and_ranges(self.segments, start, size):
220 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
221 num_retries=num_retries))
224 def as_manifest(self):
226 for r in self.segments:
227 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
228 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
231 def synchronized(orig_func):
232 @functools.wraps(orig_func)
233 def synchronized_wrapper(self, *args, **kwargs):
235 return orig_func(self, *args, **kwargs)
236 return synchronized_wrapper
238 class _BufferBlock(object):
239 """A stand-in for a Keep block that is in the process of being written.
241 Writers can append to it, get the size, and compute the Keep locator.
242 There are three valid states:
248 Block is in the process of being uploaded to Keep, append is an error.
251 The block has been written to Keep, its internal buffer has been
252 released, fetching the block will fetch it via keep client (since we
253 discarded the internal copy), and identifiers referring to the BufferBlock
254 can be replaced with the block locator.
262 def __init__(self, blockid, starting_capacity, owner):
265 the identifier for this block
268 the initial buffer capacity
271 ArvadosFile that owns this block
274 self.blockid = blockid
275 self.buffer_block = bytearray(starting_capacity)
276 self.buffer_view = memoryview(self.buffer_block)
277 self.write_pointer = 0
278 self._state = _BufferBlock.WRITABLE
281 self.lock = threading.Lock()
284 def append(self, data):
285 """Append some data to the buffer.
287 Only valid if the block is in WRITABLE state. Implements an expanding
288 buffer, doubling capacity as needed to accomdate all the data.
291 if self._state == _BufferBlock.WRITABLE:
292 while (self.write_pointer+len(data)) > len(self.buffer_block):
293 new_buffer_block = bytearray(len(self.buffer_block) * 2)
294 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
295 self.buffer_block = new_buffer_block
296 self.buffer_view = memoryview(self.buffer_block)
297 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
298 self.write_pointer += len(data)
301 raise AssertionError("Buffer block is not writable")
304 def set_state(self, nextstate, loc=None):
305 if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
306 (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
307 self._state = nextstate
308 if self._state == _BufferBlock.COMMITTED:
310 self.buffer_view = None
311 self.buffer_block = None
313 raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
320 """The amount of data written to the buffer."""
321 return self.write_pointer
325 """The Keep locator for this buffer's contents."""
326 if self._locator is None:
327 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
331 def clone(self, new_blockid, owner):
332 if self._state == _BufferBlock.COMMITTED:
333 raise AssertionError("Can only duplicate a writable or pending buffer block")
334 bufferblock = _BufferBlock(new_blockid, self.size(), owner)
335 bufferblock.append(self.buffer_view[0:self.size()])
339 class NoopLock(object):
343 def __exit__(self, exc_type, exc_value, traceback):
346 def acquire(self, blocking=False):
353 def must_be_writable(orig_func):
354 @functools.wraps(orig_func)
355 def must_be_writable_wrapper(self, *args, **kwargs):
356 if not self.writable():
357 raise IOError(errno.EROFS, "Collection must be writable.")
358 return orig_func(self, *args, **kwargs)
359 return must_be_writable_wrapper
362 class _BlockManager(object):
363 """BlockManager handles buffer blocks.
365 Also handles background block uploads, and background block prefetch for a
366 Collection of ArvadosFiles.
369 def __init__(self, keep):
370 """keep: KeepClient object to use"""
372 self._bufferblocks = {}
373 self._put_queue = None
374 self._put_errors = None
375 self._put_threads = None
376 self._prefetch_queue = None
377 self._prefetch_threads = None
378 self.lock = threading.Lock()
379 self.prefetch_enabled = True
380 self.num_put_threads = 2
381 self.num_get_threads = 2
384 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
385 """Allocate a new, empty bufferblock in WRITABLE state and return it.
388 optional block identifier, otherwise one will be automatically assigned
391 optional capacity, otherwise will use default capacity
394 ArvadosFile that owns this block
398 blockid = "bufferblock%i" % len(self._bufferblocks)
399 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
400 self._bufferblocks[bufferblock.blockid] = bufferblock
404 def dup_block(self, block, owner):
405 """Create a new bufferblock initialized with the content of an existing bufferblock.
408 the buffer block to copy.
411 ArvadosFile that owns the new block
414 new_blockid = "bufferblock%i" % len(self._bufferblocks)
415 bufferblock = block.clone(new_blockid, owner)
416 self._bufferblocks[bufferblock.blockid] = bufferblock
420 def is_bufferblock(self, locator):
421 return locator in self._bufferblocks
424 def stop_threads(self):
425 """Shut down and wait for background upload and download threads to finish."""
427 if self._put_threads is not None:
428 for t in self._put_threads:
429 self._put_queue.put(None)
430 for t in self._put_threads:
432 self._put_threads = None
433 self._put_queue = None
434 self._put_errors = None
436 if self._prefetch_threads is not None:
437 for t in self._prefetch_threads:
438 self._prefetch_queue.put(None)
439 for t in self._prefetch_threads:
441 self._prefetch_threads = None
442 self._prefetch_queue = None
444 def commit_bufferblock(self, block, wait):
445 """Initiate a background upload of a bufferblock.
448 The block object to upload
451 If `wait` is True, upload the block synchronously.
452 If `wait` is False, upload the block asynchronously. This will
453 return immediately unless if the upload queue is at capacity, in
454 which case it will wait on an upload queue slot.
458 def commit_bufferblock_worker(self):
459 """Background uploader thread."""
463 bufferblock = self._put_queue.get()
464 if bufferblock is None:
467 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
468 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
470 except Exception as e:
471 self._put_errors.put((bufferblock.locator(), e))
473 if self._put_queue is not None:
474 self._put_queue.task_done()
476 if block.state() != _BufferBlock.WRITABLE:
480 block.set_state(_BufferBlock.PENDING)
481 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
482 block.set_state(_BufferBlock.COMMITTED, loc)
485 if self._put_threads is None:
486 # Start uploader threads.
488 # If we don't limit the Queue size, the upload queue can quickly
489 # grow to take up gigabytes of RAM if the writing process is
490 # generating data more quickly than it can be send to the Keep
493 # With two upload threads and a queue size of 2, this means up to 4
494 # blocks pending. If they are full 64 MiB blocks, that means up to
495 # 256 MiB of internal buffering, which is the same size as the
496 # default download block cache in KeepClient.
497 self._put_queue = Queue.Queue(maxsize=2)
498 self._put_errors = Queue.Queue()
500 self._put_threads = []
501 for i in xrange(0, self.num_put_threads):
502 thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
503 self._put_threads.append(thread)
507 # Mark the block as PENDING so to disallow any more appends.
508 block.set_state(_BufferBlock.PENDING)
509 self._put_queue.put(block)
512 def get_bufferblock(self, locator):
513 return self._bufferblocks.get(locator)
515 def get_block_contents(self, locator, num_retries, cache_only=False):
518 First checks to see if the locator is a BufferBlock and return that, if
519 not, passes the request through to KeepClient.get().
523 if locator in self._bufferblocks:
524 bufferblock = self._bufferblocks[locator]
525 if bufferblock.state() != _BufferBlock.COMMITTED:
526 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
528 locator = bufferblock._locator
530 return self._keep.get_from_cache(locator)
532 return self._keep.get(locator, num_retries=num_retries)
534 def commit_all(self):
535 """Commit all outstanding buffer blocks.
537 Unlike commit_bufferblock(), this is a synchronous call, and will not
538 return until all buffer blocks are uploaded. Raises
539 KeepWriteError() if any blocks failed to upload.
543 items = self._bufferblocks.items()
546 if v.state() == _BufferBlock.WRITABLE:
550 if self._put_queue is not None:
551 self._put_queue.join()
553 if not self._put_errors.empty():
557 err.append(self._put_errors.get(False))
560 raise KeepWriteError("Error writing some blocks", err, label="block")
562 def block_prefetch(self, locator):
563 """Initiate a background download of a block.
565 This assumes that the underlying KeepClient implements a block cache,
566 so repeated requests for the same block will not result in repeated
567 downloads (unless the block is evicted from the cache.) This method
572 if not self.prefetch_enabled:
575 def block_prefetch_worker(self):
576 """The background downloader thread."""
579 b = self._prefetch_queue.get()
587 if locator in self._bufferblocks:
589 if self._prefetch_threads is None:
590 self._prefetch_queue = Queue.Queue()
591 self._prefetch_threads = []
592 for i in xrange(0, self.num_get_threads):
593 thread = threading.Thread(target=block_prefetch_worker, args=(self,))
594 self._prefetch_threads.append(thread)
597 self._prefetch_queue.put(locator)
600 class ArvadosFile(object):
601 """Represent a file in a Collection.
603 ArvadosFile manages the underlying representation of a file in Keep as a
604 sequence of segments spanning a set of blocks, and implements random
607 This object may be accessed from multiple threads.
611 def __init__(self, parent, name, stream=[], segments=[]):
613 ArvadosFile constructor.
616 a list of Range objects representing a block stream
619 a list of Range objects representing segments
623 self._modified = True
625 self.lock = parent.root_collection().lock
627 self._add_segment(stream, s.locator, s.range_size)
628 self._current_bblock = None
631 return self.parent.writable()
635 return copy.copy(self._segments)
638 def clone(self, new_parent, new_name):
639 """Make a copy of this file."""
640 cp = ArvadosFile(new_parent, new_name)
641 cp.replace_contents(self)
646 def replace_contents(self, other):
647 """Replace segments of this file with segments from another `ArvadosFile` object."""
651 for other_segment in other.segments():
652 new_loc = other_segment.locator
653 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
654 if other_segment.locator not in map_loc:
655 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
656 if bufferblock.state() != _BufferBlock.WRITABLE:
657 map_loc[other_segment.locator] = bufferblock.locator()
659 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
660 new_loc = map_loc[other_segment.locator]
662 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
664 self._modified = True
666 def __eq__(self, other):
669 if not isinstance(other, ArvadosFile):
672 othersegs = other.segments()
674 if len(self._segments) != len(othersegs):
676 for i in xrange(0, len(othersegs)):
677 seg1 = self._segments[i]
682 if self.parent._my_block_manager().is_bufferblock(loc1):
683 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
685 if other.parent._my_block_manager().is_bufferblock(loc2):
686 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
688 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
689 seg1.range_start != seg2.range_start or
690 seg1.range_size != seg2.range_size or
691 seg1.segment_offset != seg2.segment_offset):
696 def __ne__(self, other):
697 return not self.__eq__(other)
700 def set_unmodified(self):
701 """Clear the modified flag"""
702 self._modified = False
706 """Test the modified flag"""
707 return self._modified
711 def truncate(self, size):
712 """Shrink the size of the file.
714 If `size` is less than the size of the file, the file contents after
715 `size` will be discarded. If `size` is greater than the current size
716 of the file, an IOError will be raised.
719 if size < self.size():
721 for r in self._segments:
722 range_end = r.range_start+r.range_size
723 if r.range_start >= size:
724 # segment is past the trucate size, all done
726 elif size < range_end:
727 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
728 nr.segment_offset = r.segment_offset
734 self._segments = new_segs
735 self._modified = True
736 elif size > self.size():
737 raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
739 def readfrom(self, offset, size, num_retries, exact=False):
740 """Read up to `size` bytes from the file starting at `offset`.
743 If False (default), return less data than requested if the read
744 crosses a block boundary and the next block isn't cached. If True,
745 only return less data than requested when hitting EOF.
749 if size == 0 or offset >= self.size():
751 prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
752 readsegs = locators_and_ranges(self._segments, offset, size)
755 self.parent._my_block_manager().block_prefetch(lr.locator)
759 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
761 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
766 def _repack_writes(self, num_retries):
767 """Test if the buffer block has more data than actual segments.
769 This happens when a buffered write over-writes a file range written in
770 a previous buffered write. Re-pack the buffer block for efficiency
771 and to avoid leaking information.
774 segs = self._segments
776 # Sum up the segments to get the total bytes of the file referencing
777 # into the buffer block.
778 bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
779 write_total = sum([s.range_size for s in bufferblock_segs])
781 if write_total < self._current_bblock.size():
782 # There is more data in the buffer block than is actually accounted for by segments, so
783 # re-pack into a new buffer by copying over to a new buffer block.
784 contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
785 new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
786 for t in bufferblock_segs:
787 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
788 t.segment_offset = new_bb.size() - t.range_size
790 self._current_bblock = new_bb
794 def writeto(self, offset, data, num_retries):
795 """Write `data` to the file starting at `offset`.
797 This will update existing bytes and/or extend the size of the file as
804 if offset > self.size():
805 raise ArgumentError("Offset is past the end of the file")
807 if len(data) > config.KEEP_BLOCK_SIZE:
808 raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
810 self._modified = True
812 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
813 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
815 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
816 self._repack_writes(num_retries)
817 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
818 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
819 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
821 self._current_bblock.append(data)
823 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
825 self.parent.notify(MOD, self.parent, self.name, (self, self))
830 def flush(self, wait=True, num_retries=0):
832 if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
833 self._repack_writes(num_retries)
834 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
835 self.parent.notify(MOD, self.parent, self.name, (self, self))
839 def add_segment(self, blocks, pos, size):
840 """Add a segment to the end of the file.
842 `pos` and `offset` reference a section of the stream described by
843 `blocks` (a list of Range objects)
846 self._add_segment(blocks, pos, size)
848 def _add_segment(self, blocks, pos, size):
849 """Internal implementation of add_segment."""
850 self._modified = True
851 for lr in locators_and_ranges(blocks, pos, size):
852 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
853 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
854 self._segments.append(r)
858 """Get the file size."""
860 n = self._segments[-1]
861 return n.range_start + n.range_size
866 def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
869 for segment in self.segments:
870 loc = segment.locator
871 if loc.startswith("bufferblock"):
872 loc = self._bufferblocks[loc].calculate_locator()
873 if portable_locators:
874 loc = KeepLocator(loc).stripped()
875 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
876 segment.segment_offset, segment.range_size))
877 buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
883 def reparent(self, newparent, newname):
885 self.parent.remove(self.name)
887 self.parent = newparent
889 self.lock = self.parent.root_collection().lock
890 self._modified = True
892 class ArvadosFileReader(ArvadosFileReaderBase):
893 """Wraps ArvadosFile in a file-like object supporting reading only.
895 Be aware that this class is NOT thread safe as there is no locking around
896 updating file pointer.
900 def __init__(self, arvadosfile, mode="r", num_retries=None):
901 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
902 self.arvadosfile = arvadosfile
905 return self.arvadosfile.size()
907 def stream_name(self):
908 return self.arvadosfile.parent.stream_name()
910 @_FileLikeObjectBase._before_close
912 def read(self, size=None, num_retries=None):
913 """Read up to `size` bytes from the file and return the result.
915 Starts at the current file position. If `size` is None, read the
916 entire remainder of the file.
920 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
923 self._filepos += len(rd)
924 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
927 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
928 self._filepos += len(data)
931 @_FileLikeObjectBase._before_close
933 def readfrom(self, offset, size, num_retries=None):
934 """Read up to `size` bytes from the stream, starting at the specified file offset.
936 This method does not change the file position.
938 return self.arvadosfile.readfrom(offset, size, num_retries)
944 class ArvadosFileWriter(ArvadosFileReader):
945 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
947 Be aware that this class is NOT thread safe as there is no locking around
948 updating file pointer.
952 def __init__(self, arvadosfile, mode, num_retries=None):
953 super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
955 @_FileLikeObjectBase._before_close
957 def write(self, data, num_retries=None):
958 if self.mode[0] == "a":
959 self.arvadosfile.writeto(self.size(), data, num_retries)
961 self.arvadosfile.writeto(self._filepos, data, num_retries)
962 self._filepos += len(data)
965 @_FileLikeObjectBase._before_close
967 def writelines(self, seq, num_retries=None):
969 self.write(s, num_retries)
971 @_FileLikeObjectBase._before_close
972 def truncate(self, size=None):
975 self.arvadosfile.truncate(size)
976 if self._filepos > self.size():
977 self._filepos = self.size()
979 @_FileLikeObjectBase._before_close
981 self.arvadosfile.flush()
986 super(ArvadosFileWriter, self).close()