4823: Add new tests for BufferBlock, BlockManager, saving, updating. Stylistic
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 from .ranges import *
6 from arvados.retry import retry_method
7 import config
8 import hashlib
9 import hashlib
10 import threading
11 import Queue
12 import copy
13 import errno
14 from .errors import KeepWriteError, AssertionError
15
16 def split(path):
17     """Separate the stream name and file name in a /-separated stream path and
18     return a tuple (stream_name, file_name).
19
20     If no stream name is available, assume '.'.
21
22     """
23     try:
24         stream_name, file_name = path.rsplit('/', 1)
25     except ValueError:  # No / in string
26         stream_name, file_name = '.', path
27     return stream_name, file_name
28
29 class FileLikeObjectBase(object):
30     def __init__(self, name, mode):
31         self.name = name
32         self.mode = mode
33         self.closed = False
34
35     @staticmethod
36     def _before_close(orig_func):
37         @functools.wraps(orig_func)
38         def before_close_wrapper(self, *args, **kwargs):
39             if self.closed:
40                 raise ValueError("I/O operation on closed stream file")
41             return orig_func(self, *args, **kwargs)
42         return before_close_wrapper
43
44     def __enter__(self):
45         return self
46
47     def __exit__(self, exc_type, exc_value, traceback):
48         try:
49             self.close()
50         except Exception:
51             if exc_type is None:
52                 raise
53
54     def close(self):
55         self.closed = True
56
57
58 class ArvadosFileReaderBase(FileLikeObjectBase):
59     class _NameAttribute(str):
60         # The Python file API provides a plain .name attribute.
61         # Older SDK provided a name() method.
62         # This class provides both, for maximum compatibility.
63         def __call__(self):
64             return self
65
66     def __init__(self, name, mode, num_retries=None):
67         super(ArvadosFileReaderBase, self).__init__(self._NameAttribute(name), mode)
68         self._filepos = 0L
69         self.num_retries = num_retries
70         self._readline_cache = (None, None)
71
72     def __iter__(self):
73         while True:
74             data = self.readline()
75             if not data:
76                 break
77             yield data
78
79     def decompressed_name(self):
80         return re.sub('\.(bz2|gz)$', '', self.name)
81
82     @FileLikeObjectBase._before_close
83     def seek(self, pos, whence=os.SEEK_CUR):
84         if whence == os.SEEK_CUR:
85             pos += self._filepos
86         elif whence == os.SEEK_END:
87             pos += self.size()
88         self._filepos = min(max(pos, 0L), self.size())
89
90     def tell(self):
91         return self._filepos
92
93     @FileLikeObjectBase._before_close
94     @retry_method
95     def readall(self, size=2**20, num_retries=None):
96         while True:
97             data = self.read(size, num_retries=num_retries)
98             if data == '':
99                 break
100             yield data
101
102     @FileLikeObjectBase._before_close
103     @retry_method
104     def readline(self, size=float('inf'), num_retries=None):
105         cache_pos, cache_data = self._readline_cache
106         if self.tell() == cache_pos:
107             data = [cache_data]
108         else:
109             data = ['']
110         data_size = len(data[-1])
111         while (data_size < size) and ('\n' not in data[-1]):
112             next_read = self.read(2 ** 20, num_retries=num_retries)
113             if not next_read:
114                 break
115             data.append(next_read)
116             data_size += len(next_read)
117         data = ''.join(data)
118         try:
119             nextline_index = data.index('\n') + 1
120         except ValueError:
121             nextline_index = len(data)
122         nextline_index = min(nextline_index, size)
123         self._readline_cache = (self.tell(), data[nextline_index:])
124         return data[:nextline_index]
125
126     @FileLikeObjectBase._before_close
127     @retry_method
128     def decompress(self, decompress, size, num_retries=None):
129         for segment in self.readall(size, num_retries):
130             data = decompress(segment)
131             if data:
132                 yield data
133
134     @FileLikeObjectBase._before_close
135     @retry_method
136     def readall_decompressed(self, size=2**20, num_retries=None):
137         self.seek(0)
138         if self.name.endswith('.bz2'):
139             dc = bz2.BZ2Decompressor()
140             return self.decompress(dc.decompress, size,
141                                    num_retries=num_retries)
142         elif self.name.endswith('.gz'):
143             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
144             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
145                                    size, num_retries=num_retries)
146         else:
147             return self.readall(size, num_retries=num_retries)
148
149     @FileLikeObjectBase._before_close
150     @retry_method
151     def readlines(self, sizehint=float('inf'), num_retries=None):
152         data = []
153         data_size = 0
154         for s in self.readall(num_retries=num_retries):
155             data.append(s)
156             data_size += len(s)
157             if data_size >= sizehint:
158                 break
159         return ''.join(data).splitlines(True)
160
161     def size(self):
162         raise NotImplementedError()
163
164     def read(self, size, num_retries=None):
165         raise NotImplementedError()
166
167     def readfrom(self, start, size, num_retries=None):
168         raise NotImplementedError()
169
170
171 class StreamFileReader(ArvadosFileReaderBase):
172     def __init__(self, stream, segments, name):
173         super(StreamFileReader, self).__init__(name, 'rb', num_retries=stream.num_retries)
174         self._stream = stream
175         self.segments = segments
176
177     def stream_name(self):
178         return self._stream.name()
179
180     def size(self):
181         n = self.segments[-1]
182         return n.range_start + n.range_size
183
184     @FileLikeObjectBase._before_close
185     @retry_method
186     def read(self, size, num_retries=None):
187         """Read up to 'size' bytes from the stream, starting at the current file position"""
188         if size == 0:
189             return ''
190
191         data = ''
192         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
193         if available_chunks:
194             lr = available_chunks[0]
195             data = self._stream._readfrom(lr.locator+lr.segment_offset,
196                                           lr.segment_size,
197                                           num_retries=num_retries)
198
199         self._filepos += len(data)
200         return data
201
202     @FileLikeObjectBase._before_close
203     @retry_method
204     def readfrom(self, start, size, num_retries=None):
205         """Read up to 'size' bytes from the stream, starting at 'start'"""
206         if size == 0:
207             return ''
208
209         data = []
210         for lr in locators_and_ranges(self.segments, start, size):
211             data.append(self._stream._readfrom(lr.locator+lr.segment_offset, lr.segment_size,
212                                               num_retries=num_retries))
213         return ''.join(data)
214
215     def as_manifest(self):
216         from stream import normalize_stream
217         segs = []
218         for r in self.segments:
219             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
220         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
221
222
223 class BufferBlock(object):
224     """A BufferBlock is a stand-in for a Keep block that is in the process of being
225     written.
226
227     Writers can append to it, get the size, and compute the Keep locator.
228     There are three valid states:
229
230     WRITABLE
231       Can append to block.
232
233     PENDING
234       Block is in the process of being uploaded to Keep, append is an error.
235
236     COMMITTED
237       The block has been written to Keep, its internal buffer has been
238       released, fetching the block will fetch it via keep client (since we
239       discarded the internal copy), and identifiers referring to the BufferBlock
240       can be replaced with the block locator.
241
242     """
243
244     WRITABLE = 0
245     PENDING = 1
246     COMMITTED = 2
247
248     def __init__(self, blockid, starting_capacity, owner):
249         """
250         :blockid:
251           the identifier for this block
252
253         :starting_capacity:
254           the initial buffer capacity
255
256         :owner:
257           ArvadosFile that owns this block
258
259         """
260         self.blockid = blockid
261         self.buffer_block = bytearray(starting_capacity)
262         self.buffer_view = memoryview(self.buffer_block)
263         self.write_pointer = 0
264         self.state = BufferBlock.WRITABLE
265         self._locator = None
266         self.owner = owner
267
268     def append(self, data):
269         """Append some data to the buffer.
270
271         Only valid if the block is in WRITABLE state.  Implements an expanding
272         buffer, doubling capacity as needed to accomdate all the data.
273
274         """
275         if self.state == BufferBlock.WRITABLE:
276             while (self.write_pointer+len(data)) > len(self.buffer_block):
277                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
278                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
279                 self.buffer_block = new_buffer_block
280                 self.buffer_view = memoryview(self.buffer_block)
281             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
282             self.write_pointer += len(data)
283             self._locator = None
284         else:
285             raise AssertionError("Buffer block is not writable")
286
287     def size(self):
288         """The amount of data written to the buffer."""
289         return self.write_pointer
290
291     def locator(self):
292         """The Keep locator for this buffer's contents."""
293         if self._locator is None:
294             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
295         return self._locator
296
297
298 def synchronized(orig_func):
299     @functools.wraps(orig_func)
300     def synchronized_wrapper(self, *args, **kwargs):
301         with self.lock:
302             return orig_func(self, *args, **kwargs)
303     return synchronized_wrapper
304
305 class NoopLock(object):
306     def __enter__(self):
307         return self
308
309     def __exit__(self, exc_type, exc_value, traceback):
310         pass
311
312     def acquire(self, blocking=False):
313         pass
314
315     def release(self):
316         pass
317
318 SYNC_READONLY = 1
319 SYNC_EXPLICIT = 2
320 SYNC_LIVE = 3
321
322 def must_be_writable(orig_func):
323     @functools.wraps(orig_func)
324     def must_be_writable_wrapper(self, *args, **kwargs):
325         if self.sync_mode() == SYNC_READONLY:
326             raise IOError((errno.EROFS, "Collection is read only"))
327         return orig_func(self, *args, **kwargs)
328     return must_be_writable_wrapper
329
330
331 class BlockManager(object):
332     """BlockManager handles buffer blocks, background block uploads, and background
333     block prefetch for a Collection of ArvadosFiles.
334
335     """
336     def __init__(self, keep):
337         """keep: KeepClient object to use"""
338         self._keep = keep
339         self._bufferblocks = {}
340         self._put_queue = None
341         self._put_errors = None
342         self._put_threads = None
343         self._prefetch_queue = None
344         self._prefetch_threads = None
345         self.lock = threading.Lock()
346         self.prefetch_enabled = True
347         self.num_put_threads = 2
348         self.num_get_threads = 2
349
350     @synchronized
351     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
352         """Allocate a new, empty bufferblock in WRITABLE state and return it.
353
354         :blockid:
355           optional block identifier, otherwise one will be automatically assigned
356
357         :starting_capacity:
358           optional capacity, otherwise will use default capacity
359
360         :owner:
361           ArvadosFile that owns this block
362
363         """
364         if blockid is None:
365             blockid = "bufferblock%i" % len(self._bufferblocks)
366         bufferblock = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
367         self._bufferblocks[bufferblock.blockid] = bufferblock
368         return bufferblock
369
370     @synchronized
371     def dup_block(self, blockid, owner):
372         """Create a new bufferblock in WRITABLE state, initialized with the content of
373         an existing bufferblock.
374
375         :blockid:
376           the block to copy.  May be an existing buffer block id.
377
378         :owner:
379           ArvadosFile that owns the new block
380
381         """
382         new_blockid = "bufferblock%i" % len(self._bufferblocks)
383         block = self._bufferblocks[blockid]
384         if block.state != BufferBlock.WRITABLE:
385             raise AssertionError("Can only duplicate a writable buffer block")
386
387         bufferblock = BufferBlock(new_blockid, block.size(), owner)
388         bufferblock.append(block.buffer_view[0:block.size()])
389         self._bufferblocks[bufferblock.blockid] = bufferblock
390         return bufferblock
391
392     @synchronized
393     def is_bufferblock(self, locator):
394         return locator in self._bufferblocks
395
396     @synchronized
397     def stop_threads(self):
398         """Shut down and wait for background upload and download threads to finish."""
399
400         if self._put_threads is not None:
401             for t in self._put_threads:
402                 self._put_queue.put(None)
403             for t in self._put_threads:
404                 t.join()
405         self._put_threads = None
406         self._put_queue = None
407         self._put_errors = None
408
409         if self._prefetch_threads is not None:
410             for t in self._prefetch_threads:
411                 self._prefetch_queue.put(None)
412             for t in self._prefetch_threads:
413                 t.join()
414         self._prefetch_threads = None
415         self._prefetch_queue = None
416
417     def commit_bufferblock(self, block):
418         """Initiate a background upload of a bufferblock.
419
420         This will block if the upload queue is at capacity, otherwise it will
421         return immediately.
422
423         """
424
425         def commit_bufferblock_worker(self):
426             """Background uploader thread."""
427
428             while True:
429                 try:
430                     bufferblock = self._put_queue.get()
431                     if bufferblock is None:
432                         return
433                     bufferblock._locator = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
434                     bufferblock.state = BufferBlock.COMMITTED
435                     bufferblock.buffer_view = None
436                     bufferblock.buffer_block = None
437                 except Exception as e:
438                     print e
439                     self._put_errors.put((bufferblock.locator(), e))
440                 finally:
441                     if self._put_queue is not None:
442                         self._put_queue.task_done()
443
444         with self.lock:
445             if self._put_threads is None:
446                 # Start uploader threads.
447
448                 # If we don't limit the Queue size, the upload queue can quickly
449                 # grow to take up gigabytes of RAM if the writing process is
450                 # generating data more quickly than it can be send to the Keep
451                 # servers.
452                 #
453                 # With two upload threads and a queue size of 2, this means up to 4
454                 # blocks pending.  If they are full 64 MiB blocks, that means up to
455                 # 256 MiB of internal buffering, which is the same size as the
456                 # default download block cache in KeepClient.
457                 self._put_queue = Queue.Queue(maxsize=2)
458                 self._put_errors = Queue.Queue()
459
460                 self._put_threads = []
461                 for i in xrange(0, self.num_put_threads):
462                     thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
463                     self._put_threads.append(thread)
464                     thread.daemon = True
465                     thread.start()
466
467         # Mark the block as PENDING so to disallow any more appends.
468         block.state = BufferBlock.PENDING
469         self._put_queue.put(block)
470
471     def get_bufferblock(self, locator):
472         with self.lock:
473             return self._bufferblocks.get(locator)
474
475     def get_block_contents(self, locator, num_retries, cache_only=False):
476         """Fetch a block.
477
478         First checks to see if the locator is a BufferBlock and return that, if
479         not, passes the request through to KeepClient.get().
480
481         """
482         with self.lock:
483             if locator in self._bufferblocks:
484                 bufferblock = self._bufferblocks[locator]
485                 if bufferblock.state != BufferBlock.COMMITTED:
486                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
487                 else:
488                     locator = bufferblock._locator
489         if cache_only:
490             return self._keep.get_from_cache(locator)
491         else:
492             return self._keep.get(locator, num_retries=num_retries)
493
494     def commit_all(self):
495         """Commit all outstanding buffer blocks.
496
497         Unlike commit_bufferblock(), this is a synchronous call, and will not
498         return until all buffer blocks are uploaded.  Raises
499         KeepWriteError() if any blocks failed to upload.
500
501         """
502         with self.lock:
503             items = self._bufferblocks.items()
504
505         for k,v in items:
506             if v.state == BufferBlock.WRITABLE:
507                 self.commit_bufferblock(v)
508
509         with self.lock:
510             if self._put_queue is not None:
511                 self._put_queue.join()
512                 if not self._put_errors.empty():
513                     err = []
514                     try:
515                         while True:
516                             err.append(self._put_errors.get(False))
517                     except Queue.Empty:
518                         pass
519                     raise KeepWriteError("Error writing some blocks", err)
520
521     def block_prefetch(self, locator):
522         """Initiate a background download of a block.
523
524         This assumes that the underlying KeepClient implements a block cache,
525         so repeated requests for the same block will not result in repeated
526         downloads (unless the block is evicted from the cache.)  This method
527         does not block.
528
529         """
530
531         if not self.prefetch_enabled:
532             return
533
534         def block_prefetch_worker(self):
535             """The background downloader thread."""
536             while True:
537                 try:
538                     b = self._prefetch_queue.get()
539                     if b is None:
540                         return
541                     self._keep.get(b)
542                 except:
543                     pass
544
545         with self.lock:
546             if locator in self._bufferblocks:
547                 return
548             if self._prefetch_threads is None:
549                 self._prefetch_queue = Queue.Queue()
550                 self._prefetch_threads = []
551                 for i in xrange(0, self.num_get_threads):
552                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
553                     self._prefetch_threads.append(thread)
554                     thread.daemon = True
555                     thread.start()
556         self._prefetch_queue.put(locator)
557
558
559 class ArvadosFile(object):
560     """ArvadosFile manages the underlying representation of a file in Keep as a
561     sequence of segments spanning a set of blocks, and implements random
562     read/write access.
563
564     This object may be accessed from multiple threads.
565
566     """
567
568     def __init__(self, parent, stream=[], segments=[]):
569         """
570         ArvadosFile constructor.
571
572         :stream:
573           a list of Range objects representing a block stream
574
575         :segments:
576           a list of Range objects representing segments
577         """
578         self.parent = parent
579         self._modified = True
580         self._segments = []
581         self.lock = parent.root_collection().lock
582         for s in segments:
583             self._add_segment(stream, s.locator, s.range_size)
584         self._current_bblock = None
585
586     def sync_mode(self):
587         return self.parent.sync_mode()
588
589     @synchronized
590     def segments(self):
591         return copy.copy(self._segments)
592
593     @synchronized
594     def clone(self, new_parent):
595         """Make a copy of this file."""
596         cp = ArvadosFile(new_parent)
597
598         map_loc = {}
599         for r in self._segments:
600             new_loc = r.locator
601             if self.parent._my_block_manager().is_bufferblock(r.locator):
602                 if r.locator not in map_loc:
603                     bufferblock = get_bufferblock(r.locator)
604                     if bufferblock.state == BufferBlock.COMITTED:
605                         map_loc[r.locator] = bufferblock.locator()
606                     else:
607                         map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp)
608                 new_loc = map_loc[r.locator]
609
610             cp._segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
611
612         return cp
613
614     @must_be_writable
615     @synchronized
616     def replace_contents(self, other):
617         """Replace segments of this file with segments from another `ArvadosFile` object."""
618         self._segments = other.segments()
619         self._modified = True
620
621     def __eq__(self, other):
622         if other is self:
623             return True
624         if not isinstance(other, ArvadosFile):
625             return False
626
627         s = other.segments()
628         with self.lock:
629             return self._segments == s
630
631     def __ne__(self, other):
632         return not self.__eq__(other)
633
634     @synchronized
635     def set_unmodified(self):
636         """Clear the modified flag"""
637         self._modified = False
638
639     @synchronized
640     def modified(self):
641         """Test the modified flag"""
642         return self._modified
643
644     @must_be_writable
645     @synchronized
646     def truncate(self, size):
647         """Shrink the size of the file.
648
649         If `size` is less than the size of the file, the file contents after
650         `size` will be discarded.  If `size` is greater than the current size
651         of the file, an IOError will be raised.
652
653         """
654         if size < self.size():
655             new_segs = []
656             for r in self._segments:
657                 range_end = r.range_start+r.range_size
658                 if r.range_start >= size:
659                     # segment is past the trucate size, all done
660                     break
661                 elif size < range_end:
662                     nr = Range(r.locator, r.range_start, size - r.range_start)
663                     nr.segment_offset = r.segment_offset
664                     new_segs.append(nr)
665                     break
666                 else:
667                     new_segs.append(r)
668
669             self._segments = new_segs
670             self._modified = True
671         elif size > self.size():
672             raise IOError("truncate() does not support extending the file size")
673
674     def readfrom(self, offset, size, num_retries):
675         """Read upto `size` bytes from the file starting at `offset`."""
676
677         with self.lock:
678             if size == 0 or offset >= self.size():
679                 return ''
680             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
681             readsegs = locators_and_ranges(self._segments, offset, size)
682
683         for lr in prefetch:
684             self.parent._my_block_manager().block_prefetch(lr.locator)
685
686         data = []
687         for lr in readsegs:
688             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
689             if block:
690                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
691             else:
692                 break
693         return ''.join(data)
694
695     def _repack_writes(self):
696         """Test if the buffer block has more data than is referenced by actual
697         segments.
698
699         This happens when a buffered write over-writes a file range written in
700         a previous buffered write.  Re-pack the buffer block for efficiency
701         and to avoid leaking information.
702
703         """
704         segs = self._segments
705
706         # Sum up the segments to get the total bytes of the file referencing
707         # into the buffer block.
708         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
709         write_total = sum([s.range_size for s in bufferblock_segs])
710
711         if write_total < self._current_bblock.size():
712             # There is more data in the buffer block than is actually accounted for by segments, so
713             # re-pack into a new buffer by copying over to a new buffer block.
714             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
715             for t in bufferblock_segs:
716                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
717                 t.segment_offset = new_bb.size() - t.range_size
718
719             self._current_bblock = new_bb
720
721     @must_be_writable
722     @synchronized
723     def writeto(self, offset, data, num_retries):
724         """Write `data` to the file starting at `offset`.
725
726         This will update existing bytes and/or extend the size of the file as
727         necessary.
728
729         """
730         if len(data) == 0:
731             return
732
733         if offset > self.size():
734             raise ArgumentError("Offset is past the end of the file")
735
736         if len(data) > config.KEEP_BLOCK_SIZE:
737             raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
738
739         self._modified = True
740
741         if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE:
742             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
743
744         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
745             self._repack_writes()
746             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
747                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
748                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
749
750         self._current_bblock.append(data)
751
752         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
753
754     @must_be_writable
755     @synchronized
756     def add_segment(self, blocks, pos, size):
757         """Add a segment to the end of the file, with `pos` and `offset` referencing a
758         section of the stream described by `blocks` (a list of Range objects)
759
760         """
761         self._add_segment(blocks, pos, size)
762
763     def _add_segment(self, blocks, pos, size):
764         """Internal implementation of add_segment."""
765         self._modified = True
766         for lr in locators_and_ranges(blocks, pos, size):
767             last = self._segments[-1] if self._segments else Range(0, 0, 0)
768             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
769             self._segments.append(r)
770
771     @synchronized
772     def size(self):
773         """Get the file size."""
774         if self._segments:
775             n = self._segments[-1]
776             return n.range_start + n.range_size
777         else:
778             return 0
779
780 class ArvadosFileReader(ArvadosFileReaderBase):
781     """Wraps ArvadosFile in a file-like object supporting reading only.
782
783     Be aware that this class is NOT thread safe as there is no locking around
784     updating file pointer.
785
786     """
787
788     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
789         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
790         self.arvadosfile = arvadosfile
791
792     def size(self):
793         return self.arvadosfile.size()
794
795     @FileLikeObjectBase._before_close
796     @retry_method
797     def read(self, size, num_retries=None):
798         """Read up to `size` bytes from the stream, starting at the current file position."""
799         data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
800         self._filepos += len(data)
801         return data
802
803     @FileLikeObjectBase._before_close
804     @retry_method
805     def readfrom(self, offset, size, num_retries=None):
806         """Read up to `size` bytes from the stream, starting at the current file position."""
807         return self.arvadosfile.readfrom(offset, size, num_retries)
808
809     def flush(self):
810         pass
811
812
813 class ArvadosFileWriter(ArvadosFileReader):
814     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
815
816     Be aware that this class is NOT thread safe as there is no locking around
817     updating file pointer.
818
819     """
820
821     def __init__(self, arvadosfile, name, mode, num_retries=None):
822         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
823
824     @FileLikeObjectBase._before_close
825     @retry_method
826     def write(self, data, num_retries=None):
827         if self.mode[0] == "a":
828             self.arvadosfile.writeto(self.size(), data, num_retries)
829         else:
830             self.arvadosfile.writeto(self._filepos, data, num_retries)
831             self._filepos += len(data)
832
833     @FileLikeObjectBase._before_close
834     @retry_method
835     def writelines(self, seq, num_retries=None):
836         for s in seq:
837             self.write(s, num_retries)
838
839     def truncate(self, size=None):
840         if size is None:
841             size = self._filepos
842         self.arvadosfile.truncate(size)
843         if self._filepos > self.size():
844             self._filepos = self.size()
845
846     def close(self):
847         if self.arvadosfile.parent.sync_mode() == SYNC_LIVE:
848             self.arvadosfile.parent.root_collection().save()