5790: Fix PySDK Docker image listing comparing ints and datetimes.
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12
13 from .errors import KeepWriteError, AssertionError, ArgumentError
14 from .keep import KeepLocator
15 from ._normalize_stream import normalize_stream
16 from ._ranges import locators_and_ranges, replace_range, Range
17 from .retry import retry_method
18
19 def split(path):
20     """split(path) -> streamname, filename
21
22     Separate the stream name and file name in a /-separated stream path and
23     return a tuple (stream_name, file_name).  If no stream name is available,
24     assume '.'.
25
26     """
27     try:
28         stream_name, file_name = path.rsplit('/', 1)
29     except ValueError:  # No / in string
30         stream_name, file_name = '.', path
31     return stream_name, file_name
32
33 class _FileLikeObjectBase(object):
34     def __init__(self, name, mode):
35         self.name = name
36         self.mode = mode
37         self.closed = False
38
39     @staticmethod
40     def _before_close(orig_func):
41         @functools.wraps(orig_func)
42         def before_close_wrapper(self, *args, **kwargs):
43             if self.closed:
44                 raise ValueError("I/O operation on closed stream file")
45             return orig_func(self, *args, **kwargs)
46         return before_close_wrapper
47
48     def __enter__(self):
49         return self
50
51     def __exit__(self, exc_type, exc_value, traceback):
52         try:
53             self.close()
54         except Exception:
55             if exc_type is None:
56                 raise
57
58     def close(self):
59         self.closed = True
60
61
62 class ArvadosFileReaderBase(_FileLikeObjectBase):
63     def __init__(self, name, mode, num_retries=None):
64         super(ArvadosFileReaderBase, self).__init__(name, mode)
65         self._filepos = 0L
66         self.num_retries = num_retries
67         self._readline_cache = (None, None)
68
69     def __iter__(self):
70         while True:
71             data = self.readline()
72             if not data:
73                 break
74             yield data
75
76     def decompressed_name(self):
77         return re.sub('\.(bz2|gz)$', '', self.name)
78
79     @_FileLikeObjectBase._before_close
80     def seek(self, pos, whence=os.SEEK_SET):
81         if whence == os.SEEK_CUR:
82             pos += self._filepos
83         elif whence == os.SEEK_END:
84             pos += self.size()
85         self._filepos = min(max(pos, 0L), self.size())
86
87     def tell(self):
88         return self._filepos
89
90     @_FileLikeObjectBase._before_close
91     @retry_method
92     def readall(self, size=2**20, num_retries=None):
93         while True:
94             data = self.read(size, num_retries=num_retries)
95             if data == '':
96                 break
97             yield data
98
99     @_FileLikeObjectBase._before_close
100     @retry_method
101     def readline(self, size=float('inf'), num_retries=None):
102         cache_pos, cache_data = self._readline_cache
103         if self.tell() == cache_pos:
104             data = [cache_data]
105         else:
106             data = ['']
107         data_size = len(data[-1])
108         while (data_size < size) and ('\n' not in data[-1]):
109             next_read = self.read(2 ** 20, num_retries=num_retries)
110             if not next_read:
111                 break
112             data.append(next_read)
113             data_size += len(next_read)
114         data = ''.join(data)
115         try:
116             nextline_index = data.index('\n') + 1
117         except ValueError:
118             nextline_index = len(data)
119         nextline_index = min(nextline_index, size)
120         self._readline_cache = (self.tell(), data[nextline_index:])
121         return data[:nextline_index]
122
123     @_FileLikeObjectBase._before_close
124     @retry_method
125     def decompress(self, decompress, size, num_retries=None):
126         for segment in self.readall(size, num_retries):
127             data = decompress(segment)
128             if data:
129                 yield data
130
131     @_FileLikeObjectBase._before_close
132     @retry_method
133     def readall_decompressed(self, size=2**20, num_retries=None):
134         self.seek(0)
135         if self.name.endswith('.bz2'):
136             dc = bz2.BZ2Decompressor()
137             return self.decompress(dc.decompress, size,
138                                    num_retries=num_retries)
139         elif self.name.endswith('.gz'):
140             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
141             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
142                                    size, num_retries=num_retries)
143         else:
144             return self.readall(size, num_retries=num_retries)
145
146     @_FileLikeObjectBase._before_close
147     @retry_method
148     def readlines(self, sizehint=float('inf'), num_retries=None):
149         data = []
150         data_size = 0
151         for s in self.readall(num_retries=num_retries):
152             data.append(s)
153             data_size += len(s)
154             if data_size >= sizehint:
155                 break
156         return ''.join(data).splitlines(True)
157
158     def size(self):
159         raise NotImplementedError()
160
161     def read(self, size, num_retries=None):
162         raise NotImplementedError()
163
164     def readfrom(self, start, size, num_retries=None):
165         raise NotImplementedError()
166
167
168 class StreamFileReader(ArvadosFileReaderBase):
169     class _NameAttribute(str):
170         # The Python file API provides a plain .name attribute.
171         # Older SDK provided a name() method.
172         # This class provides both, for maximum compatibility.
173         def __call__(self):
174             return self
175
176     def __init__(self, stream, segments, name):
177         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
178         self._stream = stream
179         self.segments = segments
180
181     def stream_name(self):
182         return self._stream.name()
183
184     def size(self):
185         n = self.segments[-1]
186         return n.range_start + n.range_size
187
188     @_FileLikeObjectBase._before_close
189     @retry_method
190     def read(self, size, num_retries=None):
191         """Read up to 'size' bytes from the stream, starting at the current file position"""
192         if size == 0:
193             return ''
194
195         data = ''
196         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
197         if available_chunks:
198             lr = available_chunks[0]
199             data = self._stream.readfrom(lr.locator+lr.segment_offset,
200                                           lr.segment_size,
201                                           num_retries=num_retries)
202
203         self._filepos += len(data)
204         return data
205
206     @_FileLikeObjectBase._before_close
207     @retry_method
208     def readfrom(self, start, size, num_retries=None):
209         """Read up to 'size' bytes from the stream, starting at 'start'"""
210         if size == 0:
211             return ''
212
213         data = []
214         for lr in locators_and_ranges(self.segments, start, size):
215             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
216                                               num_retries=num_retries))
217         return ''.join(data)
218
219     def as_manifest(self):
220         segs = []
221         for r in self.segments:
222             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
223         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
224
225
226 def synchronized(orig_func):
227     @functools.wraps(orig_func)
228     def synchronized_wrapper(self, *args, **kwargs):
229         with self.lock:
230             return orig_func(self, *args, **kwargs)
231     return synchronized_wrapper
232
233 class _BufferBlock(object):
234     """A stand-in for a Keep block that is in the process of being written.
235
236     Writers can append to it, get the size, and compute the Keep locator.
237     There are three valid states:
238
239     WRITABLE
240       Can append to block.
241
242     PENDING
243       Block is in the process of being uploaded to Keep, append is an error.
244
245     COMMITTED
246       The block has been written to Keep, its internal buffer has been
247       released, fetching the block will fetch it via keep client (since we
248       discarded the internal copy), and identifiers referring to the BufferBlock
249       can be replaced with the block locator.
250
251     """
252
253     WRITABLE = 0
254     PENDING = 1
255     COMMITTED = 2
256
257     def __init__(self, blockid, starting_capacity, owner):
258         """
259         :blockid:
260           the identifier for this block
261
262         :starting_capacity:
263           the initial buffer capacity
264
265         :owner:
266           ArvadosFile that owns this block
267
268         """
269         self.blockid = blockid
270         self.buffer_block = bytearray(starting_capacity)
271         self.buffer_view = memoryview(self.buffer_block)
272         self.write_pointer = 0
273         self._state = _BufferBlock.WRITABLE
274         self._locator = None
275         self.owner = owner
276         self.lock = threading.Lock()
277
278     @synchronized
279     def append(self, data):
280         """Append some data to the buffer.
281
282         Only valid if the block is in WRITABLE state.  Implements an expanding
283         buffer, doubling capacity as needed to accomdate all the data.
284
285         """
286         if self._state == _BufferBlock.WRITABLE:
287             while (self.write_pointer+len(data)) > len(self.buffer_block):
288                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
289                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
290                 self.buffer_block = new_buffer_block
291                 self.buffer_view = memoryview(self.buffer_block)
292             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
293             self.write_pointer += len(data)
294             self._locator = None
295         else:
296             raise AssertionError("Buffer block is not writable")
297
298     @synchronized
299     def set_state(self, nextstate, loc=None):
300         if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
301             (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
302             self._state = nextstate
303             if self._state == _BufferBlock.COMMITTED:
304                 self._locator = loc
305                 self.buffer_view = None
306                 self.buffer_block = None
307         else:
308             raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
309
310     @synchronized
311     def state(self):
312         return self._state
313
314     def size(self):
315         """The amount of data written to the buffer."""
316         return self.write_pointer
317
318     @synchronized
319     def locator(self):
320         """The Keep locator for this buffer's contents."""
321         if self._locator is None:
322             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
323         return self._locator
324
325     @synchronized
326     def clone(self, new_blockid, owner):
327         if self._state == _BufferBlock.COMMITTED:
328             raise AssertionError("Can only duplicate a writable or pending buffer block")
329         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
330         bufferblock.append(self.buffer_view[0:self.size()])
331         return bufferblock
332
333
334 class NoopLock(object):
335     def __enter__(self):
336         return self
337
338     def __exit__(self, exc_type, exc_value, traceback):
339         pass
340
341     def acquire(self, blocking=False):
342         pass
343
344     def release(self):
345         pass
346
347
348 def must_be_writable(orig_func):
349     @functools.wraps(orig_func)
350     def must_be_writable_wrapper(self, *args, **kwargs):
351         if not self.writable():
352             raise IOError((errno.EROFS, "Collection must be writable."))
353         return orig_func(self, *args, **kwargs)
354     return must_be_writable_wrapper
355
356
357 class _BlockManager(object):
358     """BlockManager handles buffer blocks.
359
360     Also handles background block uploads, and background block prefetch for a
361     Collection of ArvadosFiles.
362
363     """
364     def __init__(self, keep):
365         """keep: KeepClient object to use"""
366         self._keep = keep
367         self._bufferblocks = {}
368         self._put_queue = None
369         self._put_errors = None
370         self._put_threads = None
371         self._prefetch_queue = None
372         self._prefetch_threads = None
373         self.lock = threading.Lock()
374         self.prefetch_enabled = True
375         self.num_put_threads = 2
376         self.num_get_threads = 2
377
378     @synchronized
379     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
380         """Allocate a new, empty bufferblock in WRITABLE state and return it.
381
382         :blockid:
383           optional block identifier, otherwise one will be automatically assigned
384
385         :starting_capacity:
386           optional capacity, otherwise will use default capacity
387
388         :owner:
389           ArvadosFile that owns this block
390
391         """
392         if blockid is None:
393             blockid = "bufferblock%i" % len(self._bufferblocks)
394         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
395         self._bufferblocks[bufferblock.blockid] = bufferblock
396         return bufferblock
397
398     @synchronized
399     def dup_block(self, block, owner):
400         """Create a new bufferblock initialized with the content of an existing bufferblock.
401
402         :block:
403           the buffer block to copy.
404
405         :owner:
406           ArvadosFile that owns the new block
407
408         """
409         new_blockid = "bufferblock%i" % len(self._bufferblocks)
410         bufferblock = block.clone(new_blockid, owner)
411         self._bufferblocks[bufferblock.blockid] = bufferblock
412         return bufferblock
413
414     @synchronized
415     def is_bufferblock(self, locator):
416         return locator in self._bufferblocks
417
418     @synchronized
419     def stop_threads(self):
420         """Shut down and wait for background upload and download threads to finish."""
421
422         if self._put_threads is not None:
423             for t in self._put_threads:
424                 self._put_queue.put(None)
425             for t in self._put_threads:
426                 t.join()
427         self._put_threads = None
428         self._put_queue = None
429         self._put_errors = None
430
431         if self._prefetch_threads is not None:
432             for t in self._prefetch_threads:
433                 self._prefetch_queue.put(None)
434             for t in self._prefetch_threads:
435                 t.join()
436         self._prefetch_threads = None
437         self._prefetch_queue = None
438
439     def commit_bufferblock(self, block):
440         """Initiate a background upload of a bufferblock.
441
442         This will block if the upload queue is at capacity, otherwise it will
443         return immediately.
444
445         """
446
447         def commit_bufferblock_worker(self):
448             """Background uploader thread."""
449
450             while True:
451                 try:
452                     bufferblock = self._put_queue.get()
453                     if bufferblock is None:
454                         return
455                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
456                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
457
458                 except Exception as e:
459                     self._put_errors.put((bufferblock.locator(), e))
460                 finally:
461                     if self._put_queue is not None:
462                         self._put_queue.task_done()
463
464         with self.lock:
465             if self._put_threads is None:
466                 # Start uploader threads.
467
468                 # If we don't limit the Queue size, the upload queue can quickly
469                 # grow to take up gigabytes of RAM if the writing process is
470                 # generating data more quickly than it can be send to the Keep
471                 # servers.
472                 #
473                 # With two upload threads and a queue size of 2, this means up to 4
474                 # blocks pending.  If they are full 64 MiB blocks, that means up to
475                 # 256 MiB of internal buffering, which is the same size as the
476                 # default download block cache in KeepClient.
477                 self._put_queue = Queue.Queue(maxsize=2)
478                 self._put_errors = Queue.Queue()
479
480                 self._put_threads = []
481                 for i in xrange(0, self.num_put_threads):
482                     thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
483                     self._put_threads.append(thread)
484                     thread.daemon = True
485                     thread.start()
486
487         if block.state() == _BufferBlock.WRITABLE:
488             # Mark the block as PENDING so to disallow any more appends.
489             block.set_state(_BufferBlock.PENDING)
490             self._put_queue.put(block)
491
492     @synchronized
493     def get_bufferblock(self, locator):
494         return self._bufferblocks.get(locator)
495
496     def get_block_contents(self, locator, num_retries, cache_only=False):
497         """Fetch a block.
498
499         First checks to see if the locator is a BufferBlock and return that, if
500         not, passes the request through to KeepClient.get().
501
502         """
503         with self.lock:
504             if locator in self._bufferblocks:
505                 bufferblock = self._bufferblocks[locator]
506                 if bufferblock.state() != _BufferBlock.COMMITTED:
507                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
508                 else:
509                     locator = bufferblock._locator
510         if cache_only:
511             return self._keep.get_from_cache(locator)
512         else:
513             return self._keep.get(locator, num_retries=num_retries)
514
515     def commit_all(self):
516         """Commit all outstanding buffer blocks.
517
518         Unlike commit_bufferblock(), this is a synchronous call, and will not
519         return until all buffer blocks are uploaded.  Raises
520         KeepWriteError() if any blocks failed to upload.
521
522         """
523         with self.lock:
524             items = self._bufferblocks.items()
525
526         for k,v in items:
527             v.owner.flush()
528
529         with self.lock:
530             if self._put_queue is not None:
531                 self._put_queue.join()
532
533                 if not self._put_errors.empty():
534                     err = []
535                     try:
536                         while True:
537                             err.append(self._put_errors.get(False))
538                     except Queue.Empty:
539                         pass
540                     raise KeepWriteError("Error writing some blocks", err, label="block")
541
542     def block_prefetch(self, locator):
543         """Initiate a background download of a block.
544
545         This assumes that the underlying KeepClient implements a block cache,
546         so repeated requests for the same block will not result in repeated
547         downloads (unless the block is evicted from the cache.)  This method
548         does not block.
549
550         """
551
552         if not self.prefetch_enabled:
553             return
554
555         def block_prefetch_worker(self):
556             """The background downloader thread."""
557             while True:
558                 try:
559                     b = self._prefetch_queue.get()
560                     if b is None:
561                         return
562                     self._keep.get(b)
563                 except Exception:
564                     pass
565
566         with self.lock:
567             if locator in self._bufferblocks:
568                 return
569             if self._prefetch_threads is None:
570                 self._prefetch_queue = Queue.Queue()
571                 self._prefetch_threads = []
572                 for i in xrange(0, self.num_get_threads):
573                     thread = threading.Thread(target=block_prefetch_worker, args=(self,))
574                     self._prefetch_threads.append(thread)
575                     thread.daemon = True
576                     thread.start()
577         self._prefetch_queue.put(locator)
578
579
580 class ArvadosFile(object):
581     """Represent a file in a Collection.
582
583     ArvadosFile manages the underlying representation of a file in Keep as a
584     sequence of segments spanning a set of blocks, and implements random
585     read/write access.
586
587     This object may be accessed from multiple threads.
588
589     """
590
591     def __init__(self, parent, stream=[], segments=[]):
592         """
593         ArvadosFile constructor.
594
595         :stream:
596           a list of Range objects representing a block stream
597
598         :segments:
599           a list of Range objects representing segments
600         """
601         self.parent = parent
602         self._modified = True
603         self._segments = []
604         self.lock = parent.root_collection().lock
605         for s in segments:
606             self._add_segment(stream, s.locator, s.range_size)
607         self._current_bblock = None
608
609     def writable(self):
610         return self.parent.writable()
611
612     @synchronized
613     def segments(self):
614         return copy.copy(self._segments)
615
616     @synchronized
617     def clone(self, new_parent):
618         """Make a copy of this file."""
619         cp = ArvadosFile(new_parent)
620         cp.replace_contents(self)
621         return cp
622
623     @must_be_writable
624     @synchronized
625     def replace_contents(self, other):
626         """Replace segments of this file with segments from another `ArvadosFile` object."""
627
628         map_loc = {}
629         self._segments = []
630         for other_segment in other.segments():
631             new_loc = other_segment.locator
632             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
633                 if other_segment.locator not in map_loc:
634                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
635                     if bufferblock.state() != _BufferBlock.WRITABLE:
636                         map_loc[other_segment.locator] = bufferblock.locator()
637                     else:
638                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
639                 new_loc = map_loc[other_segment.locator]
640
641             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
642
643         self._modified = True
644
645     def __eq__(self, other):
646         if other is self:
647             return True
648         if not isinstance(other, ArvadosFile):
649             return False
650
651         othersegs = other.segments()
652         with self.lock:
653             if len(self._segments) != len(othersegs):
654                 return False
655             for i in xrange(0, len(othersegs)):
656                 seg1 = self._segments[i]
657                 seg2 = othersegs[i]
658                 loc1 = seg1.locator
659                 loc2 = seg2.locator
660
661                 if self.parent._my_block_manager().is_bufferblock(loc1):
662                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
663
664                 if other.parent._my_block_manager().is_bufferblock(loc2):
665                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
666
667                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
668                     seg1.range_start != seg2.range_start or
669                     seg1.range_size != seg2.range_size or
670                     seg1.segment_offset != seg2.segment_offset):
671                     return False
672
673         return True
674
675     def __ne__(self, other):
676         return not self.__eq__(other)
677
678     @synchronized
679     def set_unmodified(self):
680         """Clear the modified flag"""
681         self._modified = False
682
683     @synchronized
684     def modified(self):
685         """Test the modified flag"""
686         return self._modified
687
688     @must_be_writable
689     @synchronized
690     def truncate(self, size):
691         """Shrink the size of the file.
692
693         If `size` is less than the size of the file, the file contents after
694         `size` will be discarded.  If `size` is greater than the current size
695         of the file, an IOError will be raised.
696
697         """
698         if size < self.size():
699             new_segs = []
700             for r in self._segments:
701                 range_end = r.range_start+r.range_size
702                 if r.range_start >= size:
703                     # segment is past the trucate size, all done
704                     break
705                 elif size < range_end:
706                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
707                     nr.segment_offset = r.segment_offset
708                     new_segs.append(nr)
709                     break
710                 else:
711                     new_segs.append(r)
712
713             self._segments = new_segs
714             self._modified = True
715         elif size > self.size():
716             raise IOError("truncate() does not support extending the file size")
717
718
719     def readfrom(self, offset, size, num_retries, exact=False):
720         """Read up to `size` bytes from the file starting at `offset`.
721
722         :exact:
723          If False (default), return less data than requested if the read
724          crosses a block boundary and the next block isn't cached.  If True,
725          only return less data than requested when hitting EOF.
726         """
727
728         with self.lock:
729             if size == 0 or offset >= self.size():
730                 return ''
731             prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
732             readsegs = locators_and_ranges(self._segments, offset, size)
733
734         for lr in prefetch:
735             self.parent._my_block_manager().block_prefetch(lr.locator)
736
737         data = []
738         for lr in readsegs:
739             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
740             if block:
741                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
742             else:
743                 break
744         return ''.join(data)
745
746     def _repack_writes(self, num_retries):
747         """Test if the buffer block has more data than actual segments.
748
749         This happens when a buffered write over-writes a file range written in
750         a previous buffered write.  Re-pack the buffer block for efficiency
751         and to avoid leaking information.
752
753         """
754         segs = self._segments
755
756         # Sum up the segments to get the total bytes of the file referencing
757         # into the buffer block.
758         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
759         write_total = sum([s.range_size for s in bufferblock_segs])
760
761         if write_total < self._current_bblock.size():
762             # There is more data in the buffer block than is actually accounted for by segments, so
763             # re-pack into a new buffer by copying over to a new buffer block.
764             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
765             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
766             for t in bufferblock_segs:
767                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
768                 t.segment_offset = new_bb.size() - t.range_size
769
770             self._current_bblock = new_bb
771
772     @must_be_writable
773     @synchronized
774     def writeto(self, offset, data, num_retries):
775         """Write `data` to the file starting at `offset`.
776
777         This will update existing bytes and/or extend the size of the file as
778         necessary.
779
780         """
781         if len(data) == 0:
782             return
783
784         if offset > self.size():
785             raise ArgumentError("Offset is past the end of the file")
786
787         if len(data) > config.KEEP_BLOCK_SIZE:
788             # Chunk it up into smaller writes
789             n = 0
790             dataview = memoryview(data)
791             while n < len(data):
792                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
793                 n += config.KEEP_BLOCK_SIZE
794             return
795
796         self._modified = True
797
798         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
799             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
800
801         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
802             self._repack_writes(num_retries)
803             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
804                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
805                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
806
807         self._current_bblock.append(data)
808
809         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
810
811     @synchronized
812     def flush(self, num_retries=0):
813         if self._current_bblock:
814             self._repack_writes(num_retries)
815             self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
816
817     @must_be_writable
818     @synchronized
819     def add_segment(self, blocks, pos, size):
820         """Add a segment to the end of the file.
821
822         `pos` and `offset` reference a section of the stream described by
823         `blocks` (a list of Range objects)
824
825         """
826         self._add_segment(blocks, pos, size)
827
828     def _add_segment(self, blocks, pos, size):
829         """Internal implementation of add_segment."""
830         self._modified = True
831         for lr in locators_and_ranges(blocks, pos, size):
832             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
833             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
834             self._segments.append(r)
835
836     @synchronized
837     def size(self):
838         """Get the file size."""
839         if self._segments:
840             n = self._segments[-1]
841             return n.range_start + n.range_size
842         else:
843             return 0
844
845     @synchronized
846     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
847         buf = ""
848         filestream = []
849         for segment in self.segments:
850             loc = segment.locator
851             if loc.startswith("bufferblock"):
852                 loc = self._bufferblocks[loc].calculate_locator()
853             if portable_locators:
854                 loc = KeepLocator(loc).stripped()
855             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
856                                  segment.segment_offset, segment.range_size))
857         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
858         buf += "\n"
859         return buf
860
861
862 class ArvadosFileReader(ArvadosFileReaderBase):
863     """Wraps ArvadosFile in a file-like object supporting reading only.
864
865     Be aware that this class is NOT thread safe as there is no locking around
866     updating file pointer.
867
868     """
869
870     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
871         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
872         self.arvadosfile = arvadosfile
873
874     def size(self):
875         return self.arvadosfile.size()
876
877     def stream_name(self):
878         return self.arvadosfile.parent.stream_name()
879
880     @_FileLikeObjectBase._before_close
881     @retry_method
882     def read(self, size, num_retries=None):
883         """Read up to `size` bytes from the stream, starting at the current file position."""
884         data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
885         self._filepos += len(data)
886         return data
887
888     @_FileLikeObjectBase._before_close
889     @retry_method
890     def readfrom(self, offset, size, num_retries=None):
891         """Read up to `size` bytes from the stream, starting at the current file position."""
892         return self.arvadosfile.readfrom(offset, size, num_retries)
893
894     def flush(self):
895         pass
896
897
898 class ArvadosFileWriter(ArvadosFileReader):
899     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
900
901     Be aware that this class is NOT thread safe as there is no locking around
902     updating file pointer.
903
904     """
905
906     def __init__(self, arvadosfile, name, mode, num_retries=None):
907         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
908
909     @_FileLikeObjectBase._before_close
910     @retry_method
911     def write(self, data, num_retries=None):
912         if self.mode[0] == "a":
913             self.arvadosfile.writeto(self.size(), data, num_retries)
914         else:
915             self.arvadosfile.writeto(self._filepos, data, num_retries)
916             self._filepos += len(data)
917
918     @_FileLikeObjectBase._before_close
919     @retry_method
920     def writelines(self, seq, num_retries=None):
921         for s in seq:
922             self.write(s, num_retries)
923
924     @_FileLikeObjectBase._before_close
925     def truncate(self, size=None):
926         if size is None:
927             size = self._filepos
928         self.arvadosfile.truncate(size)
929         if self._filepos > self.size():
930             self._filepos = self.size()
931
932     @_FileLikeObjectBase._before_close
933     def flush(self):
934         self.arvadosfile.flush()
935
936     def close(self):
937         if not self.closed:
938             self.flush()
939             super(ArvadosFileWriter, self).close()