4823: Add Collection.copy tests
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 from .ranges import *
6 from arvados.retry import retry_method
7 import config
8 import hashlib
9 import hashlib
10 import threading
11 import Queue
12 import copy
13 import errno
14
15 def split(path):
16     """split(path) -> streamname, filename
17
18     Separate the stream name and file name in a /-separated stream path.
19     If no stream name is available, assume '.'.
20     """
21     try:
22         stream_name, file_name = path.rsplit('/', 1)
23     except ValueError:  # No / in string
24         stream_name, file_name = '.', path
25     return stream_name, file_name
26
27 class ArvadosFileBase(object):
28     def __init__(self, name, mode):
29         self.name = name
30         self.mode = mode
31         self.closed = False
32
33     @staticmethod
34     def _before_close(orig_func):
35         @functools.wraps(orig_func)
36         def wrapper(self, *args, **kwargs):
37             if self.closed:
38                 raise ValueError("I/O operation on closed stream file")
39             return orig_func(self, *args, **kwargs)
40         return wrapper
41
42     def __enter__(self):
43         return self
44
45     def __exit__(self, exc_type, exc_value, traceback):
46         try:
47             self.close()
48         except Exception:
49             if exc_type is None:
50                 raise
51
52     def close(self):
53         self.closed = True
54
55
56 class ArvadosFileReaderBase(ArvadosFileBase):
57     class _NameAttribute(str):
58         # The Python file API provides a plain .name attribute.
59         # Older SDK provided a name() method.
60         # This class provides both, for maximum compatibility.
61         def __call__(self):
62             return self
63
64     def __init__(self, name, mode, num_retries=None):
65         super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
66         self._filepos = 0L
67         self.num_retries = num_retries
68         self._readline_cache = (None, None)
69
70     def __iter__(self):
71         while True:
72             data = self.readline()
73             if not data:
74                 break
75             yield data
76
77     def decompressed_name(self):
78         return re.sub('\.(bz2|gz)$', '', self.name)
79
80     @ArvadosFileBase._before_close
81     def seek(self, pos, whence=os.SEEK_CUR):
82         if whence == os.SEEK_CUR:
83             pos += self._filepos
84         elif whence == os.SEEK_END:
85             pos += self.size()
86         self._filepos = min(max(pos, 0L), self.size())
87
88     def tell(self):
89         return self._filepos
90
91     @ArvadosFileBase._before_close
92     @retry_method
93     def readall(self, size=2**20, num_retries=None):
94         while True:
95             data = self.read(size, num_retries=num_retries)
96             if data == '':
97                 break
98             yield data
99
100     @ArvadosFileBase._before_close
101     @retry_method
102     def readline(self, size=float('inf'), num_retries=None):
103         cache_pos, cache_data = self._readline_cache
104         if self.tell() == cache_pos:
105             data = [cache_data]
106         else:
107             data = ['']
108         data_size = len(data[-1])
109         while (data_size < size) and ('\n' not in data[-1]):
110             next_read = self.read(2 ** 20, num_retries=num_retries)
111             if not next_read:
112                 break
113             data.append(next_read)
114             data_size += len(next_read)
115         data = ''.join(data)
116         try:
117             nextline_index = data.index('\n') + 1
118         except ValueError:
119             nextline_index = len(data)
120         nextline_index = min(nextline_index, size)
121         self._readline_cache = (self.tell(), data[nextline_index:])
122         return data[:nextline_index]
123
124     @ArvadosFileBase._before_close
125     @retry_method
126     def decompress(self, decompress, size, num_retries=None):
127         for segment in self.readall(size, num_retries):
128             data = decompress(segment)
129             if data:
130                 yield data
131
132     @ArvadosFileBase._before_close
133     @retry_method
134     def readall_decompressed(self, size=2**20, num_retries=None):
135         self.seek(0)
136         if self.name.endswith('.bz2'):
137             dc = bz2.BZ2Decompressor()
138             return self.decompress(dc.decompress, size,
139                                    num_retries=num_retries)
140         elif self.name.endswith('.gz'):
141             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
142             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
143                                    size, num_retries=num_retries)
144         else:
145             return self.readall(size, num_retries=num_retries)
146
147     @ArvadosFileBase._before_close
148     @retry_method
149     def readlines(self, sizehint=float('inf'), num_retries=None):
150         data = []
151         data_size = 0
152         for s in self.readall(num_retries=num_retries):
153             data.append(s)
154             data_size += len(s)
155             if data_size >= sizehint:
156                 break
157         return ''.join(data).splitlines(True)
158
159     def size(self):
160         raise NotImplementedError()
161
162     def read(self, size, num_retries=None):
163         raise NotImplementedError()
164
165     def readfrom(self, start, size, num_retries=None):
166         raise NotImplementedError()
167
168
169 class StreamFileReader(ArvadosFileReaderBase):
170     def __init__(self, stream, segments, name):
171         super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
172         self._stream = stream
173         self.segments = segments
174
175     def stream_name(self):
176         return self._stream.name()
177
178     def size(self):
179         n = self.segments[-1]
180         return n.range_start + n.range_size
181
182     @ArvadosFileBase._before_close
183     @retry_method
184     def read(self, size, num_retries=None):
185         """Read up to 'size' bytes from the stream, starting at the current file position"""
186         if size == 0:
187             return ''
188
189         data = ''
190         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
191         if available_chunks:
192             lr = available_chunks[0]
193             data = self._stream._readfrom(lr.locator+lr.segment_offset,
194                                           lr.segment_size,
195                                           num_retries=num_retries)
196
197         self._filepos += len(data)
198         return data
199
200     @ArvadosFileBase._before_close
201     @retry_method
202     def readfrom(self, start, size, num_retries=None):
203         """Read up to 'size' bytes from the stream, starting at 'start'"""
204         if size == 0:
205             return ''
206
207         data = []
208         for lr in locators_and_ranges(self.segments, start, size):
209             data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
210                                               num_retries=num_retries))
211         return ''.join(data)
212
213     def as_manifest(self):
214         from stream import normalize_stream
215         segs = []
216         for r in self.segments:
217             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
218         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
219
220
221 class BufferBlock(object):
222     """
223     A BufferBlock is a stand-in for a Keep block that is in the process of being
224     written.  Writers can append to it, get the size, and compute the Keep locator.
225
226     There are three valid states:
227
228     WRITABLE
229       Can append to block.
230
231     PENDING
232       Block is in the process of being uploaded to Keep, append is an error.
233
234     COMMITTED
235       The block has been written to Keep, its internal buffer has been
236       released, fetching the block will fetch it via keep client (since we
237       discarded the internal copy), and identifiers referring to the BufferBlock
238       can be replaced with the block locator.
239     """
240     WRITABLE = 0
241     PENDING = 1
242     COMMITTED = 2
243
244     def __init__(self, blockid, starting_capacity, owner):
245         """
246         :blockid:
247           the identifier for this block
248
249         :starting_capacity:
250           the initial buffer capacity
251
252         :owner:
253           ArvadosFile that owns this block
254         """
255         self.blockid = blockid
256         self.buffer_block = bytearray(starting_capacity)
257         self.buffer_view = memoryview(self.buffer_block)
258         self.write_pointer = 0
259         self.state = BufferBlock.WRITABLE
260         self._locator = None
261         self.owner = owner
262
263     def append(self, data):
264         """
265         Append some data to the buffer.  Only valid if the block is in WRITABLE
266         state.  Implements an expanding buffer, doubling capacity as needed to
267         accomdate all the data.
268         """
269         if self.state == BufferBlock.WRITABLE:
270             while (self.write_pointer+len(data)) > len(self.buffer_block):
271                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
272                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
273                 self.buffer_block = new_buffer_block
274                 self.buffer_view = memoryview(self.buffer_block)
275             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
276             self.write_pointer += len(data)
277             self._locator = None
278         else:
279             raise AssertionError("Buffer block is not writable")
280
281     def size(self):
282         """Amount of data written to the buffer"""
283         return self.write_pointer
284
285     def locator(self):
286         """The Keep locator for this buffer's contents."""
287         if self._locator is None:
288             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
289         return self._locator
290
291
292 class AsyncKeepWriteErrors(Exception):
293     """
294     Roll up one or more Keep write exceptions (generated by background
295     threads) into a single one.
296     """
297     def __init__(self, errors):
298         self.errors = errors
299
300     def __repr__(self):
301         return "\n".join(self.errors)
302
303 def _synchronized(orig_func):
304     @functools.wraps(orig_func)
305     def wrapper(self, *args, **kwargs):
306         with self.lock:
307             return orig_func(self, *args, **kwargs)
308     return wrapper
309
310 class NoopLock(object):
311     def __enter__(self):
312         return self
313
314     def __exit__(self, exc_type, exc_value, traceback):
315         pass
316
317     def acquire(self, blocking=False):
318         pass
319
320     def release(self):
321         pass
322
323 SYNC_READONLY = 1
324 SYNC_EXPLICIT = 2
325 SYNC_LIVE = 3
326
327 def _must_be_writable(orig_func):
328     # Decorator for methods that read actual Collection data.
329     @functools.wraps(orig_func)
330     def wrapper(self, *args, **kwargs):
331         if self.sync_mode() == SYNC_READONLY:
332             raise IOError((errno.EROFS, "Collection is read only"))
333         return orig_func(self, *args, **kwargs)
334     return wrapper
335
336
337 class BlockManager(object):
338     """
339     BlockManager handles buffer blocks, background block uploads, and
340     background block prefetch for a Collection of ArvadosFiles.
341     """
342     def __init__(self, keep):
343         """keep: KeepClient object to use"""
344         self._keep = keep
345         self._bufferblocks = {}
346         self._put_queue = None
347         self._put_errors = None
348         self._put_threads = None
349         self._prefetch_queue = None
350         self._prefetch_threads = None
351         self.lock = threading.Lock()
352         self.prefetch_enabled = True
353         self.num_put_threads = 2
354         self.num_get_threads = 2
355
356     @_synchronized
357     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
358         """
359         Allocate a new, empty bufferblock in WRITABLE state and return it.
360
361         :blockid:
362           optional block identifier, otherwise one will be automatically assigned
363
364         :starting_capacity:
365           optional capacity, otherwise will use default capacity
366
367         :owner:
368           ArvadosFile that owns this block
369         """
370         if blockid is None:
371             blockid = "bufferblock%i" % len(self._bufferblocks)
372         bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
373         self._bufferblocks[bb.blockid] = bb
374         return bb
375
376     @_synchronized
377     def dup_block(self, blockid, owner):
378         """
379         Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
380
381         :blockid:
382           the block to copy.  May be an existing buffer block id.
383
384         :owner:
385           ArvadosFile that owns the new block
386         """
387         new_blockid = "bufferblock%i" % len(self._bufferblocks)
388         block = self._bufferblocks[blockid]
389         bb = BufferBlock(new_blockid, len(block), owner)
390         bb.append(block)
391         self._bufferblocks[bb.blockid] = bb
392         return bb
393
394     @_synchronized
395     def is_bufferblock(self, id):
396         return id in self._bufferblocks
397
398     @_synchronized
399     def stop_threads(self):
400         """
401         Shut down and wait for background upload and download threads to finish.
402         """
403         if self._put_threads is not None:
404             for t in self._put_threads:
405                 self._put_queue.put(None)
406             for t in self._put_threads:
407                 t.join()
408         self._put_threads = None
409         self._put_queue = None
410         self._put_errors = None
411
412         if self._prefetch_threads is not None:
413             for t in self._prefetch_threads:
414                 self._prefetch_queue.put(None)
415             for t in self._prefetch_threads:
416                 t.join()
417         self._prefetch_threads = None
418         self._prefetch_queue = None
419
420     def commit_bufferblock(self, block):
421         """
422         Initiate a background upload of a bufferblock.  This will block if the
423         upload queue is at capacity, otherwise it will return immediately.
424         """
425
426         def worker(self):
427             """
428             Background uploader thread.
429             """
430             while True:
431                 try:
432                     b = self._put_queue.get()
433                     if b is None:
434                         return
435                     b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
436                     b.state = BufferBlock.COMMITTED
437                     b.buffer_view = None
438                     b.buffer_block = None
439                 except Exception as e:
440                     print e
441                     self._put_errors.put(e)
442                 finally:
443                     if self._put_queue is not None:
444                         self._put_queue.task_done()
445
446         with self.lock:
447             if self._put_threads is None:
448                 # Start uploader threads.
449
450                 # If we don't limit the Queue size, the upload queue can quickly
451                 # grow to take up gigabytes of RAM if the writing process is
452                 # generating data more quickly than it can be send to the Keep
453                 # servers.
454                 #
455                 # With two upload threads and a queue size of 2, this means up to 4
456                 # blocks pending.  If they are full 64 MiB blocks, that means up to
457                 # 256 MiB of internal buffering, which is the same size as the
458                 # default download block cache in KeepClient.
459                 self._put_queue = Queue.Queue(maxsize=2)
460                 self._put_errors = Queue.Queue()
461
462                 self._put_threads = []
463                 for i in xrange(0, self.num_put_threads):
464                     t = threading.Thread(target=worker, args=(self,))
465                     self._put_threads.append(t)
466                     t.daemon = True
467                     t.start()
468
469         # Mark the block as PENDING so to disallow any more appends.
470         block.state = BufferBlock.PENDING
471         self._put_queue.put(block)
472
473     def get_block(self, locator, num_retries, cache_only=False):
474         """
475         Fetch a block.  First checks to see if the locator is a BufferBlock and
476         return that, if not, passes the request through to KeepClient.get().
477         """
478         with self.lock:
479             if locator in self._bufferblocks:
480                 bb = self._bufferblocks[locator]
481                 if bb.state != BufferBlock.COMMITTED:
482                     return bb.buffer_view[0:bb.write_pointer].tobytes()
483                 else:
484                     locator = bb._locator
485         return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
486
487     def commit_all(self):
488         """
489         Commit all outstanding buffer blocks.  Unlike commit_bufferblock(), this
490         is a synchronous call, and will not return until all buffer blocks are
491         uploaded.  Raises AsyncKeepWriteErrors() if any blocks failed to
492         upload.
493         """
494         with self.lock:
495             items = self._bufferblocks.items()
496
497         for k,v in items:
498             if v.state == BufferBlock.WRITABLE:
499                 self.commit_bufferblock(v)
500
501         with self.lock:
502             if self._put_queue is not None:
503                 self._put_queue.join()
504                 if not self._put_errors.empty():
505                     e = []
506                     try:
507                         while True:
508                             e.append(self._put_errors.get(False))
509                     except Queue.Empty:
510                         pass
511                     raise AsyncKeepWriteErrors(e)
512
513     def block_prefetch(self, locator):
514         """
515         Initiate a background download of a block.  This assumes that the
516         underlying KeepClient implements a block cache, so repeated requests
517         for the same block will not result in repeated downloads (unless the
518         block is evicted from the cache.)  This method does not block.
519         """
520
521         if not self.prefetch_enabled:
522             return
523
524         def worker(self):
525             """Background downloader thread."""
526             while True:
527                 try:
528                     b = self._prefetch_queue.get()
529                     if b is None:
530                         return
531                     self._keep.get(b)
532                 except:
533                     pass
534
535         with self.lock:
536             if locator in self._bufferblocks:
537                 return
538             if self._prefetch_threads is None:
539                 self._prefetch_queue = Queue.Queue()
540                 self._prefetch_threads = []
541                 for i in xrange(0, self.num_get_threads):
542                     t = threading.Thread(target=worker, args=(self,))
543                     self._prefetch_threads.append(t)
544                     t.daemon = True
545                     t.start()
546         self._prefetch_queue.put(locator)
547
548
549 class ArvadosFile(object):
550     """
551     ArvadosFile manages the underlying representation of a file in Keep as a sequence of
552     segments spanning a set of blocks, and implements random read/write access.
553     """
554
555     def __init__(self, parent, stream=[], segments=[]):
556         """
557         :stream:
558           a list of Range objects representing a block stream
559
560         :segments:
561           a list of Range objects representing segments
562         """
563         self.parent = parent
564         self._modified = True
565         self._segments = []
566         for s in segments:
567             self._add_segment(stream, s.locator, s.range_size)
568         self._current_bblock = None
569         if parent.sync_mode() == SYNC_READONLY:
570             self.lock = NoopLock()
571         else:
572             self.lock = threading.Lock()
573
574     def sync_mode(self):
575         return self.parent.sync_mode()
576
577     @_synchronized
578     def segments(self):
579         return copy.copy(self._segments)
580
581     @_synchronized
582     def clone(self, new_parent):
583         """Make a copy of this file."""
584         cp = ArvadosFile(new_parent)
585         cp._modified = False
586
587         map_loc = {}
588         for r in self._segments:
589             new_loc = r.locator
590             if self.parent._my_block_manager().is_bufferblock(r.locator):
591                 if r.locator not in map_loc:
592                     map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid
593                 new_loc = map_loc[r.locator]
594
595             cp._segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
596
597         return cp
598
599     @_synchronized
600     def __eq__(self, other):
601         if type(other) != ArvadosFile:
602             return False
603         return self._segments == other.segments()
604
605     def __neq__(self, other):
606         return not self.__eq__(other)
607
608     @_synchronized
609     def set_unmodified(self):
610         """Clear the modified flag"""
611         self._modified = False
612
613     @_synchronized
614     def modified(self):
615         """Test the modified flag"""
616         return self._modified
617
618     @_must_be_writable
619     @_synchronized
620     def truncate(self, size):
621         """
622         Adjust the size of the file.  If `size` is less than the size of the file,
623         the file contents after `size` will be discarded.  If `size` is greater
624         than the current size of the file, an IOError will be raised.
625         """
626         if size < self._size():
627             new_segs = []
628             for r in self._segments:
629                 range_end = r.range_start+r.range_size
630                 if r.range_start >= size:
631                     # segment is past the trucate size, all done
632                     break
633                 elif size < range_end:
634                     nr = Range(r.locator, r.range_start, size - r.range_start)
635                     nr.segment_offset = r.segment_offset
636                     new_segs.append(nr)
637                     break
638                 else:
639                     new_segs.append(r)
640
641             self._segments = new_segs
642             self._modified = True
643         elif size > self._size():
644             raise IOError("truncate() does not support extending the file size")
645
646     @_synchronized
647     def readfrom(self, offset, size, num_retries):
648         """
649         read upto `size` bytes from the file starting at `offset`.
650         """
651         if size == 0 or offset >= self._size():
652             return ''
653         data = []
654
655         for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE):
656             self.parent._my_block_manager().block_prefetch(lr.locator)
657
658         for lr in locators_and_ranges(self._segments, offset, size):
659             d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
660             if d:
661                 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
662             else:
663                 break
664         return ''.join(data)
665
666     def _repack_writes(self):
667         """
668         Test if the buffer block has more data than is referenced by actual segments
669         (this happens when a buffered write over-writes a file range written in
670         a previous buffered write).  Re-pack the buffer block for efficiency
671         and to avoid leaking information.
672         """
673         segs = self._segments
674
675         # Sum up the segments to get the total bytes of the file referencing
676         # into the buffer block.
677         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
678         write_total = sum([s.range_size for s in bufferblock_segs])
679
680         if write_total < self._current_bblock.size():
681             # There is more data in the buffer block than is actually accounted for by segments, so
682             # re-pack into a new buffer by copying over to a new buffer block.
683             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
684             for t in bufferblock_segs:
685                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
686                 t.segment_offset = new_bb.size() - t.range_size
687
688             self._current_bblock = new_bb
689
690     @_must_be_writable
691     @_synchronized
692     def writeto(self, offset, data, num_retries):
693         """
694         Write `data` to the file starting at `offset`.  This will update
695         existing bytes and/or extend the size of the file as necessary.
696         """
697         if len(data) == 0:
698             return
699
700         if offset > self._size():
701             raise ArgumentError("Offset is past the end of the file")
702
703         if len(data) > config.KEEP_BLOCK_SIZE:
704             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
705
706         self._modified = True
707
708         if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
709             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
710
711         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
712             self._repack_writes()
713             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
714                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
715                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
716
717         self._current_bblock.append(data)
718
719         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
720
721     @_must_be_writable
722     @_synchronized
723     def add_segment(self, blocks, pos, size):
724         # Synchronized public api, see _add_segment
725         self._add_segment(blocks, pos, size)
726
727     def _add_segment(self, blocks, pos, size):
728         """
729         Add a segment to the end of the file, with `pos` and `offset` referencing a
730         section of the stream described by `blocks` (a list of Range objects)
731         """
732         self._modified = True
733         for lr in locators_and_ranges(blocks, pos, size):
734             last = self._segments[-1] if self._segments else Range(0, 0, 0)
735             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
736             self._segments.append(r)
737
738     def _size(self):
739         """Get the file size"""
740         if self._segments:
741             n = self._segments[-1]
742             return n.range_start + n.range_size
743         else:
744             return 0
745
746     @_synchronized
747     def size(self):
748         """Get the file size"""
749         return self._size()
750
751 class ArvadosFileReader(ArvadosFileReaderBase):
752     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
753         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
754         self.arvadosfile = arvadosfile
755
756     def size(self):
757         return self.arvadosfile.size()
758
759     @ArvadosFileBase._before_close
760     @retry_method
761     def read(self, size, num_retries=None):
762         """Read up to `size` bytes from the stream, starting at the current file position"""
763         data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
764         self._filepos += len(data)
765         return data
766
767     @ArvadosFileBase._before_close
768     @retry_method
769     def readfrom(self, offset, size, num_retries=None):
770         """Read up to `size` bytes from the stream, starting at the current file position"""
771         return self.arvadosfile.readfrom(offset, size, num_retries)
772
773     def flush(self):
774         pass
775
776
777 class ArvadosFileWriter(ArvadosFileReader):
778     def __init__(self, arvadosfile, name, mode, num_retries=None):
779         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
780
781     @ArvadosFileBase._before_close
782     @retry_method
783     def write(self, data, num_retries=None):
784         if self.mode[0] == "a":
785             self.arvadosfile.writeto(self.size(), data)
786         else:
787             self.arvadosfile.writeto(self._filepos, data, num_retries)
788             self._filepos += len(data)
789
790     @ArvadosFileBase._before_close
791     @retry_method
792     def writelines(self, seq, num_retries=None):
793         for s in seq:
794             self.write(s)
795
796     def truncate(self, size=None):
797         if size is None:
798             size = self._filepos
799         self.arvadosfile.truncate(size)
800         if self._filepos > self.size():
801             self._filepos = self.size()