11684: When packing small blocks into one, save references of the files
[arvados.git] / sdk / python / arvados / arvfile.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 from future.utils import listitems, listvalues
5 standard_library.install_aliases()
6 from builtins import range
7 from builtins import object
8 import bz2
9 import collections
10 import copy
11 import errno
12 import functools
13 import hashlib
14 import logging
15 import os
16 import queue
17 import re
18 import sys
19 import threading
20 import uuid
21 import zlib
22
23 from . import config
24 from .errors import KeepWriteError, AssertionError, ArgumentError
25 from .keep import KeepLocator
26 from ._normalize_stream import normalize_stream
27 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
28 from .retry import retry_method
29
30 MOD = "mod"
31 WRITE = "write"
32
33 _logger = logging.getLogger('arvados.arvfile')
34
35 def split(path):
36     """split(path) -> streamname, filename
37
38     Separate the stream name and file name in a /-separated stream path and
39     return a tuple (stream_name, file_name).  If no stream name is available,
40     assume '.'.
41
42     """
43     try:
44         stream_name, file_name = path.rsplit('/', 1)
45     except ValueError:  # No / in string
46         stream_name, file_name = '.', path
47     return stream_name, file_name
48
49
50 class UnownedBlockError(Exception):
51     """Raised when there's an writable block without an owner on the BlockManager."""
52     pass
53
54
55 class _FileLikeObjectBase(object):
56     def __init__(self, name, mode):
57         self.name = name
58         self.mode = mode
59         self.closed = False
60
61     @staticmethod
62     def _before_close(orig_func):
63         @functools.wraps(orig_func)
64         def before_close_wrapper(self, *args, **kwargs):
65             if self.closed:
66                 raise ValueError("I/O operation on closed stream file")
67             return orig_func(self, *args, **kwargs)
68         return before_close_wrapper
69
70     def __enter__(self):
71         return self
72
73     def __exit__(self, exc_type, exc_value, traceback):
74         try:
75             self.close()
76         except Exception:
77             if exc_type is None:
78                 raise
79
80     def close(self):
81         self.closed = True
82
83
84 class ArvadosFileReaderBase(_FileLikeObjectBase):
85     def __init__(self, name, mode, num_retries=None):
86         super(ArvadosFileReaderBase, self).__init__(name, mode)
87         self._binary = 'b' in mode
88         if sys.version_info >= (3, 0) and not self._binary:
89             raise NotImplementedError("text mode {!r} is not implemented".format(mode))
90         self._filepos = 0
91         self.num_retries = num_retries
92         self._readline_cache = (None, None)
93
94     def __iter__(self):
95         while True:
96             data = self.readline()
97             if not data:
98                 break
99             yield data
100
101     def decompressed_name(self):
102         return re.sub('\.(bz2|gz)$', '', self.name)
103
104     @_FileLikeObjectBase._before_close
105     def seek(self, pos, whence=os.SEEK_SET):
106         if whence == os.SEEK_CUR:
107             pos += self._filepos
108         elif whence == os.SEEK_END:
109             pos += self.size()
110         if pos < 0:
111             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
112         self._filepos = pos
113         return self._filepos
114
115     def tell(self):
116         return self._filepos
117
118     def readable(self):
119         return True
120
121     def writable(self):
122         return False
123
124     def seekable(self):
125         return True
126
127     @_FileLikeObjectBase._before_close
128     @retry_method
129     def readall(self, size=2**20, num_retries=None):
130         while True:
131             data = self.read(size, num_retries=num_retries)
132             if len(data) == 0:
133                 break
134             yield data
135
136     @_FileLikeObjectBase._before_close
137     @retry_method
138     def readline(self, size=float('inf'), num_retries=None):
139         cache_pos, cache_data = self._readline_cache
140         if self.tell() == cache_pos:
141             data = [cache_data]
142             self._filepos += len(cache_data)
143         else:
144             data = [b'']
145         data_size = len(data[-1])
146         while (data_size < size) and (b'\n' not in data[-1]):
147             next_read = self.read(2 ** 20, num_retries=num_retries)
148             if not next_read:
149                 break
150             data.append(next_read)
151             data_size += len(next_read)
152         data = b''.join(data)
153         try:
154             nextline_index = data.index(b'\n') + 1
155         except ValueError:
156             nextline_index = len(data)
157         nextline_index = min(nextline_index, size)
158         self._filepos -= len(data) - nextline_index
159         self._readline_cache = (self.tell(), data[nextline_index:])
160         return data[:nextline_index].decode()
161
162     @_FileLikeObjectBase._before_close
163     @retry_method
164     def decompress(self, decompress, size, num_retries=None):
165         for segment in self.readall(size, num_retries=num_retries):
166             data = decompress(segment)
167             if data:
168                 yield data
169
170     @_FileLikeObjectBase._before_close
171     @retry_method
172     def readall_decompressed(self, size=2**20, num_retries=None):
173         self.seek(0)
174         if self.name.endswith('.bz2'):
175             dc = bz2.BZ2Decompressor()
176             return self.decompress(dc.decompress, size,
177                                    num_retries=num_retries)
178         elif self.name.endswith('.gz'):
179             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
180             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
181                                    size, num_retries=num_retries)
182         else:
183             return self.readall(size, num_retries=num_retries)
184
185     @_FileLikeObjectBase._before_close
186     @retry_method
187     def readlines(self, sizehint=float('inf'), num_retries=None):
188         data = []
189         data_size = 0
190         for s in self.readall(num_retries=num_retries):
191             data.append(s)
192             data_size += len(s)
193             if data_size >= sizehint:
194                 break
195         return b''.join(data).decode().splitlines(True)
196
197     def size(self):
198         raise IOError(errno.ENOSYS, "Not implemented")
199
200     def read(self, size, num_retries=None):
201         raise IOError(errno.ENOSYS, "Not implemented")
202
203     def readfrom(self, start, size, num_retries=None):
204         raise IOError(errno.ENOSYS, "Not implemented")
205
206
207 class StreamFileReader(ArvadosFileReaderBase):
208     class _NameAttribute(str):
209         # The Python file API provides a plain .name attribute.
210         # Older SDK provided a name() method.
211         # This class provides both, for maximum compatibility.
212         def __call__(self):
213             return self
214
215     def __init__(self, stream, segments, name):
216         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
217         self._stream = stream
218         self.segments = segments
219
220     def stream_name(self):
221         return self._stream.name()
222
223     def size(self):
224         n = self.segments[-1]
225         return n.range_start + n.range_size
226
227     @_FileLikeObjectBase._before_close
228     @retry_method
229     def read(self, size, num_retries=None):
230         """Read up to 'size' bytes from the stream, starting at the current file position"""
231         if size == 0:
232             return b''
233
234         data = b''
235         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
236         if available_chunks:
237             lr = available_chunks[0]
238             data = self._stream.readfrom(lr.locator+lr.segment_offset,
239                                          lr.segment_size,
240                                          num_retries=num_retries)
241
242         self._filepos += len(data)
243         return data
244
245     @_FileLikeObjectBase._before_close
246     @retry_method
247     def readfrom(self, start, size, num_retries=None):
248         """Read up to 'size' bytes from the stream, starting at 'start'"""
249         if size == 0:
250             return b''
251
252         data = []
253         for lr in locators_and_ranges(self.segments, start, size):
254             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
255                                               num_retries=num_retries))
256         return b''.join(data)
257
258     def as_manifest(self):
259         segs = []
260         for r in self.segments:
261             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
262         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
263
264
265 def synchronized(orig_func):
266     @functools.wraps(orig_func)
267     def synchronized_wrapper(self, *args, **kwargs):
268         with self.lock:
269             return orig_func(self, *args, **kwargs)
270     return synchronized_wrapper
271
272
273 class StateChangeError(Exception):
274     def __init__(self, message, state, nextstate):
275         super(StateChangeError, self).__init__(message)
276         self.state = state
277         self.nextstate = nextstate
278
279 class _BufferBlock(object):
280     """A stand-in for a Keep block that is in the process of being written.
281
282     Writers can append to it, get the size, and compute the Keep locator.
283     There are three valid states:
284
285     WRITABLE
286       Can append to block.
287
288     PENDING
289       Block is in the process of being uploaded to Keep, append is an error.
290
291     COMMITTED
292       The block has been written to Keep, its internal buffer has been
293       released, fetching the block will fetch it via keep client (since we
294       discarded the internal copy), and identifiers referring to the BufferBlock
295       can be replaced with the block locator.
296
297     """
298
299     WRITABLE = 0
300     PENDING = 1
301     COMMITTED = 2
302     ERROR = 3
303     DELETED = 4
304
305     def __init__(self, blockid, starting_capacity, owner):
306         """
307         :blockid:
308           the identifier for this block
309
310         :starting_capacity:
311           the initial buffer capacity
312
313         :owner:
314           ArvadosFile that owns this block
315
316         """
317         self.blockid = blockid
318         self.buffer_block = bytearray(starting_capacity)
319         self.buffer_view = memoryview(self.buffer_block)
320         self.write_pointer = 0
321         self._state = _BufferBlock.WRITABLE
322         self._locator = None
323         self.owner = owner
324         self.lock = threading.Lock()
325         self.wait_for_commit = threading.Event()
326         self.error = None
327
328     @synchronized
329     def append(self, data):
330         """Append some data to the buffer.
331
332         Only valid if the block is in WRITABLE state.  Implements an expanding
333         buffer, doubling capacity as needed to accomdate all the data.
334
335         """
336         if self._state == _BufferBlock.WRITABLE:
337             if not isinstance(data, bytes) and not isinstance(data, memoryview):
338                 data = data.encode()
339             while (self.write_pointer+len(data)) > len(self.buffer_block):
340                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
341                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
342                 self.buffer_block = new_buffer_block
343                 self.buffer_view = memoryview(self.buffer_block)
344             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
345             self.write_pointer += len(data)
346             self._locator = None
347         else:
348             raise AssertionError("Buffer block is not writable")
349
350     STATE_TRANSITIONS = frozenset([
351             (WRITABLE, PENDING),
352             (PENDING, COMMITTED),
353             (PENDING, ERROR),
354             (ERROR, PENDING)])
355
356     @synchronized
357     def set_state(self, nextstate, val=None):
358         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
359             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
360         self._state = nextstate
361
362         if self._state == _BufferBlock.PENDING:
363             self.wait_for_commit.clear()
364
365         if self._state == _BufferBlock.COMMITTED:
366             self._locator = val
367             self.buffer_view = None
368             self.buffer_block = None
369             self.wait_for_commit.set()
370
371         if self._state == _BufferBlock.ERROR:
372             self.error = val
373             self.wait_for_commit.set()
374
375     @synchronized
376     def state(self):
377         return self._state
378
379     def size(self):
380         """The amount of data written to the buffer."""
381         return self.write_pointer
382
383     @synchronized
384     def locator(self):
385         """The Keep locator for this buffer's contents."""
386         if self._locator is None:
387             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
388         return self._locator
389
390     @synchronized
391     def clone(self, new_blockid, owner):
392         if self._state == _BufferBlock.COMMITTED:
393             raise AssertionError("Cannot duplicate committed buffer block")
394         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
395         bufferblock.append(self.buffer_view[0:self.size()])
396         return bufferblock
397
398     @synchronized
399     def clear(self):
400         self._state = _BufferBlock.DELETED
401         self.owner = None
402         self.buffer_block = None
403         self.buffer_view = None
404
405     @synchronized
406     def repack_writes(self):
407         """Optimize buffer block by repacking segments in file sequence.
408
409         When the client makes random writes, they appear in the buffer block in
410         the sequence they were written rather than the sequence they appear in
411         the file.  This makes for inefficient, fragmented manifests.  Attempt
412         to optimize by repacking writes in file sequence.
413
414         """
415         if self._state != _BufferBlock.WRITABLE:
416             raise AssertionError("Cannot repack non-writable block")
417
418         segs = self.owner.segments()
419
420         # Collect the segments that reference the buffer block.
421         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
422
423         # Collect total data referenced by segments (could be smaller than
424         # bufferblock size if a portion of the file was written and
425         # then overwritten).
426         write_total = sum([s.range_size for s in bufferblock_segs])
427
428         if write_total < self.size() or len(bufferblock_segs) > 1:
429             # If there's more than one segment referencing this block, it is
430             # due to out-of-order writes and will produce a fragmented
431             # manifest, so try to optimize by re-packing into a new buffer.
432             contents = self.buffer_view[0:self.write_pointer].tobytes()
433             new_bb = _BufferBlock(None, write_total, None)
434             for t in bufferblock_segs:
435                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
436                 t.segment_offset = new_bb.size() - t.range_size
437
438             self.buffer_block = new_bb.buffer_block
439             self.buffer_view = new_bb.buffer_view
440             self.write_pointer = new_bb.write_pointer
441             self._locator = None
442             new_bb.clear()
443             self.owner.set_segments(segs)
444
445     def __repr__(self):
446         return "<BufferBlock %s>" % (self.blockid)
447
448
449 class NoopLock(object):
450     def __enter__(self):
451         return self
452
453     def __exit__(self, exc_type, exc_value, traceback):
454         pass
455
456     def acquire(self, blocking=False):
457         pass
458
459     def release(self):
460         pass
461
462
463 def must_be_writable(orig_func):
464     @functools.wraps(orig_func)
465     def must_be_writable_wrapper(self, *args, **kwargs):
466         if not self.writable():
467             raise IOError(errno.EROFS, "Collection is read-only.")
468         return orig_func(self, *args, **kwargs)
469     return must_be_writable_wrapper
470
471
472 class _BlockManager(object):
473     """BlockManager handles buffer blocks.
474
475     Also handles background block uploads, and background block prefetch for a
476     Collection of ArvadosFiles.
477
478     """
479
480     DEFAULT_PUT_THREADS = 2
481     DEFAULT_GET_THREADS = 2
482
483     def __init__(self, keep, copies=None, put_threads=None):
484         """keep: KeepClient object to use"""
485         self._keep = keep
486         self._bufferblocks = collections.OrderedDict()
487         self._put_queue = None
488         self._put_threads = None
489         self._prefetch_queue = None
490         self._prefetch_threads = None
491         self.lock = threading.Lock()
492         self.prefetch_enabled = True
493         if put_threads:
494             self.num_put_threads = put_threads
495         else:
496             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
497         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
498         self.copies = copies
499         self._pending_write_size = 0
500         self.threads_lock = threading.Lock()
501         self.padding_block = None
502         self._repacked_bb = {}
503         self._repacked_bb_lock = threading.Lock()
504
505     @synchronized
506     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
507         """Allocate a new, empty bufferblock in WRITABLE state and return it.
508
509         :blockid:
510           optional block identifier, otherwise one will be automatically assigned
511
512         :starting_capacity:
513           optional capacity, otherwise will use default capacity
514
515         :owner:
516           ArvadosFile that owns this block
517
518         """
519         return self._alloc_bufferblock(blockid, starting_capacity, owner)
520
521     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
522         if blockid is None:
523             blockid = str(uuid.uuid4())
524         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
525         self._bufferblocks[bufferblock.blockid] = bufferblock
526         return bufferblock
527
528     @synchronized
529     def dup_block(self, block, owner):
530         """Create a new bufferblock initialized with the content of an existing bufferblock.
531
532         :block:
533           the buffer block to copy.
534
535         :owner:
536           ArvadosFile that owns the new block
537
538         """
539         new_blockid = str(uuid.uuid4())
540         bufferblock = block.clone(new_blockid, owner)
541         self._bufferblocks[bufferblock.blockid] = bufferblock
542         return bufferblock
543
544     @synchronized
545     def is_bufferblock(self, locator):
546         return locator in self._bufferblocks
547
548     def _commit_bufferblock_worker(self):
549         """Background uploader thread."""
550
551         while True:
552             try:
553                 bufferblock = self._put_queue.get()
554                 if bufferblock is None:
555                     return
556
557                 if self.copies is None:
558                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
559                 else:
560                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
561                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
562
563                 with self._repacked_bb_lock:
564                     # Check if this block was created by repacking smaller blocks
565                     if bufferblock.blockid in self._repacked_bb:
566                         # Update segment locators (with its tokens) of files within
567                         # this block
568                         old_loc = self._repacked_bb[bufferblock.blockid]['unsigned_loc']
569                         for f in self._repacked_bb[bufferblock.blockid]['files']:
570                             for s in [x for x in f._segments if x.locator == old_loc]:
571                                 s.locator = loc
572                         del(self._repacked_bb[bufferblock.blockid])
573             except Exception as e:
574                 bufferblock.set_state(_BufferBlock.ERROR, e)
575             finally:
576                 if self._put_queue is not None:
577                     self._put_queue.task_done()
578
579     def start_put_threads(self):
580         with self.threads_lock:
581             if self._put_threads is None:
582                 # Start uploader threads.
583
584                 # If we don't limit the Queue size, the upload queue can quickly
585                 # grow to take up gigabytes of RAM if the writing process is
586                 # generating data more quickly than it can be send to the Keep
587                 # servers.
588                 #
589                 # With two upload threads and a queue size of 2, this means up to 4
590                 # blocks pending.  If they are full 64 MiB blocks, that means up to
591                 # 256 MiB of internal buffering, which is the same size as the
592                 # default download block cache in KeepClient.
593                 self._put_queue = queue.Queue(maxsize=2)
594
595                 self._put_threads = []
596                 for i in range(0, self.num_put_threads):
597                     thread = threading.Thread(target=self._commit_bufferblock_worker)
598                     self._put_threads.append(thread)
599                     thread.daemon = True
600                     thread.start()
601
602     def _block_prefetch_worker(self):
603         """The background downloader thread."""
604         while True:
605             try:
606                 b = self._prefetch_queue.get()
607                 if b is None:
608                     return
609                 self._keep.get(b)
610             except Exception:
611                 _logger.exception("Exception doing block prefetch")
612
613     @synchronized
614     def start_get_threads(self):
615         if self._prefetch_threads is None:
616             self._prefetch_queue = queue.Queue()
617             self._prefetch_threads = []
618             for i in range(0, self.num_get_threads):
619                 thread = threading.Thread(target=self._block_prefetch_worker)
620                 self._prefetch_threads.append(thread)
621                 thread.daemon = True
622                 thread.start()
623
624
625     @synchronized
626     def stop_threads(self):
627         """Shut down and wait for background upload and download threads to finish."""
628
629         if self._put_threads is not None:
630             for t in self._put_threads:
631                 self._put_queue.put(None)
632             for t in self._put_threads:
633                 t.join()
634         self._put_threads = None
635         self._put_queue = None
636
637         if self._prefetch_threads is not None:
638             for t in self._prefetch_threads:
639                 self._prefetch_queue.put(None)
640             for t in self._prefetch_threads:
641                 t.join()
642         self._prefetch_threads = None
643         self._prefetch_queue = None
644
645     def __enter__(self):
646         return self
647
648     def __exit__(self, exc_type, exc_value, traceback):
649         self.stop_threads()
650
651     @synchronized
652     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
653         """Packs small blocks together before uploading"""
654
655         self._pending_write_size += closed_file_size
656
657         # Check if there are enough small blocks for filling up one in full
658         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
659             return
660
661         # Search blocks ready for getting packed together before being
662         # committed to Keep.
663         # A WRITABLE block always has an owner.
664         # A WRITABLE block with its owner.closed() implies that its
665         # size is <= KEEP_BLOCK_SIZE/2.
666         try:
667             small_blocks = [b for b in listvalues(self._bufferblocks)
668                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
669         except AttributeError:
670             # Writable blocks without owner shouldn't exist.
671             raise UnownedBlockError()
672
673         if len(small_blocks) <= 1:
674             # Not enough small blocks for repacking
675             return
676
677         for bb in small_blocks:
678             bb.repack_writes()
679
680         # Update the pending write size count with its true value, just in case
681         # some small file was opened, written and closed several times.
682         self._pending_write_size = sum([b.size() for b in small_blocks])
683
684         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
685             return
686
687         new_bb = self._alloc_bufferblock()
688         files = []
689         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
690             bb = small_blocks.pop(0)
691             self._pending_write_size -= bb.size()
692             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
693             files.append((bb, new_bb.write_pointer - bb.size()))
694
695         # If this repacked block will be committed asynchronously, take note
696         # of its files so their segments' locators will be updated with
697         # the correct permission token returned by the API server.
698         if not sync:
699             with self._repacked_bb_lock:
700                 self._repacked_bb[new_bb.blockid] = {
701                     'unsigned_loc': new_bb.locator(),
702                     'files': [bb.owner for bb, _ in files],
703                 }
704
705         self.commit_bufferblock(new_bb, sync=sync)
706
707         with self._repacked_bb_lock:
708             for bb, new_bb_segment_offset in files:
709                 newsegs = bb.owner.segments()
710                 for s in newsegs:
711                     if s.locator == bb.blockid:
712                         s.locator = new_bb.locator()
713                         s.segment_offset = new_bb_segment_offset+s.segment_offset
714                 bb.owner.set_segments(newsegs)
715                 self._delete_bufferblock(bb.blockid)
716
717     def commit_bufferblock(self, block, sync):
718         """Initiate a background upload of a bufferblock.
719
720         :block:
721           The block object to upload
722
723         :sync:
724           If `sync` is True, upload the block synchronously.
725           If `sync` is False, upload the block asynchronously.  This will
726           return immediately unless the upload queue is at capacity, in
727           which case it will wait on an upload queue slot.
728
729         """
730         try:
731             # Mark the block as PENDING so to disallow any more appends.
732             block.set_state(_BufferBlock.PENDING)
733         except StateChangeError as e:
734             if e.state == _BufferBlock.PENDING:
735                 if sync:
736                     block.wait_for_commit.wait()
737                 else:
738                     return
739             if block.state() == _BufferBlock.COMMITTED:
740                 return
741             elif block.state() == _BufferBlock.ERROR:
742                 raise block.error
743             else:
744                 raise
745
746         if sync:
747             try:
748                 if self.copies is None:
749                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
750                 else:
751                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
752                 block.set_state(_BufferBlock.COMMITTED, loc)
753             except Exception as e:
754                 block.set_state(_BufferBlock.ERROR, e)
755                 raise
756         else:
757             self.start_put_threads()
758             self._put_queue.put(block)
759
760     @synchronized
761     def get_bufferblock(self, locator):
762         return self._bufferblocks.get(locator)
763
764     @synchronized
765     def get_padding_block(self):
766         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
767         when using truncate() to extend the size of a file.
768
769         For reference (and possible future optimization), the md5sum of the
770         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
771
772         """
773
774         if self.padding_block is None:
775             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
776             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
777             self.commit_bufferblock(self.padding_block, False)
778         return self.padding_block
779
780     @synchronized
781     def delete_bufferblock(self, locator):
782         self._delete_bufferblock(locator)
783
784     def _delete_bufferblock(self, locator):
785         bb = self._bufferblocks[locator]
786         bb.clear()
787         del self._bufferblocks[locator]
788
789     def get_block_contents(self, locator, num_retries, cache_only=False):
790         """Fetch a block.
791
792         First checks to see if the locator is a BufferBlock and return that, if
793         not, passes the request through to KeepClient.get().
794
795         """
796         with self.lock:
797             if locator in self._bufferblocks:
798                 bufferblock = self._bufferblocks[locator]
799                 if bufferblock.state() != _BufferBlock.COMMITTED:
800                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
801                 else:
802                     locator = bufferblock._locator
803         if cache_only:
804             return self._keep.get_from_cache(locator)
805         else:
806             return self._keep.get(locator, num_retries=num_retries)
807
808     def commit_all(self):
809         """Commit all outstanding buffer blocks.
810
811         This is a synchronous call, and will not return until all buffer blocks
812         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
813
814         """
815         self.repack_small_blocks(force=True, sync=True)
816
817         with self.lock:
818             items = listitems(self._bufferblocks)
819
820         for k,v in items:
821             if v.state() != _BufferBlock.COMMITTED and v.owner:
822                 v.owner.flush(sync=False)
823
824         with self.lock:
825             if self._put_queue is not None:
826                 self._put_queue.join()
827
828                 err = []
829                 for k,v in items:
830                     if v.state() == _BufferBlock.ERROR:
831                         err.append((v.locator(), v.error))
832                 if err:
833                     raise KeepWriteError("Error writing some blocks", err, label="block")
834
835         for k,v in items:
836             # flush again with sync=True to remove committed bufferblocks from
837             # the segments.
838             if v.owner:
839                 v.owner.flush(sync=True)
840
841     def block_prefetch(self, locator):
842         """Initiate a background download of a block.
843
844         This assumes that the underlying KeepClient implements a block cache,
845         so repeated requests for the same block will not result in repeated
846         downloads (unless the block is evicted from the cache.)  This method
847         does not block.
848
849         """
850
851         if not self.prefetch_enabled:
852             return
853
854         if self._keep.get_from_cache(locator) is not None:
855             return
856
857         with self.lock:
858             if locator in self._bufferblocks:
859                 return
860
861         self.start_get_threads()
862         self._prefetch_queue.put(locator)
863
864
865 class ArvadosFile(object):
866     """Represent a file in a Collection.
867
868     ArvadosFile manages the underlying representation of a file in Keep as a
869     sequence of segments spanning a set of blocks, and implements random
870     read/write access.
871
872     This object may be accessed from multiple threads.
873
874     """
875
876     def __init__(self, parent, name, stream=[], segments=[]):
877         """
878         ArvadosFile constructor.
879
880         :stream:
881           a list of Range objects representing a block stream
882
883         :segments:
884           a list of Range objects representing segments
885         """
886         self.parent = parent
887         self.name = name
888         self._writers = set()
889         self._committed = False
890         self._segments = []
891         self.lock = parent.root_collection().lock
892         for s in segments:
893             self._add_segment(stream, s.locator, s.range_size)
894         self._current_bblock = None
895
896     def writable(self):
897         return self.parent.writable()
898
899     @synchronized
900     def permission_expired(self, as_of_dt=None):
901         """Returns True if any of the segment's locators is expired"""
902         for r in self._segments:
903             if KeepLocator(r.locator).permission_expired(as_of_dt):
904                 return True
905         return False
906
907     @synchronized
908     def segments(self):
909         return copy.copy(self._segments)
910
911     @synchronized
912     def clone(self, new_parent, new_name):
913         """Make a copy of this file."""
914         cp = ArvadosFile(new_parent, new_name)
915         cp.replace_contents(self)
916         return cp
917
918     @must_be_writable
919     @synchronized
920     def replace_contents(self, other):
921         """Replace segments of this file with segments from another `ArvadosFile` object."""
922
923         map_loc = {}
924         self._segments = []
925         for other_segment in other.segments():
926             new_loc = other_segment.locator
927             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
928                 if other_segment.locator not in map_loc:
929                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
930                     if bufferblock.state() != _BufferBlock.WRITABLE:
931                         map_loc[other_segment.locator] = bufferblock.locator()
932                     else:
933                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
934                 new_loc = map_loc[other_segment.locator]
935
936             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
937
938         self.set_committed(False)
939
940     def __eq__(self, other):
941         if other is self:
942             return True
943         if not isinstance(other, ArvadosFile):
944             return False
945
946         othersegs = other.segments()
947         with self.lock:
948             if len(self._segments) != len(othersegs):
949                 return False
950             for i in range(0, len(othersegs)):
951                 seg1 = self._segments[i]
952                 seg2 = othersegs[i]
953                 loc1 = seg1.locator
954                 loc2 = seg2.locator
955
956                 if self.parent._my_block_manager().is_bufferblock(loc1):
957                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
958
959                 if other.parent._my_block_manager().is_bufferblock(loc2):
960                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
961
962                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
963                     seg1.range_start != seg2.range_start or
964                     seg1.range_size != seg2.range_size or
965                     seg1.segment_offset != seg2.segment_offset):
966                     return False
967
968         return True
969
970     def __ne__(self, other):
971         return not self.__eq__(other)
972
973     @synchronized
974     def set_segments(self, segs):
975         self._segments = segs
976
977     @synchronized
978     def set_committed(self, value=True):
979         """Set committed flag.
980
981         If value is True, set committed to be True.
982
983         If value is False, set committed to be False for this and all parents.
984         """
985         if value == self._committed:
986             return
987         self._committed = value
988         if self._committed is False and self.parent is not None:
989             self.parent.set_committed(False)
990
991     @synchronized
992     def committed(self):
993         """Get whether this is committed or not."""
994         return self._committed
995
996     @synchronized
997     def add_writer(self, writer):
998         """Add an ArvadosFileWriter reference to the list of writers"""
999         if isinstance(writer, ArvadosFileWriter):
1000             self._writers.add(writer)
1001
1002     @synchronized
1003     def remove_writer(self, writer, flush):
1004         """
1005         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
1006         and do some block maintenance tasks.
1007         """
1008         self._writers.remove(writer)
1009
1010         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
1011             # File writer closed, not small enough for repacking
1012             self.flush()
1013         elif self.closed():
1014             # All writers closed and size is adequate for repacking
1015             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1016
1017     def closed(self):
1018         """
1019         Get whether this is closed or not. When the writers list is empty, the file
1020         is supposed to be closed.
1021         """
1022         return len(self._writers) == 0
1023
1024     @must_be_writable
1025     @synchronized
1026     def truncate(self, size):
1027         """Shrink or expand the size of the file.
1028
1029         If `size` is less than the size of the file, the file contents after
1030         `size` will be discarded.  If `size` is greater than the current size
1031         of the file, it will be filled with zero bytes.
1032
1033         """
1034         if size < self.size():
1035             new_segs = []
1036             for r in self._segments:
1037                 range_end = r.range_start+r.range_size
1038                 if r.range_start >= size:
1039                     # segment is past the trucate size, all done
1040                     break
1041                 elif size < range_end:
1042                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1043                     nr.segment_offset = r.segment_offset
1044                     new_segs.append(nr)
1045                     break
1046                 else:
1047                     new_segs.append(r)
1048
1049             self._segments = new_segs
1050             self.set_committed(False)
1051         elif size > self.size():
1052             padding = self.parent._my_block_manager().get_padding_block()
1053             diff = size - self.size()
1054             while diff > config.KEEP_BLOCK_SIZE:
1055                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1056                 diff -= config.KEEP_BLOCK_SIZE
1057             if diff > 0:
1058                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1059             self.set_committed(False)
1060         else:
1061             # size == self.size()
1062             pass
1063
1064     def readfrom(self, offset, size, num_retries, exact=False):
1065         """Read up to `size` bytes from the file starting at `offset`.
1066
1067         :exact:
1068          If False (default), return less data than requested if the read
1069          crosses a block boundary and the next block isn't cached.  If True,
1070          only return less data than requested when hitting EOF.
1071         """
1072
1073         with self.lock:
1074             if size == 0 or offset >= self.size():
1075                 return b''
1076             readsegs = locators_and_ranges(self._segments, offset, size)
1077             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1078
1079         locs = set()
1080         data = []
1081         for lr in readsegs:
1082             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1083             if block:
1084                 blockview = memoryview(block)
1085                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1086                 locs.add(lr.locator)
1087             else:
1088                 break
1089
1090         for lr in prefetch:
1091             if lr.locator not in locs:
1092                 self.parent._my_block_manager().block_prefetch(lr.locator)
1093                 locs.add(lr.locator)
1094
1095         return b''.join(data)
1096
1097     @must_be_writable
1098     @synchronized
1099     def writeto(self, offset, data, num_retries):
1100         """Write `data` to the file starting at `offset`.
1101
1102         This will update existing bytes and/or extend the size of the file as
1103         necessary.
1104
1105         """
1106         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1107             data = data.encode()
1108         if len(data) == 0:
1109             return
1110
1111         if offset > self.size():
1112             self.truncate(offset)
1113
1114         if len(data) > config.KEEP_BLOCK_SIZE:
1115             # Chunk it up into smaller writes
1116             n = 0
1117             dataview = memoryview(data)
1118             while n < len(data):
1119                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1120                 n += config.KEEP_BLOCK_SIZE
1121             return
1122
1123         self.set_committed(False)
1124
1125         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1126             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1127
1128         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1129             self._current_bblock.repack_writes()
1130             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1131                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1132                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1133
1134         self._current_bblock.append(data)
1135
1136         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1137
1138         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1139
1140         return len(data)
1141
1142     @synchronized
1143     def flush(self, sync=True, num_retries=0):
1144         """Flush the current bufferblock to Keep.
1145
1146         :sync:
1147           If True, commit block synchronously, wait until buffer block has been written.
1148           If False, commit block asynchronously, return immediately after putting block into
1149           the keep put queue.
1150         """
1151         if self.committed():
1152             return
1153
1154         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1155             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1156                 self._current_bblock.repack_writes()
1157             if self._current_bblock.state() != _BufferBlock.DELETED:
1158                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1159
1160         if sync:
1161             to_delete = set()
1162             for s in self._segments:
1163                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1164                 if bb:
1165                     if bb.state() != _BufferBlock.COMMITTED:
1166                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1167                     to_delete.add(s.locator)
1168                     s.locator = bb.locator()
1169             for s in to_delete:
1170                self.parent._my_block_manager().delete_bufferblock(s)
1171
1172         self.parent.notify(MOD, self.parent, self.name, (self, self))
1173
1174     @must_be_writable
1175     @synchronized
1176     def add_segment(self, blocks, pos, size):
1177         """Add a segment to the end of the file.
1178
1179         `pos` and `offset` reference a section of the stream described by
1180         `blocks` (a list of Range objects)
1181
1182         """
1183         self._add_segment(blocks, pos, size)
1184
1185     def _add_segment(self, blocks, pos, size):
1186         """Internal implementation of add_segment."""
1187         self.set_committed(False)
1188         for lr in locators_and_ranges(blocks, pos, size):
1189             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1190             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1191             self._segments.append(r)
1192
1193     @synchronized
1194     def size(self):
1195         """Get the file size."""
1196         if self._segments:
1197             n = self._segments[-1]
1198             return n.range_start + n.range_size
1199         else:
1200             return 0
1201
1202     @synchronized
1203     def manifest_text(self, stream_name=".", portable_locators=False,
1204                       normalize=False, only_committed=False):
1205         buf = ""
1206         filestream = []
1207         for segment in self._segments:
1208             loc = segment.locator
1209             if self.parent._my_block_manager().is_bufferblock(loc):
1210                 if only_committed:
1211                     continue
1212                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1213             if portable_locators:
1214                 loc = KeepLocator(loc).stripped()
1215             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1216                                  segment.segment_offset, segment.range_size))
1217         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1218         buf += "\n"
1219         return buf
1220
1221     @must_be_writable
1222     @synchronized
1223     def _reparent(self, newparent, newname):
1224         self.set_committed(False)
1225         self.flush(sync=True)
1226         self.parent.remove(self.name)
1227         self.parent = newparent
1228         self.name = newname
1229         self.lock = self.parent.root_collection().lock
1230
1231
1232 class ArvadosFileReader(ArvadosFileReaderBase):
1233     """Wraps ArvadosFile in a file-like object supporting reading only.
1234
1235     Be aware that this class is NOT thread safe as there is no locking around
1236     updating file pointer.
1237
1238     """
1239
1240     def __init__(self, arvadosfile, mode="r", num_retries=None):
1241         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1242         self.arvadosfile = arvadosfile
1243
1244     def size(self):
1245         return self.arvadosfile.size()
1246
1247     def stream_name(self):
1248         return self.arvadosfile.parent.stream_name()
1249
1250     @_FileLikeObjectBase._before_close
1251     @retry_method
1252     def read(self, size=None, num_retries=None):
1253         """Read up to `size` bytes from the file and return the result.
1254
1255         Starts at the current file position.  If `size` is None, read the
1256         entire remainder of the file.
1257         """
1258         if size is None:
1259             data = []
1260             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1261             while rd:
1262                 data.append(rd)
1263                 self._filepos += len(rd)
1264                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1265             return b''.join(data)
1266         else:
1267             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1268             self._filepos += len(data)
1269             return data
1270
1271     @_FileLikeObjectBase._before_close
1272     @retry_method
1273     def readfrom(self, offset, size, num_retries=None):
1274         """Read up to `size` bytes from the stream, starting at the specified file offset.
1275
1276         This method does not change the file position.
1277         """
1278         return self.arvadosfile.readfrom(offset, size, num_retries)
1279
1280     def flush(self):
1281         pass
1282
1283
1284 class ArvadosFileWriter(ArvadosFileReader):
1285     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1286
1287     Be aware that this class is NOT thread safe as there is no locking around
1288     updating file pointer.
1289
1290     """
1291
1292     def __init__(self, arvadosfile, mode, num_retries=None):
1293         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1294         self.arvadosfile.add_writer(self)
1295
1296     def writable(self):
1297         return True
1298
1299     @_FileLikeObjectBase._before_close
1300     @retry_method
1301     def write(self, data, num_retries=None):
1302         if self.mode[0] == "a":
1303             self._filepos = self.size()
1304         self.arvadosfile.writeto(self._filepos, data, num_retries)
1305         self._filepos += len(data)
1306         return len(data)
1307
1308     @_FileLikeObjectBase._before_close
1309     @retry_method
1310     def writelines(self, seq, num_retries=None):
1311         for s in seq:
1312             self.write(s, num_retries=num_retries)
1313
1314     @_FileLikeObjectBase._before_close
1315     def truncate(self, size=None):
1316         if size is None:
1317             size = self._filepos
1318         self.arvadosfile.truncate(size)
1319
1320     @_FileLikeObjectBase._before_close
1321     def flush(self):
1322         self.arvadosfile.flush()
1323
1324     def close(self, flush=True):
1325         if not self.closed:
1326             self.arvadosfile.remove_writer(self, flush)
1327             super(ArvadosFileWriter, self).close()