11308: run-tests --skip python2
[arvados.git] / sdk / python / arvados / arvfile.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import range
6 from builtins import object
7 import functools
8 import os
9 import zlib
10 import bz2
11 from . import config
12 import hashlib
13 import threading
14 import queue
15 import copy
16 import errno
17 import re
18 import logging
19 import collections
20 import uuid
21
22 from .errors import KeepWriteError, AssertionError, ArgumentError
23 from .keep import KeepLocator
24 from ._normalize_stream import normalize_stream
25 from ._ranges import locators_and_ranges, replace_range, Range
26 from .retry import retry_method
27
28 MOD = "mod"
29 WRITE = "write"
30
31 _logger = logging.getLogger('arvados.arvfile')
32
33 def split(path):
34     """split(path) -> streamname, filename
35
36     Separate the stream name and file name in a /-separated stream path and
37     return a tuple (stream_name, file_name).  If no stream name is available,
38     assume '.'.
39
40     """
41     try:
42         stream_name, file_name = path.rsplit('/', 1)
43     except ValueError:  # No / in string
44         stream_name, file_name = '.', path
45     return stream_name, file_name
46
47
48 class UnownedBlockError(Exception):
49     """Raised when there's an writable block without an owner on the BlockManager."""
50     pass
51
52
53 class _FileLikeObjectBase(object):
54     def __init__(self, name, mode):
55         self.name = name
56         self.mode = mode
57         self.closed = False
58
59     @staticmethod
60     def _before_close(orig_func):
61         @functools.wraps(orig_func)
62         def before_close_wrapper(self, *args, **kwargs):
63             if self.closed:
64                 raise ValueError("I/O operation on closed stream file")
65             return orig_func(self, *args, **kwargs)
66         return before_close_wrapper
67
68     def __enter__(self):
69         return self
70
71     def __exit__(self, exc_type, exc_value, traceback):
72         try:
73             self.close()
74         except Exception:
75             if exc_type is None:
76                 raise
77
78     def close(self):
79         self.closed = True
80
81
82 class ArvadosFileReaderBase(_FileLikeObjectBase):
83     def __init__(self, name, mode, num_retries=None):
84         super(ArvadosFileReaderBase, self).__init__(name, mode)
85         self._filepos = 0
86         self.num_retries = num_retries
87         self._readline_cache = (None, None)
88
89     def __iter__(self):
90         while True:
91             data = self.readline()
92             if not data:
93                 break
94             yield data
95
96     def decompressed_name(self):
97         return re.sub('\.(bz2|gz)$', '', self.name)
98
99     @_FileLikeObjectBase._before_close
100     def seek(self, pos, whence=os.SEEK_SET):
101         if whence == os.SEEK_CUR:
102             pos += self._filepos
103         elif whence == os.SEEK_END:
104             pos += self.size()
105         self._filepos = min(max(pos, 0), self.size())
106
107     def tell(self):
108         return self._filepos
109
110     @_FileLikeObjectBase._before_close
111     @retry_method
112     def readall(self, size=2**20, num_retries=None):
113         while True:
114             data = self.read(size, num_retries=num_retries)
115             if data == '':
116                 break
117             yield data
118
119     @_FileLikeObjectBase._before_close
120     @retry_method
121     def readline(self, size=float('inf'), num_retries=None):
122         cache_pos, cache_data = self._readline_cache
123         if self.tell() == cache_pos:
124             data = [cache_data]
125             self._filepos += len(cache_data)
126         else:
127             data = ['']
128         data_size = len(data[-1])
129         while (data_size < size) and ('\n' not in data[-1]):
130             next_read = self.read(2 ** 20, num_retries=num_retries)
131             if not next_read:
132                 break
133             data.append(next_read)
134             data_size += len(next_read)
135         data = ''.join(data)
136         try:
137             nextline_index = data.index('\n') + 1
138         except ValueError:
139             nextline_index = len(data)
140         nextline_index = min(nextline_index, size)
141         self._filepos -= len(data) - nextline_index
142         self._readline_cache = (self.tell(), data[nextline_index:])
143         return data[:nextline_index]
144
145     @_FileLikeObjectBase._before_close
146     @retry_method
147     def decompress(self, decompress, size, num_retries=None):
148         for segment in self.readall(size, num_retries=num_retries):
149             data = decompress(segment)
150             if data:
151                 yield data
152
153     @_FileLikeObjectBase._before_close
154     @retry_method
155     def readall_decompressed(self, size=2**20, num_retries=None):
156         self.seek(0)
157         if self.name.endswith('.bz2'):
158             dc = bz2.BZ2Decompressor()
159             return self.decompress(dc.decompress, size,
160                                    num_retries=num_retries)
161         elif self.name.endswith('.gz'):
162             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
163             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
164                                    size, num_retries=num_retries)
165         else:
166             return self.readall(size, num_retries=num_retries)
167
168     @_FileLikeObjectBase._before_close
169     @retry_method
170     def readlines(self, sizehint=float('inf'), num_retries=None):
171         data = []
172         data_size = 0
173         for s in self.readall(num_retries=num_retries):
174             data.append(s)
175             data_size += len(s)
176             if data_size >= sizehint:
177                 break
178         return ''.join(data).splitlines(True)
179
180     def size(self):
181         raise NotImplementedError()
182
183     def read(self, size, num_retries=None):
184         raise NotImplementedError()
185
186     def readfrom(self, start, size, num_retries=None):
187         raise NotImplementedError()
188
189
190 class StreamFileReader(ArvadosFileReaderBase):
191     class _NameAttribute(str):
192         # The Python file API provides a plain .name attribute.
193         # Older SDK provided a name() method.
194         # This class provides both, for maximum compatibility.
195         def __call__(self):
196             return self
197
198     def __init__(self, stream, segments, name):
199         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
200         self._stream = stream
201         self.segments = segments
202
203     def stream_name(self):
204         return self._stream.name()
205
206     def size(self):
207         n = self.segments[-1]
208         return n.range_start + n.range_size
209
210     @_FileLikeObjectBase._before_close
211     @retry_method
212     def read(self, size, num_retries=None):
213         """Read up to 'size' bytes from the stream, starting at the current file position"""
214         if size == 0:
215             return ''
216
217         data = ''
218         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
219         if available_chunks:
220             lr = available_chunks[0]
221             data = self._stream.readfrom(lr.locator+lr.segment_offset,
222                                          lr.segment_size,
223                                          num_retries=num_retries)
224
225         self._filepos += len(data)
226         return data
227
228     @_FileLikeObjectBase._before_close
229     @retry_method
230     def readfrom(self, start, size, num_retries=None):
231         """Read up to 'size' bytes from the stream, starting at 'start'"""
232         if size == 0:
233             return ''
234
235         data = []
236         for lr in locators_and_ranges(self.segments, start, size):
237             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
238                                               num_retries=num_retries))
239         return ''.join(data)
240
241     def as_manifest(self):
242         segs = []
243         for r in self.segments:
244             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
245         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
246
247
248 def synchronized(orig_func):
249     @functools.wraps(orig_func)
250     def synchronized_wrapper(self, *args, **kwargs):
251         with self.lock:
252             return orig_func(self, *args, **kwargs)
253     return synchronized_wrapper
254
255
256 class StateChangeError(Exception):
257     def __init__(self, message, state, nextstate):
258         super(StateChangeError, self).__init__(message)
259         self.state = state
260         self.nextstate = nextstate
261
262 class _BufferBlock(object):
263     """A stand-in for a Keep block that is in the process of being written.
264
265     Writers can append to it, get the size, and compute the Keep locator.
266     There are three valid states:
267
268     WRITABLE
269       Can append to block.
270
271     PENDING
272       Block is in the process of being uploaded to Keep, append is an error.
273
274     COMMITTED
275       The block has been written to Keep, its internal buffer has been
276       released, fetching the block will fetch it via keep client (since we
277       discarded the internal copy), and identifiers referring to the BufferBlock
278       can be replaced with the block locator.
279
280     """
281
282     WRITABLE = 0
283     PENDING = 1
284     COMMITTED = 2
285     ERROR = 3
286
287     def __init__(self, blockid, starting_capacity, owner):
288         """
289         :blockid:
290           the identifier for this block
291
292         :starting_capacity:
293           the initial buffer capacity
294
295         :owner:
296           ArvadosFile that owns this block
297
298         """
299         self.blockid = blockid
300         self.buffer_block = bytearray(starting_capacity)
301         self.buffer_view = memoryview(self.buffer_block)
302         self.write_pointer = 0
303         self._state = _BufferBlock.WRITABLE
304         self._locator = None
305         self.owner = owner
306         self.lock = threading.Lock()
307         self.wait_for_commit = threading.Event()
308         self.error = None
309
310     @synchronized
311     def append(self, data):
312         """Append some data to the buffer.
313
314         Only valid if the block is in WRITABLE state.  Implements an expanding
315         buffer, doubling capacity as needed to accomdate all the data.
316
317         """
318         if self._state == _BufferBlock.WRITABLE:
319             while (self.write_pointer+len(data)) > len(self.buffer_block):
320                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
321                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
322                 self.buffer_block = new_buffer_block
323                 self.buffer_view = memoryview(self.buffer_block)
324             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
325             self.write_pointer += len(data)
326             self._locator = None
327         else:
328             raise AssertionError("Buffer block is not writable")
329
330     STATE_TRANSITIONS = frozenset([
331             (WRITABLE, PENDING),
332             (PENDING, COMMITTED),
333             (PENDING, ERROR),
334             (ERROR, PENDING)])
335
336     @synchronized
337     def set_state(self, nextstate, val=None):
338         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
339             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
340         self._state = nextstate
341
342         if self._state == _BufferBlock.PENDING:
343             self.wait_for_commit.clear()
344
345         if self._state == _BufferBlock.COMMITTED:
346             self._locator = val
347             self.buffer_view = None
348             self.buffer_block = None
349             self.wait_for_commit.set()
350
351         if self._state == _BufferBlock.ERROR:
352             self.error = val
353             self.wait_for_commit.set()
354
355     @synchronized
356     def state(self):
357         return self._state
358
359     def size(self):
360         """The amount of data written to the buffer."""
361         return self.write_pointer
362
363     @synchronized
364     def locator(self):
365         """The Keep locator for this buffer's contents."""
366         if self._locator is None:
367             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
368         return self._locator
369
370     @synchronized
371     def clone(self, new_blockid, owner):
372         if self._state == _BufferBlock.COMMITTED:
373             raise AssertionError("Cannot duplicate committed buffer block")
374         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
375         bufferblock.append(self.buffer_view[0:self.size()])
376         return bufferblock
377
378     @synchronized
379     def clear(self):
380         self.owner = None
381         self.buffer_block = None
382         self.buffer_view = None
383
384
385 class NoopLock(object):
386     def __enter__(self):
387         return self
388
389     def __exit__(self, exc_type, exc_value, traceback):
390         pass
391
392     def acquire(self, blocking=False):
393         pass
394
395     def release(self):
396         pass
397
398
399 def must_be_writable(orig_func):
400     @functools.wraps(orig_func)
401     def must_be_writable_wrapper(self, *args, **kwargs):
402         if not self.writable():
403             raise IOError(errno.EROFS, "Collection is read-only.")
404         return orig_func(self, *args, **kwargs)
405     return must_be_writable_wrapper
406
407
408 class _BlockManager(object):
409     """BlockManager handles buffer blocks.
410
411     Also handles background block uploads, and background block prefetch for a
412     Collection of ArvadosFiles.
413
414     """
415
416     DEFAULT_PUT_THREADS = 2
417     DEFAULT_GET_THREADS = 2
418
419     def __init__(self, keep, copies=None, put_threads=None):
420         """keep: KeepClient object to use"""
421         self._keep = keep
422         self._bufferblocks = collections.OrderedDict()
423         self._put_queue = None
424         self._put_threads = None
425         self._prefetch_queue = None
426         self._prefetch_threads = None
427         self.lock = threading.Lock()
428         self.prefetch_enabled = True
429         if put_threads:
430             self.num_put_threads = put_threads
431         else:
432             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
433         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
434         self.copies = copies
435         self._pending_write_size = 0
436         self.threads_lock = threading.Lock()
437
438     @synchronized
439     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
440         """Allocate a new, empty bufferblock in WRITABLE state and return it.
441
442         :blockid:
443           optional block identifier, otherwise one will be automatically assigned
444
445         :starting_capacity:
446           optional capacity, otherwise will use default capacity
447
448         :owner:
449           ArvadosFile that owns this block
450
451         """
452         return self._alloc_bufferblock(blockid, starting_capacity, owner)
453
454     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
455         if blockid is None:
456             blockid = "%s" % uuid.uuid4()
457         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
458         self._bufferblocks[bufferblock.blockid] = bufferblock
459         return bufferblock
460
461     @synchronized
462     def dup_block(self, block, owner):
463         """Create a new bufferblock initialized with the content of an existing bufferblock.
464
465         :block:
466           the buffer block to copy.
467
468         :owner:
469           ArvadosFile that owns the new block
470
471         """
472         new_blockid = "bufferblock%i" % len(self._bufferblocks)
473         bufferblock = block.clone(new_blockid, owner)
474         self._bufferblocks[bufferblock.blockid] = bufferblock
475         return bufferblock
476
477     @synchronized
478     def is_bufferblock(self, locator):
479         return locator in self._bufferblocks
480
481     def _commit_bufferblock_worker(self):
482         """Background uploader thread."""
483
484         while True:
485             try:
486                 bufferblock = self._put_queue.get()
487                 if bufferblock is None:
488                     return
489
490                 if self.copies is None:
491                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
492                 else:
493                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
494                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
495
496             except Exception as e:
497                 bufferblock.set_state(_BufferBlock.ERROR, e)
498             finally:
499                 if self._put_queue is not None:
500                     self._put_queue.task_done()
501
502     def start_put_threads(self):
503         with self.threads_lock:
504             if self._put_threads is None:
505                 # Start uploader threads.
506
507                 # If we don't limit the Queue size, the upload queue can quickly
508                 # grow to take up gigabytes of RAM if the writing process is
509                 # generating data more quickly than it can be send to the Keep
510                 # servers.
511                 #
512                 # With two upload threads and a queue size of 2, this means up to 4
513                 # blocks pending.  If they are full 64 MiB blocks, that means up to
514                 # 256 MiB of internal buffering, which is the same size as the
515                 # default download block cache in KeepClient.
516                 self._put_queue = queue.Queue(maxsize=2)
517
518                 self._put_threads = []
519                 for i in range(0, self.num_put_threads):
520                     thread = threading.Thread(target=self._commit_bufferblock_worker)
521                     self._put_threads.append(thread)
522                     thread.daemon = True
523                     thread.start()
524
525     def _block_prefetch_worker(self):
526         """The background downloader thread."""
527         while True:
528             try:
529                 b = self._prefetch_queue.get()
530                 if b is None:
531                     return
532                 self._keep.get(b)
533             except Exception:
534                 _logger.exception("Exception doing block prefetch")
535
536     @synchronized
537     def start_get_threads(self):
538         if self._prefetch_threads is None:
539             self._prefetch_queue = queue.Queue()
540             self._prefetch_threads = []
541             for i in range(0, self.num_get_threads):
542                 thread = threading.Thread(target=self._block_prefetch_worker)
543                 self._prefetch_threads.append(thread)
544                 thread.daemon = True
545                 thread.start()
546
547
548     @synchronized
549     def stop_threads(self):
550         """Shut down and wait for background upload and download threads to finish."""
551
552         if self._put_threads is not None:
553             for t in self._put_threads:
554                 self._put_queue.put(None)
555             for t in self._put_threads:
556                 t.join()
557         self._put_threads = None
558         self._put_queue = None
559
560         if self._prefetch_threads is not None:
561             for t in self._prefetch_threads:
562                 self._prefetch_queue.put(None)
563             for t in self._prefetch_threads:
564                 t.join()
565         self._prefetch_threads = None
566         self._prefetch_queue = None
567
568     def __enter__(self):
569         return self
570
571     def __exit__(self, exc_type, exc_value, traceback):
572         self.stop_threads()
573
574     @synchronized
575     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
576         """Packs small blocks together before uploading"""
577         self._pending_write_size += closed_file_size
578
579         # Check if there are enough small blocks for filling up one in full
580         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
581
582             # Search blocks ready for getting packed together before being committed to Keep.
583             # A WRITABLE block always has an owner.
584             # A WRITABLE block with its owner.closed() implies that it's
585             # size is <= KEEP_BLOCK_SIZE/2.
586             try:
587                 small_blocks = [b for b in list(self._bufferblocks.values()) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
588             except AttributeError:
589                 # Writable blocks without owner shouldn't exist.
590                 raise UnownedBlockError()
591
592             if len(small_blocks) <= 1:
593                 # Not enough small blocks for repacking
594                 return
595
596             # Update the pending write size count with its true value, just in case
597             # some small file was opened, written and closed several times.
598             self._pending_write_size = sum([b.size() for b in small_blocks])
599             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
600                 return
601
602             new_bb = self._alloc_bufferblock()
603             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
604                 bb = small_blocks.pop(0)
605                 arvfile = bb.owner
606                 self._pending_write_size -= bb.size()
607                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
608                 arvfile.set_segments([Range(new_bb.blockid,
609                                             0,
610                                             bb.size(),
611                                             new_bb.write_pointer - bb.size())])
612                 self._delete_bufferblock(bb.blockid)
613             self.commit_bufferblock(new_bb, sync=sync)
614
615     def commit_bufferblock(self, block, sync):
616         """Initiate a background upload of a bufferblock.
617
618         :block:
619           The block object to upload
620
621         :sync:
622           If `sync` is True, upload the block synchronously.
623           If `sync` is False, upload the block asynchronously.  This will
624           return immediately unless the upload queue is at capacity, in
625           which case it will wait on an upload queue slot.
626
627         """
628         try:
629             # Mark the block as PENDING so to disallow any more appends.
630             block.set_state(_BufferBlock.PENDING)
631         except StateChangeError as e:
632             if e.state == _BufferBlock.PENDING:
633                 if sync:
634                     block.wait_for_commit.wait()
635                 else:
636                     return
637             if block.state() == _BufferBlock.COMMITTED:
638                 return
639             elif block.state() == _BufferBlock.ERROR:
640                 raise block.error
641             else:
642                 raise
643
644         if sync:
645             try:
646                 if self.copies is None:
647                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
648                 else:
649                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
650                 block.set_state(_BufferBlock.COMMITTED, loc)
651             except Exception as e:
652                 block.set_state(_BufferBlock.ERROR, e)
653                 raise
654         else:
655             self.start_put_threads()
656             self._put_queue.put(block)
657
658     @synchronized
659     def get_bufferblock(self, locator):
660         return self._bufferblocks.get(locator)
661
662     @synchronized
663     def delete_bufferblock(self, locator):
664         self._delete_bufferblock(locator)
665
666     def _delete_bufferblock(self, locator):
667         bb = self._bufferblocks[locator]
668         bb.clear()
669         del self._bufferblocks[locator]
670
671     def get_block_contents(self, locator, num_retries, cache_only=False):
672         """Fetch a block.
673
674         First checks to see if the locator is a BufferBlock and return that, if
675         not, passes the request through to KeepClient.get().
676
677         """
678         with self.lock:
679             if locator in self._bufferblocks:
680                 bufferblock = self._bufferblocks[locator]
681                 if bufferblock.state() != _BufferBlock.COMMITTED:
682                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
683                 else:
684                     locator = bufferblock._locator
685         if cache_only:
686             return self._keep.get_from_cache(locator)
687         else:
688             return self._keep.get(locator, num_retries=num_retries)
689
690     def commit_all(self):
691         """Commit all outstanding buffer blocks.
692
693         This is a synchronous call, and will not return until all buffer blocks
694         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
695
696         """
697         self.repack_small_blocks(force=True, sync=True)
698
699         with self.lock:
700             items = list(self._bufferblocks.items())
701
702         for k,v in items:
703             if v.state() != _BufferBlock.COMMITTED and v.owner:
704                 v.owner.flush(sync=False)
705
706         with self.lock:
707             if self._put_queue is not None:
708                 self._put_queue.join()
709
710                 err = []
711                 for k,v in items:
712                     if v.state() == _BufferBlock.ERROR:
713                         err.append((v.locator(), v.error))
714                 if err:
715                     raise KeepWriteError("Error writing some blocks", err, label="block")
716
717         for k,v in items:
718             # flush again with sync=True to remove committed bufferblocks from
719             # the segments.
720             if v.owner:
721                 v.owner.flush(sync=True)
722
723     def block_prefetch(self, locator):
724         """Initiate a background download of a block.
725
726         This assumes that the underlying KeepClient implements a block cache,
727         so repeated requests for the same block will not result in repeated
728         downloads (unless the block is evicted from the cache.)  This method
729         does not block.
730
731         """
732
733         if not self.prefetch_enabled:
734             return
735
736         if self._keep.get_from_cache(locator) is not None:
737             return
738
739         with self.lock:
740             if locator in self._bufferblocks:
741                 return
742
743         self.start_get_threads()
744         self._prefetch_queue.put(locator)
745
746
747 class ArvadosFile(object):
748     """Represent a file in a Collection.
749
750     ArvadosFile manages the underlying representation of a file in Keep as a
751     sequence of segments spanning a set of blocks, and implements random
752     read/write access.
753
754     This object may be accessed from multiple threads.
755
756     """
757
758     def __init__(self, parent, name, stream=[], segments=[]):
759         """
760         ArvadosFile constructor.
761
762         :stream:
763           a list of Range objects representing a block stream
764
765         :segments:
766           a list of Range objects representing segments
767         """
768         self.parent = parent
769         self.name = name
770         self._writers = set()
771         self._committed = False
772         self._segments = []
773         self.lock = parent.root_collection().lock
774         for s in segments:
775             self._add_segment(stream, s.locator, s.range_size)
776         self._current_bblock = None
777
778     def writable(self):
779         return self.parent.writable()
780
781     @synchronized
782     def permission_expired(self, as_of_dt=None):
783         """Returns True if any of the segment's locators is expired"""
784         for r in self._segments:
785             if KeepLocator(r.locator).permission_expired(as_of_dt):
786                 return True
787         return False
788
789     @synchronized
790     def segments(self):
791         return copy.copy(self._segments)
792
793     @synchronized
794     def clone(self, new_parent, new_name):
795         """Make a copy of this file."""
796         cp = ArvadosFile(new_parent, new_name)
797         cp.replace_contents(self)
798         return cp
799
800     @must_be_writable
801     @synchronized
802     def replace_contents(self, other):
803         """Replace segments of this file with segments from another `ArvadosFile` object."""
804
805         map_loc = {}
806         self._segments = []
807         for other_segment in other.segments():
808             new_loc = other_segment.locator
809             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
810                 if other_segment.locator not in map_loc:
811                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
812                     if bufferblock.state() != _BufferBlock.WRITABLE:
813                         map_loc[other_segment.locator] = bufferblock.locator()
814                     else:
815                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
816                 new_loc = map_loc[other_segment.locator]
817
818             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
819
820         self.set_committed(False)
821
822     def __eq__(self, other):
823         if other is self:
824             return True
825         if not isinstance(other, ArvadosFile):
826             return False
827
828         othersegs = other.segments()
829         with self.lock:
830             if len(self._segments) != len(othersegs):
831                 return False
832             for i in range(0, len(othersegs)):
833                 seg1 = self._segments[i]
834                 seg2 = othersegs[i]
835                 loc1 = seg1.locator
836                 loc2 = seg2.locator
837
838                 if self.parent._my_block_manager().is_bufferblock(loc1):
839                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
840
841                 if other.parent._my_block_manager().is_bufferblock(loc2):
842                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
843
844                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
845                     seg1.range_start != seg2.range_start or
846                     seg1.range_size != seg2.range_size or
847                     seg1.segment_offset != seg2.segment_offset):
848                     return False
849
850         return True
851
852     def __ne__(self, other):
853         return not self.__eq__(other)
854
855     @synchronized
856     def set_segments(self, segs):
857         self._segments = segs
858
859     @synchronized
860     def set_committed(self, value=True):
861         """Set committed flag.
862
863         If value is True, set committed to be True.
864
865         If value is False, set committed to be False for this and all parents.
866         """
867         if value == self._committed:
868             return
869         self._committed = value
870         if self._committed is False and self.parent is not None:
871             self.parent.set_committed(False)
872
873     @synchronized
874     def committed(self):
875         """Get whether this is committed or not."""
876         return self._committed
877
878     @synchronized
879     def add_writer(self, writer):
880         """Add an ArvadosFileWriter reference to the list of writers"""
881         if isinstance(writer, ArvadosFileWriter):
882             self._writers.add(writer)
883
884     @synchronized
885     def remove_writer(self, writer, flush):
886         """
887         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
888         and do some block maintenance tasks.
889         """
890         self._writers.remove(writer)
891
892         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
893             # File writer closed, not small enough for repacking
894             self.flush()
895         elif self.closed():
896             # All writers closed and size is adequate for repacking
897             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
898
899     def closed(self):
900         """
901         Get whether this is closed or not. When the writers list is empty, the file
902         is supposed to be closed.
903         """
904         return len(self._writers) == 0
905
906     @must_be_writable
907     @synchronized
908     def truncate(self, size):
909         """Shrink the size of the file.
910
911         If `size` is less than the size of the file, the file contents after
912         `size` will be discarded.  If `size` is greater than the current size
913         of the file, an IOError will be raised.
914
915         """
916         if size < self.size():
917             new_segs = []
918             for r in self._segments:
919                 range_end = r.range_start+r.range_size
920                 if r.range_start >= size:
921                     # segment is past the trucate size, all done
922                     break
923                 elif size < range_end:
924                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
925                     nr.segment_offset = r.segment_offset
926                     new_segs.append(nr)
927                     break
928                 else:
929                     new_segs.append(r)
930
931             self._segments = new_segs
932             self.set_committed(False)
933         elif size > self.size():
934             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
935
936     def readfrom(self, offset, size, num_retries, exact=False):
937         """Read up to `size` bytes from the file starting at `offset`.
938
939         :exact:
940          If False (default), return less data than requested if the read
941          crosses a block boundary and the next block isn't cached.  If True,
942          only return less data than requested when hitting EOF.
943         """
944
945         with self.lock:
946             if size == 0 or offset >= self.size():
947                 return ''
948             readsegs = locators_and_ranges(self._segments, offset, size)
949             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
950
951         locs = set()
952         data = []
953         for lr in readsegs:
954             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
955             if block:
956                 blockview = memoryview(block)
957                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
958                 locs.add(lr.locator)
959             else:
960                 break
961
962         for lr in prefetch:
963             if lr.locator not in locs:
964                 self.parent._my_block_manager().block_prefetch(lr.locator)
965                 locs.add(lr.locator)
966
967         return ''.join(data)
968
969     def _repack_writes(self, num_retries):
970         """Test if the buffer block has more data than actual segments.
971
972         This happens when a buffered write over-writes a file range written in
973         a previous buffered write.  Re-pack the buffer block for efficiency
974         and to avoid leaking information.
975
976         """
977         segs = self._segments
978
979         # Sum up the segments to get the total bytes of the file referencing
980         # into the buffer block.
981         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
982         write_total = sum([s.range_size for s in bufferblock_segs])
983
984         if write_total < self._current_bblock.size():
985             # There is more data in the buffer block than is actually accounted for by segments, so
986             # re-pack into a new buffer by copying over to a new buffer block.
987             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
988             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
989             for t in bufferblock_segs:
990                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
991                 t.segment_offset = new_bb.size() - t.range_size
992
993             self._current_bblock = new_bb
994
995     @must_be_writable
996     @synchronized
997     def writeto(self, offset, data, num_retries):
998         """Write `data` to the file starting at `offset`.
999
1000         This will update existing bytes and/or extend the size of the file as
1001         necessary.
1002
1003         """
1004         if len(data) == 0:
1005             return
1006
1007         if offset > self.size():
1008             raise ArgumentError("Offset is past the end of the file")
1009
1010         if len(data) > config.KEEP_BLOCK_SIZE:
1011             # Chunk it up into smaller writes
1012             n = 0
1013             dataview = memoryview(data)
1014             while n < len(data):
1015                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1016                 n += config.KEEP_BLOCK_SIZE
1017             return
1018
1019         self.set_committed(False)
1020
1021         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1022             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1023
1024         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1025             self._repack_writes(num_retries)
1026             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1027                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1028                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1029
1030         self._current_bblock.append(data)
1031
1032         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1033
1034         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1035
1036         return len(data)
1037
1038     @synchronized
1039     def flush(self, sync=True, num_retries=0):
1040         """Flush the current bufferblock to Keep.
1041
1042         :sync:
1043           If True, commit block synchronously, wait until buffer block has been written.
1044           If False, commit block asynchronously, return immediately after putting block into
1045           the keep put queue.
1046         """
1047         if self.committed():
1048             return
1049
1050         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1051             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1052                 self._repack_writes(num_retries)
1053             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1054
1055         if sync:
1056             to_delete = set()
1057             for s in self._segments:
1058                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1059                 if bb:
1060                     if bb.state() != _BufferBlock.COMMITTED:
1061                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1062                     to_delete.add(s.locator)
1063                     s.locator = bb.locator()
1064             for s in to_delete:
1065                self.parent._my_block_manager().delete_bufferblock(s)
1066
1067         self.parent.notify(MOD, self.parent, self.name, (self, self))
1068
1069     @must_be_writable
1070     @synchronized
1071     def add_segment(self, blocks, pos, size):
1072         """Add a segment to the end of the file.
1073
1074         `pos` and `offset` reference a section of the stream described by
1075         `blocks` (a list of Range objects)
1076
1077         """
1078         self._add_segment(blocks, pos, size)
1079
1080     def _add_segment(self, blocks, pos, size):
1081         """Internal implementation of add_segment."""
1082         self.set_committed(False)
1083         for lr in locators_and_ranges(blocks, pos, size):
1084             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1085             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1086             self._segments.append(r)
1087
1088     @synchronized
1089     def size(self):
1090         """Get the file size."""
1091         if self._segments:
1092             n = self._segments[-1]
1093             return n.range_start + n.range_size
1094         else:
1095             return 0
1096
1097     @synchronized
1098     def manifest_text(self, stream_name=".", portable_locators=False,
1099                       normalize=False, only_committed=False):
1100         buf = ""
1101         filestream = []
1102         for segment in self.segments:
1103             loc = segment.locator
1104             if self.parent._my_block_manager().is_bufferblock(loc):
1105                 if only_committed:
1106                     continue
1107                 loc = self._bufferblocks[loc].calculate_locator()
1108             if portable_locators:
1109                 loc = KeepLocator(loc).stripped()
1110             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1111                                  segment.segment_offset, segment.range_size))
1112         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1113         buf += "\n"
1114         return buf
1115
1116     @must_be_writable
1117     @synchronized
1118     def _reparent(self, newparent, newname):
1119         self.set_committed(False)
1120         self.flush(sync=True)
1121         self.parent.remove(self.name)
1122         self.parent = newparent
1123         self.name = newname
1124         self.lock = self.parent.root_collection().lock
1125
1126
1127 class ArvadosFileReader(ArvadosFileReaderBase):
1128     """Wraps ArvadosFile in a file-like object supporting reading only.
1129
1130     Be aware that this class is NOT thread safe as there is no locking around
1131     updating file pointer.
1132
1133     """
1134
1135     def __init__(self, arvadosfile, num_retries=None):
1136         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1137         self.arvadosfile = arvadosfile
1138
1139     def size(self):
1140         return self.arvadosfile.size()
1141
1142     def stream_name(self):
1143         return self.arvadosfile.parent.stream_name()
1144
1145     @_FileLikeObjectBase._before_close
1146     @retry_method
1147     def read(self, size=None, num_retries=None):
1148         """Read up to `size` bytes from the file and return the result.
1149
1150         Starts at the current file position.  If `size` is None, read the
1151         entire remainder of the file.
1152         """
1153         if size is None:
1154             data = []
1155             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1156             while rd:
1157                 data.append(rd)
1158                 self._filepos += len(rd)
1159                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1160             return ''.join(data)
1161         else:
1162             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1163             self._filepos += len(data)
1164             return data
1165
1166     @_FileLikeObjectBase._before_close
1167     @retry_method
1168     def readfrom(self, offset, size, num_retries=None):
1169         """Read up to `size` bytes from the stream, starting at the specified file offset.
1170
1171         This method does not change the file position.
1172         """
1173         return self.arvadosfile.readfrom(offset, size, num_retries)
1174
1175     def flush(self):
1176         pass
1177
1178
1179 class ArvadosFileWriter(ArvadosFileReader):
1180     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1181
1182     Be aware that this class is NOT thread safe as there is no locking around
1183     updating file pointer.
1184
1185     """
1186
1187     def __init__(self, arvadosfile, mode, num_retries=None):
1188         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1189         self.mode = mode
1190         self.arvadosfile.add_writer(self)
1191
1192     @_FileLikeObjectBase._before_close
1193     @retry_method
1194     def write(self, data, num_retries=None):
1195         if self.mode[0] == "a":
1196             self.arvadosfile.writeto(self.size(), data, num_retries)
1197         else:
1198             self.arvadosfile.writeto(self._filepos, data, num_retries)
1199             self._filepos += len(data)
1200         return len(data)
1201
1202     @_FileLikeObjectBase._before_close
1203     @retry_method
1204     def writelines(self, seq, num_retries=None):
1205         for s in seq:
1206             self.write(s, num_retries=num_retries)
1207
1208     @_FileLikeObjectBase._before_close
1209     def truncate(self, size=None):
1210         if size is None:
1211             size = self._filepos
1212         self.arvadosfile.truncate(size)
1213         if self._filepos > self.size():
1214             self._filepos = self.size()
1215
1216     @_FileLikeObjectBase._before_close
1217     def flush(self):
1218         self.arvadosfile.flush()
1219
1220     def close(self, flush=True):
1221         if not self.closed:
1222             self.arvadosfile.remove_writer(self, flush)
1223             super(ArvadosFileWriter, self).close()