Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 from .ranges import *
6 from arvados.retry import retry_method
7 import config
8 import hashlib
9 import hashlib
10 import threading
11 import Queue
12 import copy
13
14 def split(path):
15     """split(path) -> streamname, filename
16
17     Separate the stream name and file name in a /-separated stream path.
18     If no stream name is available, assume '.'.
19     """
20     try:
21         stream_name, file_name = path.rsplit('/', 1)
22     except ValueError:  # No / in string
23         stream_name, file_name = '.', path
24     return stream_name, file_name
25
26 class ArvadosFileBase(object):
27     def __init__(self, name, mode):
28         self.name = name
29         self.mode = mode
30         self.closed = False
31
32     @staticmethod
33     def _before_close(orig_func):
34         @functools.wraps(orig_func)
35         def wrapper(self, *args, **kwargs):
36             if self.closed:
37                 raise ValueError("I/O operation on closed stream file")
38             return orig_func(self, *args, **kwargs)
39         return wrapper
40
41     def __enter__(self):
42         return self
43
44     def __exit__(self, exc_type, exc_value, traceback):
45         try:
46             self.close()
47         except Exception:
48             if exc_type is None:
49                 raise
50
51     def close(self):
52         self.closed = True
53
54
55 class ArvadosFileReaderBase(ArvadosFileBase):
56     class _NameAttribute(str):
57         # The Python file API provides a plain .name attribute.
58         # Older SDK provided a name() method.
59         # This class provides both, for maximum compatibility.
60         def __call__(self):
61             return self
62
63     def __init__(self, name, mode, num_retries=None):
64         super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
65         self._filepos = 0L
66         self.num_retries = num_retries
67         self._readline_cache = (None, None)
68
69     def __iter__(self):
70         while True:
71             data = self.readline()
72             if not data:
73                 break
74             yield data
75
76     def decompressed_name(self):
77         return re.sub('\.(bz2|gz)$', '', self.name)
78
79     @ArvadosFileBase._before_close
80     def seek(self, pos, whence=os.SEEK_CUR):
81         if whence == os.SEEK_CUR:
82             pos += self._filepos
83         elif whence == os.SEEK_END:
84             pos += self.size()
85         self._filepos = min(max(pos, 0L), self.size())
86
87     def tell(self):
88         return self._filepos
89
90     @ArvadosFileBase._before_close
91     @retry_method
92     def readall(self, size=2**20, num_retries=None):
93         while True:
94             data = self.read(size, num_retries=num_retries)
95             if data == '':
96                 break
97             yield data
98
99     @ArvadosFileBase._before_close
100     @retry_method
101     def readline(self, size=float('inf'), num_retries=None):
102         cache_pos, cache_data = self._readline_cache
103         if self.tell() == cache_pos:
104             data = [cache_data]
105         else:
106             data = ['']
107         data_size = len(data[-1])
108         while (data_size < size) and ('\n' not in data[-1]):
109             next_read = self.read(2 ** 20, num_retries=num_retries)
110             if not next_read:
111                 break
112             data.append(next_read)
113             data_size += len(next_read)
114         data = ''.join(data)
115         try:
116             nextline_index = data.index('\n') + 1
117         except ValueError:
118             nextline_index = len(data)
119         nextline_index = min(nextline_index, size)
120         self._readline_cache = (self.tell(), data[nextline_index:])
121         return data[:nextline_index]
122
123     @ArvadosFileBase._before_close
124     @retry_method
125     def decompress(self, decompress, size, num_retries=None):
126         for segment in self.readall(size, num_retries):
127             data = decompress(segment)
128             if data:
129                 yield data
130
131     @ArvadosFileBase._before_close
132     @retry_method
133     def readall_decompressed(self, size=2**20, num_retries=None):
134         self.seek(0)
135         if self.name.endswith('.bz2'):
136             dc = bz2.BZ2Decompressor()
137             return self.decompress(dc.decompress, size,
138                                    num_retries=num_retries)
139         elif self.name.endswith('.gz'):
140             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
141             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
142                                    size, num_retries=num_retries)
143         else:
144             return self.readall(size, num_retries=num_retries)
145
146     @ArvadosFileBase._before_close
147     @retry_method
148     def readlines(self, sizehint=float('inf'), num_retries=None):
149         data = []
150         data_size = 0
151         for s in self.readall(num_retries=num_retries):
152             data.append(s)
153             data_size += len(s)
154             if data_size >= sizehint:
155                 break
156         return ''.join(data).splitlines(True)
157
158     def size(self):
159         raise NotImplementedError()
160
161     def read(self, size, num_retries=None):
162         raise NotImplementedError()
163
164     def readfrom(self, start, size, num_retries=None):
165         raise NotImplementedError()
166
167
168 class StreamFileReader(ArvadosFileReaderBase):
169     def __init__(self, stream, segments, name):
170         super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
171         self._stream = stream
172         self.segments = segments
173
174     def stream_name(self):
175         return self._stream.name()
176
177     def size(self):
178         n = self.segments[-1]
179         return n.range_start + n.range_size
180
181     @ArvadosFileBase._before_close
182     @retry_method
183     def read(self, size, num_retries=None):
184         """Read up to 'size' bytes from the stream, starting at the current file position"""
185         if size == 0:
186             return ''
187
188         data = ''
189         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
190         if available_chunks:
191             lr = available_chunks[0]
192             data = self._stream._readfrom(lr.locator+lr.segment_offset,
193                                           lr.segment_size,
194                                           num_retries=num_retries)
195
196         self._filepos += len(data)
197         return data
198
199     @ArvadosFileBase._before_close
200     @retry_method
201     def readfrom(self, start, size, num_retries=None):
202         """Read up to 'size' bytes from the stream, starting at 'start'"""
203         if size == 0:
204             return ''
205
206         data = []
207         for lr in locators_and_ranges(self.segments, start, size):
208             data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
209                                               num_retries=num_retries))
210         return ''.join(data)
211
212     def as_manifest(self):
213         from stream import normalize_stream
214         segs = []
215         for r in self.segments:
216             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
217         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
218
219
220 class BufferBlock(object):
221 """
222 A BufferBlock is a stand-in for a Keep block that is in the process of being
223 written.  Writers can append to it, get the size, and compute the Keep locator.
224
225 There are three valid states:
226
227 WRITABLE
228   Can append to block.
229
230 PENDING
231   Block is in the process of being uploaded to Keep, append is an error.
232
233 COMMITTED
234   The block has been written to Keep, its internal buffer has been
235   released, fetching the block will fetch it via keep client (since we
236   discarded the internal copy), and identifiers referring to the BufferBlock
237   can be replaced with the block locator.
238 """
239     WRITABLE = 0
240     PENDING = 1
241     COMMITTED = 2
242
243     def __init__(self, blockid, starting_capacity, owner):
244         """
245         :blockid:
246           the identifier for this block
247
248         :starting_capacity:
249           the initial buffer capacity
250
251         :owner:
252           ArvadosFile that owns this block
253         """
254         self.blockid = blockid
255         self.buffer_block = bytearray(starting_capacity)
256         self.buffer_view = memoryview(self.buffer_block)
257         self.write_pointer = 0
258         self.state = BufferBlock.WRITABLE
259         self._locator = None
260         self.owner = owner
261
262     def append(self, data):
263         """
264         Append some data to the buffer.  Only valid if the block is in WRITABLE
265         state.  Implements an expanding buffer, doubling capacity as needed to
266         accomdate all the data.
267         """
268         if self.state == BufferBlock.WRITABLE:
269             while (self.write_pointer+len(data)) > len(self.buffer_block):
270                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
271                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
272                 self.buffer_block = new_buffer_block
273                 self.buffer_view = memoryview(self.buffer_block)
274             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
275             self.write_pointer += len(data)
276             self._locator = None
277         else:
278             raise AssertionError("Buffer block is not writable")
279
280     def size(self):
281         """Amount of data written to the buffer"""
282         return self.write_pointer
283
284     def locator(self):
285         """The Keep locator for this buffer's contents."""
286         if self._locator is None:
287             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
288         return self._locator
289
290
291 class AsyncKeepWriteErrors(Exception):
292     """
293     Roll up one or more Keep write exceptions (generated by background
294     threads) into a single one.
295     """
296     def __init__(self, errors):
297         self.errors = errors
298
299     def __repr__(self):
300         return "\n".join(self.errors)
301
302 def _synchronized(orig_func):
303     @functools.wraps(orig_func)
304     def wrapper(self, *args, **kwargs):
305         with self.lock:
306             return orig_func(self, *args, **kwargs)
307     return wrapper
308
309 class NoopLock(object):
310     def __enter__(self):
311         return self
312
313     def __exit__(self, exc_type, exc_value, traceback):
314         pass
315
316     def acquire(self, blocking=False):
317         pass
318
319     def release(self):
320         pass
321
322 def _must_be_writable(orig_func):
323     # Decorator for methods that read actual Collection data.
324     @functools.wraps(orig_func)
325     def wrapper(self, *args, **kwargs):
326         if self.sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
327             raise IOError((errno.EROFS, "Collection is read only"))
328         return orig_func(self, *args, **kwargs)
329     return wrapper
330
331
332 class BlockManager(object):
333     """
334     BlockManager handles buffer blocks, background block uploads, and
335     background block prefetch for a Collection of ArvadosFiles.
336     """
337     def __init__(self, keep):
338         """keep: KeepClient object to use"""
339         self._keep = keep
340         self._bufferblocks = {}
341         self._put_queue = None
342         self._put_errors = None
343         self._put_threads = None
344         self._prefetch_queue = None
345         self._prefetch_threads = None
346         self.lock = threading.Lock()
347
348     @_synchronized
349     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
350         """
351         Allocate a new, empty bufferblock in WRITABLE state and return it.
352
353         :blockid:
354           optional block identifier, otherwise one will be automatically assigned
355
356         :starting_capacity:
357           optional capacity, otherwise will use default capacity
358
359         :owner:
360           ArvadosFile that owns this block
361         """
362         if blockid is None:
363             blockid = "bufferblock%i" % len(self._bufferblocks)
364         bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
365         self._bufferblocks[bb.blockid] = bb
366         return bb
367
368     @_synchronized
369     def dup_block(self, blockid, owner):
370         """
371         Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
372
373         :blockid:
374           the block to copy.  May be an existing buffer block id.
375
376         :owner:
377           ArvadosFile that owns the new block
378         """
379         new_blockid = "bufferblock%i" % len(self._bufferblocks)
380         block = self._bufferblocks[blockid]
381         bb = BufferBlock(new_blockid, len(block), owner)
382         bb.append(block)
383         self._bufferblocks[bb.blockid] = bb
384         return bb
385
386     @_synchronized
387     def is_bufferblock(self, id):
388         return id in self._bufferblocks
389
390     @_synchronized
391     def stop_threads(self):
392         """
393         Shut down and wait for background upload and download threads to finish.
394         """
395         if self._put_threads is not None:
396             for t in self._put_threads:
397                 self._put_queue.put(None)
398             for t in self._put_threads:
399                 t.join()
400         self._put_threads = None
401         self._put_queue = None
402         self._put_errors = None
403
404         if self._prefetch_threads is not None:
405             for t in self._prefetch_threads:
406                 self._prefetch_queue.put(None)
407             for t in self._prefetch_threads:
408                 t.join()
409         self._prefetch_threads = None
410         self._prefetch_queue = None
411
412     def commit_bufferblock(self, block):
413         """
414         Initiate a background upload of a bufferblock.  This will block if the
415         upload queue is at capacity, otherwise it will return immediately.
416         """
417
418         def worker(self):
419             """
420             Background uploader thread.
421             """
422             while True:
423                 try:
424                     b = self._put_queue.get()
425                     if b is None:
426                         return
427                     b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
428                     b.state = BufferBlock.COMMITTED
429                     b.buffer_view = None
430                     b.buffer_block = None
431                 except Exception as e:
432                     print e
433                     self._put_errors.put(e)
434                 finally:
435                     if self._put_queue is not None:
436                         self._put_queue.task_done()
437
438         with self.lock:
439             if self._put_threads is None:
440                 # Start uploader threads.
441
442                 # If we don't limit the Queue size, the upload queue can quickly
443                 # grow to take up gigabytes of RAM if the writing process is
444                 # generating data more quickly than it can be send to the Keep
445                 # servers.
446                 #
447                 # With two upload threads and a queue size of 2, this means up to 4
448                 # blocks pending.  If they are full 64 MiB blocks, that means up to
449                 # 256 MiB of internal buffering, which is the same size as the
450                 # default download block cache in KeepClient.
451                 self._put_queue = Queue.Queue(maxsize=2)
452                 self._put_errors = Queue.Queue()
453                 self._put_threads = [threading.Thread(target=worker, args=(self,)),
454                                      threading.Thread(target=worker, args=(self,))]
455                 for t in self._put_threads:
456                     t.daemon = True
457                     t.start()
458
459         # Mark the block as PENDING so to disallow any more appends.
460         block.state = BufferBlock.PENDING
461         self._put_queue.put(block)
462
463     def get_block(self, locator, num_retries, cache_only=False):
464         """
465         Fetch a block.  First checks to see if the locator is a BufferBlock and
466         return that, if not, passes the request through to KeepClient.get().
467         """
468         with self.lock:
469             if locator in self._bufferblocks:
470                 bb = self._bufferblocks[locator]
471                 if bb.state != BufferBlock.COMMITTED:
472                     return bb.buffer_view[0:bb.write_pointer].tobytes()
473                 else:
474                     locator = bb._locator
475         return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
476
477     def commit_all(self):
478         """
479         Commit all outstanding buffer blocks.  Unlike commit_bufferblock(), this
480         is a synchronous call, and will not return until all buffer blocks are
481         uploaded.  Raises AsyncKeepWriteErrors() if any blocks failed to
482         upload.
483         """
484         with self.lock:
485             items = self._bufferblocks.items()
486
487         for k,v in items:
488             if v.state == BufferBlock.WRITABLE:
489                 self.commit_bufferblock(v)
490
491         with self.lock:
492             if self._put_queue is not None:
493                 self._put_queue.join()
494                 if not self._put_errors.empty():
495                     e = []
496                     try:
497                         while True:
498                             e.append(self._put_errors.get(False))
499                     except Queue.Empty:
500                         pass
501                     raise AsyncKeepWriteErrors(e)
502
503     def block_prefetch(self, locator):
504         """
505         Initiate a background download of a block.  This assumes that the
506         underlying KeepClient implements a block cache, so repeated requests
507         for the same block will not result in repeated downloads (unless the
508         block is evicted from the cache.)  This method does not block.
509         """
510         def worker(self):
511             """Background downloader thread."""
512             while True:
513                 try:
514                     b = self._prefetch_queue.get()
515                     if b is None:
516                         return
517                     self._keep.get(b)
518                 except:
519                     pass
520
521         with self.lock:
522             if locator in self._bufferblocks:
523                 return
524             if self._prefetch_threads is None:
525                 self._prefetch_queue = Queue.Queue()
526                 self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
527                                           threading.Thread(target=worker, args=(self,))]
528                 for t in self._prefetch_threads:
529                     t.daemon = True
530                     t.start()
531         self._prefetch_queue.put(locator)
532
533
534 class ArvadosFile(object):
535     """
536     ArvadosFile manages the underlying representation of a file in Keep as a sequence of
537     segments spanning a set of blocks, and implements random read/write access.
538     """
539
540     def __init__(self, parent, stream=[], segments=[]):
541         """
542         :stream:
543           a list of Range objects representing a block stream
544
545         :segments:
546           a list of Range objects representing segments
547         """
548         self.parent = parent
549         self._modified = True
550         self._segments = []
551         for s in segments:
552             self._add_segment(stream, s.locator, s.range_size)
553         self._current_bblock = None
554         if parent.sync_mode() == SYNC_READONLY:
555             self.lock = NoopLock()
556         else:
557             self.lock = threading.Lock()
558
559     def sync_mode(self):
560         return self.parent.sync_mode()
561
562     @_synchronized
563     def segments(self):
564         return copy.copy(self._segments)
565
566     @_synchronized
567     def clone(self, new_parent):
568         """Make a copy of this file."""
569         cp = ArvadosFile()
570         cp.parent = new_parent
571         cp._modified = False
572
573         map_loc = {}
574         for r in self._segments:
575             new_loc = r.locator
576             if self.parent._my_block_manager().is_bufferblock(r.locator):
577                 if r.locator not in map_loc:
578                     map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid
579                 new_loc = map_loc[r.locator]
580
581             cp.segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
582
583         return cp
584
585     @_synchronized
586     def __eq__(self, other):
587         if type(other) != ArvadosFile:
588             return False
589         return self._segments == other.segments()
590
591     def __neq__(self, other):
592         return not self.__eq__(other)
593
594     @_synchronized
595     def set_unmodified(self):
596         """Clear the modified flag"""
597         self._modified = False
598
599     @_synchronized
600     def modified(self):
601         """Test the modified flag"""
602         return self._modified
603
604     @_must_be_writable
605     @_synchronized
606     def truncate(self, size):
607         """
608         Adjust the size of the file.  If `size` is less than the size of the file,
609         the file contents after `size` will be discarded.  If `size` is greater
610         than the current size of the file, an IOError will be raised.
611         """
612         if size < self.size():
613             new_segs = []
614             for r in self._segments:
615                 range_end = r.range_start+r.range_size
616                 if r.range_start >= size:
617                     # segment is past the trucate size, all done
618                     break
619                 elif size < range_end:
620                     nr = Range(r.locator, r.range_start, size - r.range_start)
621                     nr.segment_offset = r.segment_offset
622                     new_segs.append(nr)
623                     break
624                 else:
625                     new_segs.append(r)
626
627             self._segments = new_segs
628             self._modified = True
629         elif size > self.size():
630             raise IOError("truncate() does not support extending the file size")
631
632     @_synchronized
633     def readfrom(self, offset, size, num_retries):
634         """
635         read upto `size` bytes from the file starting at `offset`.
636         """
637         if size == 0 or offset >= self.size():
638             return ''
639         data = []
640
641         for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE):
642             self.parent._my_block_manager().block_prefetch(lr.locator)
643
644         for lr in locators_and_ranges(self._segments, offset, size):
645             d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
646             if d:
647                 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
648             else:
649                 break
650         return ''.join(data)
651
652     def _repack_writes(self):
653         """
654         Test if the buffer block has more data than is referenced by actual segments
655         (this happens when a buffered write over-writes a file range written in
656         a previous buffered write).  Re-pack the buffer block for efficiency
657         and to avoid leaking information.
658         """
659         segs = self._segments
660
661         # Sum up the segments to get the total bytes of the file referencing
662         # into the buffer block.
663         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
664         write_total = sum([s.range_size for s in bufferblock_segs])
665
666         if write_total < self._current_bblock.size():
667             # There is more data in the buffer block than is actually accounted for by segments, so
668             # re-pack into a new buffer by copying over to a new buffer block.
669             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total, owner=self)
670             for t in bufferblock_segs:
671                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
672                 t.segment_offset = new_bb.size() - t.range_size
673
674             self._current_bblock = new_bb
675
676     @_must_be_writable
677     @_synchronized
678     def writeto(self, offset, data, num_retries):
679         """
680         Write `data` to the file starting at `offset`.  This will update
681         existing bytes and/or extend the size of the file as necessary.
682         """
683         if len(data) == 0:
684             return
685
686         if offset > self.size():
687             raise ArgumentError("Offset is past the end of the file")
688
689         if len(data) > config.KEEP_BLOCK_SIZE:
690             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
691
692         self._modified = True
693
694         if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
695             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
696
697         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
698             self._repack_writes()
699             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
700                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
701                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
702
703         self._current_bblock.append(data)
704         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
705
706     @_must_be_writable
707     @_synchronized
708     def add_segment(self, blocks, pos, size):
709         # Synchronized public api, see _add_segment
710         self._add_segment(blocks, pos, size)
711
712     def _add_segment(self, blocks, pos, size):
713         """
714         Add a segment to the end of the file, with `pos` and `offset` referencing a
715         section of the stream described by `blocks` (a list of Range objects)
716         """
717         self._modified = True
718         for lr in locators_and_ranges(blocks, pos, size):
719             last = self._segments[-1] if self._segments else Range(0, 0, 0)
720             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
721             self._segments.append(r)
722
723
724     @_synchronized
725     def size(self):
726         """Get the file size"""
727         if self._segments:
728             n = self._segments[-1]
729             return n.range_start + n.range_size
730         else:
731             return 0
732
733
734 class ArvadosFileReader(ArvadosFileReaderBase):
735     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
736         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
737         self.arvadosfile = arvadosfile
738
739     def size(self):
740         return self.arvadosfile.size()
741
742     @ArvadosFileBase._before_close
743     @retry_method
744     def read(self, size, num_retries=None):
745         """Read up to `size` bytes from the stream, starting at the current file position"""
746         data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
747         self._filepos += len(data)
748         return data
749
750     @ArvadosFileBase._before_close
751     @retry_method
752     def readfrom(self, offset, size, num_retries=None):
753         """Read up to `size` bytes from the stream, starting at the current file position"""
754         return self.arvadosfile.readfrom(offset, size, num_retries)
755
756     def flush(self):
757         pass
758
759
760 class ArvadosFileWriter(ArvadosFileReader):
761     def __init__(self, arvadosfile, name, mode, num_retries=None):
762         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
763
764     @ArvadosFileBase._before_close
765     @retry_method
766     def write(self, data, num_retries=None):
767         if self.mode[0] == "a":
768             self.arvadosfile.writeto(self.size(), data)
769         else:
770             self.arvadosfile.writeto(self._filepos, data, num_retries)
771             self._filepos += len(data)
772
773     @ArvadosFileBase._before_close
774     @retry_method
775     def writelines(self, seq, num_retries=None):
776         for s in seq:
777             self.write(s)
778
779     def truncate(self, size=None):
780         if size is None:
781             size = self._filepos
782         self.arvadosfile.truncate(size)
783         if self._filepos > self.size():
784             self._filepos = self.size()