11308: Fix string/bytes confusion for Python 3
[arvados.git] / sdk / python / arvados / arvfile.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import range
6 from builtins import object
7 import functools
8 import os
9 import zlib
10 import bz2
11 from . import config
12 import hashlib
13 import threading
14 import queue
15 import copy
16 import errno
17 import re
18 import logging
19 import collections
20 import uuid
21
22 from .errors import KeepWriteError, AssertionError, ArgumentError
23 from .keep import KeepLocator
24 from ._normalize_stream import normalize_stream
25 from ._ranges import locators_and_ranges, replace_range, Range
26 from .retry import retry_method
27
28 MOD = "mod"
29 WRITE = "write"
30
31 _logger = logging.getLogger('arvados.arvfile')
32
33 def split(path):
34     """split(path) -> streamname, filename
35
36     Separate the stream name and file name in a /-separated stream path and
37     return a tuple (stream_name, file_name).  If no stream name is available,
38     assume '.'.
39
40     """
41     try:
42         stream_name, file_name = path.rsplit('/', 1)
43     except ValueError:  # No / in string
44         stream_name, file_name = '.', path
45     return stream_name, file_name
46
47
48 class UnownedBlockError(Exception):
49     """Raised when there's an writable block without an owner on the BlockManager."""
50     pass
51
52
53 class _FileLikeObjectBase(object):
54     def __init__(self, name, mode):
55         self.name = name
56         self.mode = mode
57         self.closed = False
58
59     @staticmethod
60     def _before_close(orig_func):
61         @functools.wraps(orig_func)
62         def before_close_wrapper(self, *args, **kwargs):
63             if self.closed:
64                 raise ValueError("I/O operation on closed stream file")
65             return orig_func(self, *args, **kwargs)
66         return before_close_wrapper
67
68     def __enter__(self):
69         return self
70
71     def __exit__(self, exc_type, exc_value, traceback):
72         try:
73             self.close()
74         except Exception:
75             if exc_type is None:
76                 raise
77
78     def close(self):
79         self.closed = True
80
81
82 class ArvadosFileReaderBase(_FileLikeObjectBase):
83     def __init__(self, name, mode, num_retries=None):
84         super(ArvadosFileReaderBase, self).__init__(name, mode)
85         self._filepos = 0
86         self.num_retries = num_retries
87         self._readline_cache = (None, None)
88
89     def __iter__(self):
90         while True:
91             data = self.readline()
92             if not data:
93                 break
94             yield data
95
96     def decompressed_name(self):
97         return re.sub('\.(bz2|gz)$', '', self.name)
98
99     @_FileLikeObjectBase._before_close
100     def seek(self, pos, whence=os.SEEK_SET):
101         if whence == os.SEEK_CUR:
102             pos += self._filepos
103         elif whence == os.SEEK_END:
104             pos += self.size()
105         self._filepos = min(max(pos, 0), self.size())
106
107     def tell(self):
108         return self._filepos
109
110     @_FileLikeObjectBase._before_close
111     @retry_method
112     def readall(self, size=2**20, num_retries=None):
113         while True:
114             data = self.read(size, num_retries=num_retries)
115             if len(data) == 0:
116                 break
117             yield data
118
119     @_FileLikeObjectBase._before_close
120     @retry_method
121     def readline(self, size=float('inf'), num_retries=None):
122         cache_pos, cache_data = self._readline_cache
123         if self.tell() == cache_pos:
124             data = [cache_data]
125             self._filepos += len(cache_data)
126         else:
127             data = [b'']
128         data_size = len(data[-1])
129         while (data_size < size) and (b'\n' not in data[-1]):
130             next_read = self.read(2 ** 20, num_retries=num_retries)
131             if not next_read:
132                 break
133             data.append(next_read)
134             data_size += len(next_read)
135         data = b''.join(data)
136         try:
137             nextline_index = data.index(b'\n') + 1
138         except ValueError:
139             nextline_index = len(data)
140         nextline_index = min(nextline_index, size)
141         self._filepos -= len(data) - nextline_index
142         self._readline_cache = (self.tell(), data[nextline_index:])
143         return data[:nextline_index].decode()
144
145     @_FileLikeObjectBase._before_close
146     @retry_method
147     def decompress(self, decompress, size, num_retries=None):
148         for segment in self.readall(size, num_retries=num_retries):
149             data = decompress(segment)
150             if data:
151                 yield data
152
153     @_FileLikeObjectBase._before_close
154     @retry_method
155     def readall_decompressed(self, size=2**20, num_retries=None):
156         self.seek(0)
157         if self.name.endswith('.bz2'):
158             dc = bz2.BZ2Decompressor()
159             return self.decompress(dc.decompress, size,
160                                    num_retries=num_retries)
161         elif self.name.endswith('.gz'):
162             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
163             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
164                                    size, num_retries=num_retries)
165         else:
166             return self.readall(size, num_retries=num_retries)
167
168     @_FileLikeObjectBase._before_close
169     @retry_method
170     def readlines(self, sizehint=float('inf'), num_retries=None):
171         data = []
172         data_size = 0
173         for s in self.readall(num_retries=num_retries):
174             data.append(s)
175             data_size += len(s)
176             if data_size >= sizehint:
177                 break
178         return b''.join(data).decode().splitlines(True)
179
180     def size(self):
181         raise NotImplementedError()
182
183     def read(self, size, num_retries=None):
184         raise NotImplementedError()
185
186     def readfrom(self, start, size, num_retries=None):
187         raise NotImplementedError()
188
189
190 class StreamFileReader(ArvadosFileReaderBase):
191     class _NameAttribute(str):
192         # The Python file API provides a plain .name attribute.
193         # Older SDK provided a name() method.
194         # This class provides both, for maximum compatibility.
195         def __call__(self):
196             return self
197
198     def __init__(self, stream, segments, name):
199         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
200         self._stream = stream
201         self.segments = segments
202
203     def stream_name(self):
204         return self._stream.name()
205
206     def size(self):
207         n = self.segments[-1]
208         return n.range_start + n.range_size
209
210     @_FileLikeObjectBase._before_close
211     @retry_method
212     def read(self, size, num_retries=None):
213         """Read up to 'size' bytes from the stream, starting at the current file position"""
214         if size == 0:
215             return b''
216
217         data = b''
218         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
219         if available_chunks:
220             lr = available_chunks[0]
221             data = self._stream.readfrom(lr.locator+lr.segment_offset,
222                                          lr.segment_size,
223                                          num_retries=num_retries)
224
225         self._filepos += len(data)
226         return data
227
228     @_FileLikeObjectBase._before_close
229     @retry_method
230     def readfrom(self, start, size, num_retries=None):
231         """Read up to 'size' bytes from the stream, starting at 'start'"""
232         if size == 0:
233             return b''
234
235         data = []
236         for lr in locators_and_ranges(self.segments, start, size):
237             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
238                                               num_retries=num_retries))
239         return b''.join(data)
240
241     def as_manifest(self):
242         segs = []
243         for r in self.segments:
244             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
245         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
246
247
248 def synchronized(orig_func):
249     @functools.wraps(orig_func)
250     def synchronized_wrapper(self, *args, **kwargs):
251         with self.lock:
252             return orig_func(self, *args, **kwargs)
253     return synchronized_wrapper
254
255
256 class StateChangeError(Exception):
257     def __init__(self, message, state, nextstate):
258         super(StateChangeError, self).__init__(message)
259         self.state = state
260         self.nextstate = nextstate
261
262 class _BufferBlock(object):
263     """A stand-in for a Keep block that is in the process of being written.
264
265     Writers can append to it, get the size, and compute the Keep locator.
266     There are three valid states:
267
268     WRITABLE
269       Can append to block.
270
271     PENDING
272       Block is in the process of being uploaded to Keep, append is an error.
273
274     COMMITTED
275       The block has been written to Keep, its internal buffer has been
276       released, fetching the block will fetch it via keep client (since we
277       discarded the internal copy), and identifiers referring to the BufferBlock
278       can be replaced with the block locator.
279
280     """
281
282     WRITABLE = 0
283     PENDING = 1
284     COMMITTED = 2
285     ERROR = 3
286
287     def __init__(self, blockid, starting_capacity, owner):
288         """
289         :blockid:
290           the identifier for this block
291
292         :starting_capacity:
293           the initial buffer capacity
294
295         :owner:
296           ArvadosFile that owns this block
297
298         """
299         self.blockid = blockid
300         self.buffer_block = bytearray(starting_capacity)
301         self.buffer_view = memoryview(self.buffer_block)
302         self.write_pointer = 0
303         self._state = _BufferBlock.WRITABLE
304         self._locator = None
305         self.owner = owner
306         self.lock = threading.Lock()
307         self.wait_for_commit = threading.Event()
308         self.error = None
309
310     @synchronized
311     def append(self, data):
312         """Append some data to the buffer.
313
314         Only valid if the block is in WRITABLE state.  Implements an expanding
315         buffer, doubling capacity as needed to accomdate all the data.
316
317         """
318         if self._state == _BufferBlock.WRITABLE:
319             if not isinstance(data, bytes) and not isinstance(data, memoryview):
320                 data = data.encode()
321             while (self.write_pointer+len(data)) > len(self.buffer_block):
322                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
323                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
324                 self.buffer_block = new_buffer_block
325                 self.buffer_view = memoryview(self.buffer_block)
326             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
327             self.write_pointer += len(data)
328             self._locator = None
329         else:
330             raise AssertionError("Buffer block is not writable")
331
332     STATE_TRANSITIONS = frozenset([
333             (WRITABLE, PENDING),
334             (PENDING, COMMITTED),
335             (PENDING, ERROR),
336             (ERROR, PENDING)])
337
338     @synchronized
339     def set_state(self, nextstate, val=None):
340         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
341             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
342         self._state = nextstate
343
344         if self._state == _BufferBlock.PENDING:
345             self.wait_for_commit.clear()
346
347         if self._state == _BufferBlock.COMMITTED:
348             self._locator = val
349             self.buffer_view = None
350             self.buffer_block = None
351             self.wait_for_commit.set()
352
353         if self._state == _BufferBlock.ERROR:
354             self.error = val
355             self.wait_for_commit.set()
356
357     @synchronized
358     def state(self):
359         return self._state
360
361     def size(self):
362         """The amount of data written to the buffer."""
363         return self.write_pointer
364
365     @synchronized
366     def locator(self):
367         """The Keep locator for this buffer's contents."""
368         if self._locator is None:
369             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
370         return self._locator
371
372     @synchronized
373     def clone(self, new_blockid, owner):
374         if self._state == _BufferBlock.COMMITTED:
375             raise AssertionError("Cannot duplicate committed buffer block")
376         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
377         bufferblock.append(self.buffer_view[0:self.size()])
378         return bufferblock
379
380     @synchronized
381     def clear(self):
382         self.owner = None
383         self.buffer_block = None
384         self.buffer_view = None
385
386
387 class NoopLock(object):
388     def __enter__(self):
389         return self
390
391     def __exit__(self, exc_type, exc_value, traceback):
392         pass
393
394     def acquire(self, blocking=False):
395         pass
396
397     def release(self):
398         pass
399
400
401 def must_be_writable(orig_func):
402     @functools.wraps(orig_func)
403     def must_be_writable_wrapper(self, *args, **kwargs):
404         if not self.writable():
405             raise IOError(errno.EROFS, "Collection is read-only.")
406         return orig_func(self, *args, **kwargs)
407     return must_be_writable_wrapper
408
409
410 class _BlockManager(object):
411     """BlockManager handles buffer blocks.
412
413     Also handles background block uploads, and background block prefetch for a
414     Collection of ArvadosFiles.
415
416     """
417
418     DEFAULT_PUT_THREADS = 2
419     DEFAULT_GET_THREADS = 2
420
421     def __init__(self, keep, copies=None, put_threads=None):
422         """keep: KeepClient object to use"""
423         self._keep = keep
424         self._bufferblocks = collections.OrderedDict()
425         self._put_queue = None
426         self._put_threads = None
427         self._prefetch_queue = None
428         self._prefetch_threads = None
429         self.lock = threading.Lock()
430         self.prefetch_enabled = True
431         if put_threads:
432             self.num_put_threads = put_threads
433         else:
434             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
435         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
436         self.copies = copies
437         self._pending_write_size = 0
438         self.threads_lock = threading.Lock()
439
440     @synchronized
441     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
442         """Allocate a new, empty bufferblock in WRITABLE state and return it.
443
444         :blockid:
445           optional block identifier, otherwise one will be automatically assigned
446
447         :starting_capacity:
448           optional capacity, otherwise will use default capacity
449
450         :owner:
451           ArvadosFile that owns this block
452
453         """
454         return self._alloc_bufferblock(blockid, starting_capacity, owner)
455
456     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
457         if blockid is None:
458             blockid = "%s" % uuid.uuid4()
459         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
460         self._bufferblocks[bufferblock.blockid] = bufferblock
461         return bufferblock
462
463     @synchronized
464     def dup_block(self, block, owner):
465         """Create a new bufferblock initialized with the content of an existing bufferblock.
466
467         :block:
468           the buffer block to copy.
469
470         :owner:
471           ArvadosFile that owns the new block
472
473         """
474         new_blockid = "bufferblock%i" % len(self._bufferblocks)
475         bufferblock = block.clone(new_blockid, owner)
476         self._bufferblocks[bufferblock.blockid] = bufferblock
477         return bufferblock
478
479     @synchronized
480     def is_bufferblock(self, locator):
481         return locator in self._bufferblocks
482
483     def _commit_bufferblock_worker(self):
484         """Background uploader thread."""
485
486         while True:
487             try:
488                 bufferblock = self._put_queue.get()
489                 if bufferblock is None:
490                     return
491
492                 if self.copies is None:
493                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
494                 else:
495                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
496                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
497
498             except Exception as e:
499                 bufferblock.set_state(_BufferBlock.ERROR, e)
500             finally:
501                 if self._put_queue is not None:
502                     self._put_queue.task_done()
503
504     def start_put_threads(self):
505         with self.threads_lock:
506             if self._put_threads is None:
507                 # Start uploader threads.
508
509                 # If we don't limit the Queue size, the upload queue can quickly
510                 # grow to take up gigabytes of RAM if the writing process is
511                 # generating data more quickly than it can be send to the Keep
512                 # servers.
513                 #
514                 # With two upload threads and a queue size of 2, this means up to 4
515                 # blocks pending.  If they are full 64 MiB blocks, that means up to
516                 # 256 MiB of internal buffering, which is the same size as the
517                 # default download block cache in KeepClient.
518                 self._put_queue = queue.Queue(maxsize=2)
519
520                 self._put_threads = []
521                 for i in range(0, self.num_put_threads):
522                     thread = threading.Thread(target=self._commit_bufferblock_worker)
523                     self._put_threads.append(thread)
524                     thread.daemon = True
525                     thread.start()
526
527     def _block_prefetch_worker(self):
528         """The background downloader thread."""
529         while True:
530             try:
531                 b = self._prefetch_queue.get()
532                 if b is None:
533                     return
534                 self._keep.get(b)
535             except Exception:
536                 _logger.exception("Exception doing block prefetch")
537
538     @synchronized
539     def start_get_threads(self):
540         if self._prefetch_threads is None:
541             self._prefetch_queue = queue.Queue()
542             self._prefetch_threads = []
543             for i in range(0, self.num_get_threads):
544                 thread = threading.Thread(target=self._block_prefetch_worker)
545                 self._prefetch_threads.append(thread)
546                 thread.daemon = True
547                 thread.start()
548
549
550     @synchronized
551     def stop_threads(self):
552         """Shut down and wait for background upload and download threads to finish."""
553
554         if self._put_threads is not None:
555             for t in self._put_threads:
556                 self._put_queue.put(None)
557             for t in self._put_threads:
558                 t.join()
559         self._put_threads = None
560         self._put_queue = None
561
562         if self._prefetch_threads is not None:
563             for t in self._prefetch_threads:
564                 self._prefetch_queue.put(None)
565             for t in self._prefetch_threads:
566                 t.join()
567         self._prefetch_threads = None
568         self._prefetch_queue = None
569
570     def __enter__(self):
571         return self
572
573     def __exit__(self, exc_type, exc_value, traceback):
574         self.stop_threads()
575
576     @synchronized
577     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
578         """Packs small blocks together before uploading"""
579         self._pending_write_size += closed_file_size
580
581         # Check if there are enough small blocks for filling up one in full
582         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
583
584             # Search blocks ready for getting packed together before being committed to Keep.
585             # A WRITABLE block always has an owner.
586             # A WRITABLE block with its owner.closed() implies that it's
587             # size is <= KEEP_BLOCK_SIZE/2.
588             try:
589                 small_blocks = [b for b in list(self._bufferblocks.values()) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
590             except AttributeError:
591                 # Writable blocks without owner shouldn't exist.
592                 raise UnownedBlockError()
593
594             if len(small_blocks) <= 1:
595                 # Not enough small blocks for repacking
596                 return
597
598             # Update the pending write size count with its true value, just in case
599             # some small file was opened, written and closed several times.
600             self._pending_write_size = sum([b.size() for b in small_blocks])
601             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
602                 return
603
604             new_bb = self._alloc_bufferblock()
605             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
606                 bb = small_blocks.pop(0)
607                 arvfile = bb.owner
608                 self._pending_write_size -= bb.size()
609                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
610                 arvfile.set_segments([Range(new_bb.blockid,
611                                             0,
612                                             bb.size(),
613                                             new_bb.write_pointer - bb.size())])
614                 self._delete_bufferblock(bb.blockid)
615             self.commit_bufferblock(new_bb, sync=sync)
616
617     def commit_bufferblock(self, block, sync):
618         """Initiate a background upload of a bufferblock.
619
620         :block:
621           The block object to upload
622
623         :sync:
624           If `sync` is True, upload the block synchronously.
625           If `sync` is False, upload the block asynchronously.  This will
626           return immediately unless the upload queue is at capacity, in
627           which case it will wait on an upload queue slot.
628
629         """
630         try:
631             # Mark the block as PENDING so to disallow any more appends.
632             block.set_state(_BufferBlock.PENDING)
633         except StateChangeError as e:
634             if e.state == _BufferBlock.PENDING:
635                 if sync:
636                     block.wait_for_commit.wait()
637                 else:
638                     return
639             if block.state() == _BufferBlock.COMMITTED:
640                 return
641             elif block.state() == _BufferBlock.ERROR:
642                 raise block.error
643             else:
644                 raise
645
646         if sync:
647             try:
648                 if self.copies is None:
649                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
650                 else:
651                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
652                 block.set_state(_BufferBlock.COMMITTED, loc)
653             except Exception as e:
654                 block.set_state(_BufferBlock.ERROR, e)
655                 raise
656         else:
657             self.start_put_threads()
658             self._put_queue.put(block)
659
660     @synchronized
661     def get_bufferblock(self, locator):
662         return self._bufferblocks.get(locator)
663
664     @synchronized
665     def delete_bufferblock(self, locator):
666         self._delete_bufferblock(locator)
667
668     def _delete_bufferblock(self, locator):
669         bb = self._bufferblocks[locator]
670         bb.clear()
671         del self._bufferblocks[locator]
672
673     def get_block_contents(self, locator, num_retries, cache_only=False):
674         """Fetch a block.
675
676         First checks to see if the locator is a BufferBlock and return that, if
677         not, passes the request through to KeepClient.get().
678
679         """
680         with self.lock:
681             if locator in self._bufferblocks:
682                 bufferblock = self._bufferblocks[locator]
683                 if bufferblock.state() != _BufferBlock.COMMITTED:
684                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
685                 else:
686                     locator = bufferblock._locator
687         if cache_only:
688             return self._keep.get_from_cache(locator)
689         else:
690             return self._keep.get(locator, num_retries=num_retries)
691
692     def commit_all(self):
693         """Commit all outstanding buffer blocks.
694
695         This is a synchronous call, and will not return until all buffer blocks
696         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
697
698         """
699         self.repack_small_blocks(force=True, sync=True)
700
701         with self.lock:
702             items = list(self._bufferblocks.items())
703
704         for k,v in items:
705             if v.state() != _BufferBlock.COMMITTED and v.owner:
706                 v.owner.flush(sync=False)
707
708         with self.lock:
709             if self._put_queue is not None:
710                 self._put_queue.join()
711
712                 err = []
713                 for k,v in items:
714                     if v.state() == _BufferBlock.ERROR:
715                         err.append((v.locator(), v.error))
716                 if err:
717                     raise KeepWriteError("Error writing some blocks", err, label="block")
718
719         for k,v in items:
720             # flush again with sync=True to remove committed bufferblocks from
721             # the segments.
722             if v.owner:
723                 v.owner.flush(sync=True)
724
725     def block_prefetch(self, locator):
726         """Initiate a background download of a block.
727
728         This assumes that the underlying KeepClient implements a block cache,
729         so repeated requests for the same block will not result in repeated
730         downloads (unless the block is evicted from the cache.)  This method
731         does not block.
732
733         """
734
735         if not self.prefetch_enabled:
736             return
737
738         if self._keep.get_from_cache(locator) is not None:
739             return
740
741         with self.lock:
742             if locator in self._bufferblocks:
743                 return
744
745         self.start_get_threads()
746         self._prefetch_queue.put(locator)
747
748
749 class ArvadosFile(object):
750     """Represent a file in a Collection.
751
752     ArvadosFile manages the underlying representation of a file in Keep as a
753     sequence of segments spanning a set of blocks, and implements random
754     read/write access.
755
756     This object may be accessed from multiple threads.
757
758     """
759
760     def __init__(self, parent, name, stream=[], segments=[]):
761         """
762         ArvadosFile constructor.
763
764         :stream:
765           a list of Range objects representing a block stream
766
767         :segments:
768           a list of Range objects representing segments
769         """
770         self.parent = parent
771         self.name = name
772         self._writers = set()
773         self._committed = False
774         self._segments = []
775         self.lock = parent.root_collection().lock
776         for s in segments:
777             self._add_segment(stream, s.locator, s.range_size)
778         self._current_bblock = None
779
780     def writable(self):
781         return self.parent.writable()
782
783     @synchronized
784     def permission_expired(self, as_of_dt=None):
785         """Returns True if any of the segment's locators is expired"""
786         for r in self._segments:
787             if KeepLocator(r.locator).permission_expired(as_of_dt):
788                 return True
789         return False
790
791     @synchronized
792     def segments(self):
793         return copy.copy(self._segments)
794
795     @synchronized
796     def clone(self, new_parent, new_name):
797         """Make a copy of this file."""
798         cp = ArvadosFile(new_parent, new_name)
799         cp.replace_contents(self)
800         return cp
801
802     @must_be_writable
803     @synchronized
804     def replace_contents(self, other):
805         """Replace segments of this file with segments from another `ArvadosFile` object."""
806
807         map_loc = {}
808         self._segments = []
809         for other_segment in other.segments():
810             new_loc = other_segment.locator
811             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
812                 if other_segment.locator not in map_loc:
813                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
814                     if bufferblock.state() != _BufferBlock.WRITABLE:
815                         map_loc[other_segment.locator] = bufferblock.locator()
816                     else:
817                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
818                 new_loc = map_loc[other_segment.locator]
819
820             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
821
822         self.set_committed(False)
823
824     def __eq__(self, other):
825         if other is self:
826             return True
827         if not isinstance(other, ArvadosFile):
828             return False
829
830         othersegs = other.segments()
831         with self.lock:
832             if len(self._segments) != len(othersegs):
833                 return False
834             for i in range(0, len(othersegs)):
835                 seg1 = self._segments[i]
836                 seg2 = othersegs[i]
837                 loc1 = seg1.locator
838                 loc2 = seg2.locator
839
840                 if self.parent._my_block_manager().is_bufferblock(loc1):
841                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
842
843                 if other.parent._my_block_manager().is_bufferblock(loc2):
844                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
845
846                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
847                     seg1.range_start != seg2.range_start or
848                     seg1.range_size != seg2.range_size or
849                     seg1.segment_offset != seg2.segment_offset):
850                     return False
851
852         return True
853
854     def __ne__(self, other):
855         return not self.__eq__(other)
856
857     @synchronized
858     def set_segments(self, segs):
859         self._segments = segs
860
861     @synchronized
862     def set_committed(self, value=True):
863         """Set committed flag.
864
865         If value is True, set committed to be True.
866
867         If value is False, set committed to be False for this and all parents.
868         """
869         if value == self._committed:
870             return
871         self._committed = value
872         if self._committed is False and self.parent is not None:
873             self.parent.set_committed(False)
874
875     @synchronized
876     def committed(self):
877         """Get whether this is committed or not."""
878         return self._committed
879
880     @synchronized
881     def add_writer(self, writer):
882         """Add an ArvadosFileWriter reference to the list of writers"""
883         if isinstance(writer, ArvadosFileWriter):
884             self._writers.add(writer)
885
886     @synchronized
887     def remove_writer(self, writer, flush):
888         """
889         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
890         and do some block maintenance tasks.
891         """
892         self._writers.remove(writer)
893
894         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
895             # File writer closed, not small enough for repacking
896             self.flush()
897         elif self.closed():
898             # All writers closed and size is adequate for repacking
899             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
900
901     def closed(self):
902         """
903         Get whether this is closed or not. When the writers list is empty, the file
904         is supposed to be closed.
905         """
906         return len(self._writers) == 0
907
908     @must_be_writable
909     @synchronized
910     def truncate(self, size):
911         """Shrink the size of the file.
912
913         If `size` is less than the size of the file, the file contents after
914         `size` will be discarded.  If `size` is greater than the current size
915         of the file, an IOError will be raised.
916
917         """
918         if size < self.size():
919             new_segs = []
920             for r in self._segments:
921                 range_end = r.range_start+r.range_size
922                 if r.range_start >= size:
923                     # segment is past the trucate size, all done
924                     break
925                 elif size < range_end:
926                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
927                     nr.segment_offset = r.segment_offset
928                     new_segs.append(nr)
929                     break
930                 else:
931                     new_segs.append(r)
932
933             self._segments = new_segs
934             self.set_committed(False)
935         elif size > self.size():
936             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
937
938     def readfrom(self, offset, size, num_retries, exact=False):
939         """Read up to `size` bytes from the file starting at `offset`.
940
941         :exact:
942          If False (default), return less data than requested if the read
943          crosses a block boundary and the next block isn't cached.  If True,
944          only return less data than requested when hitting EOF.
945         """
946
947         with self.lock:
948             if size == 0 or offset >= self.size():
949                 return b''
950             readsegs = locators_and_ranges(self._segments, offset, size)
951             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
952
953         locs = set()
954         data = []
955         for lr in readsegs:
956             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
957             if block:
958                 blockview = memoryview(block)
959                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
960                 locs.add(lr.locator)
961             else:
962                 break
963
964         for lr in prefetch:
965             if lr.locator not in locs:
966                 self.parent._my_block_manager().block_prefetch(lr.locator)
967                 locs.add(lr.locator)
968
969         return b''.join(data)
970
971     def _repack_writes(self, num_retries):
972         """Test if the buffer block has more data than actual segments.
973
974         This happens when a buffered write over-writes a file range written in
975         a previous buffered write.  Re-pack the buffer block for efficiency
976         and to avoid leaking information.
977
978         """
979         segs = self._segments
980
981         # Sum up the segments to get the total bytes of the file referencing
982         # into the buffer block.
983         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
984         write_total = sum([s.range_size for s in bufferblock_segs])
985
986         if write_total < self._current_bblock.size():
987             # There is more data in the buffer block than is actually accounted for by segments, so
988             # re-pack into a new buffer by copying over to a new buffer block.
989             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
990             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
991             for t in bufferblock_segs:
992                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
993                 t.segment_offset = new_bb.size() - t.range_size
994
995             self._current_bblock = new_bb
996
997     @must_be_writable
998     @synchronized
999     def writeto(self, offset, data, num_retries):
1000         """Write `data` to the file starting at `offset`.
1001
1002         This will update existing bytes and/or extend the size of the file as
1003         necessary.
1004
1005         """
1006         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1007             data = data.encode()
1008         if len(data) == 0:
1009             return
1010
1011         if offset > self.size():
1012             raise ArgumentError("Offset is past the end of the file")
1013
1014         if len(data) > config.KEEP_BLOCK_SIZE:
1015             # Chunk it up into smaller writes
1016             n = 0
1017             dataview = memoryview(data)
1018             while n < len(data):
1019                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1020                 n += config.KEEP_BLOCK_SIZE
1021             return
1022
1023         self.set_committed(False)
1024
1025         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1026             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1027
1028         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1029             self._repack_writes(num_retries)
1030             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1031                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1032                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1033
1034         self._current_bblock.append(data)
1035
1036         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1037
1038         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1039
1040         return len(data)
1041
1042     @synchronized
1043     def flush(self, sync=True, num_retries=0):
1044         """Flush the current bufferblock to Keep.
1045
1046         :sync:
1047           If True, commit block synchronously, wait until buffer block has been written.
1048           If False, commit block asynchronously, return immediately after putting block into
1049           the keep put queue.
1050         """
1051         if self.committed():
1052             return
1053
1054         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1055             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1056                 self._repack_writes(num_retries)
1057             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1058
1059         if sync:
1060             to_delete = set()
1061             for s in self._segments:
1062                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1063                 if bb:
1064                     if bb.state() != _BufferBlock.COMMITTED:
1065                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1066                     to_delete.add(s.locator)
1067                     s.locator = bb.locator()
1068             for s in to_delete:
1069                self.parent._my_block_manager().delete_bufferblock(s)
1070
1071         self.parent.notify(MOD, self.parent, self.name, (self, self))
1072
1073     @must_be_writable
1074     @synchronized
1075     def add_segment(self, blocks, pos, size):
1076         """Add a segment to the end of the file.
1077
1078         `pos` and `offset` reference a section of the stream described by
1079         `blocks` (a list of Range objects)
1080
1081         """
1082         self._add_segment(blocks, pos, size)
1083
1084     def _add_segment(self, blocks, pos, size):
1085         """Internal implementation of add_segment."""
1086         self.set_committed(False)
1087         for lr in locators_and_ranges(blocks, pos, size):
1088             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1089             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1090             self._segments.append(r)
1091
1092     @synchronized
1093     def size(self):
1094         """Get the file size."""
1095         if self._segments:
1096             n = self._segments[-1]
1097             return n.range_start + n.range_size
1098         else:
1099             return 0
1100
1101     @synchronized
1102     def manifest_text(self, stream_name=".", portable_locators=False,
1103                       normalize=False, only_committed=False):
1104         buf = ""
1105         filestream = []
1106         for segment in self.segments:
1107             loc = segment.locator
1108             if self.parent._my_block_manager().is_bufferblock(loc):
1109                 if only_committed:
1110                     continue
1111                 loc = self._bufferblocks[loc].calculate_locator()
1112             if portable_locators:
1113                 loc = KeepLocator(loc).stripped()
1114             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1115                                  segment.segment_offset, segment.range_size))
1116         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1117         buf += "\n"
1118         return buf
1119
1120     @must_be_writable
1121     @synchronized
1122     def _reparent(self, newparent, newname):
1123         self.set_committed(False)
1124         self.flush(sync=True)
1125         self.parent.remove(self.name)
1126         self.parent = newparent
1127         self.name = newname
1128         self.lock = self.parent.root_collection().lock
1129
1130
1131 class ArvadosFileReader(ArvadosFileReaderBase):
1132     """Wraps ArvadosFile in a file-like object supporting reading only.
1133
1134     Be aware that this class is NOT thread safe as there is no locking around
1135     updating file pointer.
1136
1137     """
1138
1139     def __init__(self, arvadosfile, num_retries=None):
1140         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1141         self.arvadosfile = arvadosfile
1142
1143     def size(self):
1144         return self.arvadosfile.size()
1145
1146     def stream_name(self):
1147         return self.arvadosfile.parent.stream_name()
1148
1149     @_FileLikeObjectBase._before_close
1150     @retry_method
1151     def read(self, size=None, num_retries=None):
1152         """Read up to `size` bytes from the file and return the result.
1153
1154         Starts at the current file position.  If `size` is None, read the
1155         entire remainder of the file.
1156         """
1157         if size is None:
1158             data = []
1159             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1160             while rd:
1161                 data.append(rd)
1162                 self._filepos += len(rd)
1163                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1164             return b''.join(data)
1165         else:
1166             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1167             self._filepos += len(data)
1168             return data
1169
1170     @_FileLikeObjectBase._before_close
1171     @retry_method
1172     def readfrom(self, offset, size, num_retries=None):
1173         """Read up to `size` bytes from the stream, starting at the specified file offset.
1174
1175         This method does not change the file position.
1176         """
1177         return self.arvadosfile.readfrom(offset, size, num_retries)
1178
1179     def flush(self):
1180         pass
1181
1182
1183 class ArvadosFileWriter(ArvadosFileReader):
1184     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1185
1186     Be aware that this class is NOT thread safe as there is no locking around
1187     updating file pointer.
1188
1189     """
1190
1191     def __init__(self, arvadosfile, mode, num_retries=None):
1192         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1193         self.mode = mode
1194         self.arvadosfile.add_writer(self)
1195
1196     @_FileLikeObjectBase._before_close
1197     @retry_method
1198     def write(self, data, num_retries=None):
1199         if self.mode[0] == "a":
1200             self.arvadosfile.writeto(self.size(), data, num_retries)
1201         else:
1202             self.arvadosfile.writeto(self._filepos, data, num_retries)
1203             self._filepos += len(data)
1204         return len(data)
1205
1206     @_FileLikeObjectBase._before_close
1207     @retry_method
1208     def writelines(self, seq, num_retries=None):
1209         for s in seq:
1210             self.write(s, num_retries=num_retries)
1211
1212     @_FileLikeObjectBase._before_close
1213     def truncate(self, size=None):
1214         if size is None:
1215             size = self._filepos
1216         self.arvadosfile.truncate(size)
1217         if self._filepos > self.size():
1218             self._filepos = self.size()
1219
1220     @_FileLikeObjectBase._before_close
1221     def flush(self):
1222         self.arvadosfile.flush()
1223
1224     def close(self, flush=True):
1225         if not self.closed:
1226             self.arvadosfile.remove_writer(self, flush)
1227             super(ArvadosFileWriter, self).close()