Merge branch '16826-unlogged-attrs-fix'
[arvados.git] / sdk / python / arvados / arvfile.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
12 import bz2
13 import collections
14 import copy
15 import errno
16 import functools
17 import hashlib
18 import logging
19 import os
20 import queue
21 import re
22 import sys
23 import threading
24 import uuid
25 import zlib
26
27 from . import config
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
33
34 MOD = "mod"
35 WRITE = "write"
36
37 _logger = logging.getLogger('arvados.arvfile')
38
39 def split(path):
40     """split(path) -> streamname, filename
41
42     Separate the stream name and file name in a /-separated stream path and
43     return a tuple (stream_name, file_name).  If no stream name is available,
44     assume '.'.
45
46     """
47     try:
48         stream_name, file_name = path.rsplit('/', 1)
49     except ValueError:  # No / in string
50         stream_name, file_name = '.', path
51     return stream_name, file_name
52
53
54 class UnownedBlockError(Exception):
55     """Raised when there's an writable block without an owner on the BlockManager."""
56     pass
57
58
59 class _FileLikeObjectBase(object):
60     def __init__(self, name, mode):
61         self.name = name
62         self.mode = mode
63         self.closed = False
64
65     @staticmethod
66     def _before_close(orig_func):
67         @functools.wraps(orig_func)
68         def before_close_wrapper(self, *args, **kwargs):
69             if self.closed:
70                 raise ValueError("I/O operation on closed stream file")
71             return orig_func(self, *args, **kwargs)
72         return before_close_wrapper
73
74     def __enter__(self):
75         return self
76
77     def __exit__(self, exc_type, exc_value, traceback):
78         try:
79             self.close()
80         except Exception:
81             if exc_type is None:
82                 raise
83
84     def close(self):
85         self.closed = True
86
87
88 class ArvadosFileReaderBase(_FileLikeObjectBase):
89     def __init__(self, name, mode, num_retries=None):
90         super(ArvadosFileReaderBase, self).__init__(name, mode)
91         self._filepos = 0
92         self.num_retries = num_retries
93         self._readline_cache = (None, None)
94
95     def __iter__(self):
96         while True:
97             data = self.readline()
98             if not data:
99                 break
100             yield data
101
102     def decompressed_name(self):
103         return re.sub('\.(bz2|gz)$', '', self.name)
104
105     @_FileLikeObjectBase._before_close
106     def seek(self, pos, whence=os.SEEK_SET):
107         if whence == os.SEEK_CUR:
108             pos += self._filepos
109         elif whence == os.SEEK_END:
110             pos += self.size()
111         if pos < 0:
112             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
113         self._filepos = pos
114         return self._filepos
115
116     def tell(self):
117         return self._filepos
118
119     def readable(self):
120         return True
121
122     def writable(self):
123         return False
124
125     def seekable(self):
126         return True
127
128     @_FileLikeObjectBase._before_close
129     @retry_method
130     def readall(self, size=2**20, num_retries=None):
131         while True:
132             data = self.read(size, num_retries=num_retries)
133             if len(data) == 0:
134                 break
135             yield data
136
137     @_FileLikeObjectBase._before_close
138     @retry_method
139     def readline(self, size=float('inf'), num_retries=None):
140         cache_pos, cache_data = self._readline_cache
141         if self.tell() == cache_pos:
142             data = [cache_data]
143             self._filepos += len(cache_data)
144         else:
145             data = [b'']
146         data_size = len(data[-1])
147         while (data_size < size) and (b'\n' not in data[-1]):
148             next_read = self.read(2 ** 20, num_retries=num_retries)
149             if not next_read:
150                 break
151             data.append(next_read)
152             data_size += len(next_read)
153         data = b''.join(data)
154         try:
155             nextline_index = data.index(b'\n') + 1
156         except ValueError:
157             nextline_index = len(data)
158         nextline_index = min(nextline_index, size)
159         self._filepos -= len(data) - nextline_index
160         self._readline_cache = (self.tell(), data[nextline_index:])
161         return data[:nextline_index].decode()
162
163     @_FileLikeObjectBase._before_close
164     @retry_method
165     def decompress(self, decompress, size, num_retries=None):
166         for segment in self.readall(size, num_retries=num_retries):
167             data = decompress(segment)
168             if data:
169                 yield data
170
171     @_FileLikeObjectBase._before_close
172     @retry_method
173     def readall_decompressed(self, size=2**20, num_retries=None):
174         self.seek(0)
175         if self.name.endswith('.bz2'):
176             dc = bz2.BZ2Decompressor()
177             return self.decompress(dc.decompress, size,
178                                    num_retries=num_retries)
179         elif self.name.endswith('.gz'):
180             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
181             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
182                                    size, num_retries=num_retries)
183         else:
184             return self.readall(size, num_retries=num_retries)
185
186     @_FileLikeObjectBase._before_close
187     @retry_method
188     def readlines(self, sizehint=float('inf'), num_retries=None):
189         data = []
190         data_size = 0
191         for s in self.readall(num_retries=num_retries):
192             data.append(s)
193             data_size += len(s)
194             if data_size >= sizehint:
195                 break
196         return b''.join(data).decode().splitlines(True)
197
198     def size(self):
199         raise IOError(errno.ENOSYS, "Not implemented")
200
201     def read(self, size, num_retries=None):
202         raise IOError(errno.ENOSYS, "Not implemented")
203
204     def readfrom(self, start, size, num_retries=None):
205         raise IOError(errno.ENOSYS, "Not implemented")
206
207
208 class StreamFileReader(ArvadosFileReaderBase):
209     class _NameAttribute(str):
210         # The Python file API provides a plain .name attribute.
211         # Older SDK provided a name() method.
212         # This class provides both, for maximum compatibility.
213         def __call__(self):
214             return self
215
216     def __init__(self, stream, segments, name):
217         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
218         self._stream = stream
219         self.segments = segments
220
221     def stream_name(self):
222         return self._stream.name()
223
224     def size(self):
225         n = self.segments[-1]
226         return n.range_start + n.range_size
227
228     @_FileLikeObjectBase._before_close
229     @retry_method
230     def read(self, size, num_retries=None):
231         """Read up to 'size' bytes from the stream, starting at the current file position"""
232         if size == 0:
233             return b''
234
235         data = b''
236         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
237         if available_chunks:
238             lr = available_chunks[0]
239             data = self._stream.readfrom(lr.locator+lr.segment_offset,
240                                          lr.segment_size,
241                                          num_retries=num_retries)
242
243         self._filepos += len(data)
244         return data
245
246     @_FileLikeObjectBase._before_close
247     @retry_method
248     def readfrom(self, start, size, num_retries=None):
249         """Read up to 'size' bytes from the stream, starting at 'start'"""
250         if size == 0:
251             return b''
252
253         data = []
254         for lr in locators_and_ranges(self.segments, start, size):
255             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
256                                               num_retries=num_retries))
257         return b''.join(data)
258
259     def as_manifest(self):
260         segs = []
261         for r in self.segments:
262             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
263         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
264
265
266 def synchronized(orig_func):
267     @functools.wraps(orig_func)
268     def synchronized_wrapper(self, *args, **kwargs):
269         with self.lock:
270             return orig_func(self, *args, **kwargs)
271     return synchronized_wrapper
272
273
274 class StateChangeError(Exception):
275     def __init__(self, message, state, nextstate):
276         super(StateChangeError, self).__init__(message)
277         self.state = state
278         self.nextstate = nextstate
279
280 class _BufferBlock(object):
281     """A stand-in for a Keep block that is in the process of being written.
282
283     Writers can append to it, get the size, and compute the Keep locator.
284     There are three valid states:
285
286     WRITABLE
287       Can append to block.
288
289     PENDING
290       Block is in the process of being uploaded to Keep, append is an error.
291
292     COMMITTED
293       The block has been written to Keep, its internal buffer has been
294       released, fetching the block will fetch it via keep client (since we
295       discarded the internal copy), and identifiers referring to the BufferBlock
296       can be replaced with the block locator.
297
298     """
299
300     WRITABLE = 0
301     PENDING = 1
302     COMMITTED = 2
303     ERROR = 3
304     DELETED = 4
305
306     def __init__(self, blockid, starting_capacity, owner):
307         """
308         :blockid:
309           the identifier for this block
310
311         :starting_capacity:
312           the initial buffer capacity
313
314         :owner:
315           ArvadosFile that owns this block
316
317         """
318         self.blockid = blockid
319         self.buffer_block = bytearray(starting_capacity)
320         self.buffer_view = memoryview(self.buffer_block)
321         self.write_pointer = 0
322         self._state = _BufferBlock.WRITABLE
323         self._locator = None
324         self.owner = owner
325         self.lock = threading.Lock()
326         self.wait_for_commit = threading.Event()
327         self.error = None
328
329     @synchronized
330     def append(self, data):
331         """Append some data to the buffer.
332
333         Only valid if the block is in WRITABLE state.  Implements an expanding
334         buffer, doubling capacity as needed to accomdate all the data.
335
336         """
337         if self._state == _BufferBlock.WRITABLE:
338             if not isinstance(data, bytes) and not isinstance(data, memoryview):
339                 data = data.encode()
340             while (self.write_pointer+len(data)) > len(self.buffer_block):
341                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
342                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
343                 self.buffer_block = new_buffer_block
344                 self.buffer_view = memoryview(self.buffer_block)
345             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
346             self.write_pointer += len(data)
347             self._locator = None
348         else:
349             raise AssertionError("Buffer block is not writable")
350
351     STATE_TRANSITIONS = frozenset([
352             (WRITABLE, PENDING),
353             (PENDING, COMMITTED),
354             (PENDING, ERROR),
355             (ERROR, PENDING)])
356
357     @synchronized
358     def set_state(self, nextstate, val=None):
359         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
360             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
361         self._state = nextstate
362
363         if self._state == _BufferBlock.PENDING:
364             self.wait_for_commit.clear()
365
366         if self._state == _BufferBlock.COMMITTED:
367             self._locator = val
368             self.buffer_view = None
369             self.buffer_block = None
370             self.wait_for_commit.set()
371
372         if self._state == _BufferBlock.ERROR:
373             self.error = val
374             self.wait_for_commit.set()
375
376     @synchronized
377     def state(self):
378         return self._state
379
380     def size(self):
381         """The amount of data written to the buffer."""
382         return self.write_pointer
383
384     @synchronized
385     def locator(self):
386         """The Keep locator for this buffer's contents."""
387         if self._locator is None:
388             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
389         return self._locator
390
391     @synchronized
392     def clone(self, new_blockid, owner):
393         if self._state == _BufferBlock.COMMITTED:
394             raise AssertionError("Cannot duplicate committed buffer block")
395         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
396         bufferblock.append(self.buffer_view[0:self.size()])
397         return bufferblock
398
399     @synchronized
400     def clear(self):
401         self._state = _BufferBlock.DELETED
402         self.owner = None
403         self.buffer_block = None
404         self.buffer_view = None
405
406     @synchronized
407     def repack_writes(self):
408         """Optimize buffer block by repacking segments in file sequence.
409
410         When the client makes random writes, they appear in the buffer block in
411         the sequence they were written rather than the sequence they appear in
412         the file.  This makes for inefficient, fragmented manifests.  Attempt
413         to optimize by repacking writes in file sequence.
414
415         """
416         if self._state != _BufferBlock.WRITABLE:
417             raise AssertionError("Cannot repack non-writable block")
418
419         segs = self.owner.segments()
420
421         # Collect the segments that reference the buffer block.
422         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
423
424         # Collect total data referenced by segments (could be smaller than
425         # bufferblock size if a portion of the file was written and
426         # then overwritten).
427         write_total = sum([s.range_size for s in bufferblock_segs])
428
429         if write_total < self.size() or len(bufferblock_segs) > 1:
430             # If there's more than one segment referencing this block, it is
431             # due to out-of-order writes and will produce a fragmented
432             # manifest, so try to optimize by re-packing into a new buffer.
433             contents = self.buffer_view[0:self.write_pointer].tobytes()
434             new_bb = _BufferBlock(None, write_total, None)
435             for t in bufferblock_segs:
436                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
437                 t.segment_offset = new_bb.size() - t.range_size
438
439             self.buffer_block = new_bb.buffer_block
440             self.buffer_view = new_bb.buffer_view
441             self.write_pointer = new_bb.write_pointer
442             self._locator = None
443             new_bb.clear()
444             self.owner.set_segments(segs)
445
446     def __repr__(self):
447         return "<BufferBlock %s>" % (self.blockid)
448
449
450 class NoopLock(object):
451     def __enter__(self):
452         return self
453
454     def __exit__(self, exc_type, exc_value, traceback):
455         pass
456
457     def acquire(self, blocking=False):
458         pass
459
460     def release(self):
461         pass
462
463
464 def must_be_writable(orig_func):
465     @functools.wraps(orig_func)
466     def must_be_writable_wrapper(self, *args, **kwargs):
467         if not self.writable():
468             raise IOError(errno.EROFS, "Collection is read-only.")
469         return orig_func(self, *args, **kwargs)
470     return must_be_writable_wrapper
471
472
473 class _BlockManager(object):
474     """BlockManager handles buffer blocks.
475
476     Also handles background block uploads, and background block prefetch for a
477     Collection of ArvadosFiles.
478
479     """
480
481     DEFAULT_PUT_THREADS = 2
482     DEFAULT_GET_THREADS = 2
483
484     def __init__(self, keep, copies=None, put_threads=None, num_retries=None):
485         """keep: KeepClient object to use"""
486         self._keep = keep
487         self._bufferblocks = collections.OrderedDict()
488         self._put_queue = None
489         self._put_threads = None
490         self._prefetch_queue = None
491         self._prefetch_threads = None
492         self.lock = threading.Lock()
493         self.prefetch_enabled = True
494         if put_threads:
495             self.num_put_threads = put_threads
496         else:
497             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
498         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
499         self.copies = copies
500         self._pending_write_size = 0
501         self.threads_lock = threading.Lock()
502         self.padding_block = None
503         self.num_retries = num_retries
504
505     @synchronized
506     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
507         """Allocate a new, empty bufferblock in WRITABLE state and return it.
508
509         :blockid:
510           optional block identifier, otherwise one will be automatically assigned
511
512         :starting_capacity:
513           optional capacity, otherwise will use default capacity
514
515         :owner:
516           ArvadosFile that owns this block
517
518         """
519         return self._alloc_bufferblock(blockid, starting_capacity, owner)
520
521     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
522         if blockid is None:
523             blockid = str(uuid.uuid4())
524         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
525         self._bufferblocks[bufferblock.blockid] = bufferblock
526         return bufferblock
527
528     @synchronized
529     def dup_block(self, block, owner):
530         """Create a new bufferblock initialized with the content of an existing bufferblock.
531
532         :block:
533           the buffer block to copy.
534
535         :owner:
536           ArvadosFile that owns the new block
537
538         """
539         new_blockid = str(uuid.uuid4())
540         bufferblock = block.clone(new_blockid, owner)
541         self._bufferblocks[bufferblock.blockid] = bufferblock
542         return bufferblock
543
544     @synchronized
545     def is_bufferblock(self, locator):
546         return locator in self._bufferblocks
547
548     def _commit_bufferblock_worker(self):
549         """Background uploader thread."""
550
551         while True:
552             try:
553                 bufferblock = self._put_queue.get()
554                 if bufferblock is None:
555                     return
556
557                 if self.copies is None:
558                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries)
559                 else:
560                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies)
561                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
562             except Exception as e:
563                 bufferblock.set_state(_BufferBlock.ERROR, e)
564             finally:
565                 if self._put_queue is not None:
566                     self._put_queue.task_done()
567
568     def start_put_threads(self):
569         with self.threads_lock:
570             if self._put_threads is None:
571                 # Start uploader threads.
572
573                 # If we don't limit the Queue size, the upload queue can quickly
574                 # grow to take up gigabytes of RAM if the writing process is
575                 # generating data more quickly than it can be send to the Keep
576                 # servers.
577                 #
578                 # With two upload threads and a queue size of 2, this means up to 4
579                 # blocks pending.  If they are full 64 MiB blocks, that means up to
580                 # 256 MiB of internal buffering, which is the same size as the
581                 # default download block cache in KeepClient.
582                 self._put_queue = queue.Queue(maxsize=2)
583
584                 self._put_threads = []
585                 for i in range(0, self.num_put_threads):
586                     thread = threading.Thread(target=self._commit_bufferblock_worker)
587                     self._put_threads.append(thread)
588                     thread.daemon = True
589                     thread.start()
590
591     def _block_prefetch_worker(self):
592         """The background downloader thread."""
593         while True:
594             try:
595                 b = self._prefetch_queue.get()
596                 if b is None:
597                     return
598                 self._keep.get(b)
599             except Exception:
600                 _logger.exception("Exception doing block prefetch")
601
602     @synchronized
603     def start_get_threads(self):
604         if self._prefetch_threads is None:
605             self._prefetch_queue = queue.Queue()
606             self._prefetch_threads = []
607             for i in range(0, self.num_get_threads):
608                 thread = threading.Thread(target=self._block_prefetch_worker)
609                 self._prefetch_threads.append(thread)
610                 thread.daemon = True
611                 thread.start()
612
613
614     @synchronized
615     def stop_threads(self):
616         """Shut down and wait for background upload and download threads to finish."""
617
618         if self._put_threads is not None:
619             for t in self._put_threads:
620                 self._put_queue.put(None)
621             for t in self._put_threads:
622                 t.join()
623         self._put_threads = None
624         self._put_queue = None
625
626         if self._prefetch_threads is not None:
627             for t in self._prefetch_threads:
628                 self._prefetch_queue.put(None)
629             for t in self._prefetch_threads:
630                 t.join()
631         self._prefetch_threads = None
632         self._prefetch_queue = None
633
634     def __enter__(self):
635         return self
636
637     def __exit__(self, exc_type, exc_value, traceback):
638         self.stop_threads()
639
640     @synchronized
641     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
642         """Packs small blocks together before uploading"""
643
644         self._pending_write_size += closed_file_size
645
646         # Check if there are enough small blocks for filling up one in full
647         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
648             return
649
650         # Search blocks ready for getting packed together before being
651         # committed to Keep.
652         # A WRITABLE block always has an owner.
653         # A WRITABLE block with its owner.closed() implies that its
654         # size is <= KEEP_BLOCK_SIZE/2.
655         try:
656             small_blocks = [b for b in listvalues(self._bufferblocks)
657                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
658         except AttributeError:
659             # Writable blocks without owner shouldn't exist.
660             raise UnownedBlockError()
661
662         if len(small_blocks) <= 1:
663             # Not enough small blocks for repacking
664             return
665
666         for bb in small_blocks:
667             bb.repack_writes()
668
669         # Update the pending write size count with its true value, just in case
670         # some small file was opened, written and closed several times.
671         self._pending_write_size = sum([b.size() for b in small_blocks])
672
673         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
674             return
675
676         new_bb = self._alloc_bufferblock()
677         new_bb.owner = []
678         files = []
679         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
680             bb = small_blocks.pop(0)
681             new_bb.owner.append(bb.owner)
682             self._pending_write_size -= bb.size()
683             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
684             files.append((bb, new_bb.write_pointer - bb.size()))
685
686         self.commit_bufferblock(new_bb, sync=sync)
687
688         for bb, new_bb_segment_offset in files:
689             newsegs = bb.owner.segments()
690             for s in newsegs:
691                 if s.locator == bb.blockid:
692                     s.locator = new_bb.blockid
693                     s.segment_offset = new_bb_segment_offset+s.segment_offset
694             bb.owner.set_segments(newsegs)
695             self._delete_bufferblock(bb.blockid)
696
697     def commit_bufferblock(self, block, sync):
698         """Initiate a background upload of a bufferblock.
699
700         :block:
701           The block object to upload
702
703         :sync:
704           If `sync` is True, upload the block synchronously.
705           If `sync` is False, upload the block asynchronously.  This will
706           return immediately unless the upload queue is at capacity, in
707           which case it will wait on an upload queue slot.
708
709         """
710         try:
711             # Mark the block as PENDING so to disallow any more appends.
712             block.set_state(_BufferBlock.PENDING)
713         except StateChangeError as e:
714             if e.state == _BufferBlock.PENDING:
715                 if sync:
716                     block.wait_for_commit.wait()
717                 else:
718                     return
719             if block.state() == _BufferBlock.COMMITTED:
720                 return
721             elif block.state() == _BufferBlock.ERROR:
722                 raise block.error
723             else:
724                 raise
725
726         if sync:
727             try:
728                 if self.copies is None:
729                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries)
730                 else:
731                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies)
732                 block.set_state(_BufferBlock.COMMITTED, loc)
733             except Exception as e:
734                 block.set_state(_BufferBlock.ERROR, e)
735                 raise
736         else:
737             self.start_put_threads()
738             self._put_queue.put(block)
739
740     @synchronized
741     def get_bufferblock(self, locator):
742         return self._bufferblocks.get(locator)
743
744     @synchronized
745     def get_padding_block(self):
746         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
747         when using truncate() to extend the size of a file.
748
749         For reference (and possible future optimization), the md5sum of the
750         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
751
752         """
753
754         if self.padding_block is None:
755             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
756             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
757             self.commit_bufferblock(self.padding_block, False)
758         return self.padding_block
759
760     @synchronized
761     def delete_bufferblock(self, locator):
762         self._delete_bufferblock(locator)
763
764     def _delete_bufferblock(self, locator):
765         bb = self._bufferblocks[locator]
766         bb.clear()
767         del self._bufferblocks[locator]
768
769     def get_block_contents(self, locator, num_retries, cache_only=False):
770         """Fetch a block.
771
772         First checks to see if the locator is a BufferBlock and return that, if
773         not, passes the request through to KeepClient.get().
774
775         """
776         with self.lock:
777             if locator in self._bufferblocks:
778                 bufferblock = self._bufferblocks[locator]
779                 if bufferblock.state() != _BufferBlock.COMMITTED:
780                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
781                 else:
782                     locator = bufferblock._locator
783         if cache_only:
784             return self._keep.get_from_cache(locator)
785         else:
786             return self._keep.get(locator, num_retries=num_retries)
787
788     def commit_all(self):
789         """Commit all outstanding buffer blocks.
790
791         This is a synchronous call, and will not return until all buffer blocks
792         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
793
794         """
795         self.repack_small_blocks(force=True, sync=True)
796
797         with self.lock:
798             items = listitems(self._bufferblocks)
799
800         for k,v in items:
801             if v.state() != _BufferBlock.COMMITTED and v.owner:
802                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
803                 # state, they're already being committed asynchronously.
804                 if isinstance(v.owner, ArvadosFile):
805                     v.owner.flush(sync=False)
806
807         with self.lock:
808             if self._put_queue is not None:
809                 self._put_queue.join()
810
811                 err = []
812                 for k,v in items:
813                     if v.state() == _BufferBlock.ERROR:
814                         err.append((v.locator(), v.error))
815                 if err:
816                     raise KeepWriteError("Error writing some blocks", err, label="block")
817
818         for k,v in items:
819             # flush again with sync=True to remove committed bufferblocks from
820             # the segments.
821             if v.owner:
822                 if isinstance(v.owner, ArvadosFile):
823                     v.owner.flush(sync=True)
824                 elif isinstance(v.owner, list) and len(v.owner) > 0:
825                     # This bufferblock is referenced by many files as a result
826                     # of repacking small blocks, so don't delete it when flushing
827                     # its owners, just do it after flushing them all.
828                     for owner in v.owner:
829                         owner.flush(sync=True)
830                     self.delete_bufferblock(k)
831
832     def block_prefetch(self, locator):
833         """Initiate a background download of a block.
834
835         This assumes that the underlying KeepClient implements a block cache,
836         so repeated requests for the same block will not result in repeated
837         downloads (unless the block is evicted from the cache.)  This method
838         does not block.
839
840         """
841
842         if not self.prefetch_enabled:
843             return
844
845         if self._keep.get_from_cache(locator) is not None:
846             return
847
848         with self.lock:
849             if locator in self._bufferblocks:
850                 return
851
852         self.start_get_threads()
853         self._prefetch_queue.put(locator)
854
855
856 class ArvadosFile(object):
857     """Represent a file in a Collection.
858
859     ArvadosFile manages the underlying representation of a file in Keep as a
860     sequence of segments spanning a set of blocks, and implements random
861     read/write access.
862
863     This object may be accessed from multiple threads.
864
865     """
866
867     __slots__ = ('parent', 'name', '_writers', '_committed',
868                  '_segments', 'lock', '_current_bblock', 'fuse_entry')
869
870     def __init__(self, parent, name, stream=[], segments=[]):
871         """
872         ArvadosFile constructor.
873
874         :stream:
875           a list of Range objects representing a block stream
876
877         :segments:
878           a list of Range objects representing segments
879         """
880         self.parent = parent
881         self.name = name
882         self._writers = set()
883         self._committed = False
884         self._segments = []
885         self.lock = parent.root_collection().lock
886         for s in segments:
887             self._add_segment(stream, s.locator, s.range_size)
888         self._current_bblock = None
889
890     def writable(self):
891         return self.parent.writable()
892
893     @synchronized
894     def permission_expired(self, as_of_dt=None):
895         """Returns True if any of the segment's locators is expired"""
896         for r in self._segments:
897             if KeepLocator(r.locator).permission_expired(as_of_dt):
898                 return True
899         return False
900
901     @synchronized
902     def has_remote_blocks(self):
903         """Returns True if any of the segment's locators has a +R signature"""
904
905         for s in self._segments:
906             if '+R' in s.locator:
907                 return True
908         return False
909
910     @synchronized
911     def _copy_remote_blocks(self, remote_blocks={}):
912         """Ask Keep to copy remote blocks and point to their local copies.
913
914         This is called from the parent Collection.
915
916         :remote_blocks:
917             Shared cache of remote to local block mappings. This is used to avoid
918             doing extra work when blocks are shared by more than one file in
919             different subdirectories.
920         """
921
922         for s in self._segments:
923             if '+R' in s.locator:
924                 try:
925                     loc = remote_blocks[s.locator]
926                 except KeyError:
927                     loc = self.parent._my_keep().refresh_signature(s.locator)
928                     remote_blocks[s.locator] = loc
929                 s.locator = loc
930                 self.parent.set_committed(False)
931         return remote_blocks
932
933     @synchronized
934     def segments(self):
935         return copy.copy(self._segments)
936
937     @synchronized
938     def clone(self, new_parent, new_name):
939         """Make a copy of this file."""
940         cp = ArvadosFile(new_parent, new_name)
941         cp.replace_contents(self)
942         return cp
943
944     @must_be_writable
945     @synchronized
946     def replace_contents(self, other):
947         """Replace segments of this file with segments from another `ArvadosFile` object."""
948
949         map_loc = {}
950         self._segments = []
951         for other_segment in other.segments():
952             new_loc = other_segment.locator
953             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
954                 if other_segment.locator not in map_loc:
955                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
956                     if bufferblock.state() != _BufferBlock.WRITABLE:
957                         map_loc[other_segment.locator] = bufferblock.locator()
958                     else:
959                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
960                 new_loc = map_loc[other_segment.locator]
961
962             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
963
964         self.set_committed(False)
965
966     def __eq__(self, other):
967         if other is self:
968             return True
969         if not isinstance(other, ArvadosFile):
970             return False
971
972         othersegs = other.segments()
973         with self.lock:
974             if len(self._segments) != len(othersegs):
975                 return False
976             for i in range(0, len(othersegs)):
977                 seg1 = self._segments[i]
978                 seg2 = othersegs[i]
979                 loc1 = seg1.locator
980                 loc2 = seg2.locator
981
982                 if self.parent._my_block_manager().is_bufferblock(loc1):
983                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
984
985                 if other.parent._my_block_manager().is_bufferblock(loc2):
986                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
987
988                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
989                     seg1.range_start != seg2.range_start or
990                     seg1.range_size != seg2.range_size or
991                     seg1.segment_offset != seg2.segment_offset):
992                     return False
993
994         return True
995
996     def __ne__(self, other):
997         return not self.__eq__(other)
998
999     @synchronized
1000     def set_segments(self, segs):
1001         self._segments = segs
1002
1003     @synchronized
1004     def set_committed(self, value=True):
1005         """Set committed flag.
1006
1007         If value is True, set committed to be True.
1008
1009         If value is False, set committed to be False for this and all parents.
1010         """
1011         if value == self._committed:
1012             return
1013         self._committed = value
1014         if self._committed is False and self.parent is not None:
1015             self.parent.set_committed(False)
1016
1017     @synchronized
1018     def committed(self):
1019         """Get whether this is committed or not."""
1020         return self._committed
1021
1022     @synchronized
1023     def add_writer(self, writer):
1024         """Add an ArvadosFileWriter reference to the list of writers"""
1025         if isinstance(writer, ArvadosFileWriter):
1026             self._writers.add(writer)
1027
1028     @synchronized
1029     def remove_writer(self, writer, flush):
1030         """
1031         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
1032         and do some block maintenance tasks.
1033         """
1034         self._writers.remove(writer)
1035
1036         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1037             # File writer closed, not small enough for repacking
1038             self.flush()
1039         elif self.closed():
1040             # All writers closed and size is adequate for repacking
1041             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1042
1043     def closed(self):
1044         """
1045         Get whether this is closed or not. When the writers list is empty, the file
1046         is supposed to be closed.
1047         """
1048         return len(self._writers) == 0
1049
1050     @must_be_writable
1051     @synchronized
1052     def truncate(self, size):
1053         """Shrink or expand the size of the file.
1054
1055         If `size` is less than the size of the file, the file contents after
1056         `size` will be discarded.  If `size` is greater than the current size
1057         of the file, it will be filled with zero bytes.
1058
1059         """
1060         if size < self.size():
1061             new_segs = []
1062             for r in self._segments:
1063                 range_end = r.range_start+r.range_size
1064                 if r.range_start >= size:
1065                     # segment is past the trucate size, all done
1066                     break
1067                 elif size < range_end:
1068                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1069                     nr.segment_offset = r.segment_offset
1070                     new_segs.append(nr)
1071                     break
1072                 else:
1073                     new_segs.append(r)
1074
1075             self._segments = new_segs
1076             self.set_committed(False)
1077         elif size > self.size():
1078             padding = self.parent._my_block_manager().get_padding_block()
1079             diff = size - self.size()
1080             while diff > config.KEEP_BLOCK_SIZE:
1081                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1082                 diff -= config.KEEP_BLOCK_SIZE
1083             if diff > 0:
1084                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1085             self.set_committed(False)
1086         else:
1087             # size == self.size()
1088             pass
1089
1090     def readfrom(self, offset, size, num_retries, exact=False):
1091         """Read up to `size` bytes from the file starting at `offset`.
1092
1093         :exact:
1094          If False (default), return less data than requested if the read
1095          crosses a block boundary and the next block isn't cached.  If True,
1096          only return less data than requested when hitting EOF.
1097         """
1098
1099         with self.lock:
1100             if size == 0 or offset >= self.size():
1101                 return b''
1102             readsegs = locators_and_ranges(self._segments, offset, size)
1103             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1104
1105         locs = set()
1106         data = []
1107         for lr in readsegs:
1108             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1109             if block:
1110                 blockview = memoryview(block)
1111                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1112                 locs.add(lr.locator)
1113             else:
1114                 break
1115
1116         for lr in prefetch:
1117             if lr.locator not in locs:
1118                 self.parent._my_block_manager().block_prefetch(lr.locator)
1119                 locs.add(lr.locator)
1120
1121         return b''.join(data)
1122
1123     @must_be_writable
1124     @synchronized
1125     def writeto(self, offset, data, num_retries):
1126         """Write `data` to the file starting at `offset`.
1127
1128         This will update existing bytes and/or extend the size of the file as
1129         necessary.
1130
1131         """
1132         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1133             data = data.encode()
1134         if len(data) == 0:
1135             return
1136
1137         if offset > self.size():
1138             self.truncate(offset)
1139
1140         if len(data) > config.KEEP_BLOCK_SIZE:
1141             # Chunk it up into smaller writes
1142             n = 0
1143             dataview = memoryview(data)
1144             while n < len(data):
1145                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1146                 n += config.KEEP_BLOCK_SIZE
1147             return
1148
1149         self.set_committed(False)
1150
1151         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1152             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1153
1154         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1155             self._current_bblock.repack_writes()
1156             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1157                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1158                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1159
1160         self._current_bblock.append(data)
1161
1162         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1163
1164         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1165
1166         return len(data)
1167
1168     @synchronized
1169     def flush(self, sync=True, num_retries=0):
1170         """Flush the current bufferblock to Keep.
1171
1172         :sync:
1173           If True, commit block synchronously, wait until buffer block has been written.
1174           If False, commit block asynchronously, return immediately after putting block into
1175           the keep put queue.
1176         """
1177         if self.committed():
1178             return
1179
1180         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1181             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1182                 self._current_bblock.repack_writes()
1183             if self._current_bblock.state() != _BufferBlock.DELETED:
1184                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1185
1186         if sync:
1187             to_delete = set()
1188             for s in self._segments:
1189                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1190                 if bb:
1191                     if bb.state() != _BufferBlock.COMMITTED:
1192                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1193                     to_delete.add(s.locator)
1194                     s.locator = bb.locator()
1195             for s in to_delete:
1196                 # Don't delete the bufferblock if it's owned by many files. It'll be
1197                 # deleted after all of its owners are flush()ed.
1198                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1199                     self.parent._my_block_manager().delete_bufferblock(s)
1200
1201         self.parent.notify(MOD, self.parent, self.name, (self, self))
1202
1203     @must_be_writable
1204     @synchronized
1205     def add_segment(self, blocks, pos, size):
1206         """Add a segment to the end of the file.
1207
1208         `pos` and `offset` reference a section of the stream described by
1209         `blocks` (a list of Range objects)
1210
1211         """
1212         self._add_segment(blocks, pos, size)
1213
1214     def _add_segment(self, blocks, pos, size):
1215         """Internal implementation of add_segment."""
1216         self.set_committed(False)
1217         for lr in locators_and_ranges(blocks, pos, size):
1218             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1219             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1220             self._segments.append(r)
1221
1222     @synchronized
1223     def size(self):
1224         """Get the file size."""
1225         if self._segments:
1226             n = self._segments[-1]
1227             return n.range_start + n.range_size
1228         else:
1229             return 0
1230
1231     @synchronized
1232     def manifest_text(self, stream_name=".", portable_locators=False,
1233                       normalize=False, only_committed=False):
1234         buf = ""
1235         filestream = []
1236         for segment in self._segments:
1237             loc = segment.locator
1238             if self.parent._my_block_manager().is_bufferblock(loc):
1239                 if only_committed:
1240                     continue
1241                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1242             if portable_locators:
1243                 loc = KeepLocator(loc).stripped()
1244             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1245                                  segment.segment_offset, segment.range_size))
1246         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1247         buf += "\n"
1248         return buf
1249
1250     @must_be_writable
1251     @synchronized
1252     def _reparent(self, newparent, newname):
1253         self.set_committed(False)
1254         self.flush(sync=True)
1255         self.parent.remove(self.name)
1256         self.parent = newparent
1257         self.name = newname
1258         self.lock = self.parent.root_collection().lock
1259
1260
1261 class ArvadosFileReader(ArvadosFileReaderBase):
1262     """Wraps ArvadosFile in a file-like object supporting reading only.
1263
1264     Be aware that this class is NOT thread safe as there is no locking around
1265     updating file pointer.
1266
1267     """
1268
1269     def __init__(self, arvadosfile, mode="r", num_retries=None):
1270         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1271         self.arvadosfile = arvadosfile
1272
1273     def size(self):
1274         return self.arvadosfile.size()
1275
1276     def stream_name(self):
1277         return self.arvadosfile.parent.stream_name()
1278
1279     def readinto(self, b):
1280         data = self.read(len(b))
1281         b[:len(data)] = data
1282         return len(data)
1283
1284     @_FileLikeObjectBase._before_close
1285     @retry_method
1286     def read(self, size=None, num_retries=None):
1287         """Read up to `size` bytes from the file and return the result.
1288
1289         Starts at the current file position.  If `size` is None, read the
1290         entire remainder of the file.
1291         """
1292         if size is None:
1293             data = []
1294             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1295             while rd:
1296                 data.append(rd)
1297                 self._filepos += len(rd)
1298                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1299             return b''.join(data)
1300         else:
1301             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1302             self._filepos += len(data)
1303             return data
1304
1305     @_FileLikeObjectBase._before_close
1306     @retry_method
1307     def readfrom(self, offset, size, num_retries=None):
1308         """Read up to `size` bytes from the stream, starting at the specified file offset.
1309
1310         This method does not change the file position.
1311         """
1312         return self.arvadosfile.readfrom(offset, size, num_retries)
1313
1314     def flush(self):
1315         pass
1316
1317
1318 class ArvadosFileWriter(ArvadosFileReader):
1319     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1320
1321     Be aware that this class is NOT thread safe as there is no locking around
1322     updating file pointer.
1323
1324     """
1325
1326     def __init__(self, arvadosfile, mode, num_retries=None):
1327         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1328         self.arvadosfile.add_writer(self)
1329
1330     def writable(self):
1331         return True
1332
1333     @_FileLikeObjectBase._before_close
1334     @retry_method
1335     def write(self, data, num_retries=None):
1336         if self.mode[0] == "a":
1337             self._filepos = self.size()
1338         self.arvadosfile.writeto(self._filepos, data, num_retries)
1339         self._filepos += len(data)
1340         return len(data)
1341
1342     @_FileLikeObjectBase._before_close
1343     @retry_method
1344     def writelines(self, seq, num_retries=None):
1345         for s in seq:
1346             self.write(s, num_retries=num_retries)
1347
1348     @_FileLikeObjectBase._before_close
1349     def truncate(self, size=None):
1350         if size is None:
1351             size = self._filepos
1352         self.arvadosfile.truncate(size)
1353
1354     @_FileLikeObjectBase._before_close
1355     def flush(self):
1356         self.arvadosfile.flush()
1357
1358     def close(self, flush=True):
1359         if not self.closed:
1360             self.arvadosfile.remove_writer(self, flush)
1361             super(ArvadosFileWriter, self).close()
1362
1363
1364 class WrappableFile(object):
1365     """An interface to an Arvados file that's compatible with io wrappers.
1366
1367     """
1368     def __init__(self, f):
1369         self.f = f
1370         self.closed = False
1371     def close(self):
1372         self.closed = True
1373         return self.f.close()
1374     def flush(self):
1375         return self.f.flush()
1376     def read(self, *args, **kwargs):
1377         return self.f.read(*args, **kwargs)
1378     def readable(self):
1379         return self.f.readable()
1380     def readinto(self, *args, **kwargs):
1381         return self.f.readinto(*args, **kwargs)
1382     def seek(self, *args, **kwargs):
1383         return self.f.seek(*args, **kwargs)
1384     def seekable(self):
1385         return self.f.seekable()
1386     def tell(self):
1387         return self.f.tell()
1388     def writable(self):
1389         return self.f.writable()
1390     def write(self, *args, **kwargs):
1391         return self.f.write(*args, **kwargs)