Merge branch '8784-dir-listings'
[arvados.git] / sdk / python / arvados / arvfile.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
12 import bz2
13 import collections
14 import copy
15 import errno
16 import functools
17 import hashlib
18 import logging
19 import os
20 import queue
21 import re
22 import sys
23 import threading
24 import uuid
25 import zlib
26
27 from . import config
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
33
34 MOD = "mod"
35 WRITE = "write"
36
37 _logger = logging.getLogger('arvados.arvfile')
38
39 def split(path):
40     """split(path) -> streamname, filename
41
42     Separate the stream name and file name in a /-separated stream path and
43     return a tuple (stream_name, file_name).  If no stream name is available,
44     assume '.'.
45
46     """
47     try:
48         stream_name, file_name = path.rsplit('/', 1)
49     except ValueError:  # No / in string
50         stream_name, file_name = '.', path
51     return stream_name, file_name
52
53
54 class UnownedBlockError(Exception):
55     """Raised when there's an writable block without an owner on the BlockManager."""
56     pass
57
58
59 class _FileLikeObjectBase(object):
60     def __init__(self, name, mode):
61         self.name = name
62         self.mode = mode
63         self.closed = False
64
65     @staticmethod
66     def _before_close(orig_func):
67         @functools.wraps(orig_func)
68         def before_close_wrapper(self, *args, **kwargs):
69             if self.closed:
70                 raise ValueError("I/O operation on closed stream file")
71             return orig_func(self, *args, **kwargs)
72         return before_close_wrapper
73
74     def __enter__(self):
75         return self
76
77     def __exit__(self, exc_type, exc_value, traceback):
78         try:
79             self.close()
80         except Exception:
81             if exc_type is None:
82                 raise
83
84     def close(self):
85         self.closed = True
86
87
88 class ArvadosFileReaderBase(_FileLikeObjectBase):
89     def __init__(self, name, mode, num_retries=None):
90         super(ArvadosFileReaderBase, self).__init__(name, mode)
91         self._binary = 'b' in mode
92         if sys.version_info >= (3, 0) and not self._binary:
93             raise NotImplementedError("text mode {!r} is not implemented".format(mode))
94         self._filepos = 0
95         self.num_retries = num_retries
96         self._readline_cache = (None, None)
97
98     def __iter__(self):
99         while True:
100             data = self.readline()
101             if not data:
102                 break
103             yield data
104
105     def decompressed_name(self):
106         return re.sub('\.(bz2|gz)$', '', self.name)
107
108     @_FileLikeObjectBase._before_close
109     def seek(self, pos, whence=os.SEEK_SET):
110         if whence == os.SEEK_CUR:
111             pos += self._filepos
112         elif whence == os.SEEK_END:
113             pos += self.size()
114         if pos < 0:
115             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
116         self._filepos = pos
117         return self._filepos
118
119     def tell(self):
120         return self._filepos
121
122     def readable(self):
123         return True
124
125     def writable(self):
126         return False
127
128     def seekable(self):
129         return True
130
131     @_FileLikeObjectBase._before_close
132     @retry_method
133     def readall(self, size=2**20, num_retries=None):
134         while True:
135             data = self.read(size, num_retries=num_retries)
136             if len(data) == 0:
137                 break
138             yield data
139
140     @_FileLikeObjectBase._before_close
141     @retry_method
142     def readline(self, size=float('inf'), num_retries=None):
143         cache_pos, cache_data = self._readline_cache
144         if self.tell() == cache_pos:
145             data = [cache_data]
146             self._filepos += len(cache_data)
147         else:
148             data = [b'']
149         data_size = len(data[-1])
150         while (data_size < size) and (b'\n' not in data[-1]):
151             next_read = self.read(2 ** 20, num_retries=num_retries)
152             if not next_read:
153                 break
154             data.append(next_read)
155             data_size += len(next_read)
156         data = b''.join(data)
157         try:
158             nextline_index = data.index(b'\n') + 1
159         except ValueError:
160             nextline_index = len(data)
161         nextline_index = min(nextline_index, size)
162         self._filepos -= len(data) - nextline_index
163         self._readline_cache = (self.tell(), data[nextline_index:])
164         return data[:nextline_index].decode()
165
166     @_FileLikeObjectBase._before_close
167     @retry_method
168     def decompress(self, decompress, size, num_retries=None):
169         for segment in self.readall(size, num_retries=num_retries):
170             data = decompress(segment)
171             if data:
172                 yield data
173
174     @_FileLikeObjectBase._before_close
175     @retry_method
176     def readall_decompressed(self, size=2**20, num_retries=None):
177         self.seek(0)
178         if self.name.endswith('.bz2'):
179             dc = bz2.BZ2Decompressor()
180             return self.decompress(dc.decompress, size,
181                                    num_retries=num_retries)
182         elif self.name.endswith('.gz'):
183             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
184             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
185                                    size, num_retries=num_retries)
186         else:
187             return self.readall(size, num_retries=num_retries)
188
189     @_FileLikeObjectBase._before_close
190     @retry_method
191     def readlines(self, sizehint=float('inf'), num_retries=None):
192         data = []
193         data_size = 0
194         for s in self.readall(num_retries=num_retries):
195             data.append(s)
196             data_size += len(s)
197             if data_size >= sizehint:
198                 break
199         return b''.join(data).decode().splitlines(True)
200
201     def size(self):
202         raise IOError(errno.ENOSYS, "Not implemented")
203
204     def read(self, size, num_retries=None):
205         raise IOError(errno.ENOSYS, "Not implemented")
206
207     def readfrom(self, start, size, num_retries=None):
208         raise IOError(errno.ENOSYS, "Not implemented")
209
210
211 class StreamFileReader(ArvadosFileReaderBase):
212     class _NameAttribute(str):
213         # The Python file API provides a plain .name attribute.
214         # Older SDK provided a name() method.
215         # This class provides both, for maximum compatibility.
216         def __call__(self):
217             return self
218
219     def __init__(self, stream, segments, name):
220         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
221         self._stream = stream
222         self.segments = segments
223
224     def stream_name(self):
225         return self._stream.name()
226
227     def size(self):
228         n = self.segments[-1]
229         return n.range_start + n.range_size
230
231     @_FileLikeObjectBase._before_close
232     @retry_method
233     def read(self, size, num_retries=None):
234         """Read up to 'size' bytes from the stream, starting at the current file position"""
235         if size == 0:
236             return b''
237
238         data = b''
239         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
240         if available_chunks:
241             lr = available_chunks[0]
242             data = self._stream.readfrom(lr.locator+lr.segment_offset,
243                                          lr.segment_size,
244                                          num_retries=num_retries)
245
246         self._filepos += len(data)
247         return data
248
249     @_FileLikeObjectBase._before_close
250     @retry_method
251     def readfrom(self, start, size, num_retries=None):
252         """Read up to 'size' bytes from the stream, starting at 'start'"""
253         if size == 0:
254             return b''
255
256         data = []
257         for lr in locators_and_ranges(self.segments, start, size):
258             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
259                                               num_retries=num_retries))
260         return b''.join(data)
261
262     def as_manifest(self):
263         segs = []
264         for r in self.segments:
265             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
266         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
267
268
269 def synchronized(orig_func):
270     @functools.wraps(orig_func)
271     def synchronized_wrapper(self, *args, **kwargs):
272         with self.lock:
273             return orig_func(self, *args, **kwargs)
274     return synchronized_wrapper
275
276
277 class StateChangeError(Exception):
278     def __init__(self, message, state, nextstate):
279         super(StateChangeError, self).__init__(message)
280         self.state = state
281         self.nextstate = nextstate
282
283 class _BufferBlock(object):
284     """A stand-in for a Keep block that is in the process of being written.
285
286     Writers can append to it, get the size, and compute the Keep locator.
287     There are three valid states:
288
289     WRITABLE
290       Can append to block.
291
292     PENDING
293       Block is in the process of being uploaded to Keep, append is an error.
294
295     COMMITTED
296       The block has been written to Keep, its internal buffer has been
297       released, fetching the block will fetch it via keep client (since we
298       discarded the internal copy), and identifiers referring to the BufferBlock
299       can be replaced with the block locator.
300
301     """
302
303     WRITABLE = 0
304     PENDING = 1
305     COMMITTED = 2
306     ERROR = 3
307     DELETED = 4
308
309     def __init__(self, blockid, starting_capacity, owner):
310         """
311         :blockid:
312           the identifier for this block
313
314         :starting_capacity:
315           the initial buffer capacity
316
317         :owner:
318           ArvadosFile that owns this block
319
320         """
321         self.blockid = blockid
322         self.buffer_block = bytearray(starting_capacity)
323         self.buffer_view = memoryview(self.buffer_block)
324         self.write_pointer = 0
325         self._state = _BufferBlock.WRITABLE
326         self._locator = None
327         self.owner = owner
328         self.lock = threading.Lock()
329         self.wait_for_commit = threading.Event()
330         self.error = None
331
332     @synchronized
333     def append(self, data):
334         """Append some data to the buffer.
335
336         Only valid if the block is in WRITABLE state.  Implements an expanding
337         buffer, doubling capacity as needed to accomdate all the data.
338
339         """
340         if self._state == _BufferBlock.WRITABLE:
341             if not isinstance(data, bytes) and not isinstance(data, memoryview):
342                 data = data.encode()
343             while (self.write_pointer+len(data)) > len(self.buffer_block):
344                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
345                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
346                 self.buffer_block = new_buffer_block
347                 self.buffer_view = memoryview(self.buffer_block)
348             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
349             self.write_pointer += len(data)
350             self._locator = None
351         else:
352             raise AssertionError("Buffer block is not writable")
353
354     STATE_TRANSITIONS = frozenset([
355             (WRITABLE, PENDING),
356             (PENDING, COMMITTED),
357             (PENDING, ERROR),
358             (ERROR, PENDING)])
359
360     @synchronized
361     def set_state(self, nextstate, val=None):
362         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
363             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
364         self._state = nextstate
365
366         if self._state == _BufferBlock.PENDING:
367             self.wait_for_commit.clear()
368
369         if self._state == _BufferBlock.COMMITTED:
370             self._locator = val
371             self.buffer_view = None
372             self.buffer_block = None
373             self.wait_for_commit.set()
374
375         if self._state == _BufferBlock.ERROR:
376             self.error = val
377             self.wait_for_commit.set()
378
379     @synchronized
380     def state(self):
381         return self._state
382
383     def size(self):
384         """The amount of data written to the buffer."""
385         return self.write_pointer
386
387     @synchronized
388     def locator(self):
389         """The Keep locator for this buffer's contents."""
390         if self._locator is None:
391             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
392         return self._locator
393
394     @synchronized
395     def clone(self, new_blockid, owner):
396         if self._state == _BufferBlock.COMMITTED:
397             raise AssertionError("Cannot duplicate committed buffer block")
398         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
399         bufferblock.append(self.buffer_view[0:self.size()])
400         return bufferblock
401
402     @synchronized
403     def clear(self):
404         self._state = _BufferBlock.DELETED
405         self.owner = None
406         self.buffer_block = None
407         self.buffer_view = None
408
409     @synchronized
410     def repack_writes(self):
411         """Optimize buffer block by repacking segments in file sequence.
412
413         When the client makes random writes, they appear in the buffer block in
414         the sequence they were written rather than the sequence they appear in
415         the file.  This makes for inefficient, fragmented manifests.  Attempt
416         to optimize by repacking writes in file sequence.
417
418         """
419         if self._state != _BufferBlock.WRITABLE:
420             raise AssertionError("Cannot repack non-writable block")
421
422         segs = self.owner.segments()
423
424         # Collect the segments that reference the buffer block.
425         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
426
427         # Collect total data referenced by segments (could be smaller than
428         # bufferblock size if a portion of the file was written and
429         # then overwritten).
430         write_total = sum([s.range_size for s in bufferblock_segs])
431
432         if write_total < self.size() or len(bufferblock_segs) > 1:
433             # If there's more than one segment referencing this block, it is
434             # due to out-of-order writes and will produce a fragmented
435             # manifest, so try to optimize by re-packing into a new buffer.
436             contents = self.buffer_view[0:self.write_pointer].tobytes()
437             new_bb = _BufferBlock(None, write_total, None)
438             for t in bufferblock_segs:
439                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
440                 t.segment_offset = new_bb.size() - t.range_size
441
442             self.buffer_block = new_bb.buffer_block
443             self.buffer_view = new_bb.buffer_view
444             self.write_pointer = new_bb.write_pointer
445             self._locator = None
446             new_bb.clear()
447             self.owner.set_segments(segs)
448
449     def __repr__(self):
450         return "<BufferBlock %s>" % (self.blockid)
451
452
453 class NoopLock(object):
454     def __enter__(self):
455         return self
456
457     def __exit__(self, exc_type, exc_value, traceback):
458         pass
459
460     def acquire(self, blocking=False):
461         pass
462
463     def release(self):
464         pass
465
466
467 def must_be_writable(orig_func):
468     @functools.wraps(orig_func)
469     def must_be_writable_wrapper(self, *args, **kwargs):
470         if not self.writable():
471             raise IOError(errno.EROFS, "Collection is read-only.")
472         return orig_func(self, *args, **kwargs)
473     return must_be_writable_wrapper
474
475
476 class _BlockManager(object):
477     """BlockManager handles buffer blocks.
478
479     Also handles background block uploads, and background block prefetch for a
480     Collection of ArvadosFiles.
481
482     """
483
484     DEFAULT_PUT_THREADS = 2
485     DEFAULT_GET_THREADS = 2
486
487     def __init__(self, keep, copies=None, put_threads=None):
488         """keep: KeepClient object to use"""
489         self._keep = keep
490         self._bufferblocks = collections.OrderedDict()
491         self._put_queue = None
492         self._put_threads = None
493         self._prefetch_queue = None
494         self._prefetch_threads = None
495         self.lock = threading.Lock()
496         self.prefetch_enabled = True
497         if put_threads:
498             self.num_put_threads = put_threads
499         else:
500             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
501         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
502         self.copies = copies
503         self._pending_write_size = 0
504         self.threads_lock = threading.Lock()
505         self.padding_block = None
506
507     @synchronized
508     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
509         """Allocate a new, empty bufferblock in WRITABLE state and return it.
510
511         :blockid:
512           optional block identifier, otherwise one will be automatically assigned
513
514         :starting_capacity:
515           optional capacity, otherwise will use default capacity
516
517         :owner:
518           ArvadosFile that owns this block
519
520         """
521         return self._alloc_bufferblock(blockid, starting_capacity, owner)
522
523     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
524         if blockid is None:
525             blockid = str(uuid.uuid4())
526         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
527         self._bufferblocks[bufferblock.blockid] = bufferblock
528         return bufferblock
529
530     @synchronized
531     def dup_block(self, block, owner):
532         """Create a new bufferblock initialized with the content of an existing bufferblock.
533
534         :block:
535           the buffer block to copy.
536
537         :owner:
538           ArvadosFile that owns the new block
539
540         """
541         new_blockid = str(uuid.uuid4())
542         bufferblock = block.clone(new_blockid, owner)
543         self._bufferblocks[bufferblock.blockid] = bufferblock
544         return bufferblock
545
546     @synchronized
547     def is_bufferblock(self, locator):
548         return locator in self._bufferblocks
549
550     def _commit_bufferblock_worker(self):
551         """Background uploader thread."""
552
553         while True:
554             try:
555                 bufferblock = self._put_queue.get()
556                 if bufferblock is None:
557                     return
558
559                 if self.copies is None:
560                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
561                 else:
562                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
563                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
564             except Exception as e:
565                 bufferblock.set_state(_BufferBlock.ERROR, e)
566             finally:
567                 if self._put_queue is not None:
568                     self._put_queue.task_done()
569
570     def start_put_threads(self):
571         with self.threads_lock:
572             if self._put_threads is None:
573                 # Start uploader threads.
574
575                 # If we don't limit the Queue size, the upload queue can quickly
576                 # grow to take up gigabytes of RAM if the writing process is
577                 # generating data more quickly than it can be send to the Keep
578                 # servers.
579                 #
580                 # With two upload threads and a queue size of 2, this means up to 4
581                 # blocks pending.  If they are full 64 MiB blocks, that means up to
582                 # 256 MiB of internal buffering, which is the same size as the
583                 # default download block cache in KeepClient.
584                 self._put_queue = queue.Queue(maxsize=2)
585
586                 self._put_threads = []
587                 for i in range(0, self.num_put_threads):
588                     thread = threading.Thread(target=self._commit_bufferblock_worker)
589                     self._put_threads.append(thread)
590                     thread.daemon = True
591                     thread.start()
592
593     def _block_prefetch_worker(self):
594         """The background downloader thread."""
595         while True:
596             try:
597                 b = self._prefetch_queue.get()
598                 if b is None:
599                     return
600                 self._keep.get(b)
601             except Exception:
602                 _logger.exception("Exception doing block prefetch")
603
604     @synchronized
605     def start_get_threads(self):
606         if self._prefetch_threads is None:
607             self._prefetch_queue = queue.Queue()
608             self._prefetch_threads = []
609             for i in range(0, self.num_get_threads):
610                 thread = threading.Thread(target=self._block_prefetch_worker)
611                 self._prefetch_threads.append(thread)
612                 thread.daemon = True
613                 thread.start()
614
615
616     @synchronized
617     def stop_threads(self):
618         """Shut down and wait for background upload and download threads to finish."""
619
620         if self._put_threads is not None:
621             for t in self._put_threads:
622                 self._put_queue.put(None)
623             for t in self._put_threads:
624                 t.join()
625         self._put_threads = None
626         self._put_queue = None
627
628         if self._prefetch_threads is not None:
629             for t in self._prefetch_threads:
630                 self._prefetch_queue.put(None)
631             for t in self._prefetch_threads:
632                 t.join()
633         self._prefetch_threads = None
634         self._prefetch_queue = None
635
636     def __enter__(self):
637         return self
638
639     def __exit__(self, exc_type, exc_value, traceback):
640         self.stop_threads()
641
642     @synchronized
643     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
644         """Packs small blocks together before uploading"""
645
646         self._pending_write_size += closed_file_size
647
648         # Check if there are enough small blocks for filling up one in full
649         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
650             return
651
652         # Search blocks ready for getting packed together before being
653         # committed to Keep.
654         # A WRITABLE block always has an owner.
655         # A WRITABLE block with its owner.closed() implies that its
656         # size is <= KEEP_BLOCK_SIZE/2.
657         try:
658             small_blocks = [b for b in listvalues(self._bufferblocks)
659                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
660         except AttributeError:
661             # Writable blocks without owner shouldn't exist.
662             raise UnownedBlockError()
663
664         if len(small_blocks) <= 1:
665             # Not enough small blocks for repacking
666             return
667
668         for bb in small_blocks:
669             bb.repack_writes()
670
671         # Update the pending write size count with its true value, just in case
672         # some small file was opened, written and closed several times.
673         self._pending_write_size = sum([b.size() for b in small_blocks])
674
675         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
676             return
677
678         new_bb = self._alloc_bufferblock()
679         new_bb.owner = []
680         files = []
681         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
682             bb = small_blocks.pop(0)
683             new_bb.owner.append(bb.owner)
684             self._pending_write_size -= bb.size()
685             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
686             files.append((bb, new_bb.write_pointer - bb.size()))
687
688         self.commit_bufferblock(new_bb, sync=sync)
689
690         for bb, new_bb_segment_offset in files:
691             newsegs = bb.owner.segments()
692             for s in newsegs:
693                 if s.locator == bb.blockid:
694                     s.locator = new_bb.blockid
695                     s.segment_offset = new_bb_segment_offset+s.segment_offset
696             bb.owner.set_segments(newsegs)
697             self._delete_bufferblock(bb.blockid)
698
699     def commit_bufferblock(self, block, sync):
700         """Initiate a background upload of a bufferblock.
701
702         :block:
703           The block object to upload
704
705         :sync:
706           If `sync` is True, upload the block synchronously.
707           If `sync` is False, upload the block asynchronously.  This will
708           return immediately unless the upload queue is at capacity, in
709           which case it will wait on an upload queue slot.
710
711         """
712         try:
713             # Mark the block as PENDING so to disallow any more appends.
714             block.set_state(_BufferBlock.PENDING)
715         except StateChangeError as e:
716             if e.state == _BufferBlock.PENDING:
717                 if sync:
718                     block.wait_for_commit.wait()
719                 else:
720                     return
721             if block.state() == _BufferBlock.COMMITTED:
722                 return
723             elif block.state() == _BufferBlock.ERROR:
724                 raise block.error
725             else:
726                 raise
727
728         if sync:
729             try:
730                 if self.copies is None:
731                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
732                 else:
733                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
734                 block.set_state(_BufferBlock.COMMITTED, loc)
735             except Exception as e:
736                 block.set_state(_BufferBlock.ERROR, e)
737                 raise
738         else:
739             self.start_put_threads()
740             self._put_queue.put(block)
741
742     @synchronized
743     def get_bufferblock(self, locator):
744         return self._bufferblocks.get(locator)
745
746     @synchronized
747     def get_padding_block(self):
748         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
749         when using truncate() to extend the size of a file.
750
751         For reference (and possible future optimization), the md5sum of the
752         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
753
754         """
755
756         if self.padding_block is None:
757             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
758             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
759             self.commit_bufferblock(self.padding_block, False)
760         return self.padding_block
761
762     @synchronized
763     def delete_bufferblock(self, locator):
764         self._delete_bufferblock(locator)
765
766     def _delete_bufferblock(self, locator):
767         bb = self._bufferblocks[locator]
768         bb.clear()
769         del self._bufferblocks[locator]
770
771     def get_block_contents(self, locator, num_retries, cache_only=False):
772         """Fetch a block.
773
774         First checks to see if the locator is a BufferBlock and return that, if
775         not, passes the request through to KeepClient.get().
776
777         """
778         with self.lock:
779             if locator in self._bufferblocks:
780                 bufferblock = self._bufferblocks[locator]
781                 if bufferblock.state() != _BufferBlock.COMMITTED:
782                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
783                 else:
784                     locator = bufferblock._locator
785         if cache_only:
786             return self._keep.get_from_cache(locator)
787         else:
788             return self._keep.get(locator, num_retries=num_retries)
789
790     def commit_all(self):
791         """Commit all outstanding buffer blocks.
792
793         This is a synchronous call, and will not return until all buffer blocks
794         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
795
796         """
797         self.repack_small_blocks(force=True, sync=True)
798
799         with self.lock:
800             items = listitems(self._bufferblocks)
801
802         for k,v in items:
803             if v.state() != _BufferBlock.COMMITTED and v.owner:
804                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
805                 # state, they're already being committed asynchronously.
806                 if isinstance(v.owner, ArvadosFile):
807                     v.owner.flush(sync=False)
808
809         with self.lock:
810             if self._put_queue is not None:
811                 self._put_queue.join()
812
813                 err = []
814                 for k,v in items:
815                     if v.state() == _BufferBlock.ERROR:
816                         err.append((v.locator(), v.error))
817                 if err:
818                     raise KeepWriteError("Error writing some blocks", err, label="block")
819
820         for k,v in items:
821             # flush again with sync=True to remove committed bufferblocks from
822             # the segments.
823             if v.owner:
824                 if isinstance(v.owner, ArvadosFile):
825                     v.owner.flush(sync=True)
826                 elif isinstance(v.owner, list) and len(v.owner) > 0:
827                     # This bufferblock is referenced by many files as a result
828                     # of repacking small blocks, so don't delete it when flushing
829                     # its owners, just do it after flushing them all.
830                     for owner in v.owner:
831                         owner.flush(sync=True)
832                     self.delete_bufferblock(k)
833
834     def block_prefetch(self, locator):
835         """Initiate a background download of a block.
836
837         This assumes that the underlying KeepClient implements a block cache,
838         so repeated requests for the same block will not result in repeated
839         downloads (unless the block is evicted from the cache.)  This method
840         does not block.
841
842         """
843
844         if not self.prefetch_enabled:
845             return
846
847         if self._keep.get_from_cache(locator) is not None:
848             return
849
850         with self.lock:
851             if locator in self._bufferblocks:
852                 return
853
854         self.start_get_threads()
855         self._prefetch_queue.put(locator)
856
857
858 class ArvadosFile(object):
859     """Represent a file in a Collection.
860
861     ArvadosFile manages the underlying representation of a file in Keep as a
862     sequence of segments spanning a set of blocks, and implements random
863     read/write access.
864
865     This object may be accessed from multiple threads.
866
867     """
868
869     def __init__(self, parent, name, stream=[], segments=[]):
870         """
871         ArvadosFile constructor.
872
873         :stream:
874           a list of Range objects representing a block stream
875
876         :segments:
877           a list of Range objects representing segments
878         """
879         self.parent = parent
880         self.name = name
881         self._writers = set()
882         self._committed = False
883         self._segments = []
884         self.lock = parent.root_collection().lock
885         for s in segments:
886             self._add_segment(stream, s.locator, s.range_size)
887         self._current_bblock = None
888
889     def writable(self):
890         return self.parent.writable()
891
892     @synchronized
893     def permission_expired(self, as_of_dt=None):
894         """Returns True if any of the segment's locators is expired"""
895         for r in self._segments:
896             if KeepLocator(r.locator).permission_expired(as_of_dt):
897                 return True
898         return False
899
900     @synchronized
901     def segments(self):
902         return copy.copy(self._segments)
903
904     @synchronized
905     def clone(self, new_parent, new_name):
906         """Make a copy of this file."""
907         cp = ArvadosFile(new_parent, new_name)
908         cp.replace_contents(self)
909         return cp
910
911     @must_be_writable
912     @synchronized
913     def replace_contents(self, other):
914         """Replace segments of this file with segments from another `ArvadosFile` object."""
915
916         map_loc = {}
917         self._segments = []
918         for other_segment in other.segments():
919             new_loc = other_segment.locator
920             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
921                 if other_segment.locator not in map_loc:
922                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
923                     if bufferblock.state() != _BufferBlock.WRITABLE:
924                         map_loc[other_segment.locator] = bufferblock.locator()
925                     else:
926                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
927                 new_loc = map_loc[other_segment.locator]
928
929             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
930
931         self.set_committed(False)
932
933     def __eq__(self, other):
934         if other is self:
935             return True
936         if not isinstance(other, ArvadosFile):
937             return False
938
939         othersegs = other.segments()
940         with self.lock:
941             if len(self._segments) != len(othersegs):
942                 return False
943             for i in range(0, len(othersegs)):
944                 seg1 = self._segments[i]
945                 seg2 = othersegs[i]
946                 loc1 = seg1.locator
947                 loc2 = seg2.locator
948
949                 if self.parent._my_block_manager().is_bufferblock(loc1):
950                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
951
952                 if other.parent._my_block_manager().is_bufferblock(loc2):
953                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
954
955                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
956                     seg1.range_start != seg2.range_start or
957                     seg1.range_size != seg2.range_size or
958                     seg1.segment_offset != seg2.segment_offset):
959                     return False
960
961         return True
962
963     def __ne__(self, other):
964         return not self.__eq__(other)
965
966     @synchronized
967     def set_segments(self, segs):
968         self._segments = segs
969
970     @synchronized
971     def set_committed(self, value=True):
972         """Set committed flag.
973
974         If value is True, set committed to be True.
975
976         If value is False, set committed to be False for this and all parents.
977         """
978         if value == self._committed:
979             return
980         self._committed = value
981         if self._committed is False and self.parent is not None:
982             self.parent.set_committed(False)
983
984     @synchronized
985     def committed(self):
986         """Get whether this is committed or not."""
987         return self._committed
988
989     @synchronized
990     def add_writer(self, writer):
991         """Add an ArvadosFileWriter reference to the list of writers"""
992         if isinstance(writer, ArvadosFileWriter):
993             self._writers.add(writer)
994
995     @synchronized
996     def remove_writer(self, writer, flush):
997         """
998         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
999         and do some block maintenance tasks.
1000         """
1001         self._writers.remove(writer)
1002
1003         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1004             # File writer closed, not small enough for repacking
1005             self.flush()
1006         elif self.closed():
1007             # All writers closed and size is adequate for repacking
1008             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1009
1010     def closed(self):
1011         """
1012         Get whether this is closed or not. When the writers list is empty, the file
1013         is supposed to be closed.
1014         """
1015         return len(self._writers) == 0
1016
1017     @must_be_writable
1018     @synchronized
1019     def truncate(self, size):
1020         """Shrink or expand the size of the file.
1021
1022         If `size` is less than the size of the file, the file contents after
1023         `size` will be discarded.  If `size` is greater than the current size
1024         of the file, it will be filled with zero bytes.
1025
1026         """
1027         if size < self.size():
1028             new_segs = []
1029             for r in self._segments:
1030                 range_end = r.range_start+r.range_size
1031                 if r.range_start >= size:
1032                     # segment is past the trucate size, all done
1033                     break
1034                 elif size < range_end:
1035                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1036                     nr.segment_offset = r.segment_offset
1037                     new_segs.append(nr)
1038                     break
1039                 else:
1040                     new_segs.append(r)
1041
1042             self._segments = new_segs
1043             self.set_committed(False)
1044         elif size > self.size():
1045             padding = self.parent._my_block_manager().get_padding_block()
1046             diff = size - self.size()
1047             while diff > config.KEEP_BLOCK_SIZE:
1048                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1049                 diff -= config.KEEP_BLOCK_SIZE
1050             if diff > 0:
1051                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1052             self.set_committed(False)
1053         else:
1054             # size == self.size()
1055             pass
1056
1057     def readfrom(self, offset, size, num_retries, exact=False):
1058         """Read up to `size` bytes from the file starting at `offset`.
1059
1060         :exact:
1061          If False (default), return less data than requested if the read
1062          crosses a block boundary and the next block isn't cached.  If True,
1063          only return less data than requested when hitting EOF.
1064         """
1065
1066         with self.lock:
1067             if size == 0 or offset >= self.size():
1068                 return b''
1069             readsegs = locators_and_ranges(self._segments, offset, size)
1070             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1071
1072         locs = set()
1073         data = []
1074         for lr in readsegs:
1075             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1076             if block:
1077                 blockview = memoryview(block)
1078                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1079                 locs.add(lr.locator)
1080             else:
1081                 break
1082
1083         for lr in prefetch:
1084             if lr.locator not in locs:
1085                 self.parent._my_block_manager().block_prefetch(lr.locator)
1086                 locs.add(lr.locator)
1087
1088         return b''.join(data)
1089
1090     @must_be_writable
1091     @synchronized
1092     def writeto(self, offset, data, num_retries):
1093         """Write `data` to the file starting at `offset`.
1094
1095         This will update existing bytes and/or extend the size of the file as
1096         necessary.
1097
1098         """
1099         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1100             data = data.encode()
1101         if len(data) == 0:
1102             return
1103
1104         if offset > self.size():
1105             self.truncate(offset)
1106
1107         if len(data) > config.KEEP_BLOCK_SIZE:
1108             # Chunk it up into smaller writes
1109             n = 0
1110             dataview = memoryview(data)
1111             while n < len(data):
1112                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1113                 n += config.KEEP_BLOCK_SIZE
1114             return
1115
1116         self.set_committed(False)
1117
1118         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1119             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1120
1121         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1122             self._current_bblock.repack_writes()
1123             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1124                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1125                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1126
1127         self._current_bblock.append(data)
1128
1129         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1130
1131         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1132
1133         return len(data)
1134
1135     @synchronized
1136     def flush(self, sync=True, num_retries=0):
1137         """Flush the current bufferblock to Keep.
1138
1139         :sync:
1140           If True, commit block synchronously, wait until buffer block has been written.
1141           If False, commit block asynchronously, return immediately after putting block into
1142           the keep put queue.
1143         """
1144         if self.committed():
1145             return
1146
1147         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1148             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1149                 self._current_bblock.repack_writes()
1150             if self._current_bblock.state() != _BufferBlock.DELETED:
1151                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1152
1153         if sync:
1154             to_delete = set()
1155             for s in self._segments:
1156                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1157                 if bb:
1158                     if bb.state() != _BufferBlock.COMMITTED:
1159                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1160                     to_delete.add(s.locator)
1161                     s.locator = bb.locator()
1162             for s in to_delete:
1163                 # Don't delete the bufferblock if it's owned by many files. It'll be
1164                 # deleted after all of its owners are flush()ed.
1165                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1166                     self.parent._my_block_manager().delete_bufferblock(s)
1167
1168         self.parent.notify(MOD, self.parent, self.name, (self, self))
1169
1170     @must_be_writable
1171     @synchronized
1172     def add_segment(self, blocks, pos, size):
1173         """Add a segment to the end of the file.
1174
1175         `pos` and `offset` reference a section of the stream described by
1176         `blocks` (a list of Range objects)
1177
1178         """
1179         self._add_segment(blocks, pos, size)
1180
1181     def _add_segment(self, blocks, pos, size):
1182         """Internal implementation of add_segment."""
1183         self.set_committed(False)
1184         for lr in locators_and_ranges(blocks, pos, size):
1185             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1186             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1187             self._segments.append(r)
1188
1189     @synchronized
1190     def size(self):
1191         """Get the file size."""
1192         if self._segments:
1193             n = self._segments[-1]
1194             return n.range_start + n.range_size
1195         else:
1196             return 0
1197
1198     @synchronized
1199     def manifest_text(self, stream_name=".", portable_locators=False,
1200                       normalize=False, only_committed=False):
1201         buf = ""
1202         filestream = []
1203         for segment in self._segments:
1204             loc = segment.locator
1205             if self.parent._my_block_manager().is_bufferblock(loc):
1206                 if only_committed:
1207                     continue
1208                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1209             if portable_locators:
1210                 loc = KeepLocator(loc).stripped()
1211             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1212                                  segment.segment_offset, segment.range_size))
1213         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1214         buf += "\n"
1215         return buf
1216
1217     @must_be_writable
1218     @synchronized
1219     def _reparent(self, newparent, newname):
1220         self.set_committed(False)
1221         self.flush(sync=True)
1222         self.parent.remove(self.name)
1223         self.parent = newparent
1224         self.name = newname
1225         self.lock = self.parent.root_collection().lock
1226
1227
1228 class ArvadosFileReader(ArvadosFileReaderBase):
1229     """Wraps ArvadosFile in a file-like object supporting reading only.
1230
1231     Be aware that this class is NOT thread safe as there is no locking around
1232     updating file pointer.
1233
1234     """
1235
1236     def __init__(self, arvadosfile, mode="r", num_retries=None):
1237         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1238         self.arvadosfile = arvadosfile
1239
1240     def size(self):
1241         return self.arvadosfile.size()
1242
1243     def stream_name(self):
1244         return self.arvadosfile.parent.stream_name()
1245
1246     @_FileLikeObjectBase._before_close
1247     @retry_method
1248     def read(self, size=None, num_retries=None):
1249         """Read up to `size` bytes from the file and return the result.
1250
1251         Starts at the current file position.  If `size` is None, read the
1252         entire remainder of the file.
1253         """
1254         if size is None:
1255             data = []
1256             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1257             while rd:
1258                 data.append(rd)
1259                 self._filepos += len(rd)
1260                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1261             return b''.join(data)
1262         else:
1263             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1264             self._filepos += len(data)
1265             return data
1266
1267     @_FileLikeObjectBase._before_close
1268     @retry_method
1269     def readfrom(self, offset, size, num_retries=None):
1270         """Read up to `size` bytes from the stream, starting at the specified file offset.
1271
1272         This method does not change the file position.
1273         """
1274         return self.arvadosfile.readfrom(offset, size, num_retries)
1275
1276     def flush(self):
1277         pass
1278
1279
1280 class ArvadosFileWriter(ArvadosFileReader):
1281     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1282
1283     Be aware that this class is NOT thread safe as there is no locking around
1284     updating file pointer.
1285
1286     """
1287
1288     def __init__(self, arvadosfile, mode, num_retries=None):
1289         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1290         self.arvadosfile.add_writer(self)
1291
1292     def writable(self):
1293         return True
1294
1295     @_FileLikeObjectBase._before_close
1296     @retry_method
1297     def write(self, data, num_retries=None):
1298         if self.mode[0] == "a":
1299             self._filepos = self.size()
1300         self.arvadosfile.writeto(self._filepos, data, num_retries)
1301         self._filepos += len(data)
1302         return len(data)
1303
1304     @_FileLikeObjectBase._before_close
1305     @retry_method
1306     def writelines(self, seq, num_retries=None):
1307         for s in seq:
1308             self.write(s, num_retries=num_retries)
1309
1310     @_FileLikeObjectBase._before_close
1311     def truncate(self, size=None):
1312         if size is None:
1313             size = self._filepos
1314         self.arvadosfile.truncate(size)
1315
1316     @_FileLikeObjectBase._before_close
1317     def flush(self):
1318         self.arvadosfile.flush()
1319
1320     def close(self, flush=True):
1321         if not self.closed:
1322             self.arvadosfile.remove_writer(self, flush)
1323             super(ArvadosFileWriter, self).close()