4823: Fix instance of Collection -> CollectionRoot. Tweaked a couple of comments.
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 from .ranges import *
6 from arvados.retry import retry_method
7 import config
8 import hashlib
9 import hashlib
10 import threading
11 import Queue
12 import copy
13 import errno
14
15 def split(path):
16     """split(path) -> streamname, filename
17
18     Separate the stream name and file name in a /-separated stream path.
19     If no stream name is available, assume '.'.
20     """
21     try:
22         stream_name, file_name = path.rsplit('/', 1)
23     except ValueError:  # No / in string
24         stream_name, file_name = '.', path
25     return stream_name, file_name
26
27 class ArvadosFileBase(object):
28     def __init__(self, name, mode):
29         self.name = name
30         self.mode = mode
31         self.closed = False
32
33     @staticmethod
34     def _before_close(orig_func):
35         @functools.wraps(orig_func)
36         def wrapper(self, *args, **kwargs):
37             if self.closed:
38                 raise ValueError("I/O operation on closed stream file")
39             return orig_func(self, *args, **kwargs)
40         return wrapper
41
42     def __enter__(self):
43         return self
44
45     def __exit__(self, exc_type, exc_value, traceback):
46         try:
47             self.close()
48         except Exception:
49             if exc_type is None:
50                 raise
51
52     def close(self):
53         self.closed = True
54
55
56 class ArvadosFileReaderBase(ArvadosFileBase):
57     class _NameAttribute(str):
58         # The Python file API provides a plain .name attribute.
59         # Older SDK provided a name() method.
60         # This class provides both, for maximum compatibility.
61         def __call__(self):
62             return self
63
64     def __init__(self, name, mode, num_retries=None):
65         super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
66         self._filepos = 0L
67         self.num_retries = num_retries
68         self._readline_cache = (None, None)
69
70     def __iter__(self):
71         while True:
72             data = self.readline()
73             if not data:
74                 break
75             yield data
76
77     def decompressed_name(self):
78         return re.sub('\.(bz2|gz)$', '', self.name)
79
80     @ArvadosFileBase._before_close
81     def seek(self, pos, whence=os.SEEK_CUR):
82         if whence == os.SEEK_CUR:
83             pos += self._filepos
84         elif whence == os.SEEK_END:
85             pos += self.size()
86         self._filepos = min(max(pos, 0L), self.size())
87
88     def tell(self):
89         return self._filepos
90
91     @ArvadosFileBase._before_close
92     @retry_method
93     def readall(self, size=2**20, num_retries=None):
94         while True:
95             data = self.read(size, num_retries=num_retries)
96             if data == '':
97                 break
98             yield data
99
100     @ArvadosFileBase._before_close
101     @retry_method
102     def readline(self, size=float('inf'), num_retries=None):
103         cache_pos, cache_data = self._readline_cache
104         if self.tell() == cache_pos:
105             data = [cache_data]
106         else:
107             data = ['']
108         data_size = len(data[-1])
109         while (data_size < size) and ('\n' not in data[-1]):
110             next_read = self.read(2 ** 20, num_retries=num_retries)
111             if not next_read:
112                 break
113             data.append(next_read)
114             data_size += len(next_read)
115         data = ''.join(data)
116         try:
117             nextline_index = data.index('\n') + 1
118         except ValueError:
119             nextline_index = len(data)
120         nextline_index = min(nextline_index, size)
121         self._readline_cache = (self.tell(), data[nextline_index:])
122         return data[:nextline_index]
123
124     @ArvadosFileBase._before_close
125     @retry_method
126     def decompress(self, decompress, size, num_retries=None):
127         for segment in self.readall(size, num_retries):
128             data = decompress(segment)
129             if data:
130                 yield data
131
132     @ArvadosFileBase._before_close
133     @retry_method
134     def readall_decompressed(self, size=2**20, num_retries=None):
135         self.seek(0)
136         if self.name.endswith('.bz2'):
137             dc = bz2.BZ2Decompressor()
138             return self.decompress(dc.decompress, size,
139                                    num_retries=num_retries)
140         elif self.name.endswith('.gz'):
141             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
142             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
143                                    size, num_retries=num_retries)
144         else:
145             return self.readall(size, num_retries=num_retries)
146
147     @ArvadosFileBase._before_close
148     @retry_method
149     def readlines(self, sizehint=float('inf'), num_retries=None):
150         data = []
151         data_size = 0
152         for s in self.readall(num_retries=num_retries):
153             data.append(s)
154             data_size += len(s)
155             if data_size >= sizehint:
156                 break
157         return ''.join(data).splitlines(True)
158
159     def size(self):
160         raise NotImplementedError()
161
162     def read(self, size, num_retries=None):
163         raise NotImplementedError()
164
165     def readfrom(self, start, size, num_retries=None):
166         raise NotImplementedError()
167
168
169 class StreamFileReader(ArvadosFileReaderBase):
170     def __init__(self, stream, segments, name):
171         super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
172         self._stream = stream
173         self.segments = segments
174
175     def stream_name(self):
176         return self._stream.name()
177
178     def size(self):
179         n = self.segments[-1]
180         return n.range_start + n.range_size
181
182     @ArvadosFileBase._before_close
183     @retry_method
184     def read(self, size, num_retries=None):
185         """Read up to 'size' bytes from the stream, starting at the current file position"""
186         if size == 0:
187             return ''
188
189         data = ''
190         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
191         if available_chunks:
192             lr = available_chunks[0]
193             data = self._stream._readfrom(lr.locator+lr.segment_offset,
194                                           lr.segment_size,
195                                           num_retries=num_retries)
196
197         self._filepos += len(data)
198         return data
199
200     @ArvadosFileBase._before_close
201     @retry_method
202     def readfrom(self, start, size, num_retries=None):
203         """Read up to 'size' bytes from the stream, starting at 'start'"""
204         if size == 0:
205             return ''
206
207         data = []
208         for lr in locators_and_ranges(self.segments, start, size):
209             data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
210                                               num_retries=num_retries))
211         return ''.join(data)
212
213     def as_manifest(self):
214         from stream import normalize_stream
215         segs = []
216         for r in self.segments:
217             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
218         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
219
220
221 class BufferBlock(object):
222     """
223     A BufferBlock is a stand-in for a Keep block that is in the process of being
224     written.  Writers can append to it, get the size, and compute the Keep locator.
225
226     There are three valid states:
227
228     WRITABLE
229       Can append to block.
230
231     PENDING
232       Block is in the process of being uploaded to Keep, append is an error.
233
234     COMMITTED
235       The block has been written to Keep, its internal buffer has been
236       released, fetching the block will fetch it via keep client (since we
237       discarded the internal copy), and identifiers referring to the BufferBlock
238       can be replaced with the block locator.
239     """
240     WRITABLE = 0
241     PENDING = 1
242     COMMITTED = 2
243
244     def __init__(self, blockid, starting_capacity, owner):
245         """
246         :blockid:
247           the identifier for this block
248
249         :starting_capacity:
250           the initial buffer capacity
251
252         :owner:
253           ArvadosFile that owns this block
254         """
255         self.blockid = blockid
256         self.buffer_block = bytearray(starting_capacity)
257         self.buffer_view = memoryview(self.buffer_block)
258         self.write_pointer = 0
259         self.state = BufferBlock.WRITABLE
260         self._locator = None
261         self.owner = owner
262
263     def append(self, data):
264         """
265         Append some data to the buffer.  Only valid if the block is in WRITABLE
266         state.  Implements an expanding buffer, doubling capacity as needed to
267         accomdate all the data.
268         """
269         if self.state == BufferBlock.WRITABLE:
270             while (self.write_pointer+len(data)) > len(self.buffer_block):
271                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
272                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
273                 self.buffer_block = new_buffer_block
274                 self.buffer_view = memoryview(self.buffer_block)
275             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
276             self.write_pointer += len(data)
277             self._locator = None
278         else:
279             raise AssertionError("Buffer block is not writable")
280
281     def size(self):
282         """Amount of data written to the buffer"""
283         return self.write_pointer
284
285     def locator(self):
286         """The Keep locator for this buffer's contents."""
287         if self._locator is None:
288             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
289         return self._locator
290
291
292 class AsyncKeepWriteErrors(Exception):
293     """
294     Roll up one or more Keep write exceptions (generated by background
295     threads) into a single one.
296     """
297     def __init__(self, errors):
298         self.errors = errors
299
300     def __repr__(self):
301         return "\n".join(self.errors)
302
303 def synchronized(orig_func):
304     @functools.wraps(orig_func)
305     def wrapper(self, *args, **kwargs):
306         with self.lock:
307             return orig_func(self, *args, **kwargs)
308     return wrapper
309
310 class NoopLock(object):
311     def __enter__(self):
312         return self
313
314     def __exit__(self, exc_type, exc_value, traceback):
315         pass
316
317     def acquire(self, blocking=False):
318         pass
319
320     def release(self):
321         pass
322
323 SYNC_READONLY = 1
324 SYNC_EXPLICIT = 2
325 SYNC_LIVE = 3
326
327 def must_be_writable(orig_func):
328     @functools.wraps(orig_func)
329     def wrapper(self, *args, **kwargs):
330         if self.sync_mode() == SYNC_READONLY:
331             raise IOError((errno.EROFS, "Collection is read only"))
332         return orig_func(self, *args, **kwargs)
333     return wrapper
334
335
336 class BlockManager(object):
337     """
338     BlockManager handles buffer blocks, background block uploads, and
339     background block prefetch for a Collection of ArvadosFiles.
340     """
341     def __init__(self, keep):
342         """keep: KeepClient object to use"""
343         self._keep = keep
344         self._bufferblocks = {}
345         self._put_queue = None
346         self._put_errors = None
347         self._put_threads = None
348         self._prefetch_queue = None
349         self._prefetch_threads = None
350         self.lock = threading.Lock()
351         self.prefetch_enabled = True
352         self.num_put_threads = 2
353         self.num_get_threads = 2
354
355     @synchronized
356     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
357         """
358         Allocate a new, empty bufferblock in WRITABLE state and return it.
359
360         :blockid:
361           optional block identifier, otherwise one will be automatically assigned
362
363         :starting_capacity:
364           optional capacity, otherwise will use default capacity
365
366         :owner:
367           ArvadosFile that owns this block
368         """
369         if blockid is None:
370             blockid = "bufferblock%i" % len(self._bufferblocks)
371         bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
372         self._bufferblocks[bb.blockid] = bb
373         return bb
374
375     @synchronized
376     def dup_block(self, blockid, owner):
377         """
378         Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
379
380         :blockid:
381           the block to copy.  May be an existing buffer block id.
382
383         :owner:
384           ArvadosFile that owns the new block
385         """
386         new_blockid = "bufferblock%i" % len(self._bufferblocks)
387         block = self._bufferblocks[blockid]
388         bb = BufferBlock(new_blockid, len(block), owner)
389         bb.append(block)
390         self._bufferblocks[bb.blockid] = bb
391         return bb
392
393     @synchronized
394     def is_bufferblock(self, id):
395         return id in self._bufferblocks
396
397     @synchronized
398     def stop_threads(self):
399         """
400         Shut down and wait for background upload and download threads to finish.
401         """
402         if self._put_threads is not None:
403             for t in self._put_threads:
404                 self._put_queue.put(None)
405             for t in self._put_threads:
406                 t.join()
407         self._put_threads = None
408         self._put_queue = None
409         self._put_errors = None
410
411         if self._prefetch_threads is not None:
412             for t in self._prefetch_threads:
413                 self._prefetch_queue.put(None)
414             for t in self._prefetch_threads:
415                 t.join()
416         self._prefetch_threads = None
417         self._prefetch_queue = None
418
419     def commit_bufferblock(self, block):
420         """
421         Initiate a background upload of a bufferblock.  This will block if the
422         upload queue is at capacity, otherwise it will return immediately.
423         """
424
425         def worker(self):
426             """
427             Background uploader thread.
428             """
429             while True:
430                 try:
431                     b = self._put_queue.get()
432                     if b is None:
433                         return
434                     b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
435                     b.state = BufferBlock.COMMITTED
436                     b.buffer_view = None
437                     b.buffer_block = None
438                 except Exception as e:
439                     print e
440                     self._put_errors.put(e)
441                 finally:
442                     if self._put_queue is not None:
443                         self._put_queue.task_done()
444
445         with self.lock:
446             if self._put_threads is None:
447                 # Start uploader threads.
448
449                 # If we don't limit the Queue size, the upload queue can quickly
450                 # grow to take up gigabytes of RAM if the writing process is
451                 # generating data more quickly than it can be send to the Keep
452                 # servers.
453                 #
454                 # With two upload threads and a queue size of 2, this means up to 4
455                 # blocks pending.  If they are full 64 MiB blocks, that means up to
456                 # 256 MiB of internal buffering, which is the same size as the
457                 # default download block cache in KeepClient.
458                 self._put_queue = Queue.Queue(maxsize=2)
459                 self._put_errors = Queue.Queue()
460
461                 self._put_threads = []
462                 for i in xrange(0, self.num_put_threads):
463                     t = threading.Thread(target=worker, args=(self,))
464                     self._put_threads.append(t)
465                     t.daemon = True
466                     t.start()
467
468         # Mark the block as PENDING so to disallow any more appends.
469         block.state = BufferBlock.PENDING
470         self._put_queue.put(block)
471
472     def get_block(self, locator, num_retries, cache_only=False):
473         """
474         Fetch a block.  First checks to see if the locator is a BufferBlock and
475         return that, if not, passes the request through to KeepClient.get().
476         """
477         with self.lock:
478             if locator in self._bufferblocks:
479                 bb = self._bufferblocks[locator]
480                 if bb.state != BufferBlock.COMMITTED:
481                     return bb.buffer_view[0:bb.write_pointer].tobytes()
482                 else:
483                     locator = bb._locator
484         return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
485
486     def commit_all(self):
487         """
488         Commit all outstanding buffer blocks.  Unlike commit_bufferblock(), this
489         is a synchronous call, and will not return until all buffer blocks are
490         uploaded.  Raises AsyncKeepWriteErrors() if any blocks failed to
491         upload.
492         """
493         with self.lock:
494             items = self._bufferblocks.items()
495
496         for k,v in items:
497             if v.state == BufferBlock.WRITABLE:
498                 self.commit_bufferblock(v)
499
500         with self.lock:
501             if self._put_queue is not None:
502                 self._put_queue.join()
503                 if not self._put_errors.empty():
504                     e = []
505                     try:
506                         while True:
507                             e.append(self._put_errors.get(False))
508                     except Queue.Empty:
509                         pass
510                     raise AsyncKeepWriteErrors(e)
511
512     def block_prefetch(self, locator):
513         """
514         Initiate a background download of a block.  This assumes that the
515         underlying KeepClient implements a block cache, so repeated requests
516         for the same block will not result in repeated downloads (unless the
517         block is evicted from the cache.)  This method does not block.
518         """
519
520         if not self.prefetch_enabled:
521             return
522
523         def worker(self):
524             """Background downloader thread."""
525             while True:
526                 try:
527                     b = self._prefetch_queue.get()
528                     if b is None:
529                         return
530                     self._keep.get(b)
531                 except:
532                     pass
533
534         with self.lock:
535             if locator in self._bufferblocks:
536                 return
537             if self._prefetch_threads is None:
538                 self._prefetch_queue = Queue.Queue()
539                 self._prefetch_threads = []
540                 for i in xrange(0, self.num_get_threads):
541                     t = threading.Thread(target=worker, args=(self,))
542                     self._prefetch_threads.append(t)
543                     t.daemon = True
544                     t.start()
545         self._prefetch_queue.put(locator)
546
547
548 class ArvadosFile(object):
549     """ArvadosFile manages the underlying representation of a file in Keep as a
550     sequence of segments spanning a set of blocks, and implements random
551     read/write access.  This object may be accessed from multiple threads.
552
553     """
554
555     def __init__(self, parent, stream=[], segments=[]):
556         """
557         :stream:
558           a list of Range objects representing a block stream
559
560         :segments:
561           a list of Range objects representing segments
562         """
563         self.parent = parent
564         self._modified = True
565         self._segments = []
566         self.lock = parent.root_collection().lock
567         for s in segments:
568             self._add_segment(stream, s.locator, s.range_size)
569         self._current_bblock = None
570
571     def sync_mode(self):
572         return self.parent.sync_mode()
573
574     @synchronized
575     def segments(self):
576         return copy.copy(self._segments)
577
578     @synchronized
579     def clone(self, new_parent):
580         """Make a copy of this file."""
581         cp = ArvadosFile(new_parent)
582
583         map_loc = {}
584         for r in self._segments:
585             new_loc = r.locator
586             if self.parent._my_block_manager().is_bufferblock(r.locator):
587                 if r.locator not in map_loc:
588                     map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid
589                 new_loc = map_loc[r.locator]
590
591             cp._segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
592
593         return cp
594
595     @must_be_writable
596     @synchronized
597     def replace_contents(self, other):
598         """Replace segments of this file with segments from another `ArvadosFile` object."""
599         self._segments = other.segments()
600         self._modified = True
601
602     def __eq__(self, other):
603         if other is self:
604             return True
605         if not isinstance(other, ArvadosFile):
606             return False
607
608         s = other.segments()
609         with self.lock:
610             return self._segments == s
611
612     def __ne__(self, other):
613         return not self.__eq__(other)
614
615     @synchronized
616     def set_unmodified(self):
617         """Clear the modified flag"""
618         self._modified = False
619
620     @synchronized
621     def modified(self):
622         """Test the modified flag"""
623         return self._modified
624
625     @must_be_writable
626     @synchronized
627     def truncate(self, size):
628         """
629         Adjust the size of the file.  If `size` is less than the size of the file,
630         the file contents after `size` will be discarded.  If `size` is greater
631         than the current size of the file, an IOError will be raised.
632         """
633         if size < self.size():
634             new_segs = []
635             for r in self._segments:
636                 range_end = r.range_start+r.range_size
637                 if r.range_start >= size:
638                     # segment is past the trucate size, all done
639                     break
640                 elif size < range_end:
641                     nr = Range(r.locator, r.range_start, size - r.range_start)
642                     nr.segment_offset = r.segment_offset
643                     new_segs.append(nr)
644                     break
645                 else:
646                     new_segs.append(r)
647
648             self._segments = new_segs
649             self._modified = True
650         elif size > self.size():
651             raise IOError("truncate() does not support extending the file size")
652
653     def readfrom(self, offset, size, num_retries):
654         """
655         read upto `size` bytes from the file starting at `offset`.
656         """
657         with self.lock:
658             if size == 0 or offset >= self.size():
659                 return ''
660             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
661             readsegs = locators_and_ranges(self._segments, offset, size)
662
663         for lr in prefetch:
664             self.parent._my_block_manager().block_prefetch(lr.locator)
665
666         data = []
667         for lr in readsegs:
668             d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
669             if d:
670                 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
671             else:
672                 break
673         return ''.join(data)
674
675     def _repack_writes(self):
676         """
677         Test if the buffer block has more data than is referenced by actual segments
678         (this happens when a buffered write over-writes a file range written in
679         a previous buffered write).  Re-pack the buffer block for efficiency
680         and to avoid leaking information.
681         """
682         segs = self._segments
683
684         # Sum up the segments to get the total bytes of the file referencing
685         # into the buffer block.
686         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
687         write_total = sum([s.range_size for s in bufferblock_segs])
688
689         if write_total < self._current_bblock.size():
690             # There is more data in the buffer block than is actually accounted for by segments, so
691             # re-pack into a new buffer by copying over to a new buffer block.
692             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
693             for t in bufferblock_segs:
694                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
695                 t.segment_offset = new_bb.size() - t.range_size
696
697             self._current_bblock = new_bb
698
699     @must_be_writable
700     @synchronized
701     def writeto(self, offset, data, num_retries):
702         """
703         Write `data` to the file starting at `offset`.  This will update
704         existing bytes and/or extend the size of the file as necessary.
705         """
706         if len(data) == 0:
707             return
708
709         if offset > self.size():
710             raise ArgumentError("Offset is past the end of the file")
711
712         if len(data) > config.KEEP_BLOCK_SIZE:
713             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
714
715         self._modified = True
716
717         if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
718             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
719
720         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
721             self._repack_writes()
722             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
723                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
724                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
725
726         self._current_bblock.append(data)
727
728         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
729
730     @must_be_writable
731     @synchronized
732     def add_segment(self, blocks, pos, size):
733         """
734         Add a segment to the end of the file, with `pos` and `offset` referencing a
735         section of the stream described by `blocks` (a list of Range objects)
736         """
737         self._add_segment(blocks, pos, size)
738
739     def _add_segment(self, blocks, pos, size):
740         """
741         (Internal version.)
742         """
743         self._modified = True
744         for lr in locators_and_ranges(blocks, pos, size):
745             last = self._segments[-1] if self._segments else Range(0, 0, 0)
746             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
747             self._segments.append(r)
748
749     @synchronized
750     def size(self):
751         """Get the file size"""
752         if self._segments:
753             n = self._segments[-1]
754             return n.range_start + n.range_size
755         else:
756             return 0
757
758 class ArvadosFileReader(ArvadosFileReaderBase):
759     """Wraps ArvadosFile in a file-like object supporting reading only.  Be aware
760     that this class is NOT thread safe as there is no locking around updating file
761     pointer.
762     """
763
764     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
765         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
766         self.arvadosfile = arvadosfile
767
768     def size(self):
769         return self.arvadosfile.size()
770
771     @ArvadosFileBase._before_close
772     @retry_method
773     def read(self, size, num_retries=None):
774         """Read up to `size` bytes from the stream, starting at the current file position"""
775         data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
776         self._filepos += len(data)
777         return data
778
779     @ArvadosFileBase._before_close
780     @retry_method
781     def readfrom(self, offset, size, num_retries=None):
782         """Read up to `size` bytes from the stream, starting at the current file position"""
783         return self.arvadosfile.readfrom(offset, size, num_retries)
784
785     def flush(self):
786         pass
787
788
789 class ArvadosFileWriter(ArvadosFileReader):
790     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
791     Be aware that this class is NOT thread safe as there is no locking around
792     updating file pointer.
793
794     """
795
796     def __init__(self, arvadosfile, name, mode, num_retries=None):
797         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
798
799     @ArvadosFileBase._before_close
800     @retry_method
801     def write(self, data, num_retries=None):
802         if self.mode[0] == "a":
803             self.arvadosfile.writeto(self.size(), data, num_retries)
804         else:
805             self.arvadosfile.writeto(self._filepos, data, num_retries)
806             self._filepos += len(data)
807
808     @ArvadosFileBase._before_close
809     @retry_method
810     def writelines(self, seq, num_retries=None):
811         for s in seq:
812             self.write(s, num_retries)
813
814     def truncate(self, size=None):
815         if size is None:
816             size = self._filepos
817         self.arvadosfile.truncate(size)
818         if self._filepos > self.size():
819             self._filepos = self.size()
820
821     def close(self):
822         if self.arvadosfile.parent.sync_mode() == SYNC_LIVE:
823             self.arvadosfile.parent.root_collection().save()