11308: Merge branch 'master' into 11308-python3
[arvados.git] / sdk / python / arvados / arvfile.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 from future.utils import listitems, listvalues
5 standard_library.install_aliases()
6 from builtins import range
7 from builtins import object
8 import bz2
9 import collections
10 import copy
11 import errno
12 import functools
13 import hashlib
14 import logging
15 import os
16 import queue
17 import re
18 import sys
19 import threading
20 import uuid
21 import zlib
22
23 from . import config
24 from .errors import KeepWriteError, AssertionError, ArgumentError
25 from .keep import KeepLocator
26 from ._normalize_stream import normalize_stream
27 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
28 from .retry import retry_method
29
30 MOD = "mod"
31 WRITE = "write"
32
33 _logger = logging.getLogger('arvados.arvfile')
34
35 def split(path):
36     """split(path) -> streamname, filename
37
38     Separate the stream name and file name in a /-separated stream path and
39     return a tuple (stream_name, file_name).  If no stream name is available,
40     assume '.'.
41
42     """
43     try:
44         stream_name, file_name = path.rsplit('/', 1)
45     except ValueError:  # No / in string
46         stream_name, file_name = '.', path
47     return stream_name, file_name
48
49
50 class UnownedBlockError(Exception):
51     """Raised when there's an writable block without an owner on the BlockManager."""
52     pass
53
54
55 class _FileLikeObjectBase(object):
56     def __init__(self, name, mode):
57         self.name = name
58         self.mode = mode
59         self.closed = False
60
61     @staticmethod
62     def _before_close(orig_func):
63         @functools.wraps(orig_func)
64         def before_close_wrapper(self, *args, **kwargs):
65             if self.closed:
66                 raise ValueError("I/O operation on closed stream file")
67             return orig_func(self, *args, **kwargs)
68         return before_close_wrapper
69
70     def __enter__(self):
71         return self
72
73     def __exit__(self, exc_type, exc_value, traceback):
74         try:
75             self.close()
76         except Exception:
77             if exc_type is None:
78                 raise
79
80     def close(self):
81         self.closed = True
82
83
84 class ArvadosFileReaderBase(_FileLikeObjectBase):
85     def __init__(self, name, mode, num_retries=None):
86         super(ArvadosFileReaderBase, self).__init__(name, mode)
87         self._binary = 'b' in mode
88         if sys.version_info >= (3, 0) and not self._binary:
89             raise NotImplementedError("text mode {!r} is not implemented".format(mode))
90         self._filepos = 0
91         self.num_retries = num_retries
92         self._readline_cache = (None, None)
93
94     def __iter__(self):
95         while True:
96             data = self.readline()
97             if not data:
98                 break
99             yield data
100
101     def decompressed_name(self):
102         return re.sub('\.(bz2|gz)$', '', self.name)
103
104     @_FileLikeObjectBase._before_close
105     def seek(self, pos, whence=os.SEEK_SET):
106         if whence == os.SEEK_CUR:
107             pos += self._filepos
108         elif whence == os.SEEK_END:
109             pos += self.size()
110         if pos < 0:
111             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
112         self._filepos = pos
113         return self._filepos
114
115     def tell(self):
116         return self._filepos
117
118     def readable(self):
119         return True
120
121     def writable(self):
122         return False
123
124     def seekable(self):
125         return True
126
127     @_FileLikeObjectBase._before_close
128     @retry_method
129     def readall(self, size=2**20, num_retries=None):
130         while True:
131             data = self.read(size, num_retries=num_retries)
132             if len(data) == 0:
133                 break
134             yield data
135
136     @_FileLikeObjectBase._before_close
137     @retry_method
138     def readline(self, size=float('inf'), num_retries=None):
139         cache_pos, cache_data = self._readline_cache
140         if self.tell() == cache_pos:
141             data = [cache_data]
142             self._filepos += len(cache_data)
143         else:
144             data = [b'']
145         data_size = len(data[-1])
146         while (data_size < size) and (b'\n' not in data[-1]):
147             next_read = self.read(2 ** 20, num_retries=num_retries)
148             if not next_read:
149                 break
150             data.append(next_read)
151             data_size += len(next_read)
152         data = b''.join(data)
153         try:
154             nextline_index = data.index(b'\n') + 1
155         except ValueError:
156             nextline_index = len(data)
157         nextline_index = min(nextline_index, size)
158         self._filepos -= len(data) - nextline_index
159         self._readline_cache = (self.tell(), data[nextline_index:])
160         return data[:nextline_index].decode()
161
162     @_FileLikeObjectBase._before_close
163     @retry_method
164     def decompress(self, decompress, size, num_retries=None):
165         for segment in self.readall(size, num_retries=num_retries):
166             data = decompress(segment)
167             if data:
168                 yield data
169
170     @_FileLikeObjectBase._before_close
171     @retry_method
172     def readall_decompressed(self, size=2**20, num_retries=None):
173         self.seek(0)
174         if self.name.endswith('.bz2'):
175             dc = bz2.BZ2Decompressor()
176             return self.decompress(dc.decompress, size,
177                                    num_retries=num_retries)
178         elif self.name.endswith('.gz'):
179             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
180             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
181                                    size, num_retries=num_retries)
182         else:
183             return self.readall(size, num_retries=num_retries)
184
185     @_FileLikeObjectBase._before_close
186     @retry_method
187     def readlines(self, sizehint=float('inf'), num_retries=None):
188         data = []
189         data_size = 0
190         for s in self.readall(num_retries=num_retries):
191             data.append(s)
192             data_size += len(s)
193             if data_size >= sizehint:
194                 break
195         return b''.join(data).decode().splitlines(True)
196
197     def size(self):
198         raise IOError(errno.ENOSYS, "Not implemented")
199
200     def read(self, size, num_retries=None):
201         raise IOError(errno.ENOSYS, "Not implemented")
202
203     def readfrom(self, start, size, num_retries=None):
204         raise IOError(errno.ENOSYS, "Not implemented")
205
206
207 class StreamFileReader(ArvadosFileReaderBase):
208     class _NameAttribute(str):
209         # The Python file API provides a plain .name attribute.
210         # Older SDK provided a name() method.
211         # This class provides both, for maximum compatibility.
212         def __call__(self):
213             return self
214
215     def __init__(self, stream, segments, name):
216         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
217         self._stream = stream
218         self.segments = segments
219
220     def stream_name(self):
221         return self._stream.name()
222
223     def size(self):
224         n = self.segments[-1]
225         return n.range_start + n.range_size
226
227     @_FileLikeObjectBase._before_close
228     @retry_method
229     def read(self, size, num_retries=None):
230         """Read up to 'size' bytes from the stream, starting at the current file position"""
231         if size == 0:
232             return b''
233
234         data = b''
235         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
236         if available_chunks:
237             lr = available_chunks[0]
238             data = self._stream.readfrom(lr.locator+lr.segment_offset,
239                                          lr.segment_size,
240                                          num_retries=num_retries)
241
242         self._filepos += len(data)
243         return data
244
245     @_FileLikeObjectBase._before_close
246     @retry_method
247     def readfrom(self, start, size, num_retries=None):
248         """Read up to 'size' bytes from the stream, starting at 'start'"""
249         if size == 0:
250             return b''
251
252         data = []
253         for lr in locators_and_ranges(self.segments, start, size):
254             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
255                                               num_retries=num_retries))
256         return b''.join(data)
257
258     def as_manifest(self):
259         segs = []
260         for r in self.segments:
261             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
262         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
263
264
265 def synchronized(orig_func):
266     @functools.wraps(orig_func)
267     def synchronized_wrapper(self, *args, **kwargs):
268         with self.lock:
269             return orig_func(self, *args, **kwargs)
270     return synchronized_wrapper
271
272
273 class StateChangeError(Exception):
274     def __init__(self, message, state, nextstate):
275         super(StateChangeError, self).__init__(message)
276         self.state = state
277         self.nextstate = nextstate
278
279 class _BufferBlock(object):
280     """A stand-in for a Keep block that is in the process of being written.
281
282     Writers can append to it, get the size, and compute the Keep locator.
283     There are three valid states:
284
285     WRITABLE
286       Can append to block.
287
288     PENDING
289       Block is in the process of being uploaded to Keep, append is an error.
290
291     COMMITTED
292       The block has been written to Keep, its internal buffer has been
293       released, fetching the block will fetch it via keep client (since we
294       discarded the internal copy), and identifiers referring to the BufferBlock
295       can be replaced with the block locator.
296
297     """
298
299     WRITABLE = 0
300     PENDING = 1
301     COMMITTED = 2
302     ERROR = 3
303     DELETED = 4
304
305     def __init__(self, blockid, starting_capacity, owner):
306         """
307         :blockid:
308           the identifier for this block
309
310         :starting_capacity:
311           the initial buffer capacity
312
313         :owner:
314           ArvadosFile that owns this block
315
316         """
317         self.blockid = blockid
318         self.buffer_block = bytearray(starting_capacity)
319         self.buffer_view = memoryview(self.buffer_block)
320         self.write_pointer = 0
321         self._state = _BufferBlock.WRITABLE
322         self._locator = None
323         self.owner = owner
324         self.lock = threading.Lock()
325         self.wait_for_commit = threading.Event()
326         self.error = None
327
328     @synchronized
329     def append(self, data):
330         """Append some data to the buffer.
331
332         Only valid if the block is in WRITABLE state.  Implements an expanding
333         buffer, doubling capacity as needed to accomdate all the data.
334
335         """
336         if self._state == _BufferBlock.WRITABLE:
337             if not isinstance(data, bytes) and not isinstance(data, memoryview):
338                 data = data.encode()
339             while (self.write_pointer+len(data)) > len(self.buffer_block):
340                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
341                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
342                 self.buffer_block = new_buffer_block
343                 self.buffer_view = memoryview(self.buffer_block)
344             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
345             self.write_pointer += len(data)
346             self._locator = None
347         else:
348             raise AssertionError("Buffer block is not writable")
349
350     STATE_TRANSITIONS = frozenset([
351             (WRITABLE, PENDING),
352             (PENDING, COMMITTED),
353             (PENDING, ERROR),
354             (ERROR, PENDING)])
355
356     @synchronized
357     def set_state(self, nextstate, val=None):
358         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
359             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
360         self._state = nextstate
361
362         if self._state == _BufferBlock.PENDING:
363             self.wait_for_commit.clear()
364
365         if self._state == _BufferBlock.COMMITTED:
366             self._locator = val
367             self.buffer_view = None
368             self.buffer_block = None
369             self.wait_for_commit.set()
370
371         if self._state == _BufferBlock.ERROR:
372             self.error = val
373             self.wait_for_commit.set()
374
375     @synchronized
376     def state(self):
377         return self._state
378
379     def size(self):
380         """The amount of data written to the buffer."""
381         return self.write_pointer
382
383     @synchronized
384     def locator(self):
385         """The Keep locator for this buffer's contents."""
386         if self._locator is None:
387             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
388         return self._locator
389
390     @synchronized
391     def clone(self, new_blockid, owner):
392         if self._state == _BufferBlock.COMMITTED:
393             raise AssertionError("Cannot duplicate committed buffer block")
394         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
395         bufferblock.append(self.buffer_view[0:self.size()])
396         return bufferblock
397
398     @synchronized
399     def clear(self):
400         self._state = _BufferBlock.DELETED
401         self.owner = None
402         self.buffer_block = None
403         self.buffer_view = None
404
405     @synchronized
406     def repack_writes(self):
407         """Optimize buffer block by repacking segments in file sequence.
408
409         When the client makes random writes, they appear in the buffer block in
410         the sequence they were written rather than the sequence they appear in
411         the file.  This makes for inefficient, fragmented manifests.  Attempt
412         to optimize by repacking writes in file sequence.
413
414         """
415         if self._state != _BufferBlock.WRITABLE:
416             raise AssertionError("Cannot repack non-writable block")
417
418         segs = self.owner.segments()
419
420         # Collect the segments that reference the buffer block.
421         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
422
423         # Collect total data referenced by segments (could be smaller than
424         # bufferblock size if a portion of the file was written and
425         # then overwritten).
426         write_total = sum([s.range_size for s in bufferblock_segs])
427
428         if write_total < self.size() or len(bufferblock_segs) > 1:
429             # If there's more than one segment referencing this block, it is
430             # due to out-of-order writes and will produce a fragmented
431             # manifest, so try to optimize by re-packing into a new buffer.
432             contents = self.buffer_view[0:self.write_pointer].tobytes()
433             new_bb = _BufferBlock(None, write_total, None)
434             for t in bufferblock_segs:
435                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
436                 t.segment_offset = new_bb.size() - t.range_size
437
438             self.buffer_block = new_bb.buffer_block
439             self.buffer_view = new_bb.buffer_view
440             self.write_pointer = new_bb.write_pointer
441             self._locator = None
442             new_bb.clear()
443             self.owner.set_segments(segs)
444
445     def __repr__(self):
446         return "<BufferBlock %s>" % (self.blockid)
447
448
449 class NoopLock(object):
450     def __enter__(self):
451         return self
452
453     def __exit__(self, exc_type, exc_value, traceback):
454         pass
455
456     def acquire(self, blocking=False):
457         pass
458
459     def release(self):
460         pass
461
462
463 def must_be_writable(orig_func):
464     @functools.wraps(orig_func)
465     def must_be_writable_wrapper(self, *args, **kwargs):
466         if not self.writable():
467             raise IOError(errno.EROFS, "Collection is read-only.")
468         return orig_func(self, *args, **kwargs)
469     return must_be_writable_wrapper
470
471
472 class _BlockManager(object):
473     """BlockManager handles buffer blocks.
474
475     Also handles background block uploads, and background block prefetch for a
476     Collection of ArvadosFiles.
477
478     """
479
480     DEFAULT_PUT_THREADS = 2
481     DEFAULT_GET_THREADS = 2
482
483     def __init__(self, keep, copies=None, put_threads=None):
484         """keep: KeepClient object to use"""
485         self._keep = keep
486         self._bufferblocks = collections.OrderedDict()
487         self._put_queue = None
488         self._put_threads = None
489         self._prefetch_queue = None
490         self._prefetch_threads = None
491         self.lock = threading.Lock()
492         self.prefetch_enabled = True
493         if put_threads:
494             self.num_put_threads = put_threads
495         else:
496             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
497         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
498         self.copies = copies
499         self._pending_write_size = 0
500         self.threads_lock = threading.Lock()
501         self.padding_block = None
502
503     @synchronized
504     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
505         """Allocate a new, empty bufferblock in WRITABLE state and return it.
506
507         :blockid:
508           optional block identifier, otherwise one will be automatically assigned
509
510         :starting_capacity:
511           optional capacity, otherwise will use default capacity
512
513         :owner:
514           ArvadosFile that owns this block
515
516         """
517         return self._alloc_bufferblock(blockid, starting_capacity, owner)
518
519     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
520         if blockid is None:
521             blockid = str(uuid.uuid4())
522         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
523         self._bufferblocks[bufferblock.blockid] = bufferblock
524         return bufferblock
525
526     @synchronized
527     def dup_block(self, block, owner):
528         """Create a new bufferblock initialized with the content of an existing bufferblock.
529
530         :block:
531           the buffer block to copy.
532
533         :owner:
534           ArvadosFile that owns the new block
535
536         """
537         new_blockid = str(uuid.uuid4())
538         bufferblock = block.clone(new_blockid, owner)
539         self._bufferblocks[bufferblock.blockid] = bufferblock
540         return bufferblock
541
542     @synchronized
543     def is_bufferblock(self, locator):
544         return locator in self._bufferblocks
545
546     def _commit_bufferblock_worker(self):
547         """Background uploader thread."""
548
549         while True:
550             try:
551                 bufferblock = self._put_queue.get()
552                 if bufferblock is None:
553                     return
554
555                 if self.copies is None:
556                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
557                 else:
558                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
559                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
560
561             except Exception as e:
562                 bufferblock.set_state(_BufferBlock.ERROR, e)
563             finally:
564                 if self._put_queue is not None:
565                     self._put_queue.task_done()
566
567     def start_put_threads(self):
568         with self.threads_lock:
569             if self._put_threads is None:
570                 # Start uploader threads.
571
572                 # If we don't limit the Queue size, the upload queue can quickly
573                 # grow to take up gigabytes of RAM if the writing process is
574                 # generating data more quickly than it can be send to the Keep
575                 # servers.
576                 #
577                 # With two upload threads and a queue size of 2, this means up to 4
578                 # blocks pending.  If they are full 64 MiB blocks, that means up to
579                 # 256 MiB of internal buffering, which is the same size as the
580                 # default download block cache in KeepClient.
581                 self._put_queue = queue.Queue(maxsize=2)
582
583                 self._put_threads = []
584                 for i in range(0, self.num_put_threads):
585                     thread = threading.Thread(target=self._commit_bufferblock_worker)
586                     self._put_threads.append(thread)
587                     thread.daemon = True
588                     thread.start()
589
590     def _block_prefetch_worker(self):
591         """The background downloader thread."""
592         while True:
593             try:
594                 b = self._prefetch_queue.get()
595                 if b is None:
596                     return
597                 self._keep.get(b)
598             except Exception:
599                 _logger.exception("Exception doing block prefetch")
600
601     @synchronized
602     def start_get_threads(self):
603         if self._prefetch_threads is None:
604             self._prefetch_queue = queue.Queue()
605             self._prefetch_threads = []
606             for i in range(0, self.num_get_threads):
607                 thread = threading.Thread(target=self._block_prefetch_worker)
608                 self._prefetch_threads.append(thread)
609                 thread.daemon = True
610                 thread.start()
611
612
613     @synchronized
614     def stop_threads(self):
615         """Shut down and wait for background upload and download threads to finish."""
616
617         if self._put_threads is not None:
618             for t in self._put_threads:
619                 self._put_queue.put(None)
620             for t in self._put_threads:
621                 t.join()
622         self._put_threads = None
623         self._put_queue = None
624
625         if self._prefetch_threads is not None:
626             for t in self._prefetch_threads:
627                 self._prefetch_queue.put(None)
628             for t in self._prefetch_threads:
629                 t.join()
630         self._prefetch_threads = None
631         self._prefetch_queue = None
632
633     def __enter__(self):
634         return self
635
636     def __exit__(self, exc_type, exc_value, traceback):
637         self.stop_threads()
638
639     @synchronized
640     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
641         """Packs small blocks together before uploading"""
642
643         self._pending_write_size += closed_file_size
644
645         # Check if there are enough small blocks for filling up one in full
646         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
647             return
648
649         # Search blocks ready for getting packed together before being
650         # committed to Keep.
651         # A WRITABLE block always has an owner.
652         # A WRITABLE block with its owner.closed() implies that its
653         # size is <= KEEP_BLOCK_SIZE/2.
654         try:
655             small_blocks = [b for b in listvalues(self._bufferblocks)
656                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
657         except AttributeError:
658             # Writable blocks without owner shouldn't exist.
659             raise UnownedBlockError()
660
661         if len(small_blocks) <= 1:
662             # Not enough small blocks for repacking
663             return
664
665         for bb in small_blocks:
666             bb.repack_writes()
667
668         # Update the pending write size count with its true value, just in case
669         # some small file was opened, written and closed several times.
670         self._pending_write_size = sum([b.size() for b in small_blocks])
671
672         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
673             return
674
675         new_bb = self._alloc_bufferblock()
676         files = []
677         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
678             bb = small_blocks.pop(0)
679             self._pending_write_size -= bb.size()
680             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
681             files.append((bb, new_bb.write_pointer - bb.size()))
682
683         self.commit_bufferblock(new_bb, sync=sync)
684
685         for bb, new_bb_segment_offset in files:
686             newsegs = bb.owner.segments()
687             for s in newsegs:
688                 if s.locator == bb.blockid:
689                     s.locator = new_bb.locator()
690                     s.segment_offset = new_bb_segment_offset+s.segment_offset
691             bb.owner.set_segments(newsegs)
692             self._delete_bufferblock(bb.blockid)
693
694     def commit_bufferblock(self, block, sync):
695         """Initiate a background upload of a bufferblock.
696
697         :block:
698           The block object to upload
699
700         :sync:
701           If `sync` is True, upload the block synchronously.
702           If `sync` is False, upload the block asynchronously.  This will
703           return immediately unless the upload queue is at capacity, in
704           which case it will wait on an upload queue slot.
705
706         """
707         try:
708             # Mark the block as PENDING so to disallow any more appends.
709             block.set_state(_BufferBlock.PENDING)
710         except StateChangeError as e:
711             if e.state == _BufferBlock.PENDING:
712                 if sync:
713                     block.wait_for_commit.wait()
714                 else:
715                     return
716             if block.state() == _BufferBlock.COMMITTED:
717                 return
718             elif block.state() == _BufferBlock.ERROR:
719                 raise block.error
720             else:
721                 raise
722
723         if sync:
724             try:
725                 if self.copies is None:
726                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
727                 else:
728                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
729                 block.set_state(_BufferBlock.COMMITTED, loc)
730             except Exception as e:
731                 block.set_state(_BufferBlock.ERROR, e)
732                 raise
733         else:
734             self.start_put_threads()
735             self._put_queue.put(block)
736
737     @synchronized
738     def get_bufferblock(self, locator):
739         return self._bufferblocks.get(locator)
740
741     @synchronized
742     def get_padding_block(self):
743         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
744         when using truncate() to extend the size of a file.
745
746         For reference (and possible future optimization), the md5sum of the
747         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
748
749         """
750
751         if self.padding_block is None:
752             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
753             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
754             self.commit_bufferblock(self.padding_block, False)
755         return self.padding_block
756
757     @synchronized
758     def delete_bufferblock(self, locator):
759         self._delete_bufferblock(locator)
760
761     def _delete_bufferblock(self, locator):
762         bb = self._bufferblocks[locator]
763         bb.clear()
764         del self._bufferblocks[locator]
765
766     def get_block_contents(self, locator, num_retries, cache_only=False):
767         """Fetch a block.
768
769         First checks to see if the locator is a BufferBlock and return that, if
770         not, passes the request through to KeepClient.get().
771
772         """
773         with self.lock:
774             if locator in self._bufferblocks:
775                 bufferblock = self._bufferblocks[locator]
776                 if bufferblock.state() != _BufferBlock.COMMITTED:
777                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
778                 else:
779                     locator = bufferblock._locator
780         if cache_only:
781             return self._keep.get_from_cache(locator)
782         else:
783             return self._keep.get(locator, num_retries=num_retries)
784
785     def commit_all(self):
786         """Commit all outstanding buffer blocks.
787
788         This is a synchronous call, and will not return until all buffer blocks
789         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
790
791         """
792         self.repack_small_blocks(force=True, sync=True)
793
794         with self.lock:
795             items = listitems(self._bufferblocks)
796
797         for k,v in items:
798             if v.state() != _BufferBlock.COMMITTED and v.owner:
799                 v.owner.flush(sync=False)
800
801         with self.lock:
802             if self._put_queue is not None:
803                 self._put_queue.join()
804
805                 err = []
806                 for k,v in items:
807                     if v.state() == _BufferBlock.ERROR:
808                         err.append((v.locator(), v.error))
809                 if err:
810                     raise KeepWriteError("Error writing some blocks", err, label="block")
811
812         for k,v in items:
813             # flush again with sync=True to remove committed bufferblocks from
814             # the segments.
815             if v.owner:
816                 v.owner.flush(sync=True)
817
818     def block_prefetch(self, locator):
819         """Initiate a background download of a block.
820
821         This assumes that the underlying KeepClient implements a block cache,
822         so repeated requests for the same block will not result in repeated
823         downloads (unless the block is evicted from the cache.)  This method
824         does not block.
825
826         """
827
828         if not self.prefetch_enabled:
829             return
830
831         if self._keep.get_from_cache(locator) is not None:
832             return
833
834         with self.lock:
835             if locator in self._bufferblocks:
836                 return
837
838         self.start_get_threads()
839         self._prefetch_queue.put(locator)
840
841
842 class ArvadosFile(object):
843     """Represent a file in a Collection.
844
845     ArvadosFile manages the underlying representation of a file in Keep as a
846     sequence of segments spanning a set of blocks, and implements random
847     read/write access.
848
849     This object may be accessed from multiple threads.
850
851     """
852
853     def __init__(self, parent, name, stream=[], segments=[]):
854         """
855         ArvadosFile constructor.
856
857         :stream:
858           a list of Range objects representing a block stream
859
860         :segments:
861           a list of Range objects representing segments
862         """
863         self.parent = parent
864         self.name = name
865         self._writers = set()
866         self._committed = False
867         self._segments = []
868         self.lock = parent.root_collection().lock
869         for s in segments:
870             self._add_segment(stream, s.locator, s.range_size)
871         self._current_bblock = None
872
873     def writable(self):
874         return self.parent.writable()
875
876     @synchronized
877     def permission_expired(self, as_of_dt=None):
878         """Returns True if any of the segment's locators is expired"""
879         for r in self._segments:
880             if KeepLocator(r.locator).permission_expired(as_of_dt):
881                 return True
882         return False
883
884     @synchronized
885     def segments(self):
886         return copy.copy(self._segments)
887
888     @synchronized
889     def clone(self, new_parent, new_name):
890         """Make a copy of this file."""
891         cp = ArvadosFile(new_parent, new_name)
892         cp.replace_contents(self)
893         return cp
894
895     @must_be_writable
896     @synchronized
897     def replace_contents(self, other):
898         """Replace segments of this file with segments from another `ArvadosFile` object."""
899
900         map_loc = {}
901         self._segments = []
902         for other_segment in other.segments():
903             new_loc = other_segment.locator
904             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
905                 if other_segment.locator not in map_loc:
906                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
907                     if bufferblock.state() != _BufferBlock.WRITABLE:
908                         map_loc[other_segment.locator] = bufferblock.locator()
909                     else:
910                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
911                 new_loc = map_loc[other_segment.locator]
912
913             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
914
915         self.set_committed(False)
916
917     def __eq__(self, other):
918         if other is self:
919             return True
920         if not isinstance(other, ArvadosFile):
921             return False
922
923         othersegs = other.segments()
924         with self.lock:
925             if len(self._segments) != len(othersegs):
926                 return False
927             for i in range(0, len(othersegs)):
928                 seg1 = self._segments[i]
929                 seg2 = othersegs[i]
930                 loc1 = seg1.locator
931                 loc2 = seg2.locator
932
933                 if self.parent._my_block_manager().is_bufferblock(loc1):
934                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
935
936                 if other.parent._my_block_manager().is_bufferblock(loc2):
937                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
938
939                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
940                     seg1.range_start != seg2.range_start or
941                     seg1.range_size != seg2.range_size or
942                     seg1.segment_offset != seg2.segment_offset):
943                     return False
944
945         return True
946
947     def __ne__(self, other):
948         return not self.__eq__(other)
949
950     @synchronized
951     def set_segments(self, segs):
952         self._segments = segs
953
954     @synchronized
955     def set_committed(self, value=True):
956         """Set committed flag.
957
958         If value is True, set committed to be True.
959
960         If value is False, set committed to be False for this and all parents.
961         """
962         if value == self._committed:
963             return
964         self._committed = value
965         if self._committed is False and self.parent is not None:
966             self.parent.set_committed(False)
967
968     @synchronized
969     def committed(self):
970         """Get whether this is committed or not."""
971         return self._committed
972
973     @synchronized
974     def add_writer(self, writer):
975         """Add an ArvadosFileWriter reference to the list of writers"""
976         if isinstance(writer, ArvadosFileWriter):
977             self._writers.add(writer)
978
979     @synchronized
980     def remove_writer(self, writer, flush):
981         """
982         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
983         and do some block maintenance tasks.
984         """
985         self._writers.remove(writer)
986
987         if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
988             # File writer closed, not small enough for repacking
989             self.flush()
990         elif self.closed():
991             # All writers closed and size is adequate for repacking
992             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
993
994     def closed(self):
995         """
996         Get whether this is closed or not. When the writers list is empty, the file
997         is supposed to be closed.
998         """
999         return len(self._writers) == 0
1000
1001     @must_be_writable
1002     @synchronized
1003     def truncate(self, size):
1004         """Shrink or expand the size of the file.
1005
1006         If `size` is less than the size of the file, the file contents after
1007         `size` will be discarded.  If `size` is greater than the current size
1008         of the file, it will be filled with zero bytes.
1009
1010         """
1011         if size < self.size():
1012             new_segs = []
1013             for r in self._segments:
1014                 range_end = r.range_start+r.range_size
1015                 if r.range_start >= size:
1016                     # segment is past the trucate size, all done
1017                     break
1018                 elif size < range_end:
1019                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1020                     nr.segment_offset = r.segment_offset
1021                     new_segs.append(nr)
1022                     break
1023                 else:
1024                     new_segs.append(r)
1025
1026             self._segments = new_segs
1027             self.set_committed(False)
1028         elif size > self.size():
1029             padding = self.parent._my_block_manager().get_padding_block()
1030             diff = size - self.size()
1031             while diff > config.KEEP_BLOCK_SIZE:
1032                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1033                 diff -= config.KEEP_BLOCK_SIZE
1034             if diff > 0:
1035                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1036             self.set_committed(False)
1037         else:
1038             # size == self.size()
1039             pass
1040
1041     def readfrom(self, offset, size, num_retries, exact=False):
1042         """Read up to `size` bytes from the file starting at `offset`.
1043
1044         :exact:
1045          If False (default), return less data than requested if the read
1046          crosses a block boundary and the next block isn't cached.  If True,
1047          only return less data than requested when hitting EOF.
1048         """
1049
1050         with self.lock:
1051             if size == 0 or offset >= self.size():
1052                 return b''
1053             readsegs = locators_and_ranges(self._segments, offset, size)
1054             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1055
1056         locs = set()
1057         data = []
1058         for lr in readsegs:
1059             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1060             if block:
1061                 blockview = memoryview(block)
1062                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1063                 locs.add(lr.locator)
1064             else:
1065                 break
1066
1067         for lr in prefetch:
1068             if lr.locator not in locs:
1069                 self.parent._my_block_manager().block_prefetch(lr.locator)
1070                 locs.add(lr.locator)
1071
1072         return b''.join(data)
1073
1074     @must_be_writable
1075     @synchronized
1076     def writeto(self, offset, data, num_retries):
1077         """Write `data` to the file starting at `offset`.
1078
1079         This will update existing bytes and/or extend the size of the file as
1080         necessary.
1081
1082         """
1083         if not isinstance(data, bytes) and not isinstance(data, memoryview):
1084             data = data.encode()
1085         if len(data) == 0:
1086             return
1087
1088         if offset > self.size():
1089             self.truncate(offset)
1090
1091         if len(data) > config.KEEP_BLOCK_SIZE:
1092             # Chunk it up into smaller writes
1093             n = 0
1094             dataview = memoryview(data)
1095             while n < len(data):
1096                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1097                 n += config.KEEP_BLOCK_SIZE
1098             return
1099
1100         self.set_committed(False)
1101
1102         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1103             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1104
1105         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1106             self._current_bblock.repack_writes()
1107             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1108                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1109                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1110
1111         self._current_bblock.append(data)
1112
1113         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1114
1115         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1116
1117         return len(data)
1118
1119     @synchronized
1120     def flush(self, sync=True, num_retries=0):
1121         """Flush the current bufferblock to Keep.
1122
1123         :sync:
1124           If True, commit block synchronously, wait until buffer block has been written.
1125           If False, commit block asynchronously, return immediately after putting block into
1126           the keep put queue.
1127         """
1128         if self.committed():
1129             return
1130
1131         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1132             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1133                 self._current_bblock.repack_writes()
1134             if self._current_bblock.state() != _BufferBlock.DELETED:
1135                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1136
1137         if sync:
1138             to_delete = set()
1139             for s in self._segments:
1140                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1141                 if bb:
1142                     if bb.state() != _BufferBlock.COMMITTED:
1143                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1144                     to_delete.add(s.locator)
1145                     s.locator = bb.locator()
1146             for s in to_delete:
1147                self.parent._my_block_manager().delete_bufferblock(s)
1148
1149         self.parent.notify(MOD, self.parent, self.name, (self, self))
1150
1151     @must_be_writable
1152     @synchronized
1153     def add_segment(self, blocks, pos, size):
1154         """Add a segment to the end of the file.
1155
1156         `pos` and `offset` reference a section of the stream described by
1157         `blocks` (a list of Range objects)
1158
1159         """
1160         self._add_segment(blocks, pos, size)
1161
1162     def _add_segment(self, blocks, pos, size):
1163         """Internal implementation of add_segment."""
1164         self.set_committed(False)
1165         for lr in locators_and_ranges(blocks, pos, size):
1166             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1167             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1168             self._segments.append(r)
1169
1170     @synchronized
1171     def size(self):
1172         """Get the file size."""
1173         if self._segments:
1174             n = self._segments[-1]
1175             return n.range_start + n.range_size
1176         else:
1177             return 0
1178
1179     @synchronized
1180     def manifest_text(self, stream_name=".", portable_locators=False,
1181                       normalize=False, only_committed=False):
1182         buf = ""
1183         filestream = []
1184         for segment in self._segments:
1185             loc = segment.locator
1186             if self.parent._my_block_manager().is_bufferblock(loc):
1187                 if only_committed:
1188                     continue
1189                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1190             if portable_locators:
1191                 loc = KeepLocator(loc).stripped()
1192             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1193                                  segment.segment_offset, segment.range_size))
1194         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1195         buf += "\n"
1196         return buf
1197
1198     @must_be_writable
1199     @synchronized
1200     def _reparent(self, newparent, newname):
1201         self.set_committed(False)
1202         self.flush(sync=True)
1203         self.parent.remove(self.name)
1204         self.parent = newparent
1205         self.name = newname
1206         self.lock = self.parent.root_collection().lock
1207
1208
1209 class ArvadosFileReader(ArvadosFileReaderBase):
1210     """Wraps ArvadosFile in a file-like object supporting reading only.
1211
1212     Be aware that this class is NOT thread safe as there is no locking around
1213     updating file pointer.
1214
1215     """
1216
1217     def __init__(self, arvadosfile, mode="r", num_retries=None):
1218         super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1219         self.arvadosfile = arvadosfile
1220
1221     def size(self):
1222         return self.arvadosfile.size()
1223
1224     def stream_name(self):
1225         return self.arvadosfile.parent.stream_name()
1226
1227     @_FileLikeObjectBase._before_close
1228     @retry_method
1229     def read(self, size=None, num_retries=None):
1230         """Read up to `size` bytes from the file and return the result.
1231
1232         Starts at the current file position.  If `size` is None, read the
1233         entire remainder of the file.
1234         """
1235         if size is None:
1236             data = []
1237             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1238             while rd:
1239                 data.append(rd)
1240                 self._filepos += len(rd)
1241                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1242             return b''.join(data)
1243         else:
1244             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1245             self._filepos += len(data)
1246             return data
1247
1248     @_FileLikeObjectBase._before_close
1249     @retry_method
1250     def readfrom(self, offset, size, num_retries=None):
1251         """Read up to `size` bytes from the stream, starting at the specified file offset.
1252
1253         This method does not change the file position.
1254         """
1255         return self.arvadosfile.readfrom(offset, size, num_retries)
1256
1257     def flush(self):
1258         pass
1259
1260
1261 class ArvadosFileWriter(ArvadosFileReader):
1262     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1263
1264     Be aware that this class is NOT thread safe as there is no locking around
1265     updating file pointer.
1266
1267     """
1268
1269     def __init__(self, arvadosfile, mode, num_retries=None):
1270         super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1271         self.arvadosfile.add_writer(self)
1272
1273     def writable(self):
1274         return True
1275
1276     @_FileLikeObjectBase._before_close
1277     @retry_method
1278     def write(self, data, num_retries=None):
1279         if self.mode[0] == "a":
1280             self._filepos = self.size()
1281         self.arvadosfile.writeto(self._filepos, data, num_retries)
1282         self._filepos += len(data)
1283         return len(data)
1284
1285     @_FileLikeObjectBase._before_close
1286     @retry_method
1287     def writelines(self, seq, num_retries=None):
1288         for s in seq:
1289             self.write(s, num_retries=num_retries)
1290
1291     @_FileLikeObjectBase._before_close
1292     def truncate(self, size=None):
1293         if size is None:
1294             size = self._filepos
1295         self.arvadosfile.truncate(size)
1296
1297     @_FileLikeObjectBase._before_close
1298     def flush(self):
1299         self.arvadosfile.flush()
1300
1301     def close(self, flush=True):
1302         if not self.closed:
1303             self.arvadosfile.remove_writer(self, flush)
1304             super(ArvadosFileWriter, self).close()