Merge branch '11185-wb-disable-reuse'
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41
42 class UnownedBlockError(Exception):
43     """Raised when there's an writable block without an owner on the BlockManager."""
44     pass
45
46
47 class _FileLikeObjectBase(object):
48     def __init__(self, name, mode):
49         self.name = name
50         self.mode = mode
51         self.closed = False
52
53     @staticmethod
54     def _before_close(orig_func):
55         @functools.wraps(orig_func)
56         def before_close_wrapper(self, *args, **kwargs):
57             if self.closed:
58                 raise ValueError("I/O operation on closed stream file")
59             return orig_func(self, *args, **kwargs)
60         return before_close_wrapper
61
62     def __enter__(self):
63         return self
64
65     def __exit__(self, exc_type, exc_value, traceback):
66         try:
67             self.close()
68         except Exception:
69             if exc_type is None:
70                 raise
71
72     def close(self):
73         self.closed = True
74
75
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77     def __init__(self, name, mode, num_retries=None):
78         super(ArvadosFileReaderBase, self).__init__(name, mode)
79         self._filepos = 0L
80         self.num_retries = num_retries
81         self._readline_cache = (None, None)
82
83     def __iter__(self):
84         while True:
85             data = self.readline()
86             if not data:
87                 break
88             yield data
89
90     def decompressed_name(self):
91         return re.sub('\.(bz2|gz)$', '', self.name)
92
93     @_FileLikeObjectBase._before_close
94     def seek(self, pos, whence=os.SEEK_SET):
95         if whence == os.SEEK_CUR:
96             pos += self._filepos
97         elif whence == os.SEEK_END:
98             pos += self.size()
99         if pos < 0L:
100             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
101         self._filepos = pos
102         return self._filepos
103
104     def tell(self):
105         return self._filepos
106
107     def readable(self):
108         return True
109
110     def writable(self):
111         return False
112
113     def seekable(self):
114         return True
115
116     @_FileLikeObjectBase._before_close
117     @retry_method
118     def readall(self, size=2**20, num_retries=None):
119         while True:
120             data = self.read(size, num_retries=num_retries)
121             if data == '':
122                 break
123             yield data
124
125     @_FileLikeObjectBase._before_close
126     @retry_method
127     def readline(self, size=float('inf'), num_retries=None):
128         cache_pos, cache_data = self._readline_cache
129         if self.tell() == cache_pos:
130             data = [cache_data]
131             self._filepos += len(cache_data)
132         else:
133             data = ['']
134         data_size = len(data[-1])
135         while (data_size < size) and ('\n' not in data[-1]):
136             next_read = self.read(2 ** 20, num_retries=num_retries)
137             if not next_read:
138                 break
139             data.append(next_read)
140             data_size += len(next_read)
141         data = ''.join(data)
142         try:
143             nextline_index = data.index('\n') + 1
144         except ValueError:
145             nextline_index = len(data)
146         nextline_index = min(nextline_index, size)
147         self._filepos -= len(data) - nextline_index
148         self._readline_cache = (self.tell(), data[nextline_index:])
149         return data[:nextline_index]
150
151     @_FileLikeObjectBase._before_close
152     @retry_method
153     def decompress(self, decompress, size, num_retries=None):
154         for segment in self.readall(size, num_retries=num_retries):
155             data = decompress(segment)
156             if data:
157                 yield data
158
159     @_FileLikeObjectBase._before_close
160     @retry_method
161     def readall_decompressed(self, size=2**20, num_retries=None):
162         self.seek(0)
163         if self.name.endswith('.bz2'):
164             dc = bz2.BZ2Decompressor()
165             return self.decompress(dc.decompress, size,
166                                    num_retries=num_retries)
167         elif self.name.endswith('.gz'):
168             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
169             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
170                                    size, num_retries=num_retries)
171         else:
172             return self.readall(size, num_retries=num_retries)
173
174     @_FileLikeObjectBase._before_close
175     @retry_method
176     def readlines(self, sizehint=float('inf'), num_retries=None):
177         data = []
178         data_size = 0
179         for s in self.readall(num_retries=num_retries):
180             data.append(s)
181             data_size += len(s)
182             if data_size >= sizehint:
183                 break
184         return ''.join(data).splitlines(True)
185
186     def size(self):
187         raise IOError(errno.ENOSYS, "Not implemented")
188
189     def read(self, size, num_retries=None):
190         raise IOError(errno.ENOSYS, "Not implemented")
191
192     def readfrom(self, start, size, num_retries=None):
193         raise IOError(errno.ENOSYS, "Not implemented")
194
195
196 class StreamFileReader(ArvadosFileReaderBase):
197     class _NameAttribute(str):
198         # The Python file API provides a plain .name attribute.
199         # Older SDK provided a name() method.
200         # This class provides both, for maximum compatibility.
201         def __call__(self):
202             return self
203
204     def __init__(self, stream, segments, name):
205         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
206         self._stream = stream
207         self.segments = segments
208
209     def stream_name(self):
210         return self._stream.name()
211
212     def size(self):
213         n = self.segments[-1]
214         return n.range_start + n.range_size
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def read(self, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at the current file position"""
220         if size == 0:
221             return ''
222
223         data = ''
224         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
225         if available_chunks:
226             lr = available_chunks[0]
227             data = self._stream.readfrom(lr.locator+lr.segment_offset,
228                                           lr.segment_size,
229                                           num_retries=num_retries)
230
231         self._filepos += len(data)
232         return data
233
234     @_FileLikeObjectBase._before_close
235     @retry_method
236     def readfrom(self, start, size, num_retries=None):
237         """Read up to 'size' bytes from the stream, starting at 'start'"""
238         if size == 0:
239             return ''
240
241         data = []
242         for lr in locators_and_ranges(self.segments, start, size):
243             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
244                                               num_retries=num_retries))
245         return ''.join(data)
246
247     def as_manifest(self):
248         segs = []
249         for r in self.segments:
250             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
251         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
252
253
254 def synchronized(orig_func):
255     @functools.wraps(orig_func)
256     def synchronized_wrapper(self, *args, **kwargs):
257         with self.lock:
258             return orig_func(self, *args, **kwargs)
259     return synchronized_wrapper
260
261
262 class StateChangeError(Exception):
263     def __init__(self, message, state, nextstate):
264         super(StateChangeError, self).__init__(message)
265         self.state = state
266         self.nextstate = nextstate
267
268 class _BufferBlock(object):
269     """A stand-in for a Keep block that is in the process of being written.
270
271     Writers can append to it, get the size, and compute the Keep locator.
272     There are three valid states:
273
274     WRITABLE
275       Can append to block.
276
277     PENDING
278       Block is in the process of being uploaded to Keep, append is an error.
279
280     COMMITTED
281       The block has been written to Keep, its internal buffer has been
282       released, fetching the block will fetch it via keep client (since we
283       discarded the internal copy), and identifiers referring to the BufferBlock
284       can be replaced with the block locator.
285
286     """
287
288     WRITABLE = 0
289     PENDING = 1
290     COMMITTED = 2
291     ERROR = 3
292     DELETED = 4
293
294     def __init__(self, blockid, starting_capacity, owner):
295         """
296         :blockid:
297           the identifier for this block
298
299         :starting_capacity:
300           the initial buffer capacity
301
302         :owner:
303           ArvadosFile that owns this block
304
305         """
306         self.blockid = blockid
307         self.buffer_block = bytearray(starting_capacity)
308         self.buffer_view = memoryview(self.buffer_block)
309         self.write_pointer = 0
310         self._state = _BufferBlock.WRITABLE
311         self._locator = None
312         self.owner = owner
313         self.lock = threading.Lock()
314         self.wait_for_commit = threading.Event()
315         self.error = None
316
317     @synchronized
318     def append(self, data):
319         """Append some data to the buffer.
320
321         Only valid if the block is in WRITABLE state.  Implements an expanding
322         buffer, doubling capacity as needed to accomdate all the data.
323
324         """
325         if self._state == _BufferBlock.WRITABLE:
326             while (self.write_pointer+len(data)) > len(self.buffer_block):
327                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
328                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
329                 self.buffer_block = new_buffer_block
330                 self.buffer_view = memoryview(self.buffer_block)
331             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
332             self.write_pointer += len(data)
333             self._locator = None
334         else:
335             raise AssertionError("Buffer block is not writable")
336
337     STATE_TRANSITIONS = frozenset([
338             (WRITABLE, PENDING),
339             (PENDING, COMMITTED),
340             (PENDING, ERROR),
341             (ERROR, PENDING)])
342
343     @synchronized
344     def set_state(self, nextstate, val=None):
345         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
346             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
347         self._state = nextstate
348
349         if self._state == _BufferBlock.PENDING:
350             self.wait_for_commit.clear()
351
352         if self._state == _BufferBlock.COMMITTED:
353             self._locator = val
354             self.buffer_view = None
355             self.buffer_block = None
356             self.wait_for_commit.set()
357
358         if self._state == _BufferBlock.ERROR:
359             self.error = val
360             self.wait_for_commit.set()
361
362     @synchronized
363     def state(self):
364         return self._state
365
366     def size(self):
367         """The amount of data written to the buffer."""
368         return self.write_pointer
369
370     @synchronized
371     def locator(self):
372         """The Keep locator for this buffer's contents."""
373         if self._locator is None:
374             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
375         return self._locator
376
377     @synchronized
378     def clone(self, new_blockid, owner):
379         if self._state == _BufferBlock.COMMITTED:
380             raise AssertionError("Cannot duplicate committed buffer block")
381         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
382         bufferblock.append(self.buffer_view[0:self.size()])
383         return bufferblock
384
385     @synchronized
386     def clear(self):
387         self._state = _BufferBlock.DELETED
388         self.owner = None
389         self.buffer_block = None
390         self.buffer_view = None
391
392     @synchronized
393     def repack_writes(self):
394         """Optimize buffer block by repacking segments in file sequence.
395
396         When the client makes random writes, they appear in the buffer block in
397         the sequence they were written rather than the sequence they appear in
398         the file.  This makes for inefficient, fragmented manifests.  Attempt
399         to optimize by repacking writes in file sequence.
400
401         """
402         if self._state != _BufferBlock.WRITABLE:
403             raise AssertionError("Cannot repack non-writable block")
404
405         segs = self.owner.segments()
406
407         # Collect the segments that reference the buffer block.
408         bufferblock_segs = [s for s in segs if s.locator == self.blockid]
409
410         # Collect total data referenced by segments (could be smaller than
411         # bufferblock size if a portion of the file was written and
412         # then overwritten).
413         write_total = sum([s.range_size for s in bufferblock_segs])
414
415         if write_total < self.size() or len(bufferblock_segs) > 1:
416             # If there's more than one segment referencing this block, it is
417             # due to out-of-order writes and will produce a fragmented
418             # manifest, so try to optimize by re-packing into a new buffer.
419             contents = self.buffer_view[0:self.write_pointer].tobytes()
420             new_bb = _BufferBlock(None, write_total, None)
421             for t in bufferblock_segs:
422                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
423                 t.segment_offset = new_bb.size() - t.range_size
424
425             self.buffer_block = new_bb.buffer_block
426             self.buffer_view = new_bb.buffer_view
427             self.write_pointer = new_bb.write_pointer
428             self._locator = None
429             new_bb.clear()
430             self.owner.set_segments(segs)
431
432     def __repr__(self):
433         return "<BufferBlock %s>" % (self.blockid)
434
435
436 class NoopLock(object):
437     def __enter__(self):
438         return self
439
440     def __exit__(self, exc_type, exc_value, traceback):
441         pass
442
443     def acquire(self, blocking=False):
444         pass
445
446     def release(self):
447         pass
448
449
450 def must_be_writable(orig_func):
451     @functools.wraps(orig_func)
452     def must_be_writable_wrapper(self, *args, **kwargs):
453         if not self.writable():
454             raise IOError(errno.EROFS, "Collection is read-only.")
455         return orig_func(self, *args, **kwargs)
456     return must_be_writable_wrapper
457
458
459 class _BlockManager(object):
460     """BlockManager handles buffer blocks.
461
462     Also handles background block uploads, and background block prefetch for a
463     Collection of ArvadosFiles.
464
465     """
466
467     DEFAULT_PUT_THREADS = 2
468     DEFAULT_GET_THREADS = 2
469
470     def __init__(self, keep, copies=None, put_threads=None):
471         """keep: KeepClient object to use"""
472         self._keep = keep
473         self._bufferblocks = collections.OrderedDict()
474         self._put_queue = None
475         self._put_threads = None
476         self._prefetch_queue = None
477         self._prefetch_threads = None
478         self.lock = threading.Lock()
479         self.prefetch_enabled = True
480         if put_threads:
481             self.num_put_threads = put_threads
482         else:
483             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
484         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
485         self.copies = copies
486         self._pending_write_size = 0
487         self.threads_lock = threading.Lock()
488         self.padding_block = None
489
490     @synchronized
491     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
492         """Allocate a new, empty bufferblock in WRITABLE state and return it.
493
494         :blockid:
495           optional block identifier, otherwise one will be automatically assigned
496
497         :starting_capacity:
498           optional capacity, otherwise will use default capacity
499
500         :owner:
501           ArvadosFile that owns this block
502
503         """
504         return self._alloc_bufferblock(blockid, starting_capacity, owner)
505
506     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
507         if blockid is None:
508             blockid = str(uuid.uuid4())
509         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
510         self._bufferblocks[bufferblock.blockid] = bufferblock
511         return bufferblock
512
513     @synchronized
514     def dup_block(self, block, owner):
515         """Create a new bufferblock initialized with the content of an existing bufferblock.
516
517         :block:
518           the buffer block to copy.
519
520         :owner:
521           ArvadosFile that owns the new block
522
523         """
524         new_blockid = str(uuid.uuid4())
525         bufferblock = block.clone(new_blockid, owner)
526         self._bufferblocks[bufferblock.blockid] = bufferblock
527         return bufferblock
528
529     @synchronized
530     def is_bufferblock(self, locator):
531         return locator in self._bufferblocks
532
533     def _commit_bufferblock_worker(self):
534         """Background uploader thread."""
535
536         while True:
537             try:
538                 bufferblock = self._put_queue.get()
539                 if bufferblock is None:
540                     return
541
542                 if self.copies is None:
543                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
544                 else:
545                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
546                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
547
548             except Exception as e:
549                 bufferblock.set_state(_BufferBlock.ERROR, e)
550             finally:
551                 if self._put_queue is not None:
552                     self._put_queue.task_done()
553
554     def start_put_threads(self):
555         with self.threads_lock:
556             if self._put_threads is None:
557                 # Start uploader threads.
558
559                 # If we don't limit the Queue size, the upload queue can quickly
560                 # grow to take up gigabytes of RAM if the writing process is
561                 # generating data more quickly than it can be send to the Keep
562                 # servers.
563                 #
564                 # With two upload threads and a queue size of 2, this means up to 4
565                 # blocks pending.  If they are full 64 MiB blocks, that means up to
566                 # 256 MiB of internal buffering, which is the same size as the
567                 # default download block cache in KeepClient.
568                 self._put_queue = Queue.Queue(maxsize=2)
569
570                 self._put_threads = []
571                 for i in xrange(0, self.num_put_threads):
572                     thread = threading.Thread(target=self._commit_bufferblock_worker)
573                     self._put_threads.append(thread)
574                     thread.daemon = True
575                     thread.start()
576
577     def _block_prefetch_worker(self):
578         """The background downloader thread."""
579         while True:
580             try:
581                 b = self._prefetch_queue.get()
582                 if b is None:
583                     return
584                 self._keep.get(b)
585             except Exception:
586                 _logger.exception("Exception doing block prefetch")
587
588     @synchronized
589     def start_get_threads(self):
590         if self._prefetch_threads is None:
591             self._prefetch_queue = Queue.Queue()
592             self._prefetch_threads = []
593             for i in xrange(0, self.num_get_threads):
594                 thread = threading.Thread(target=self._block_prefetch_worker)
595                 self._prefetch_threads.append(thread)
596                 thread.daemon = True
597                 thread.start()
598
599
600     @synchronized
601     def stop_threads(self):
602         """Shut down and wait for background upload and download threads to finish."""
603
604         if self._put_threads is not None:
605             for t in self._put_threads:
606                 self._put_queue.put(None)
607             for t in self._put_threads:
608                 t.join()
609         self._put_threads = None
610         self._put_queue = None
611
612         if self._prefetch_threads is not None:
613             for t in self._prefetch_threads:
614                 self._prefetch_queue.put(None)
615             for t in self._prefetch_threads:
616                 t.join()
617         self._prefetch_threads = None
618         self._prefetch_queue = None
619
620     def __enter__(self):
621         return self
622
623     def __exit__(self, exc_type, exc_value, traceback):
624         self.stop_threads()
625
626     @synchronized
627     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
628         """Packs small blocks together before uploading"""
629
630         self._pending_write_size += closed_file_size
631
632         # Check if there are enough small blocks for filling up one in full
633         if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
634             return
635
636         # Search blocks ready for getting packed together before being committed to Keep.
637         # A WRITABLE block always has an owner.
638         # A WRITABLE block with its owner.closed() implies that it's
639         # size is <= KEEP_BLOCK_SIZE/2.
640         try:
641             small_blocks = [b for b in self._bufferblocks.values()
642                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
643         except AttributeError:
644             # Writable blocks without owner shouldn't exist.
645             raise UnownedBlockError()
646
647         if len(small_blocks) <= 1:
648             # Not enough small blocks for repacking
649             return
650
651         for bb in small_blocks:
652             bb.repack_writes()
653
654         # Update the pending write size count with its true value, just in case
655         # some small file was opened, written and closed several times.
656         self._pending_write_size = sum([b.size() for b in small_blocks])
657
658         if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
659             return
660
661         new_bb = self._alloc_bufferblock()
662         files = []
663         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
664             bb = small_blocks.pop(0)
665             self._pending_write_size -= bb.size()
666             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
667             files.append((bb, new_bb.write_pointer - bb.size()))
668
669         self.commit_bufferblock(new_bb, sync=sync)
670
671         for bb, new_bb_segment_offset in files:
672             newsegs = bb.owner.segments()
673             for s in newsegs:
674                 if s.locator == bb.blockid:
675                     s.locator = new_bb.locator()
676                     s.segment_offset = new_bb_segment_offset+s.segment_offset
677             bb.owner.set_segments(newsegs)
678             self._delete_bufferblock(bb.blockid)
679
680     def commit_bufferblock(self, block, sync):
681         """Initiate a background upload of a bufferblock.
682
683         :block:
684           The block object to upload
685
686         :sync:
687           If `sync` is True, upload the block synchronously.
688           If `sync` is False, upload the block asynchronously.  This will
689           return immediately unless the upload queue is at capacity, in
690           which case it will wait on an upload queue slot.
691
692         """
693         try:
694             # Mark the block as PENDING so to disallow any more appends.
695             block.set_state(_BufferBlock.PENDING)
696         except StateChangeError as e:
697             if e.state == _BufferBlock.PENDING:
698                 if sync:
699                     block.wait_for_commit.wait()
700                 else:
701                     return
702             if block.state() == _BufferBlock.COMMITTED:
703                 return
704             elif block.state() == _BufferBlock.ERROR:
705                 raise block.error
706             else:
707                 raise
708
709         if sync:
710             try:
711                 if self.copies is None:
712                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
713                 else:
714                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
715                 block.set_state(_BufferBlock.COMMITTED, loc)
716             except Exception as e:
717                 block.set_state(_BufferBlock.ERROR, e)
718                 raise
719         else:
720             self.start_put_threads()
721             self._put_queue.put(block)
722
723     @synchronized
724     def get_bufferblock(self, locator):
725         return self._bufferblocks.get(locator)
726
727     @synchronized
728     def get_padding_block(self):
729         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
730         when using truncate() to extend the size of a file.
731
732         For reference (and possible future optimization), the md5sum of the
733         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
734
735         """
736
737         if self.padding_block is None:
738             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
739             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
740             self.commit_bufferblock(self.padding_block, False)
741         return self.padding_block
742
743     @synchronized
744     def delete_bufferblock(self, locator):
745         self._delete_bufferblock(locator)
746
747     def _delete_bufferblock(self, locator):
748         bb = self._bufferblocks[locator]
749         bb.clear()
750         del self._bufferblocks[locator]
751
752     def get_block_contents(self, locator, num_retries, cache_only=False):
753         """Fetch a block.
754
755         First checks to see if the locator is a BufferBlock and return that, if
756         not, passes the request through to KeepClient.get().
757
758         """
759         with self.lock:
760             if locator in self._bufferblocks:
761                 bufferblock = self._bufferblocks[locator]
762                 if bufferblock.state() != _BufferBlock.COMMITTED:
763                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
764                 else:
765                     locator = bufferblock._locator
766         if cache_only:
767             return self._keep.get_from_cache(locator)
768         else:
769             return self._keep.get(locator, num_retries=num_retries)
770
771     def commit_all(self):
772         """Commit all outstanding buffer blocks.
773
774         This is a synchronous call, and will not return until all buffer blocks
775         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
776
777         """
778         self.repack_small_blocks(force=True, sync=True)
779
780         with self.lock:
781             items = self._bufferblocks.items()
782
783         for k,v in items:
784             if v.state() != _BufferBlock.COMMITTED and v.owner:
785                 v.owner.flush(sync=False)
786
787         with self.lock:
788             if self._put_queue is not None:
789                 self._put_queue.join()
790
791                 err = []
792                 for k,v in items:
793                     if v.state() == _BufferBlock.ERROR:
794                         err.append((v.locator(), v.error))
795                 if err:
796                     raise KeepWriteError("Error writing some blocks", err, label="block")
797
798         for k,v in items:
799             # flush again with sync=True to remove committed bufferblocks from
800             # the segments.
801             if v.owner:
802                 v.owner.flush(sync=True)
803
804     def block_prefetch(self, locator):
805         """Initiate a background download of a block.
806
807         This assumes that the underlying KeepClient implements a block cache,
808         so repeated requests for the same block will not result in repeated
809         downloads (unless the block is evicted from the cache.)  This method
810         does not block.
811
812         """
813
814         if not self.prefetch_enabled:
815             return
816
817         if self._keep.get_from_cache(locator) is not None:
818             return
819
820         with self.lock:
821             if locator in self._bufferblocks:
822                 return
823
824         self.start_get_threads()
825         self._prefetch_queue.put(locator)
826
827
828 class ArvadosFile(object):
829     """Represent a file in a Collection.
830
831     ArvadosFile manages the underlying representation of a file in Keep as a
832     sequence of segments spanning a set of blocks, and implements random
833     read/write access.
834
835     This object may be accessed from multiple threads.
836
837     """
838
839     def __init__(self, parent, name, stream=[], segments=[]):
840         """
841         ArvadosFile constructor.
842
843         :stream:
844           a list of Range objects representing a block stream
845
846         :segments:
847           a list of Range objects representing segments
848         """
849         self.parent = parent
850         self.name = name
851         self._writers = set()
852         self._committed = False
853         self._segments = []
854         self.lock = parent.root_collection().lock
855         for s in segments:
856             self._add_segment(stream, s.locator, s.range_size)
857         self._current_bblock = None
858
859     def writable(self):
860         return self.parent.writable()
861
862     @synchronized
863     def permission_expired(self, as_of_dt=None):
864         """Returns True if any of the segment's locators is expired"""
865         for r in self._segments:
866             if KeepLocator(r.locator).permission_expired(as_of_dt):
867                 return True
868         return False
869
870     @synchronized
871     def segments(self):
872         return copy.copy(self._segments)
873
874     @synchronized
875     def clone(self, new_parent, new_name):
876         """Make a copy of this file."""
877         cp = ArvadosFile(new_parent, new_name)
878         cp.replace_contents(self)
879         return cp
880
881     @must_be_writable
882     @synchronized
883     def replace_contents(self, other):
884         """Replace segments of this file with segments from another `ArvadosFile` object."""
885
886         map_loc = {}
887         self._segments = []
888         for other_segment in other.segments():
889             new_loc = other_segment.locator
890             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
891                 if other_segment.locator not in map_loc:
892                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
893                     if bufferblock.state() != _BufferBlock.WRITABLE:
894                         map_loc[other_segment.locator] = bufferblock.locator()
895                     else:
896                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
897                 new_loc = map_loc[other_segment.locator]
898
899             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
900
901         self.set_committed(False)
902
903     def __eq__(self, other):
904         if other is self:
905             return True
906         if not isinstance(other, ArvadosFile):
907             return False
908
909         othersegs = other.segments()
910         with self.lock:
911             if len(self._segments) != len(othersegs):
912                 return False
913             for i in xrange(0, len(othersegs)):
914                 seg1 = self._segments[i]
915                 seg2 = othersegs[i]
916                 loc1 = seg1.locator
917                 loc2 = seg2.locator
918
919                 if self.parent._my_block_manager().is_bufferblock(loc1):
920                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
921
922                 if other.parent._my_block_manager().is_bufferblock(loc2):
923                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
924
925                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
926                     seg1.range_start != seg2.range_start or
927                     seg1.range_size != seg2.range_size or
928                     seg1.segment_offset != seg2.segment_offset):
929                     return False
930
931         return True
932
933     def __ne__(self, other):
934         return not self.__eq__(other)
935
936     @synchronized
937     def set_segments(self, segs):
938         self._segments = segs
939
940     @synchronized
941     def set_committed(self, value=True):
942         """Set committed flag.
943
944         If value is True, set committed to be True.
945
946         If value is False, set committed to be False for this and all parents.
947         """
948         if value == self._committed:
949             return
950         self._committed = value
951         if self._committed is False and self.parent is not None:
952             self.parent.set_committed(False)
953
954     @synchronized
955     def committed(self):
956         """Get whether this is committed or not."""
957         return self._committed
958
959     @synchronized
960     def add_writer(self, writer):
961         """Add an ArvadosFileWriter reference to the list of writers"""
962         if isinstance(writer, ArvadosFileWriter):
963             self._writers.add(writer)
964
965     @synchronized
966     def remove_writer(self, writer, flush):
967         """
968         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
969         and do some block maintenance tasks.
970         """
971         self._writers.remove(writer)
972
973         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
974             # File writer closed, not small enough for repacking
975             self.flush()
976         elif self.closed():
977             # All writers closed and size is adequate for repacking
978             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
979
980     def closed(self):
981         """
982         Get whether this is closed or not. When the writers list is empty, the file
983         is supposed to be closed.
984         """
985         return len(self._writers) == 0
986
987     @must_be_writable
988     @synchronized
989     def truncate(self, size):
990         """Shrink or expand the size of the file.
991
992         If `size` is less than the size of the file, the file contents after
993         `size` will be discarded.  If `size` is greater than the current size
994         of the file, it will be filled with zero bytes.
995
996         """
997         if size < self.size():
998             new_segs = []
999             for r in self._segments:
1000                 range_end = r.range_start+r.range_size
1001                 if r.range_start >= size:
1002                     # segment is past the trucate size, all done
1003                     break
1004                 elif size < range_end:
1005                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1006                     nr.segment_offset = r.segment_offset
1007                     new_segs.append(nr)
1008                     break
1009                 else:
1010                     new_segs.append(r)
1011
1012             self._segments = new_segs
1013             self.set_committed(False)
1014         elif size > self.size():
1015             padding = self.parent._my_block_manager().get_padding_block()
1016             diff = size - self.size()
1017             while diff > config.KEEP_BLOCK_SIZE:
1018                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1019                 diff -= config.KEEP_BLOCK_SIZE
1020             if diff > 0:
1021                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1022             self.set_committed(False)
1023         else:
1024             # size == self.size()
1025             pass
1026
1027     def readfrom(self, offset, size, num_retries, exact=False):
1028         """Read up to `size` bytes from the file starting at `offset`.
1029
1030         :exact:
1031          If False (default), return less data than requested if the read
1032          crosses a block boundary and the next block isn't cached.  If True,
1033          only return less data than requested when hitting EOF.
1034         """
1035
1036         with self.lock:
1037             if size == 0 or offset >= self.size():
1038                 return ''
1039             readsegs = locators_and_ranges(self._segments, offset, size)
1040             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1041
1042         locs = set()
1043         data = []
1044         for lr in readsegs:
1045             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1046             if block:
1047                 blockview = memoryview(block)
1048                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1049                 locs.add(lr.locator)
1050             else:
1051                 break
1052
1053         for lr in prefetch:
1054             if lr.locator not in locs:
1055                 self.parent._my_block_manager().block_prefetch(lr.locator)
1056                 locs.add(lr.locator)
1057
1058         return ''.join(data)
1059
1060     @must_be_writable
1061     @synchronized
1062     def writeto(self, offset, data, num_retries):
1063         """Write `data` to the file starting at `offset`.
1064
1065         This will update existing bytes and/or extend the size of the file as
1066         necessary.
1067
1068         """
1069         if len(data) == 0:
1070             return
1071
1072         if offset > self.size():
1073             self.truncate(offset)
1074
1075         if len(data) > config.KEEP_BLOCK_SIZE:
1076             # Chunk it up into smaller writes
1077             n = 0
1078             dataview = memoryview(data)
1079             while n < len(data):
1080                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1081                 n += config.KEEP_BLOCK_SIZE
1082             return
1083
1084         self.set_committed(False)
1085
1086         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1087             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1088
1089         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1090             self._current_bblock.repack_writes()
1091             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1092                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1093                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1094
1095         self._current_bblock.append(data)
1096
1097         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1098
1099         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1100
1101         return len(data)
1102
1103     @synchronized
1104     def flush(self, sync=True, num_retries=0):
1105         """Flush the current bufferblock to Keep.
1106
1107         :sync:
1108           If True, commit block synchronously, wait until buffer block has been written.
1109           If False, commit block asynchronously, return immediately after putting block into
1110           the keep put queue.
1111         """
1112         if self.committed():
1113             return
1114
1115         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1116             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1117                 self._current_bblock.repack_writes()
1118             if self._current_bblock.state() != _BufferBlock.DELETED:
1119                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1120
1121         if sync:
1122             to_delete = set()
1123             for s in self._segments:
1124                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1125                 if bb:
1126                     if bb.state() != _BufferBlock.COMMITTED:
1127                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1128                     to_delete.add(s.locator)
1129                     s.locator = bb.locator()
1130             for s in to_delete:
1131                self.parent._my_block_manager().delete_bufferblock(s)
1132
1133         self.parent.notify(MOD, self.parent, self.name, (self, self))
1134
1135     @must_be_writable
1136     @synchronized
1137     def add_segment(self, blocks, pos, size):
1138         """Add a segment to the end of the file.
1139
1140         `pos` and `offset` reference a section of the stream described by
1141         `blocks` (a list of Range objects)
1142
1143         """
1144         self._add_segment(blocks, pos, size)
1145
1146     def _add_segment(self, blocks, pos, size):
1147         """Internal implementation of add_segment."""
1148         self.set_committed(False)
1149         for lr in locators_and_ranges(blocks, pos, size):
1150             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1151             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1152             self._segments.append(r)
1153
1154     @synchronized
1155     def size(self):
1156         """Get the file size."""
1157         if self._segments:
1158             n = self._segments[-1]
1159             return n.range_start + n.range_size
1160         else:
1161             return 0
1162
1163     @synchronized
1164     def manifest_text(self, stream_name=".", portable_locators=False,
1165                       normalize=False, only_committed=False):
1166         buf = ""
1167         filestream = []
1168         for segment in self._segments:
1169             loc = segment.locator
1170             if self.parent._my_block_manager().is_bufferblock(loc):
1171                 if only_committed:
1172                     continue
1173                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1174             if portable_locators:
1175                 loc = KeepLocator(loc).stripped()
1176             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1177                                  segment.segment_offset, segment.range_size))
1178         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1179         buf += "\n"
1180         return buf
1181
1182     @must_be_writable
1183     @synchronized
1184     def _reparent(self, newparent, newname):
1185         self.set_committed(False)
1186         self.flush(sync=True)
1187         self.parent.remove(self.name)
1188         self.parent = newparent
1189         self.name = newname
1190         self.lock = self.parent.root_collection().lock
1191
1192
1193 class ArvadosFileReader(ArvadosFileReaderBase):
1194     """Wraps ArvadosFile in a file-like object supporting reading only.
1195
1196     Be aware that this class is NOT thread safe as there is no locking around
1197     updating file pointer.
1198
1199     """
1200
1201     def __init__(self, arvadosfile, num_retries=None):
1202         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1203         self.arvadosfile = arvadosfile
1204
1205     def size(self):
1206         return self.arvadosfile.size()
1207
1208     def stream_name(self):
1209         return self.arvadosfile.parent.stream_name()
1210
1211     @_FileLikeObjectBase._before_close
1212     @retry_method
1213     def read(self, size=None, num_retries=None):
1214         """Read up to `size` bytes from the file and return the result.
1215
1216         Starts at the current file position.  If `size` is None, read the
1217         entire remainder of the file.
1218         """
1219         if size is None:
1220             data = []
1221             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1222             while rd:
1223                 data.append(rd)
1224                 self._filepos += len(rd)
1225                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1226             return ''.join(data)
1227         else:
1228             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1229             self._filepos += len(data)
1230             return data
1231
1232     @_FileLikeObjectBase._before_close
1233     @retry_method
1234     def readfrom(self, offset, size, num_retries=None):
1235         """Read up to `size` bytes from the stream, starting at the specified file offset.
1236
1237         This method does not change the file position.
1238         """
1239         return self.arvadosfile.readfrom(offset, size, num_retries)
1240
1241     def flush(self):
1242         pass
1243
1244
1245 class ArvadosFileWriter(ArvadosFileReader):
1246     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1247
1248     Be aware that this class is NOT thread safe as there is no locking around
1249     updating file pointer.
1250
1251     """
1252
1253     def __init__(self, arvadosfile, mode, num_retries=None):
1254         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1255         self.mode = mode
1256         self.arvadosfile.add_writer(self)
1257
1258     def writable(self):
1259         return True
1260
1261     @_FileLikeObjectBase._before_close
1262     @retry_method
1263     def write(self, data, num_retries=None):
1264         if self.mode[0] == "a":
1265             self.arvadosfile.writeto(self.size(), data, num_retries)
1266         else:
1267             self.arvadosfile.writeto(self._filepos, data, num_retries)
1268             self._filepos += len(data)
1269         return len(data)
1270
1271     @_FileLikeObjectBase._before_close
1272     @retry_method
1273     def writelines(self, seq, num_retries=None):
1274         for s in seq:
1275             self.write(s, num_retries=num_retries)
1276
1277     @_FileLikeObjectBase._before_close
1278     def truncate(self, size=None):
1279         if size is None:
1280             size = self._filepos
1281         self.arvadosfile.truncate(size)
1282
1283     @_FileLikeObjectBase._before_close
1284     def flush(self):
1285         self.arvadosfile.flush()
1286
1287     def close(self, flush=True):
1288         if not self.closed:
1289             self.arvadosfile.remove_writer(self, flush)
1290             super(ArvadosFileWriter, self).close()