14075: Fix cwl unit tests
[arvados.git] / sdk / python / arvados / arvfile.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
12 import bz2
13 import collections
14 import copy
15 import errno
16 import functools
17 import hashlib
18 import logging
19 import os
20 import queue
21 import re
22 import sys
23 import threading
24 import uuid
25 import zlib
26
27 from . import config
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
33
34 MOD = "mod"
35 WRITE = "write"
36
37 _logger = logging.getLogger('arvados.arvfile')
38
39 def split(path):
40     """split(path) -> streamname, filename
41
42     Separate the stream name and file name in a /-separated stream path and
43     return a tuple (stream_name, file_name).  If no stream name is available,
44     assume '.'.
45
46     """
47     try:
48         stream_name, file_name = path.rsplit('/', 1)
49     except ValueError:  # No / in string
50         stream_name, file_name = '.', path
51     return stream_name, file_name
52
53
54 class UnownedBlockError(Exception):
55     """Raised when there's an writable block without an owner on the BlockManager."""
56     pass
57
58
59 class _FileLikeObjectBase(object):
60     def __init__(self, name, mode):
61         self.name = name
62         self.mode = mode
63         self.closed = False
64
65     @staticmethod
66     def _before_close(orig_func):
67         @functools.wraps(orig_func)
68         def before_close_wrapper(self, *args, **kwargs):
69             if self.closed:
70                 raise ValueError("I/O operation on closed stream file")
71             return orig_func(self, *args, **kwargs)
72         return before_close_wrapper
73
74     def __enter__(self):
75         return self
76
77     def __exit__(self, exc_type, exc_value, traceback):
78         try:
79             self.close()
80         except Exception:
81             if exc_type is None:
82                 raise
83
84     def close(self):
85         self.closed = True
86
87
88 class ArvadosFileReaderBase(_FileLikeObjectBase):
89     def __init__(self, name, mode, num_retries=None):
90         super(ArvadosFileReaderBase, self).__init__(name, mode)
91         self._binary = 'b' in mode
92         if sys.version_info >= (3, 0) and not self._binary:
93             raise NotImplementedError("text mode {!r} is not implemented".format(mode))
94         self._filepos = 0
95         self.num_retries = num_retries
96         self._readline_cache = (None, None)
97
98     def __iter__(self):
99         while True:
100             data = self.readline()
101             if not data:
102                 break
103             yield data
104
105     def decompressed_name(self):
106         return re.sub('\.(bz2|gz)$', '', self.name)
107
108     @_FileLikeObjectBase._before_close
109     def seek(self, pos, whence=os.SEEK_SET):
110         if whence == os.SEEK_CUR:
111             pos += self._filepos
112         elif whence == os.SEEK_END:
113             pos += self.size()
114         if pos < 0:
115             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
116         self._filepos = pos
117         return self._filepos
118
119     def tell(self):
120         return self._filepos
121
122     def readable(self):
123         return True
124
125     def writable(self):
126         return False
127
128     def seekable(self):
129         return True
130
131     @_FileLikeObjectBase._before_close
132     @retry_method
133     def readall(self, size=2**20, num_retries=None):
134         while True:
135             data = self.read(size, num_retries=num_retries)
136             if len(data) == 0:
137                 break
138             yield data
139
140     @_FileLikeObjectBase._before_close
141     @retry_method
142     def readline(self, size=float('inf'), num_retries=None):
143         cache_pos, cache_data = self._readline_cache
144         if self.tell() == cache_pos:
145             data = [cache_data]
146             self._filepos += len(cache_data)
147         else:
148             data = [b'']
149         data_size = len(data[-1])
150         while (data_size < size) and (b'\n' not in data[-1]):
151             next_read = self.read(2 ** 20, num_retries=num_retries)
152             if not next_read:
153                 break
154             data.append(next_read)
155             data_size += len(next_read)
156         data = b''.join(data)
157         try:
158             nextline_index = data.index(b'\n') + 1
159         except ValueError:
160             nextline_index = len(data)
161         nextline_index = min(nextline_index, size)
162         self._filepos -= len(data) - nextline_index
163         self._readline_cache = (self.tell(), data[nextline_index:])
164         return data[:nextline_index].decode()
165
166     @_FileLikeObjectBase._before_close
167     @retry_method
168     def decompress(self, decompress, size, num_retries=None):
169         for segment in self.readall(size, num_retries=num_retries):
170             data = decompress(segment)
171             if data:
172                 yield data
173
174     @_FileLikeObjectBase._before_close
175     @retry_method
176     def readall_decompressed(self, size=2**20, num_retries=None):
177         self.seek(0)
178         if self.name.endswith('.bz2'):
179             dc = bz2.BZ2Decompressor()
180             return self.decompress(dc.decompress, size,
181                                    num_retries=num_retries)
182         elif self.name.endswith('.gz'):
183             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
184             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
185                                    size, num_retries=num_retries)
186         else:
187             return self.readall(size, num_retries=num_retries)
188
189     @_FileLikeObjectBase._before_close
190     @retry_method
191     def readlines(self, sizehint=float('inf'), num_retries=None):
192         data = []
193         data_size = 0
194         for s in self.readall(num_retries=num_retries):
195             data.append(s)
196             data_size += len(s)
197             if data_size >= sizehint:
198                 break
199         return b''.join(data).decode().splitlines(True)
200
201     def size(self):
202         raise IOError(errno.ENOSYS, "Not implemented")
203
204     def read(self, size, num_retries=None):
205         raise IOError(errno.ENOSYS, "Not implemented")
206
207     def readfrom(self, start, size, num_retries=None):
208         raise IOError(errno.ENOSYS, "Not implemented")
209
210
211 class StreamFileReader(ArvadosFileReaderBase):
212     class _NameAttribute(str):
213         # The Python file API provides a plain .name attribute.
214         # Older SDK provided a name() method.
215         # This class provides both, for maximum compatibility.
216         def __call__(self):
217             return self
218
219     def __init__(self, stream, segments, name):
220         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
221         self._stream = stream
222         self.segments = segments
223
224     def stream_name(self):
225         return self._stream.name()
226
227     def size(self):
228         n = self.segments[-1]
229         return n.range_start + n.range_size
230
231     @_FileLikeObjectBase._before_close
232     @retry_method
233     def read(self, size, num_retries=None):
234         """Read up to 'size' bytes from the stream, starting at the current file position"""
235         if size == 0:
236             return b''
237
238         data = b''
239         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
240         if available_chunks:
241             lr = available_chunks[0]
242             data = self._stream.readfrom(lr.locator+lr.segment_offset,
243                                          lr.segment_size,
244                                          num_retries=num_retries)
245
246         self._filepos += len(data)
247         return data
248
249     @_FileLikeObjectBase._before_close
250     @retry_method
251     def readfrom(self, start, size, num_retries=None):
252         """Read up to 'size' bytes from the stream, starting at 'start'"""
253         if size == 0:
254             return b''
255
256         data = []
257         for lr in locators_and_ranges(self.segments, start, size):
258             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
259                                               num_retries=num_retries))
260         return b''.join(data)
261
262     def as_manifest(self):
263         segs = []
264         for r in self.segments:
265             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
266         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
267
268
269 def synchronized(orig_func):
270     @functools.wraps(orig_func)
271     def synchronized_wrapper(self, *args, **kwargs):
272         with self.lock:
273             return orig_func(self, *args, **kwargs)
274     return synchronized_wrapper
275
276
277 class StateChangeError(Exception):
278     def __init__(self, message, state, nextstate):
279         super(StateChangeError, self).__init__(message)
280         self.state = state
281         self.nextstate = nextstate
282
283 class _BufferBlock(object):
284     """A stand-in for a Keep block that is in the process of being written.
285
286     Writers can append to it, get the size, and compute the Keep locator.
287     There are three valid states:
288
289     WRITABLE
290       Can append to block.
291
292     PENDING
293       Block is in the process of being uploaded to Keep, append is an error.
294
295     COMMITTED
296       The block has been written to Keep, its internal buffer has been
297       released, fetching the block will fetch it via keep client (since we
298       discarded the internal copy), and identifiers referring to the BufferBlock
299       can be replaced with the block locator.
300
301     """
302
303     WRITABLE = 0
304     PENDING = 1
305     COMMITTED = 2
306     ERROR = 3
307     DELETED = 4
308
309     def __init__(self, blockid, starting_capacity, owner):
310         """
311         :blockid:
312           the identifier for this block
313
314         :starting_capacity:
315           the initial buffer capacity
316
317         :owner:
318           ArvadosFile that owns this block
319
320         """
321         self.blockid = blockid
322         self.buffer_block = bytearray(starting_capacity)
323         self.buffer_view = memoryview(self.buffer_block)
324         self.write_pointer = 0
325         self._state = _BufferBlock.WRITABLE
326         self._locator = None
327         self.owner = owner
328         self.lock = threading.Lock()
329         self.wait_for_commit = threading.Event()
330         self.error = None
331
332     @synchronized
333     def append(self, data):
334         """Append some data to the buffer.
335
336         Only valid if the block is in WRITABLE state.  Implements an expanding
337         buffer, doubling capacity as needed to accomdate all the data.
338
339         """
340         if self._state == _BufferBlock.WRITABLE:
341             if not isinstance(data, bytes) and not isinstance(data, memoryview):
342                 data = data.encode()
343             while (self.write_pointer+len(data)) > len(self.buffer_block):
344                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
345                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
346                 self.buffer_block = new_buffer_block
347                 self.buffer_view = memoryview(self.buffer_block)
348             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
349             self.write_pointer += len(data)
350             self._locator = None
351         else:
352             raise AssertionError("Buffer block is not writable")
353
354     STATE_TRANSITIONS = frozenset([
355             (WRITABLE, PENDING),
356             (PENDING, COMMITTED),
357             (PENDING, ERROR),
358             (ERROR, PENDING)])
359
360     @synchronized
361     def set_state(self, nextstate, val=None):
362         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
363             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
364         self._state = nextstate
365
366         if self._state == _BufferBlock.PENDING:
367             self.wait_for_commit.clear()
368
369         if self._state == _BufferBlock.COMMITTED:
370             self._locator = val
371             self.buffer_view = None
372             self.buffer_block = None
373             self.wait_for_commit.set()
374
375         if self._state == _BufferBlock.ERROR:
376             self.error = val
377             self.wait_for_commit.set()
378
379     @synchronized
380     def state(self):
381         return self._state
382
383     def size(self):
384         """The amount of data written to the buffer."""
385         return self.write_pointer
386
387     @synchronized
388     def locator(self):
389         """The Keep locator for this buffer's contents."""
390         if self._locator is None:
391             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
392         return self._locator
393
394     @synchronized
395     def clone(self, new_blockid, owner):
396         if self._state == _BufferBlock.COMMITTED:
397             raise AssertionError("Cannot duplicate committed buffer block")
398         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
399         bufferblock.append(self.buffer_view[0:self.size()])
400         return bufferblock
401
402     @synchronized
403     def clear(self):
404         self._state = _BufferBlock.DELETED
405         self.owner = None
406         self.buffer_block = None
407         self.buffer_view = None
408
409     @synchronized
410     def repack_writes(self):
411         """Optimize buffer block by repacking segments in file sequence.
412
413         When the client makes random writes, they appear in the buffer block in
414         the sequence they were written rather than the sequence they appear in
415         the file.  This makes for inefficient, fragmented manifests.  Attempt
416         to optimize by repacking writes in file sequence.
417
418         """
419         if self._state != _BufferBlock.WRITABLE:
420             raise AssertionError("Cannot repack non-writable block")
421
422         segs = self.owner.segments()
423
424         # Collect the segments that reference the buffer block.
425         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
426
427         # Collect total data referenced by segments (could be smaller than
428         # bufferblock size if a portion of the file was written and
429         # then overwritten).
430         write_total = sum([s.range_size for s in bufferblock_segs])
431
432         if write_total < self.size() or len(bufferblock_segs) > 1:
433             # If there's more than one segment referencing this block, it is
434             # due to out-of-order writes and will produce a fragmented
435             # manifest, so try to optimize by re-packing into a new buffer.
436             contents = self.buffer_view[0:self.write_pointer].tobytes()
437             new_bb = _BufferBlock(None, write_total, None)
438             for t in bufferblock_segs:
439                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
440                 t.segment_offset = new_bb.size() - t.range_size
441
442             self.buffer_block = new_bb.buffer_block
443             self.buffer_view = new_bb.buffer_view
444             self.write_pointer = new_bb.write_pointer
445             self._locator = None
446             new_bb.clear()
447             self.owner.set_segments(segs)
448
449     def __repr__(self):
450         return "<BufferBlock %s>" % (self.blockid)
451
452
453 class NoopLock(object):
454     def __enter__(self):
455         return self
456
457     def __exit__(self, exc_type, exc_value, traceback):
458         pass
459
460     def acquire(self, blocking=False):
461         pass
462
463     def release(self):
464         pass
465
466
467 def must_be_writable(orig_func):
468     @functools.wraps(orig_func)
469     def must_be_writable_wrapper(self, *args, **kwargs):
470         if not self.writable():
471             raise IOError(errno.EROFS, "Collection is read-only.")
472         return orig_func(self, *args, **kwargs)
473     return must_be_writable_wrapper
474
475
476 class _BlockManager(object):
477     """BlockManager handles buffer blocks.
478
479     Also handles background block uploads, and background block prefetch for a
480     Collection of ArvadosFiles.
481
482     """
483
484     DEFAULT_PUT_THREADS = 2
485     DEFAULT_GET_THREADS = 2
486
487     def __init__(self, keep, copies=None, put_threads=None):
488         """keep: KeepClient object to use"""
489         self._keep = keep
490         self._bufferblocks = collections.OrderedDict()
491         self._put_queue = None
492         self._put_threads = None
493         self._prefetch_queue = None
494         self._prefetch_threads = None
495         self.lock = threading.Lock()
496         self.prefetch_enabled = True
497         if put_threads:
498             self.num_put_threads = put_threads
499         else:
500             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
501         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
502         self.copies = copies
503         self._pending_write_size = 0
504         self.threads_lock = threading.Lock()
505         self.padding_block = None
506
507     @synchronized
508     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
509         """Allocate a new, empty bufferblock in WRITABLE state and return it.
510
511         :blockid:
512           optional block identifier, otherwise one will be automatically assigned
513
514         :starting_capacity:
515           optional capacity, otherwise will use default capacity
516
517         :owner:
518           ArvadosFile that owns this block
519
520         """
521         return self._alloc_bufferblock(blockid, starting_capacity, owner)
522
523     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
524         if blockid is None:
525             blockid = str(uuid.uuid4())
526         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
527         self._bufferblocks[bufferblock.blockid] = bufferblock
528         return bufferblock
529
530     @synchronized
531     def dup_block(self, block, owner):
532         """Create a new bufferblock initialized with the content of an existing bufferblock.
533
534         :block:
535           the buffer block to copy.
536
537         :owner:
538           ArvadosFile that owns the new block
539
540         """
541         new_blockid = str(uuid.uuid4())
542         bufferblock = block.clone(new_blockid, owner)
543         self._bufferblocks[bufferblock.blockid] = bufferblock
544         return bufferblock
545
546     @synchronized
547     def is_bufferblock(self, locator):
548         return locator in self._bufferblocks
549
550     def _commit_bufferblock_worker(self):
551         """Background uploader thread."""
552
553         while True:
554             try:
555                 bufferblock = self._put_queue.get()
556                 if bufferblock is None:
557                     return
558
559                 if self.copies is None:
560                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
561                 else:
562                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
563                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
564             except Exception as e:
565                 bufferblock.set_state(_BufferBlock.ERROR, e)
566             finally:
567                 if self._put_queue is not None:
568                     self._put_queue.task_done()
569
570     def start_put_threads(self):
571         with self.threads_lock:
572             if self._put_threads is None:
573                 # Start uploader threads.
574
575                 # If we don't limit the Queue size, the upload queue can quickly
576                 # grow to take up gigabytes of RAM if the writing process is
577                 # generating data more quickly than it can be send to the Keep
578                 # servers.
579                 #
580                 # With two upload threads and a queue size of 2, this means up to 4
581                 # blocks pending.  If they are full 64 MiB blocks, that means up to
582                 # 256 MiB of internal buffering, which is the same size as the
583                 # default download block cache in KeepClient.
584                 self._put_queue = queue.Queue(maxsize=2)
585
586                 self._put_threads = []
587                 for i in range(0, self.num_put_threads):
588                     thread = threading.Thread(target=self._commit_bufferblock_worker)
589                     self._put_threads.append(thread)
590                     thread.daemon = True
591                     thread.start()
592
593     def _block_prefetch_worker(self):
594         """The background downloader thread."""
595         while True:
596             try:
597                 b = self._prefetch_queue.get()
598                 if b is None:
599                     return
600                 self._keep.get(b)
601             except Exception:
602                 _logger.exception("Exception doing block prefetch")
603
604     @synchronized
605     def start_get_threads(self):
606         if self._prefetch_threads is None:
607             self._prefetch_queue = queue.Queue()
608             self._prefetch_threads = []
609             for i in range(0, self.num_get_threads):
610                 thread = threading.Thread(target=self._block_prefetch_worker)
611                 self._prefetch_threads.append(thread)
612                 thread.daemon = True
613                 thread.start()
614
615
616     @synchronized
617     def stop_threads(self):
618         """Shut down and wait for background upload and download threads to finish."""
619
620         if self._put_threads is not None:
621             for t in self._put_threads:
622                 self._put_queue.put(None)
623             for t in self._put_threads:
624                 t.join()
625         self._put_threads = None
626         self._put_queue = None
627
628         if self._prefetch_threads is not None:
629             for t in self._prefetch_threads:
630                 self._prefetch_queue.put(None)
631             for t in self._prefetch_threads:
632                 t.join()
633         self._prefetch_threads = None
634         self._prefetch_queue = None
635
636     def __enter__(self):
637         return self
638
639     def __exit__(self, exc_type, exc_value, traceback):
640         self.stop_threads()
641
642     @synchronized
643     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
644         """Packs small blocks together before uploading"""
645
646         self._pending_write_size += closed_file_size
647
648         # Check if there are enough small blocks for filling up one in full
649         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
650             return
651
652         # Search blocks ready for getting packed together before being
653         # committed to Keep.
654         # A WRITABLE block always has an owner.
655         # A WRITABLE block with its owner.closed() implies that its
656         # size is <= KEEP_BLOCK_SIZE/2.
657         try:
658             small_blocks = [b for b in listvalues(self._bufferblocks)
659                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
660         except AttributeError:
661             # Writable blocks without owner shouldn't exist.
662             raise UnownedBlockError()
663
664         if len(small_blocks) <= 1:
665             # Not enough small blocks for repacking
666             return
667
668         for bb in small_blocks:
669             bb.repack_writes()
670
671         # Update the pending write size count with its true value, just in case
672         # some small file was opened, written and closed several times.
673         self._pending_write_size = sum([b.size() for b in small_blocks])
674
675         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
676             return
677
678         new_bb = self._alloc_bufferblock()
679         new_bb.owner = []
680         files = []
681         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
682             bb = small_blocks.pop(0)
683             new_bb.owner.append(bb.owner)
684             self._pending_write_size -= bb.size()
685             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
686             files.append((bb, new_bb.write_pointer - bb.size()))
687
688         self.commit_bufferblock(new_bb, sync=sync)
689
690         for bb, new_bb_segment_offset in files:
691             newsegs = bb.owner.segments()
692             for s in newsegs:
693                 if s.locator == bb.blockid:
694                     s.locator = new_bb.blockid
695                     s.segment_offset = new_bb_segment_offset+s.segment_offset
696             bb.owner.set_segments(newsegs)
697             self._delete_bufferblock(bb.blockid)
698
699     def commit_bufferblock(self, block, sync):
700         """Initiate a background upload of a bufferblock.
701
702         :block:
703           The block object to upload
704
705         :sync:
706           If `sync` is True, upload the block synchronously.
707           If `sync` is False, upload the block asynchronously.  This will
708           return immediately unless the upload queue is at capacity, in
709           which case it will wait on an upload queue slot.
710
711         """
712         try:
713             # Mark the block as PENDING so to disallow any more appends.
714             block.set_state(_BufferBlock.PENDING)
715         except StateChangeError as e:
716             if e.state == _BufferBlock.PENDING:
717                 if sync:
718                     block.wait_for_commit.wait()
719                 else:
720                     return
721             if block.state() == _BufferBlock.COMMITTED:
722                 return
723             elif block.state() == _BufferBlock.ERROR:
724                 raise block.error
725             else:
726                 raise
727
728         if sync:
729             try:
730                 if self.copies is None:
731                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
732                 else:
733                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
734                 block.set_state(_BufferBlock.COMMITTED, loc)
735             except Exception as e:
736                 block.set_state(_BufferBlock.ERROR, e)
737                 raise
738         else:
739             self.start_put_threads()
740             self._put_queue.put(block)
741
742     @synchronized
743     def get_bufferblock(self, locator):
744         return self._bufferblocks.get(locator)
745
746     @synchronized
747     def get_padding_block(self):
748         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
749         when using truncate() to extend the size of a file.
750
751         For reference (and possible future optimization), the md5sum of the
752         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
753
754         """
755
756         if self.padding_block is None:
757             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
758             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
759             self.commit_bufferblock(self.padding_block, False)
760         return self.padding_block
761
762     @synchronized
763     def delete_bufferblock(self, locator):
764         self._delete_bufferblock(locator)
765
766     def _delete_bufferblock(self, locator):
767         bb = self._bufferblocks[locator]
768         bb.clear()
769         del self._bufferblocks[locator]
770
771     def get_block_contents(self, locator, num_retries, cache_only=False):
772         """Fetch a block.
773
774         First checks to see if the locator is a BufferBlock and return that, if
775         not, passes the request through to KeepClient.get().
776
777         """
778         with self.lock:
779             if locator in self._bufferblocks:
780                 bufferblock = self._bufferblocks[locator]
781                 if bufferblock.state() != _BufferBlock.COMMITTED:
782                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
783                 else:
784                     locator = bufferblock._locator
785         if cache_only:
786             return self._keep.get_from_cache(locator)
787         else:
788             return self._keep.get(locator, num_retries=num_retries)
789
790     def commit_all(self):
791         """Commit all outstanding buffer blocks.
792
793         This is a synchronous call, and will not return until all buffer blocks
794         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
795
796         """
797         self.repack_small_blocks(force=True, sync=True)
798
799         with self.lock:
800             items = listitems(self._bufferblocks)
801
802         for k,v in items:
803             if v.state() != _BufferBlock.COMMITTED and v.owner:
804                 # Ignore blocks with a list of owners, as if they're not in COMMITTED
805                 # state, they're already being committed asynchronously.
806                 if isinstance(v.owner, ArvadosFile):
807                     v.owner.flush(sync=False)
808
809         with self.lock:
810             if self._put_queue is not None:
811                 self._put_queue.join()
812
813                 err = []
814                 for k,v in items:
815                     if v.state() == _BufferBlock.ERROR:
816                         err.append((v.locator(), v.error))
817                 if err:
818                     raise KeepWriteError("Error writing some blocks", err, label="block")
819
820         for k,v in items:
821             # flush again with sync=True to remove committed bufferblocks from
822             # the segments.
823             if v.owner:
824                 if isinstance(v.owner, ArvadosFile):
825                     v.owner.flush(sync=True)
826                 elif isinstance(v.owner, list) and len(v.owner) > 0:
827                     # This bufferblock is referenced by many files as a result
828                     # of repacking small blocks, so don't delete it when flushing
829                     # its owners, just do it after flushing them all.
830                     for owner in v.owner:
831                         owner.flush(sync=True)
832                     self.delete_bufferblock(k)
833
834     def block_prefetch(self, locator):
835         """Initiate a background download of a block.
836
837         This assumes that the underlying KeepClient implements a block cache,
838         so repeated requests for the same block will not result in repeated
839         downloads (unless the block is evicted from the cache.)  This method
840         does not block.
841
842         """
843
844         if not self.prefetch_enabled:
845             return
846
847         if self._keep.get_from_cache(locator) is not None:
848             return
849
850         with self.lock:
851             if locator in self._bufferblocks:
852                 return
853
854         self.start_get_threads()
855         self._prefetch_queue.put(locator)
856
857
858 class ArvadosFile(object):
859     """Represent a file in a Collection.
860
861     ArvadosFile manages the underlying representation of a file in Keep as a
862     sequence of segments spanning a set of blocks, and implements random
863     read/write access.
864
865     This object may be accessed from multiple threads.
866
867     """
868
869     __slots__ = ('parent', 'name', '_writers', '_committed',
870                  '_segments', 'lock', '_current_bblock', 'fuse_entry')
871
872     def __init__(self, parent, name, stream=[], segments=[]):
873         """
874         ArvadosFile constructor.
875
876         :stream:
877           a list of Range objects representing a block stream
878
879         :segments:
880           a list of Range objects representing segments
881         """
882         self.parent = parent
883         self.name = name
884         self._writers = set()
885         self._committed = False
886         self._segments = []
887         self.lock = parent.root_collection().lock
888         for s in segments:
889             self._add_segment(stream, s.locator, s.range_size)
890         self._current_bblock = None
891
892     def writable(self):
893         return self.parent.writable()
894
895     @synchronized
896     def permission_expired(self, as_of_dt=None):
897         """Returns True if any of the segment's locators is expired"""
898         for r in self._segments:
899             if KeepLocator(r.locator).permission_expired(as_of_dt):
900                 return True
901         return False
902
903     @synchronized
904     def segments(self):
905         return copy.copy(self._segments)
906
907     @synchronized
908     def clone(self, new_parent, new_name):
909         """Make a copy of this file."""
910         cp = ArvadosFile(new_parent, new_name)
911         cp.replace_contents(self)
912         return cp
913
914     @must_be_writable
915     @synchronized
916     def replace_contents(self, other):
917         """Replace segments of this file with segments from another `ArvadosFile` object."""
918
919         map_loc = {}
920         self._segments = []
921         for other_segment in other.segments():
922             new_loc = other_segment.locator
923             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
924                 if other_segment.locator not in map_loc:
925                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
926                     if bufferblock.state() != _BufferBlock.WRITABLE:
927                         map_loc[other_segment.locator] = bufferblock.locator()
928                     else:
929                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
930                 new_loc = map_loc[other_segment.locator]
931
932             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
933
934         self.set_committed(False)
935
936     def __eq__(self, other):
937         if other is self:
938             return True
939         if not isinstance(other, ArvadosFile):
940             return False
941
942         othersegs = other.segments()
943         with self.lock:
944             if len(self._segments) != len(othersegs):
945                 return False
946             for i in range(0, len(othersegs)):
947                 seg1 = self._segments[i]
948                 seg2 = othersegs[i]
949                 loc1 = seg1.locator
950                 loc2 = seg2.locator
951
952                 if self.parent._my_block_manager().is_bufferblock(loc1):
953                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
954
955                 if other.parent._my_block_manager().is_bufferblock(loc2):
956                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
957
958                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
959                     seg1.range_start != seg2.range_start or
960                     seg1.range_size != seg2.range_size or
961                     seg1.segment_offset != seg2.segment_offset):
962                     return False
963
964         return True
965
966     def __ne__(self, other):
967         return not self.__eq__(other)
968
969     @synchronized
970     def set_segments(self, segs):
971         self._segments = segs
972
973     @synchronized
974     def set_committed(self, value=True):
975         """Set committed flag.
976
977         If value is True, set committed to be True.
978
979         If value is False, set committed to be False for this and all parents.
980         """
981         if value == self._committed:
982             return
983         self._committed = value
984         if self._committed is False and self.parent is not None:
985             self.parent.set_committed(False)
986
987     @synchronized
988     def committed(self):
989         """Get whether this is committed or not."""
990         return self._committed
991
992     @synchronized
993     def add_writer(self, writer):
994         """Add an ArvadosFileWriter reference to the list of writers"""
995         if isinstance(writer, ArvadosFileWriter):
996             self._writers.add(writer)
997
998     @synchronized
999     def remove_writer(self, writer, flush):
1000         """
1001         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
1002         and do some block maintenance tasks.
1003         """
1004         self._writers.remove(writer)
1005
1006         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1007             # File writer closed, not small enough for repacking
1008             self.flush()
1009         elif self.closed():
1010             # All writers closed and size is adequate for repacking
1011             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1012
1013     def closed(self):
1014         """
1015         Get whether this is closed or not. When the writers list is empty, the file
1016         is supposed to be closed.
1017         """
1018         return len(self._writers) == 0
1019
1020     @must_be_writable
1021     @synchronized
1022     def truncate(self, size):
1023         """Shrink or expand the size of the file.
1024
1025         If `size` is less than the size of the file, the file contents after
1026         `size` will be discarded.  If `size` is greater than the current size
1027         of the file, it will be filled with zero bytes.
1028
1029         """
1030         if size < self.size():
1031             new_segs = []
1032             for r in self._segments:
1033                 range_end = r.range_start+r.range_size
1034                 if r.range_start >= size:
1035                     # segment is past the trucate size, all done
1036                     break
1037                 elif size < range_end:
1038                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1039                     nr.segment_offset = r.segment_offset
1040                     new_segs.append(nr)
1041                     break
1042                 else:
1043                     new_segs.append(r)
1044
1045             self._segments = new_segs
1046             self.set_committed(False)
1047         elif size > self.size():
1048             padding = self.parent._my_block_manager().get_padding_block()
1049             diff = size - self.size()
1050             while diff > config.KEEP_BLOCK_SIZE:
1051                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1052                 diff -= config.KEEP_BLOCK_SIZE
1053             if diff > 0:
1054                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1055             self.set_committed(False)
1056         else:
1057             # size == self.size()
1058             pass
1059
1060     def readfrom(self, offset, size, num_retries, exact=False):
1061         """Read up to `size` bytes from the file starting at `offset`.
1062
1063         :exact:
1064          If False (default), return less data than requested if the read
1065          crosses a block boundary and the next block isn't cached.  If True,
1066          only return less data than requested when hitting EOF.
1067         """
1068
1069         with self.lock:
1070             if size == 0 or offset >= self.size():
1071                 return b''
1072             readsegs = locators_and_ranges(self._segments, offset, size)
1073             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1074
1075         locs = set()
1076         data = []
1077         for lr in readsegs:
1078             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1079             if block:
1080                 blockview = memoryview(block)
1081                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1082                 locs.add(lr.locator)
1083             else:
1084                 break
1085
1086         for lr in prefetch:
1087             if lr.locator not in locs:
1088                 self.parent._my_block_manager().block_prefetch(lr.locator)
1089                 locs.add(lr.locator)
1090
1091         return b''.join(data)
1092
1093     @must_be_writable
1094     @synchronized
1095     def writeto(self, offset, data, num_retries):
1096         """Write `data` to the file starting at `offset`.
1097
1098         This will update existing bytes and/or extend the size of the file as
1099         necessary.
1100
1101         """
1102         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1103             data = data.encode()
1104         if len(data) == 0:
1105             return
1106
1107         if offset > self.size():
1108             self.truncate(offset)
1109
1110         if len(data) > config.KEEP_BLOCK_SIZE:
1111             # Chunk it up into smaller writes
1112             n = 0
1113             dataview = memoryview(data)
1114             while n < len(data):
1115                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1116                 n += config.KEEP_BLOCK_SIZE
1117             return
1118
1119         self.set_committed(False)
1120
1121         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1122             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1123
1124         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1125             self._current_bblock.repack_writes()
1126             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1127                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1128                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1129
1130         self._current_bblock.append(data)
1131
1132         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1133
1134         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1135
1136         return len(data)
1137
1138     @synchronized
1139     def flush(self, sync=True, num_retries=0):
1140         """Flush the current bufferblock to Keep.
1141
1142         :sync:
1143           If True, commit block synchronously, wait until buffer block has been written.
1144           If False, commit block asynchronously, return immediately after putting block into
1145           the keep put queue.
1146         """
1147         if self.committed():
1148             return
1149
1150         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1151             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1152                 self._current_bblock.repack_writes()
1153             if self._current_bblock.state() != _BufferBlock.DELETED:
1154                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1155
1156         if sync:
1157             to_delete = set()
1158             for s in self._segments:
1159                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1160                 if bb:
1161                     if bb.state() != _BufferBlock.COMMITTED:
1162                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1163                     to_delete.add(s.locator)
1164                     s.locator = bb.locator()
1165             for s in to_delete:
1166                 # Don't delete the bufferblock if it's owned by many files. It'll be
1167                 # deleted after all of its owners are flush()ed.
1168                 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1169                     self.parent._my_block_manager().delete_bufferblock(s)
1170
1171         self.parent.notify(MOD, self.parent, self.name, (self, self))
1172
1173     @must_be_writable
1174     @synchronized
1175     def add_segment(self, blocks, pos, size):
1176         """Add a segment to the end of the file.
1177
1178         `pos` and `offset` reference a section of the stream described by
1179         `blocks` (a list of Range objects)
1180
1181         """
1182         self._add_segment(blocks, pos, size)
1183
1184     def _add_segment(self, blocks, pos, size):
1185         """Internal implementation of add_segment."""
1186         self.set_committed(False)
1187         for lr in locators_and_ranges(blocks, pos, size):
1188             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1189             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1190             self._segments.append(r)
1191
1192     @synchronized
1193     def size(self):
1194         """Get the file size."""
1195         if self._segments:
1196             n = self._segments[-1]
1197             return n.range_start + n.range_size
1198         else:
1199             return 0
1200
1201     @synchronized
1202     def manifest_text(self, stream_name=".", portable_locators=False,
1203                       normalize=False, only_committed=False):
1204         buf = ""
1205         filestream = []
1206         for segment in self._segments:
1207             loc = segment.locator
1208             if self.parent._my_block_manager().is_bufferblock(loc):
1209                 if only_committed:
1210                     continue
1211                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1212             if portable_locators:
1213                 loc = KeepLocator(loc).stripped()
1214             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1215                                  segment.segment_offset, segment.range_size))
1216         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1217         buf += "\n"
1218         return buf
1219
1220     @must_be_writable
1221     @synchronized
1222     def _reparent(self, newparent, newname):
1223         self.set_committed(False)
1224         self.flush(sync=True)
1225         self.parent.remove(self.name)
1226         self.parent = newparent
1227         self.name = newname
1228         self.lock = self.parent.root_collection().lock
1229
1230
1231 class ArvadosFileReader(ArvadosFileReaderBase):
1232     """Wraps ArvadosFile in a file-like object supporting reading only.
1233
1234     Be aware that this class is NOT thread safe as there is no locking around
1235     updating file pointer.
1236
1237     """
1238
1239     def __init__(self, arvadosfile, mode="r", num_retries=None):
1240         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1241         self.arvadosfile = arvadosfile
1242
1243     def size(self):
1244         return self.arvadosfile.size()
1245
1246     def stream_name(self):
1247         return self.arvadosfile.parent.stream_name()
1248
1249     @_FileLikeObjectBase._before_close
1250     @retry_method
1251     def read(self, size=None, num_retries=None):
1252         """Read up to `size` bytes from the file and return the result.
1253
1254         Starts at the current file position.  If `size` is None, read the
1255         entire remainder of the file.
1256         """
1257         if size is None:
1258             data = []
1259             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1260             while rd:
1261                 data.append(rd)
1262                 self._filepos += len(rd)
1263                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1264             return b''.join(data)
1265         else:
1266             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1267             self._filepos += len(data)
1268             return data
1269
1270     @_FileLikeObjectBase._before_close
1271     @retry_method
1272     def readfrom(self, offset, size, num_retries=None):
1273         """Read up to `size` bytes from the stream, starting at the specified file offset.
1274
1275         This method does not change the file position.
1276         """
1277         return self.arvadosfile.readfrom(offset, size, num_retries)
1278
1279     def flush(self):
1280         pass
1281
1282
1283 class ArvadosFileWriter(ArvadosFileReader):
1284     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1285
1286     Be aware that this class is NOT thread safe as there is no locking around
1287     updating file pointer.
1288
1289     """
1290
1291     def __init__(self, arvadosfile, mode, num_retries=None):
1292         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1293         self.arvadosfile.add_writer(self)
1294
1295     def writable(self):
1296         return True
1297
1298     @_FileLikeObjectBase._before_close
1299     @retry_method
1300     def write(self, data, num_retries=None):
1301         if self.mode[0] == "a":
1302             self._filepos = self.size()
1303         self.arvadosfile.writeto(self._filepos, data, num_retries)
1304         self._filepos += len(data)
1305         return len(data)
1306
1307     @_FileLikeObjectBase._before_close
1308     @retry_method
1309     def writelines(self, seq, num_retries=None):
1310         for s in seq:
1311             self.write(s, num_retries=num_retries)
1312
1313     @_FileLikeObjectBase._before_close
1314     def truncate(self, size=None):
1315         if size is None:
1316             size = self._filepos
1317         self.arvadosfile.truncate(size)
1318
1319     @_FileLikeObjectBase._before_close
1320     def flush(self):
1321         self.arvadosfile.flush()
1322
1323     def close(self, flush=True):
1324         if not self.closed:
1325             self.arvadosfile.remove_writer(self, flush)
1326             super(ArvadosFileWriter, self).close()