14198: Strip 4th position when setting runtime_token from provided auth
[arvados.git] / sdk / python / arvados / arvfile.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
12 import bz2
13 import collections
14 import copy
15 import errno
16 import functools
17 import hashlib
18 import logging
19 import os
20 import queue
21 import re
22 import sys
23 import threading
24 import uuid
25 import zlib
26
27 from . import config
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
33
34 MOD = "mod"
35 WRITE = "write"
36
37 _logger = logging.getLogger('arvados.arvfile')
38
39 def split(path):
40     """split(path) -> streamname, filename
41
42     Separate the stream name and file name in a /-separated stream path and
43     return a tuple (stream_name, file_name).  If no stream name is available,
44     assume '.'.
45
46     """
47     try:
48         stream_name, file_name = path.rsplit('/', 1)
49     except ValueError:  # No / in string
50         stream_name, file_name = '.', path
51     return stream_name, file_name
52
53
54 class UnownedBlockError(Exception):
55     """Raised when there's an writable block without an owner on the BlockManager."""
56     pass
57
58
59 class _FileLikeObjectBase(object):
60     def __init__(self, name, mode):
61         self.name = name
62         self.mode = mode
63         self.closed = False
64
65     @staticmethod
66     def _before_close(orig_func):
67         @functools.wraps(orig_func)
68         def before_close_wrapper(self, *args, **kwargs):
69             if self.closed:
70                 raise ValueError("I/O operation on closed stream file")
71             return orig_func(self, *args, **kwargs)
72         return before_close_wrapper
73
74     def __enter__(self):
75         return self
76
77     def __exit__(self, exc_type, exc_value, traceback):
78         try:
79             self.close()
80         except Exception:
81             if exc_type is None:
82                 raise
83
84     def close(self):
85         self.closed = True
86
87
88 class ArvadosFileReaderBase(_FileLikeObjectBase):
89     def __init__(self, name, mode, num_retries=None):
90         super(ArvadosFileReaderBase, self).__init__(name, mode)
91         self._binary = 'b' in mode
92         if sys.version_info >= (3, 0) and not self._binary:
93             raise NotImplementedError("text mode {!r} is not implemented".format(mode))
94         self._filepos = 0
95         self.num_retries = num_retries
96         self._readline_cache = (None, None)
97
98     def __iter__(self):
99         while True:
100             data = self.readline()
101             if not data:
102                 break
103             yield data
104
105     def decompressed_name(self):
106         return re.sub('\.(bz2|gz)$', '', self.name)
107
108     @_FileLikeObjectBase._before_close
109     def seek(self, pos, whence=os.SEEK_SET):
110         if whence == os.SEEK_CUR:
111             pos += self._filepos
112         elif whence == os.SEEK_END:
113             pos += self.size()
114         if pos < 0:
115             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
116         self._filepos = pos
117         return self._filepos
118
119     def tell(self):
120         return self._filepos
121
122     def readable(self):
123         return True
124
125     def writable(self):
126         return False
127
128     def seekable(self):
129         return True
130
131     @_FileLikeObjectBase._before_close
132     @retry_method
133     def readall(self, size=2**20, num_retries=None):
134         while True:
135             data = self.read(size, num_retries=num_retries)
136             if len(data) == 0:
137                 break
138             yield data
139
140     @_FileLikeObjectBase._before_close
141     @retry_method
142     def readline(self, size=float('inf'), num_retries=None):
143         cache_pos, cache_data = self._readline_cache
144         if self.tell() == cache_pos:
145             data = [cache_data]
146             self._filepos += len(cache_data)
147         else:
148             data = [b'']
149         data_size = len(data[-1])
150         while (data_size < size) and (b'\n' not in data[-1]):
151             next_read = self.read(2 ** 20, num_retries=num_retries)
152             if not next_read:
153                 break
154             data.append(next_read)
155             data_size += len(next_read)
156         data = b''.join(data)
157         try:
158             nextline_index = data.index(b'\n') + 1
159         except ValueError:
160             nextline_index = len(data)
161         nextline_index = min(nextline_index, size)
162         self._filepos -= len(data) - nextline_index
163         self._readline_cache = (self.tell(), data[nextline_index:])
164         return data[:nextline_index].decode()
165
166     @_FileLikeObjectBase._before_close
167     @retry_method
168     def decompress(self, decompress, size, num_retries=None):
169         for segment in self.readall(size, num_retries=num_retries):
170             data = decompress(segment)
171             if data:
172                 yield data
173
174     @_FileLikeObjectBase._before_close
175     @retry_method
176     def readall_decompressed(self, size=2**20, num_retries=None):
177         self.seek(0)
178         if self.name.endswith('.bz2'):
179             dc = bz2.BZ2Decompressor()
180             return self.decompress(dc.decompress, size,
181                                    num_retries=num_retries)
182         elif self.name.endswith('.gz'):
183             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
184             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
185                                    size, num_retries=num_retries)
186         else:
187             return self.readall(size, num_retries=num_retries)
188
189     @_FileLikeObjectBase._before_close
190     @retry_method
191     def readlines(self, sizehint=float('inf'), num_retries=None):
192         data = []
193         data_size = 0
194         for s in self.readall(num_retries=num_retries):
195             data.append(s)
196             data_size += len(s)
197             if data_size >= sizehint:
198                 break
199         return b''.join(data).decode().splitlines(True)
200
201     def size(self):
202         raise IOError(errno.ENOSYS, "Not implemented")
203
204     def read(self, size, num_retries=None):
205         raise IOError(errno.ENOSYS, "Not implemented")
206
207     def readfrom(self, start, size, num_retries=None):
208         raise IOError(errno.ENOSYS, "Not implemented")
209
210
211 class StreamFileReader(ArvadosFileReaderBase):
212     class _NameAttribute(str):
213         # The Python file API provides a plain .name attribute.
214         # Older SDK provided a name() method.
215         # This class provides both, for maximum compatibility.
216         def __call__(self):
217             return self
218
219     def __init__(self, stream, segments, name):
220         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
221         self._stream = stream
222         self.segments = segments
223
224     def stream_name(self):
225         return self._stream.name()
226
227     def size(self):
228         n = self.segments[-1]
229         return n.range_start + n.range_size
230
231     @_FileLikeObjectBase._before_close
232     @retry_method
233     def read(self, size, num_retries=None):
234         """Read up to 'size' bytes from the stream, starting at the current file position"""
235         if size == 0:
236             return b''
237
238         data = b''
239         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
240         if available_chunks:
241             lr = available_chunks[0]
242             data = self._stream.readfrom(lr.locator+lr.segment_offset,
243                                          lr.segment_size,
244                                          num_retries=num_retries)
245
246         self._filepos += len(data)
247         return data
248
249     @_FileLikeObjectBase._before_close
250     @retry_method
251     def readfrom(self, start, size, num_retries=None):
252         """Read up to 'size' bytes from the stream, starting at 'start'"""
253         if size == 0:
254             return b''
255
256         data = []
257         for lr in locators_and_ranges(self.segments, start, size):
258             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
259                                               num_retries=num_retries))
260         return b''.join(data)
261
262     def as_manifest(self):
263         segs = []
264         for r in self.segments:
265             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
266         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
267
268
269 def synchronized(orig_func):
270     @functools.wraps(orig_func)
271     def synchronized_wrapper(self, *args, **kwargs):
272         with self.lock:
273             return orig_func(self, *args, **kwargs)
274     return synchronized_wrapper
275
276
277 class StateChangeError(Exception):
278     def __init__(self, message, state, nextstate):
279         super(StateChangeError, self).__init__(message)
280         self.state = state
281         self.nextstate = nextstate
282
283 class _BufferBlock(object):
284     """A stand-in for a Keep block that is in the process of being written.
285
286     Writers can append to it, get the size, and compute the Keep locator.
287     There are three valid states:
288
289     WRITABLE
290       Can append to block.
291
292     PENDING
293       Block is in the process of being uploaded to Keep, append is an error.
294
295     COMMITTED
296       The block has been written to Keep, its internal buffer has been
297       released, fetching the block will fetch it via keep client (since we
298       discarded the internal copy), and identifiers referring to the BufferBlock
299       can be replaced with the block locator.
300
301     """
302
303     WRITABLE = 0
304     PENDING = 1
305     COMMITTED = 2
306     ERROR = 3
307     DELETED = 4
308
309     def __init__(self, blockid, starting_capacity, owner):
310         """
311         :blockid:
312           the identifier for this block
313
314         :starting_capacity:
315           the initial buffer capacity
316
317         :owner:
318           ArvadosFile that owns this block
319
320         """
321         self.blockid = blockid
322         self.buffer_block = bytearray(starting_capacity)
323         self.buffer_view = memoryview(self.buffer_block)
324         self.write_pointer = 0
325         self._state = _BufferBlock.WRITABLE
326         self._locator = None
327         self.owner = owner
328         self.lock = threading.Lock()
329         self.wait_for_commit = threading.Event()
330         self.error = None
331
332     @synchronized
333     def append(self, data):
334         """Append some data to the buffer.
335
336         Only valid if the block is in WRITABLE state.  Implements an expanding
337         buffer, doubling capacity as needed to accomdate all the data.
338
339         """
340         if self._state == _BufferBlock.WRITABLE:
341             if not isinstance(data, bytes) and not isinstance(data, memoryview):
342                 data = data.encode()
343             while (self.write_pointer+len(data)) > len(self.buffer_block):
344                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
345                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
346                 self.buffer_block = new_buffer_block
347                 self.buffer_view = memoryview(self.buffer_block)
348             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
349             self.write_pointer += len(data)
350             self._locator = None
351         else:
352             raise AssertionError("Buffer block is not writable")
353
354     STATE_TRANSITIONS = frozenset([
355             (WRITABLE, PENDING),
356             (PENDING, COMMITTED),
357             (PENDING, ERROR),
358             (ERROR, PENDING)])
359
360     @synchronized
361     def set_state(self, nextstate, val=None):
362         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
363             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
364         self._state = nextstate
365
366         if self._state == _BufferBlock.PENDING:
367             self.wait_for_commit.clear()
368
369         if self._state == _BufferBlock.COMMITTED:
370             self._locator = val
371             self.buffer_view = None
372             self.buffer_block = None
373             self.wait_for_commit.set()
374
375         if self._state == _BufferBlock.ERROR:
376             self.error = val
377             self.wait_for_commit.set()
378
379     @synchronized
380     def state(self):
381         return self._state
382
383     def size(self):
384         """The amount of data written to the buffer."""
385         return self.write_pointer
386
387     @synchronized
388     def locator(self):
389         """The Keep locator for this buffer's contents."""
390         if self._locator is None:
391             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
392         return self._locator
393
394     @synchronized
395     def clone(self, new_blockid, owner):
396         if self._state == _BufferBlock.COMMITTED:
397             raise AssertionError("Cannot duplicate committed buffer block")
398         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
399         bufferblock.append(self.buffer_view[0:self.size()])
400         return bufferblock
401
402     @synchronized
403     def clear(self):
404         self._state = _BufferBlock.DELETED
405         self.owner = None
406         self.buffer_block = None
407         self.buffer_view = None
408
409     @synchronized
410     def repack_writes(self):
411         """Optimize buffer block by repacking segments in file sequence.
412
413         When the client makes random writes, they appear in the buffer block in
414         the sequence they were written rather than the sequence they appear in
415         the file.  This makes for inefficient, fragmented manifests.  Attempt
416         to optimize by repacking writes in file sequence.
417
418         """
419         if self._state != _BufferBlock.WRITABLE:
420             raise AssertionError("Cannot repack non-writable block")
421
422         segs = self.owner.segments()
423
424         # Collect the segments that reference the buffer block.
425         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
426
427         # Collect total data referenced by segments (could be smaller than
428         # bufferblock size if a portion of the file was written and
429         # then overwritten).
430         write_total = sum([s.range_size for s in bufferblock_segs])
431
432         if write_total < self.size() or len(bufferblock_segs) > 1:
433             # If there's more than one segment referencing this block, it is
434             # due to out-of-order writes and will produce a fragmented
435             # manifest, so try to optimize by re-packing into a new buffer.
436             contents = self.buffer_view[0:self.write_pointer].tobytes()
437             new_bb = _BufferBlock(None, write_total, None)
438             for t in bufferblock_segs:
439                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
440                 t.segment_offset = new_bb.size() - t.range_size
441
442             self.buffer_block = new_bb.buffer_block
443             self.buffer_view = new_bb.buffer_view
444             self.write_pointer = new_bb.write_pointer
445             self._locator = None
446             new_bb.clear()
447             self.owner.set_segments(segs)
448
449     def __repr__(self):
450         return "<BufferBlock %s>" % (self.blockid)
451
452
453 class NoopLock(object):
454     def __enter__(self):
455         return self
456
457     def __exit__(self, exc_type, exc_value, traceback):
458         pass
459
460     def acquire(self, blocking=False):
461         pass
462
463     def release(self):
464         pass
465
466
467 def must_be_writable(orig_func):
468     @functools.wraps(orig_func)
469     def must_be_writable_wrapper(self, *args, **kwargs):
470         if not self.writable():
471             raise IOError(errno.EROFS, "Collection is read-only.")
472         return orig_func(self, *args, **kwargs)
473     return must_be_writable_wrapper
474
475
476 class _BlockManager(object):
477     """BlockManager handles buffer blocks.
478
479     Also handles background block uploads, and background block prefetch for a
480     Collection of ArvadosFiles.
481
482     """
483
484     DEFAULT_PUT_THREADS = 2
485     DEFAULT_GET_THREADS = 2
486
487     def __init__(self, keep, copies=None, put_threads=None):
488         """keep: KeepClient object to use"""
489         self._keep = keep
490         self._bufferblocks = collections.OrderedDict()
491         self._put_queue = None
492         self._put_threads = None
493         self._prefetch_queue = None
494         self._prefetch_threads = None
495         self.lock = threading.Lock()
496         self.prefetch_enabled = True
497         if put_threads:
498             self.num_put_threads = put_threads
499         else:
500             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
501         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
502         self.copies = copies
503         self._pending_write_size = 0
504         self.threads_lock = threading.Lock()
505         self.padding_block = None
506
507     @synchronized
508     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
509         """Allocate a new, empty bufferblock in WRITABLE state and return it.
510
511         :blockid:
512           optional block identifier, otherwise one will be automatically assigned
513
514         :starting_capacity:
515           optional capacity, otherwise will use default capacity
516
517         :owner:
518           ArvadosFile that owns this block
519
520         """
521         return self._alloc_bufferblock(blockid, starting_capacity, owner)
522
523     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
524         if blockid is None:
525             blockid = str(uuid.uuid4())
526         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
527         self._bufferblocks[bufferblock.blockid] = bufferblock
528         return bufferblock
529
530     @synchronized
531     def dup_block(self, block, owner):
532         """Create a new bufferblock initialized with the content of an existing bufferblock.
533
534         :block:
535           the buffer block to copy.
536
537         :owner:
538           ArvadosFile that owns the new block
539
540         """
541         new_blockid = str(uuid.uuid4())
542         bufferblock = block.clone(new_blockid, owner)
543         self._bufferblocks[bufferblock.blockid] = bufferblock
544         return bufferblock
545
546     @synchronized
547     def is_bufferblock(self, locator):
548         return locator in self._bufferblocks
549
550     def _commit_bufferblock_worker(self):
551         """Background uploader thread."""
552
553         while True:
554             try:
555                 bufferblock = self._put_queue.get()
556                 if bufferblock is None:
557                     return
558
559                 if self.copies is None:
560                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
561                 else:
562                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
563                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
564             except Exception as e:
565                 bufferblock.set_state(_BufferBlock.ERROR, e)
566             finally:
567                 if self._put_queue is not None:
568                     self._put_queue.task_done()
569
570     def start_put_threads(self):
571         with self.threads_lock:
572             if self._put_threads is None:
573                 # Start uploader threads.
574
575                 # If we don't limit the Queue size, the upload queue can quickly
576                 # grow to take up gigabytes of RAM if the writing process is
577                 # generating data more quickly than it can be send to the Keep
578                 # servers.
579                 #
580                 # With two upload threads and a queue size of 2, this means up to 4
581                 # blocks pending.  If they are full 64 MiB blocks, that means up to
582                 # 256 MiB of internal buffering, which is the same size as the
583                 # default download block cache in KeepClient.
584                 self._put_queue = queue.Queue(maxsize=2)
585
586                 self._put_threads = []
587                 for i in range(0, self.num_put_threads):
588                     thread = threading.Thread(target=self._commit_bufferblock_worker)
589                     self._put_threads.append(thread)
590                     thread.daemon = True
591                     thread.start()
592
593     def _block_prefetch_worker(self):
594         """The background downloader thread."""
595         while True:
596             try:
597                 b = self._prefetch_queue.get()
598                 if b is None:
599                     return
600                 self._keep.get(b)
601             except Exception:
602                 _logger.exception("Exception doing block prefetch")
603
604     @synchronized
605     def start_get_threads(self):
606         if self._prefetch_threads is None:
607             self._prefetch_queue = queue.Queue()
608             self._prefetch_threads = []
609             for i in range(0, self.num_get_threads):
610                 thread = threading.Thread(target=self._block_prefetch_worker)
611                 self._prefetch_threads.append(thread)
612                 thread.daemon = True
613                 thread.start()
614
615
616     @synchronized
617     def stop_threads(self):
618         """Shut down and wait for background upload and download threads to finish."""
619
620         if self._put_threads is not None:
621             for t in self._put_threads:
622                 self._put_queue.put(None)
623             for t in self._put_threads:
624                 t.join()
625         self._put_threads = None
626         self._put_queue = None
627
628         if self._prefetch_threads is not None:
629             for t in self._prefetch_threads:
630                 self._prefetch_queue.put(None)
631             for t in self._prefetch_threads:
632                 t.join()
633         self._prefetch_threads = None
634         self._prefetch_queue = None
635
636     def __enter__(self):
637         return self
638
639     def __exit__(self, exc_type, exc_value, traceback):
640         self.stop_threads()
641
642     @synchronized
643     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
644         """Packs small blocks together before uploading"""
645
646         self._pending_write_size += closed_file_size
647
648         # Check if there are enough small blocks for filling up one in full
649         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
650             return
651
652         # Search blocks ready for getting packed together before being
653         # committed to Keep.
654         # A WRITABLE block always has an owner.
655         # A WRITABLE block with its owner.closed() implies that its
656         # size is <= KEEP_BLOCK_SIZE/2.
657         try:
658             small_blocks = [b for b in listvalues(self._bufferblocks)
659                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
660         except AttributeError:
661             # Writable blocks without owner shouldn't exist.
662             raise UnownedBlockError()
663
664         if len(small_blocks) <= 1:
665             # Not enough small blocks for repacking
666             return
667
668         for bb in small_blocks:
669             bb.repack_writes()
670
671         # Update the pending write size count with its true value, just in case
672         # some small file was opened, written and closed several times.
673         self._pending_write_size = sum([b.size() for b in small_blocks])
674
675         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
676             return
677
678         new_bb = self._alloc_bufferblock()
679         new_bb.owner = []
680         files = []
681         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
682             bb = small_blocks.pop(0)
683             new_bb.owner.append(bb.owner)
684             self._pending_write_size -= bb.size()
685             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
686             files.append((bb, new_bb.write_pointer - bb.size()))
687
688         self.commit_bufferblock(new_bb, sync=sync)
689
690         for bb, new_bb_segment_offset in files:
691             newsegs = bb.owner.segments()
692             for s in newsegs:
693                 if s.locator == bb.blockid:
694                     s.locator = new_bb.blockid
695                     s.segment_offset = new_bb_segment_offset+s.segment_offset
696             bb.owner.set_segments(newsegs)
697             self._delete_bufferblock(bb.blockid)
698
699     def commit_bufferblock(self, block, sync):
700         """Initiate a background upload of a bufferblock.
701
702         :block:
703           The block object to upload
704
705         :sync:
706           If `sync` is True, upload the block synchronously.
707           If `sync` is False, upload the block asynchronously.  This will
708           return immediately unless the upload queue is at capacity, in
709           which case it will wait on an upload queue slot.
710
711         """
712         try:
713             # Mark the block as PENDING so to disallow any more appends.
714             block.set_state(_BufferBlock.PENDING)
715         except StateChangeError as e:
716             if e.state == _BufferBlock.PENDING:
717                 if sync:
718                     block.wait_for_commit.wait()
719                 else:
720                     return
721             if block.state() == _BufferBlock.COMMITTED:
722                 return
723             elif block.state() == _BufferBlock.ERROR:
724                 raise block.error
725             else:
726                 raise
727
728         if sync:
729             try:
730                 if self.copies is None:
731                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
732                 else:
733                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
734                 block.set_state(_BufferBlock.COMMITTED, loc)
735             except Exception as e:
736                 block.set_state(_BufferBlock.ERROR, e)
737                 raise
738         else:
739             self.start_put_threads()
740             self._put_queue.put(block)
741
742     @synchronized
743     def get_bufferblock(self, locator):
744         return self._bufferblocks.get(locator)
745
746     @synchronized
747     def get_padding_block(self):
748         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
749         when using truncate() to extend the size of a file.
750
751         For reference (and possible future optimization), the md5sum of the
752         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
753
754         """
755
756         if self.padding_block is None:
757             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
758             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
759             self.commit_bufferblock(self.padding_block, False)
760         return self.padding_block
761
762     @synchronized
763     def delete_bufferblock(self, locator):
764         self._delete_bufferblock(locator)
765
766     def _delete_bufferblock(self, locator):
767         bb = self._bufferblocks[locator]
768         bb.clear()
769         del self._bufferblocks[locator]
770
771     def get_block_contents(self, locator, num_retries, cache_only=False):
772         """Fetch a block.
773
774         First checks to see if the locator is a BufferBlock and return that, if
775         not, passes the request through to KeepClient.get().
776
777         """
778         with self.lock:
779             if locator in self._bufferblocks:
780                 bufferblock = self._bufferblocks[locator]
781                 if bufferblock.state() != _BufferBlock.COMMITTED:
782                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
783                 else:
784                     locator = bufferblock._locator
785         if cache_only:
786             return self._keep.get_from_cache(locator)
787         else:
788             return self._keep.get(locator, num_retries=num_retries)
789
790     def commit_all(self):
791         """Commit all outstanding buffer blocks.
792
793         This is a synchronous call, and will not return until all buffer blocks
794         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
795
796         """
797         self.repack_small_blocks(force=True, sync=True)
798
799         with self.lock:
800             items = listitems(self._bufferblocks)
801
802         for k,v in items:
803             if v.state() != _BufferBlock.COMMITTED and v.owner:
804                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
805                 # state, they're already being committed asynchronously.
806                 if isinstance(v.owner, ArvadosFile):
807                     v.owner.flush(sync=False)
808
809         with self.lock:
810             if self._put_queue is not None:
811                 self._put_queue.join()
812
813                 err = []
814                 for k,v in items:
815                     if v.state() == _BufferBlock.ERROR:
816                         err.append((v.locator(), v.error))
817                 if err:
818                     raise KeepWriteError("Error writing some blocks", err, label="block")
819
820         for k,v in items:
821             # flush again with sync=True to remove committed bufferblocks from
822             # the segments.
823             if v.owner:
824                 if isinstance(v.owner, ArvadosFile):
825                     v.owner.flush(sync=True)
826                 elif isinstance(v.owner, list) and len(v.owner) > 0:
827                     # This bufferblock is referenced by many files as a result
828                     # of repacking small blocks, so don't delete it when flushing
829                     # its owners, just do it after flushing them all.
830                     for owner in v.owner:
831                         owner.flush(sync=True)
832                     self.delete_bufferblock(k)
833
834     def block_prefetch(self, locator):
835         """Initiate a background download of a block.
836
837         This assumes that the underlying KeepClient implements a block cache,
838         so repeated requests for the same block will not result in repeated
839         downloads (unless the block is evicted from the cache.)  This method
840         does not block.
841
842         """
843
844         if not self.prefetch_enabled:
845             return
846
847         if self._keep.get_from_cache(locator) is not None:
848             return
849
850         with self.lock:
851             if locator in self._bufferblocks:
852                 return
853
854         self.start_get_threads()
855         self._prefetch_queue.put(locator)
856
857
858 class ArvadosFile(object):
859     """Represent a file in a Collection.
860
861     ArvadosFile manages the underlying representation of a file in Keep as a
862     sequence of segments spanning a set of blocks, and implements random
863     read/write access.
864
865     This object may be accessed from multiple threads.
866
867     """
868
869     __slots__ = ('parent', 'name', '_writers', '_committed',
870                  '_segments', 'lock', '_current_bblock', 'fuse_entry')
871
872     def __init__(self, parent, name, stream=[], segments=[]):
873         """
874         ArvadosFile constructor.
875
876         :stream:
877           a list of Range objects representing a block stream
878
879         :segments:
880           a list of Range objects representing segments
881         """
882         self.parent = parent
883         self.name = name
884         self._writers = set()
885         self._committed = False
886         self._segments = []
887         self.lock = parent.root_collection().lock
888         for s in segments:
889             self._add_segment(stream, s.locator, s.range_size)
890         self._current_bblock = None
891
892     def writable(self):
893         return self.parent.writable()
894
895     @synchronized
896     def permission_expired(self, as_of_dt=None):
897         """Returns True if any of the segment's locators is expired"""
898         for r in self._segments:
899             if KeepLocator(r.locator).permission_expired(as_of_dt):
900                 return True
901         return False
902
903     @synchronized
904     def has_remote_blocks(self):
905         """Returns True if any of the segment's locators has a +R signature"""
906
907         for s in self._segments:
908             if '+R' in s.locator:
909                 return True
910         return False
911
912     @synchronized
913     def _copy_remote_blocks(self, remote_blocks={}):
914         """Ask Keep to copy remote blocks and point to their local copies.
915
916         This is called from the parent Collection.
917
918         :remote_blocks:
919             Shared cache of remote to local block mappings. This is used to avoid
920             doing extra work when blocks are shared by more than one file in
921             different subdirectories.
922         """
923
924         for s in self._segments:
925             if '+R' in s.locator:
926                 try:
927                     loc = remote_blocks[s.locator]
928                 except KeyError:
929                     loc = self.parent._my_keep().refresh_signature(s.locator)
930                     remote_blocks[s.locator] = loc
931                 s.locator = loc
932                 self.parent.set_committed(False)
933         return remote_blocks
934
935     @synchronized
936     def segments(self):
937         return copy.copy(self._segments)
938
939     @synchronized
940     def clone(self, new_parent, new_name):
941         """Make a copy of this file."""
942         cp = ArvadosFile(new_parent, new_name)
943         cp.replace_contents(self)
944         return cp
945
946     @must_be_writable
947     @synchronized
948     def replace_contents(self, other):
949         """Replace segments of this file with segments from another `ArvadosFile` object."""
950
951         map_loc = {}
952         self._segments = []
953         for other_segment in other.segments():
954             new_loc = other_segment.locator
955             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
956                 if other_segment.locator not in map_loc:
957                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
958                     if bufferblock.state() != _BufferBlock.WRITABLE:
959                         map_loc[other_segment.locator] = bufferblock.locator()
960                     else:
961                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
962                 new_loc = map_loc[other_segment.locator]
963
964             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
965
966         self.set_committed(False)
967
968     def __eq__(self, other):
969         if other is self:
970             return True
971         if not isinstance(other, ArvadosFile):
972             return False
973
974         othersegs = other.segments()
975         with self.lock:
976             if len(self._segments) != len(othersegs):
977                 return False
978             for i in range(0, len(othersegs)):
979                 seg1 = self._segments[i]
980                 seg2 = othersegs[i]
981                 loc1 = seg1.locator
982                 loc2 = seg2.locator
983
984                 if self.parent._my_block_manager().is_bufferblock(loc1):
985                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
986
987                 if other.parent._my_block_manager().is_bufferblock(loc2):
988                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
989
990                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
991                     seg1.range_start != seg2.range_start or
992                     seg1.range_size != seg2.range_size or
993                     seg1.segment_offset != seg2.segment_offset):
994                     return False
995
996         return True
997
998     def __ne__(self, other):
999         return not self.__eq__(other)
1000
1001     @synchronized
1002     def set_segments(self, segs):
1003         self._segments = segs
1004
1005     @synchronized
1006     def set_committed(self, value=True):
1007         """Set committed flag.
1008
1009         If value is True, set committed to be True.
1010
1011         If value is False, set committed to be False for this and all parents.
1012         """
1013         if value == self._committed:
1014             return
1015         self._committed = value
1016         if self._committed is False and self.parent is not None:
1017             self.parent.set_committed(False)
1018
1019     @synchronized
1020     def committed(self):
1021         """Get whether this is committed or not."""
1022         return self._committed
1023
1024     @synchronized
1025     def add_writer(self, writer):
1026         """Add an ArvadosFileWriter reference to the list of writers"""
1027         if isinstance(writer, ArvadosFileWriter):
1028             self._writers.add(writer)
1029
1030     @synchronized
1031     def remove_writer(self, writer, flush):
1032         """
1033         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
1034         and do some block maintenance tasks.
1035         """
1036         self._writers.remove(writer)
1037
1038         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1039             # File writer closed, not small enough for repacking
1040             self.flush()
1041         elif self.closed():
1042             # All writers closed and size is adequate for repacking
1043             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1044
1045     def closed(self):
1046         """
1047         Get whether this is closed or not. When the writers list is empty, the file
1048         is supposed to be closed.
1049         """
1050         return len(self._writers) == 0
1051
1052     @must_be_writable
1053     @synchronized
1054     def truncate(self, size):
1055         """Shrink or expand the size of the file.
1056
1057         If `size` is less than the size of the file, the file contents after
1058         `size` will be discarded.  If `size` is greater than the current size
1059         of the file, it will be filled with zero bytes.
1060
1061         """
1062         if size < self.size():
1063             new_segs = []
1064             for r in self._segments:
1065                 range_end = r.range_start+r.range_size
1066                 if r.range_start >= size:
1067                     # segment is past the trucate size, all done
1068                     break
1069                 elif size < range_end:
1070                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1071                     nr.segment_offset = r.segment_offset
1072                     new_segs.append(nr)
1073                     break
1074                 else:
1075                     new_segs.append(r)
1076
1077             self._segments = new_segs
1078             self.set_committed(False)
1079         elif size > self.size():
1080             padding = self.parent._my_block_manager().get_padding_block()
1081             diff = size - self.size()
1082             while diff > config.KEEP_BLOCK_SIZE:
1083                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1084                 diff -= config.KEEP_BLOCK_SIZE
1085             if diff > 0:
1086                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1087             self.set_committed(False)
1088         else:
1089             # size == self.size()
1090             pass
1091
1092     def readfrom(self, offset, size, num_retries, exact=False):
1093         """Read up to `size` bytes from the file starting at `offset`.
1094
1095         :exact:
1096          If False (default), return less data than requested if the read
1097          crosses a block boundary and the next block isn't cached.  If True,
1098          only return less data than requested when hitting EOF.
1099         """
1100
1101         with self.lock:
1102             if size == 0 or offset >= self.size():
1103                 return b''
1104             readsegs = locators_and_ranges(self._segments, offset, size)
1105             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1106
1107         locs = set()
1108         data = []
1109         for lr in readsegs:
1110             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1111             if block:
1112                 blockview = memoryview(block)
1113                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1114                 locs.add(lr.locator)
1115             else:
1116                 break
1117
1118         for lr in prefetch:
1119             if lr.locator not in locs:
1120                 self.parent._my_block_manager().block_prefetch(lr.locator)
1121                 locs.add(lr.locator)
1122
1123         return b''.join(data)
1124
1125     @must_be_writable
1126     @synchronized
1127     def writeto(self, offset, data, num_retries):
1128         """Write `data` to the file starting at `offset`.
1129
1130         This will update existing bytes and/or extend the size of the file as
1131         necessary.
1132
1133         """
1134         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1135             data = data.encode()
1136         if len(data) == 0:
1137             return
1138
1139         if offset > self.size():
1140             self.truncate(offset)
1141
1142         if len(data) > config.KEEP_BLOCK_SIZE:
1143             # Chunk it up into smaller writes
1144             n = 0
1145             dataview = memoryview(data)
1146             while n < len(data):
1147                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1148                 n += config.KEEP_BLOCK_SIZE
1149             return
1150
1151         self.set_committed(False)
1152
1153         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1154             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1155
1156         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1157             self._current_bblock.repack_writes()
1158             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1159                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1160                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1161
1162         self._current_bblock.append(data)
1163
1164         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1165
1166         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1167
1168         return len(data)
1169
1170     @synchronized
1171     def flush(self, sync=True, num_retries=0):
1172         """Flush the current bufferblock to Keep.
1173
1174         :sync:
1175           If True, commit block synchronously, wait until buffer block has been written.
1176           If False, commit block asynchronously, return immediately after putting block into
1177           the keep put queue.
1178         """
1179         if self.committed():
1180             return
1181
1182         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1183             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1184                 self._current_bblock.repack_writes()
1185             if self._current_bblock.state() != _BufferBlock.DELETED:
1186                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1187
1188         if sync:
1189             to_delete = set()
1190             for s in self._segments:
1191                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1192                 if bb:
1193                     if bb.state() != _BufferBlock.COMMITTED:
1194                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1195                     to_delete.add(s.locator)
1196                     s.locator = bb.locator()
1197             for s in to_delete:
1198                 # Don't delete the bufferblock if it's owned by many files. It'll be
1199                 # deleted after all of its owners are flush()ed.
1200                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1201                     self.parent._my_block_manager().delete_bufferblock(s)
1202
1203         self.parent.notify(MOD, self.parent, self.name, (self, self))
1204
1205     @must_be_writable
1206     @synchronized
1207     def add_segment(self, blocks, pos, size):
1208         """Add a segment to the end of the file.
1209
1210         `pos` and `offset` reference a section of the stream described by
1211         `blocks` (a list of Range objects)
1212
1213         """
1214         self._add_segment(blocks, pos, size)
1215
1216     def _add_segment(self, blocks, pos, size):
1217         """Internal implementation of add_segment."""
1218         self.set_committed(False)
1219         for lr in locators_and_ranges(blocks, pos, size):
1220             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1221             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1222             self._segments.append(r)
1223
1224     @synchronized
1225     def size(self):
1226         """Get the file size."""
1227         if self._segments:
1228             n = self._segments[-1]
1229             return n.range_start + n.range_size
1230         else:
1231             return 0
1232
1233     @synchronized
1234     def manifest_text(self, stream_name=".", portable_locators=False,
1235                       normalize=False, only_committed=False):
1236         buf = ""
1237         filestream = []
1238         for segment in self._segments:
1239             loc = segment.locator
1240             if self.parent._my_block_manager().is_bufferblock(loc):
1241                 if only_committed:
1242                     continue
1243                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1244             if portable_locators:
1245                 loc = KeepLocator(loc).stripped()
1246             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1247                                  segment.segment_offset, segment.range_size))
1248         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1249         buf += "\n"
1250         return buf
1251
1252     @must_be_writable
1253     @synchronized
1254     def _reparent(self, newparent, newname):
1255         self.set_committed(False)
1256         self.flush(sync=True)
1257         self.parent.remove(self.name)
1258         self.parent = newparent
1259         self.name = newname
1260         self.lock = self.parent.root_collection().lock
1261
1262
1263 class ArvadosFileReader(ArvadosFileReaderBase):
1264     """Wraps ArvadosFile in a file-like object supporting reading only.
1265
1266     Be aware that this class is NOT thread safe as there is no locking around
1267     updating file pointer.
1268
1269     """
1270
1271     def __init__(self, arvadosfile, mode="r", num_retries=None):
1272         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1273         self.arvadosfile = arvadosfile
1274
1275     def size(self):
1276         return self.arvadosfile.size()
1277
1278     def stream_name(self):
1279         return self.arvadosfile.parent.stream_name()
1280
1281     @_FileLikeObjectBase._before_close
1282     @retry_method
1283     def read(self, size=None, num_retries=None):
1284         """Read up to `size` bytes from the file and return the result.
1285
1286         Starts at the current file position.  If `size` is None, read the
1287         entire remainder of the file.
1288         """
1289         if size is None:
1290             data = []
1291             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1292             while rd:
1293                 data.append(rd)
1294                 self._filepos += len(rd)
1295                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1296             return b''.join(data)
1297         else:
1298             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1299             self._filepos += len(data)
1300             return data
1301
1302     @_FileLikeObjectBase._before_close
1303     @retry_method
1304     def readfrom(self, offset, size, num_retries=None):
1305         """Read up to `size` bytes from the stream, starting at the specified file offset.
1306
1307         This method does not change the file position.
1308         """
1309         return self.arvadosfile.readfrom(offset, size, num_retries)
1310
1311     def flush(self):
1312         pass
1313
1314
1315 class ArvadosFileWriter(ArvadosFileReader):
1316     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1317
1318     Be aware that this class is NOT thread safe as there is no locking around
1319     updating file pointer.
1320
1321     """
1322
1323     def __init__(self, arvadosfile, mode, num_retries=None):
1324         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1325         self.arvadosfile.add_writer(self)
1326
1327     def writable(self):
1328         return True
1329
1330     @_FileLikeObjectBase._before_close
1331     @retry_method
1332     def write(self, data, num_retries=None):
1333         if self.mode[0] == "a":
1334             self._filepos = self.size()
1335         self.arvadosfile.writeto(self._filepos, data, num_retries)
1336         self._filepos += len(data)
1337         return len(data)
1338
1339     @_FileLikeObjectBase._before_close
1340     @retry_method
1341     def writelines(self, seq, num_retries=None):
1342         for s in seq:
1343             self.write(s, num_retries=num_retries)
1344
1345     @_FileLikeObjectBase._before_close
1346     def truncate(self, size=None):
1347         if size is None:
1348             size = self._filepos
1349         self.arvadosfile.truncate(size)
1350
1351     @_FileLikeObjectBase._before_close
1352     def flush(self):
1353         self.arvadosfile.flush()
1354
1355     def close(self, flush=True):
1356         if not self.closed:
1357             self.arvadosfile.remove_writer(self, flush)
1358             super(ArvadosFileWriter, self).close()