11507: Put bufferblocks into DELETED state when deleted so they don't get
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41
42 class UnownedBlockError(Exception):
43     """Raised when there's an writable block without an owner on the BlockManager."""
44     pass
45
46
47 class _FileLikeObjectBase(object):
48     def __init__(self, name, mode):
49         self.name = name
50         self.mode = mode
51         self.closed = False
52
53     @staticmethod
54     def _before_close(orig_func):
55         @functools.wraps(orig_func)
56         def before_close_wrapper(self, *args, **kwargs):
57             if self.closed:
58                 raise ValueError("I/O operation on closed stream file")
59             return orig_func(self, *args, **kwargs)
60         return before_close_wrapper
61
62     def __enter__(self):
63         return self
64
65     def __exit__(self, exc_type, exc_value, traceback):
66         try:
67             self.close()
68         except Exception:
69             if exc_type is None:
70                 raise
71
72     def close(self):
73         self.closed = True
74
75
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77     def __init__(self, name, mode, num_retries=None):
78         super(ArvadosFileReaderBase, self).__init__(name, mode)
79         self._filepos = 0L
80         self.num_retries = num_retries
81         self._readline_cache = (None, None)
82
83     def __iter__(self):
84         while True:
85             data = self.readline()
86             if not data:
87                 break
88             yield data
89
90     def decompressed_name(self):
91         return re.sub('\.(bz2|gz)$', '', self.name)
92
93     @_FileLikeObjectBase._before_close
94     def seek(self, pos, whence=os.SEEK_SET):
95         if whence == os.SEEK_CUR:
96             pos += self._filepos
97         elif whence == os.SEEK_END:
98             pos += self.size()
99         if pos < 0L:
100             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
101         self._filepos = pos
102         return self._filepos
103
104     def tell(self):
105         return self._filepos
106
107     def readable(self):
108         return True
109
110     def writable(self):
111         return False
112
113     def seekable(self):
114         return True
115
116     @_FileLikeObjectBase._before_close
117     @retry_method
118     def readall(self, size=2**20, num_retries=None):
119         while True:
120             data = self.read(size, num_retries=num_retries)
121             if data == '':
122                 break
123             yield data
124
125     @_FileLikeObjectBase._before_close
126     @retry_method
127     def readline(self, size=float('inf'), num_retries=None):
128         cache_pos, cache_data = self._readline_cache
129         if self.tell() == cache_pos:
130             data = [cache_data]
131             self._filepos += len(cache_data)
132         else:
133             data = ['']
134         data_size = len(data[-1])
135         while (data_size < size) and ('\n' not in data[-1]):
136             next_read = self.read(2 ** 20, num_retries=num_retries)
137             if not next_read:
138                 break
139             data.append(next_read)
140             data_size += len(next_read)
141         data = ''.join(data)
142         try:
143             nextline_index = data.index('\n') + 1
144         except ValueError:
145             nextline_index = len(data)
146         nextline_index = min(nextline_index, size)
147         self._filepos -= len(data) - nextline_index
148         self._readline_cache = (self.tell(), data[nextline_index:])
149         return data[:nextline_index]
150
151     @_FileLikeObjectBase._before_close
152     @retry_method
153     def decompress(self, decompress, size, num_retries=None):
154         for segment in self.readall(size, num_retries=num_retries):
155             data = decompress(segment)
156             if data:
157                 yield data
158
159     @_FileLikeObjectBase._before_close
160     @retry_method
161     def readall_decompressed(self, size=2**20, num_retries=None):
162         self.seek(0)
163         if self.name.endswith('.bz2'):
164             dc = bz2.BZ2Decompressor()
165             return self.decompress(dc.decompress, size,
166                                    num_retries=num_retries)
167         elif self.name.endswith('.gz'):
168             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
169             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
170                                    size, num_retries=num_retries)
171         else:
172             return self.readall(size, num_retries=num_retries)
173
174     @_FileLikeObjectBase._before_close
175     @retry_method
176     def readlines(self, sizehint=float('inf'), num_retries=None):
177         data = []
178         data_size = 0
179         for s in self.readall(num_retries=num_retries):
180             data.append(s)
181             data_size += len(s)
182             if data_size >= sizehint:
183                 break
184         return ''.join(data).splitlines(True)
185
186     def size(self):
187         raise IOError(errno.ENOSYS, "Not implemented")
188
189     def read(self, size, num_retries=None):
190         raise IOError(errno.ENOSYS, "Not implemented")
191
192     def readfrom(self, start, size, num_retries=None):
193         raise IOError(errno.ENOSYS, "Not implemented")
194
195
196 class StreamFileReader(ArvadosFileReaderBase):
197     class _NameAttribute(str):
198         # The Python file API provides a plain .name attribute.
199         # Older SDK provided a name() method.
200         # This class provides both, for maximum compatibility.
201         def __call__(self):
202             return self
203
204     def __init__(self, stream, segments, name):
205         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
206         self._stream = stream
207         self.segments = segments
208
209     def stream_name(self):
210         return self._stream.name()
211
212     def size(self):
213         n = self.segments[-1]
214         return n.range_start + n.range_size
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def read(self, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at the current file position"""
220         if size == 0:
221             return ''
222
223         data = ''
224         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
225         if available_chunks:
226             lr = available_chunks[0]
227             data = self._stream.readfrom(lr.locator+lr.segment_offset,
228                                           lr.segment_size,
229                                           num_retries=num_retries)
230
231         self._filepos += len(data)
232         return data
233
234     @_FileLikeObjectBase._before_close
235     @retry_method
236     def readfrom(self, start, size, num_retries=None):
237         """Read up to 'size' bytes from the stream, starting at 'start'"""
238         if size == 0:
239             return ''
240
241         data = []
242         for lr in locators_and_ranges(self.segments, start, size):
243             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
244                                               num_retries=num_retries))
245         return ''.join(data)
246
247     def as_manifest(self):
248         segs = []
249         for r in self.segments:
250             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
251         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
252
253
254 def synchronized(orig_func):
255     @functools.wraps(orig_func)
256     def synchronized_wrapper(self, *args, **kwargs):
257         with self.lock:
258             return orig_func(self, *args, **kwargs)
259     return synchronized_wrapper
260
261
262 class StateChangeError(Exception):
263     def __init__(self, message, state, nextstate):
264         super(StateChangeError, self).__init__(message)
265         self.state = state
266         self.nextstate = nextstate
267
268 class _BufferBlock(object):
269     """A stand-in for a Keep block that is in the process of being written.
270
271     Writers can append to it, get the size, and compute the Keep locator.
272     There are three valid states:
273
274     WRITABLE
275       Can append to block.
276
277     PENDING
278       Block is in the process of being uploaded to Keep, append is an error.
279
280     COMMITTED
281       The block has been written to Keep, its internal buffer has been
282       released, fetching the block will fetch it via keep client (since we
283       discarded the internal copy), and identifiers referring to the BufferBlock
284       can be replaced with the block locator.
285
286     """
287
288     WRITABLE = 0
289     PENDING = 1
290     COMMITTED = 2
291     ERROR = 3
292     DELETED = 4
293
294     def __init__(self, blockid, starting_capacity, owner):
295         """
296         :blockid:
297           the identifier for this block
298
299         :starting_capacity:
300           the initial buffer capacity
301
302         :owner:
303           ArvadosFile that owns this block
304
305         """
306         self.blockid = blockid
307         self.buffer_block = bytearray(starting_capacity)
308         self.buffer_view = memoryview(self.buffer_block)
309         self.write_pointer = 0
310         self._state = _BufferBlock.WRITABLE
311         self._locator = None
312         self.owner = owner
313         self.lock = threading.Lock()
314         self.wait_for_commit = threading.Event()
315         self.error = None
316
317     @synchronized
318     def append(self, data):
319         """Append some data to the buffer.
320
321         Only valid if the block is in WRITABLE state.  Implements an expanding
322         buffer, doubling capacity as needed to accomdate all the data.
323
324         """
325         if self._state == _BufferBlock.WRITABLE:
326             while (self.write_pointer+len(data)) > len(self.buffer_block):
327                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
328                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
329                 self.buffer_block = new_buffer_block
330                 self.buffer_view = memoryview(self.buffer_block)
331             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
332             self.write_pointer += len(data)
333             self._locator = None
334         else:
335             raise AssertionError("Buffer block is not writable")
336
337     STATE_TRANSITIONS = frozenset([
338             (WRITABLE, PENDING),
339             (PENDING, COMMITTED),
340             (PENDING, ERROR),
341             (ERROR, PENDING)])
342
343     @synchronized
344     def set_state(self, nextstate, val=None):
345         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
346             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
347         self._state = nextstate
348
349         if self._state == _BufferBlock.PENDING:
350             self.wait_for_commit.clear()
351
352         if self._state == _BufferBlock.COMMITTED:
353             self._locator = val
354             self.buffer_view = None
355             self.buffer_block = None
356             self.wait_for_commit.set()
357
358         if self._state == _BufferBlock.ERROR:
359             self.error = val
360             self.wait_for_commit.set()
361
362     @synchronized
363     def state(self):
364         return self._state
365
366     def size(self):
367         """The amount of data written to the buffer."""
368         return self.write_pointer
369
370     @synchronized
371     def locator(self):
372         """The Keep locator for this buffer's contents."""
373         if self._locator is None:
374             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
375         return self._locator
376
377     @synchronized
378     def clone(self, new_blockid, owner):
379         if self._state == _BufferBlock.COMMITTED:
380             raise AssertionError("Cannot duplicate committed buffer block")
381         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
382         bufferblock.append(self.buffer_view[0:self.size()])
383         return bufferblock
384
385     @synchronized
386     def clear(self):
387         self._state = _BufferBlock.DELETED
388         self.owner = None
389         self.buffer_block = None
390         self.buffer_view = None
391
392
393 class NoopLock(object):
394     def __enter__(self):
395         return self
396
397     def __exit__(self, exc_type, exc_value, traceback):
398         pass
399
400     def acquire(self, blocking=False):
401         pass
402
403     def release(self):
404         pass
405
406
407 def must_be_writable(orig_func):
408     @functools.wraps(orig_func)
409     def must_be_writable_wrapper(self, *args, **kwargs):
410         if not self.writable():
411             raise IOError(errno.EROFS, "Collection is read-only.")
412         return orig_func(self, *args, **kwargs)
413     return must_be_writable_wrapper
414
415
416 class _BlockManager(object):
417     """BlockManager handles buffer blocks.
418
419     Also handles background block uploads, and background block prefetch for a
420     Collection of ArvadosFiles.
421
422     """
423
424     DEFAULT_PUT_THREADS = 2
425     DEFAULT_GET_THREADS = 2
426
427     def __init__(self, keep, copies=None, put_threads=None):
428         """keep: KeepClient object to use"""
429         self._keep = keep
430         self._bufferblocks = collections.OrderedDict()
431         self._put_queue = None
432         self._put_threads = None
433         self._prefetch_queue = None
434         self._prefetch_threads = None
435         self.lock = threading.Lock()
436         self.prefetch_enabled = True
437         if put_threads:
438             self.num_put_threads = put_threads
439         else:
440             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
441         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
442         self.copies = copies
443         self._pending_write_size = 0
444         self.threads_lock = threading.Lock()
445         self.padding_block = None
446
447     @synchronized
448     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
449         """Allocate a new, empty bufferblock in WRITABLE state and return it.
450
451         :blockid:
452           optional block identifier, otherwise one will be automatically assigned
453
454         :starting_capacity:
455           optional capacity, otherwise will use default capacity
456
457         :owner:
458           ArvadosFile that owns this block
459
460         """
461         return self._alloc_bufferblock(blockid, starting_capacity, owner)
462
463     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
464         if blockid is None:
465             blockid = str(uuid.uuid4())
466         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
467         self._bufferblocks[bufferblock.blockid] = bufferblock
468         return bufferblock
469
470     @synchronized
471     def dup_block(self, block, owner):
472         """Create a new bufferblock initialized with the content of an existing bufferblock.
473
474         :block:
475           the buffer block to copy.
476
477         :owner:
478           ArvadosFile that owns the new block
479
480         """
481         new_blockid = str(uuid.uuid4())
482         bufferblock = block.clone(new_blockid, owner)
483         self._bufferblocks[bufferblock.blockid] = bufferblock
484         return bufferblock
485
486     @synchronized
487     def is_bufferblock(self, locator):
488         return locator in self._bufferblocks
489
490     def _commit_bufferblock_worker(self):
491         """Background uploader thread."""
492
493         while True:
494             try:
495                 bufferblock = self._put_queue.get()
496                 if bufferblock is None:
497                     return
498
499                 if self.copies is None:
500                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
501                 else:
502                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
503                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
504
505             except Exception as e:
506                 bufferblock.set_state(_BufferBlock.ERROR, e)
507             finally:
508                 if self._put_queue is not None:
509                     self._put_queue.task_done()
510
511     def start_put_threads(self):
512         with self.threads_lock:
513             if self._put_threads is None:
514                 # Start uploader threads.
515
516                 # If we don't limit the Queue size, the upload queue can quickly
517                 # grow to take up gigabytes of RAM if the writing process is
518                 # generating data more quickly than it can be send to the Keep
519                 # servers.
520                 #
521                 # With two upload threads and a queue size of 2, this means up to 4
522                 # blocks pending.  If they are full 64 MiB blocks, that means up to
523                 # 256 MiB of internal buffering, which is the same size as the
524                 # default download block cache in KeepClient.
525                 self._put_queue = Queue.Queue(maxsize=2)
526
527                 self._put_threads = []
528                 for i in xrange(0, self.num_put_threads):
529                     thread = threading.Thread(target=self._commit_bufferblock_worker)
530                     self._put_threads.append(thread)
531                     thread.daemon = True
532                     thread.start()
533
534     def _block_prefetch_worker(self):
535         """The background downloader thread."""
536         while True:
537             try:
538                 b = self._prefetch_queue.get()
539                 if b is None:
540                     return
541                 self._keep.get(b)
542             except Exception:
543                 _logger.exception("Exception doing block prefetch")
544
545     @synchronized
546     def start_get_threads(self):
547         if self._prefetch_threads is None:
548             self._prefetch_queue = Queue.Queue()
549             self._prefetch_threads = []
550             for i in xrange(0, self.num_get_threads):
551                 thread = threading.Thread(target=self._block_prefetch_worker)
552                 self._prefetch_threads.append(thread)
553                 thread.daemon = True
554                 thread.start()
555
556
557     @synchronized
558     def stop_threads(self):
559         """Shut down and wait for background upload and download threads to finish."""
560
561         if self._put_threads is not None:
562             for t in self._put_threads:
563                 self._put_queue.put(None)
564             for t in self._put_threads:
565                 t.join()
566         self._put_threads = None
567         self._put_queue = None
568
569         if self._prefetch_threads is not None:
570             for t in self._prefetch_threads:
571                 self._prefetch_queue.put(None)
572             for t in self._prefetch_threads:
573                 t.join()
574         self._prefetch_threads = None
575         self._prefetch_queue = None
576
577     def __enter__(self):
578         return self
579
580     def __exit__(self, exc_type, exc_value, traceback):
581         self.stop_threads()
582
583     @synchronized
584     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
585         """Packs small blocks together before uploading"""
586         self._pending_write_size += closed_file_size
587
588         # Check if there are enough small blocks for filling up one in full
589         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
590
591             # Search blocks ready for getting packed together before being committed to Keep.
592             # A WRITABLE block always has an owner.
593             # A WRITABLE block with its owner.closed() implies that it's
594             # size is <= KEEP_BLOCK_SIZE/2.
595             try:
596                 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
597             except AttributeError:
598                 # Writable blocks without owner shouldn't exist.
599                 raise UnownedBlockError()
600
601             if len(small_blocks) <= 1:
602                 # Not enough small blocks for repacking
603                 return
604
605             # Update the pending write size count with its true value, just in case
606             # some small file was opened, written and closed several times.
607             self._pending_write_size = sum([b.size() for b in small_blocks])
608             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
609                 return
610
611             new_bb = self._alloc_bufferblock()
612             files = []
613             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
614                 bb = small_blocks.pop(0)
615                 arvfile = bb.owner
616                 self._pending_write_size -= bb.size()
617                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
618                 files.append((bb, new_bb.write_pointer - bb.size()))
619
620             self.commit_bufferblock(new_bb, sync=sync)
621
622             for fn in files:
623                 bb = fn[0]
624                 bb.owner.set_segments([Range(new_bb.locator(), 0, bb.size(), fn[1])])
625                 self._delete_bufferblock(bb.blockid)
626
627     def commit_bufferblock(self, block, sync):
628         """Initiate a background upload of a bufferblock.
629
630         :block:
631           The block object to upload
632
633         :sync:
634           If `sync` is True, upload the block synchronously.
635           If `sync` is False, upload the block asynchronously.  This will
636           return immediately unless the upload queue is at capacity, in
637           which case it will wait on an upload queue slot.
638
639         """
640         try:
641             # Mark the block as PENDING so to disallow any more appends.
642             block.set_state(_BufferBlock.PENDING)
643         except StateChangeError as e:
644             if e.state == _BufferBlock.PENDING:
645                 if sync:
646                     block.wait_for_commit.wait()
647                 else:
648                     return
649             if block.state() == _BufferBlock.COMMITTED:
650                 return
651             elif block.state() == _BufferBlock.ERROR:
652                 raise block.error
653             else:
654                 raise
655
656         if sync:
657             try:
658                 if self.copies is None:
659                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
660                 else:
661                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
662                 block.set_state(_BufferBlock.COMMITTED, loc)
663             except Exception as e:
664                 block.set_state(_BufferBlock.ERROR, e)
665                 raise
666         else:
667             self.start_put_threads()
668             self._put_queue.put(block)
669
670     @synchronized
671     def get_bufferblock(self, locator):
672         return self._bufferblocks.get(locator)
673
674     @synchronized
675     def get_padding_block(self):
676         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
677         when using truncate() to extend the size of a file.
678
679         For reference (and possible future optimization), the md5sum of the
680         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
681
682         """
683
684         if self.padding_block is None:
685             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
686             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
687             self.commit_bufferblock(self.padding_block, False)
688         return self.padding_block
689
690     @synchronized
691     def delete_bufferblock(self, locator):
692         self._delete_bufferblock(locator)
693
694     def _delete_bufferblock(self, locator):
695         bb = self._bufferblocks[locator]
696         bb.clear()
697         del self._bufferblocks[locator]
698
699     def get_block_contents(self, locator, num_retries, cache_only=False):
700         """Fetch a block.
701
702         First checks to see if the locator is a BufferBlock and return that, if
703         not, passes the request through to KeepClient.get().
704
705         """
706         with self.lock:
707             if locator in self._bufferblocks:
708                 bufferblock = self._bufferblocks[locator]
709                 if bufferblock.state() != _BufferBlock.COMMITTED:
710                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
711                 else:
712                     locator = bufferblock._locator
713         if cache_only:
714             return self._keep.get_from_cache(locator)
715         else:
716             return self._keep.get(locator, num_retries=num_retries)
717
718     def commit_all(self):
719         """Commit all outstanding buffer blocks.
720
721         This is a synchronous call, and will not return until all buffer blocks
722         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
723
724         """
725         self.repack_small_blocks(force=True, sync=True)
726
727         with self.lock:
728             items = self._bufferblocks.items()
729
730         for k,v in items:
731             if v.state() != _BufferBlock.COMMITTED and v.owner:
732                 v.owner.flush(sync=False)
733
734         with self.lock:
735             if self._put_queue is not None:
736                 self._put_queue.join()
737
738                 err = []
739                 for k,v in items:
740                     if v.state() == _BufferBlock.ERROR:
741                         err.append((v.locator(), v.error))
742                 if err:
743                     raise KeepWriteError("Error writing some blocks", err, label="block")
744
745         for k,v in items:
746             # flush again with sync=True to remove committed bufferblocks from
747             # the segments.
748             if v.owner:
749                 v.owner.flush(sync=True)
750
751     def block_prefetch(self, locator):
752         """Initiate a background download of a block.
753
754         This assumes that the underlying KeepClient implements a block cache,
755         so repeated requests for the same block will not result in repeated
756         downloads (unless the block is evicted from the cache.)  This method
757         does not block.
758
759         """
760
761         if not self.prefetch_enabled:
762             return
763
764         if self._keep.get_from_cache(locator) is not None:
765             return
766
767         with self.lock:
768             if locator in self._bufferblocks:
769                 return
770
771         self.start_get_threads()
772         self._prefetch_queue.put(locator)
773
774
775 class ArvadosFile(object):
776     """Represent a file in a Collection.
777
778     ArvadosFile manages the underlying representation of a file in Keep as a
779     sequence of segments spanning a set of blocks, and implements random
780     read/write access.
781
782     This object may be accessed from multiple threads.
783
784     """
785
786     def __init__(self, parent, name, stream=[], segments=[]):
787         """
788         ArvadosFile constructor.
789
790         :stream:
791           a list of Range objects representing a block stream
792
793         :segments:
794           a list of Range objects representing segments
795         """
796         self.parent = parent
797         self.name = name
798         self._writers = set()
799         self._committed = False
800         self._segments = []
801         self.lock = parent.root_collection().lock
802         for s in segments:
803             self._add_segment(stream, s.locator, s.range_size)
804         self._current_bblock = None
805
806     def writable(self):
807         return self.parent.writable()
808
809     @synchronized
810     def permission_expired(self, as_of_dt=None):
811         """Returns True if any of the segment's locators is expired"""
812         for r in self._segments:
813             if KeepLocator(r.locator).permission_expired(as_of_dt):
814                 return True
815         return False
816
817     @synchronized
818     def segments(self):
819         return copy.copy(self._segments)
820
821     @synchronized
822     def clone(self, new_parent, new_name):
823         """Make a copy of this file."""
824         cp = ArvadosFile(new_parent, new_name)
825         cp.replace_contents(self)
826         return cp
827
828     @must_be_writable
829     @synchronized
830     def replace_contents(self, other):
831         """Replace segments of this file with segments from another `ArvadosFile` object."""
832
833         map_loc = {}
834         self._segments = []
835         for other_segment in other.segments():
836             new_loc = other_segment.locator
837             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
838                 if other_segment.locator not in map_loc:
839                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
840                     if bufferblock.state() != _BufferBlock.WRITABLE:
841                         map_loc[other_segment.locator] = bufferblock.locator()
842                     else:
843                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
844                 new_loc = map_loc[other_segment.locator]
845
846             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
847
848         self.set_committed(False)
849
850     def __eq__(self, other):
851         if other is self:
852             return True
853         if not isinstance(other, ArvadosFile):
854             return False
855
856         othersegs = other.segments()
857         with self.lock:
858             if len(self._segments) != len(othersegs):
859                 return False
860             for i in xrange(0, len(othersegs)):
861                 seg1 = self._segments[i]
862                 seg2 = othersegs[i]
863                 loc1 = seg1.locator
864                 loc2 = seg2.locator
865
866                 if self.parent._my_block_manager().is_bufferblock(loc1):
867                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
868
869                 if other.parent._my_block_manager().is_bufferblock(loc2):
870                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
871
872                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
873                     seg1.range_start != seg2.range_start or
874                     seg1.range_size != seg2.range_size or
875                     seg1.segment_offset != seg2.segment_offset):
876                     return False
877
878         return True
879
880     def __ne__(self, other):
881         return not self.__eq__(other)
882
883     @synchronized
884     def set_segments(self, segs):
885         self._segments = segs
886
887     @synchronized
888     def set_committed(self, value=True):
889         """Set committed flag.
890
891         If value is True, set committed to be True.
892
893         If value is False, set committed to be False for this and all parents.
894         """
895         if value == self._committed:
896             return
897         self._committed = value
898         if self._committed is False and self.parent is not None:
899             self.parent.set_committed(False)
900
901     @synchronized
902     def committed(self):
903         """Get whether this is committed or not."""
904         return self._committed
905
906     @synchronized
907     def add_writer(self, writer):
908         """Add an ArvadosFileWriter reference to the list of writers"""
909         if isinstance(writer, ArvadosFileWriter):
910             self._writers.add(writer)
911
912     @synchronized
913     def remove_writer(self, writer, flush):
914         """
915         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
916         and do some block maintenance tasks.
917         """
918         self._writers.remove(writer)
919
920         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
921             # File writer closed, not small enough for repacking
922             self.flush()
923         elif self.closed():
924             # All writers closed and size is adequate for repacking
925             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
926
927     def closed(self):
928         """
929         Get whether this is closed or not. When the writers list is empty, the file
930         is supposed to be closed.
931         """
932         return len(self._writers) == 0
933
934     @must_be_writable
935     @synchronized
936     def truncate(self, size):
937         """Shrink or expand the size of the file.
938
939         If `size` is less than the size of the file, the file contents after
940         `size` will be discarded.  If `size` is greater than the current size
941         of the file, it will be filled with zero bytes.
942
943         """
944         if size < self.size():
945             new_segs = []
946             for r in self._segments:
947                 range_end = r.range_start+r.range_size
948                 if r.range_start >= size:
949                     # segment is past the trucate size, all done
950                     break
951                 elif size < range_end:
952                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
953                     nr.segment_offset = r.segment_offset
954                     new_segs.append(nr)
955                     break
956                 else:
957                     new_segs.append(r)
958
959             self._segments = new_segs
960             self.set_committed(False)
961         elif size > self.size():
962             padding = self.parent._my_block_manager().get_padding_block()
963             diff = size - self.size()
964             while diff > config.KEEP_BLOCK_SIZE:
965                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
966                 diff -= config.KEEP_BLOCK_SIZE
967             if diff > 0:
968                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
969             self.set_committed(False)
970         else:
971             # size == self.size()
972             pass
973
974     def readfrom(self, offset, size, num_retries, exact=False):
975         """Read up to `size` bytes from the file starting at `offset`.
976
977         :exact:
978          If False (default), return less data than requested if the read
979          crosses a block boundary and the next block isn't cached.  If True,
980          only return less data than requested when hitting EOF.
981         """
982
983         with self.lock:
984             if size == 0 or offset >= self.size():
985                 return ''
986             readsegs = locators_and_ranges(self._segments, offset, size)
987             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
988
989         locs = set()
990         data = []
991         for lr in readsegs:
992             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
993             if block:
994                 blockview = memoryview(block)
995                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
996                 locs.add(lr.locator)
997             else:
998                 break
999
1000         for lr in prefetch:
1001             if lr.locator not in locs:
1002                 self.parent._my_block_manager().block_prefetch(lr.locator)
1003                 locs.add(lr.locator)
1004
1005         return ''.join(data)
1006
1007     def _repack_writes(self, num_retries):
1008         """Optimize buffer block by repacking segments in file sequence.
1009
1010         When the client makes random writes, they appear in the buffer block in
1011         the sequence they were written rather than the sequence they appear in
1012         the file.  This makes for inefficient, fragmented manifests.  Attempt
1013         to optimize by repacking writes in file sequence.
1014
1015         """
1016         segs = self._segments
1017
1018         # Collect the segments that reference the buffer block.
1019         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1020
1021         # Collect total data referenced by segments (could be smaller than
1022         # bufferblock size if a portion of the file was written and
1023         # then overwritten).
1024         write_total = sum([s.range_size for s in bufferblock_segs])
1025
1026         if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
1027             # If there's more than one segment referencing this block, it is
1028             # due to out-of-order writes and will produce a fragmented
1029             # manifest, so try to optimize by re-packing into a new buffer.
1030             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1031             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1032             for t in bufferblock_segs:
1033                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1034                 t.segment_offset = new_bb.size() - t.range_size
1035
1036             self._current_bblock = new_bb
1037
1038     @must_be_writable
1039     @synchronized
1040     def writeto(self, offset, data, num_retries):
1041         """Write `data` to the file starting at `offset`.
1042
1043         This will update existing bytes and/or extend the size of the file as
1044         necessary.
1045
1046         """
1047         if len(data) == 0:
1048             return
1049
1050         if offset > self.size():
1051             self.truncate(offset)
1052
1053         if len(data) > config.KEEP_BLOCK_SIZE:
1054             # Chunk it up into smaller writes
1055             n = 0
1056             dataview = memoryview(data)
1057             while n < len(data):
1058                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1059                 n += config.KEEP_BLOCK_SIZE
1060             return
1061
1062         self.set_committed(False)
1063
1064         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1065             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1066
1067         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1068             self._repack_writes(num_retries)
1069             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1070                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1071                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1072
1073         self._current_bblock.append(data)
1074
1075         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1076
1077         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1078
1079         return len(data)
1080
1081     @synchronized
1082     def flush(self, sync=True, num_retries=0):
1083         """Flush the current bufferblock to Keep.
1084
1085         :sync:
1086           If True, commit block synchronously, wait until buffer block has been written.
1087           If False, commit block asynchronously, return immediately after putting block into
1088           the keep put queue.
1089         """
1090         if self.committed():
1091             return
1092
1093         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1094             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1095                 self._repack_writes(num_retries)
1096             if self._current_bblock.state() != _BufferBlock.DELETED:
1097                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1098
1099         if sync:
1100             to_delete = set()
1101             for s in self._segments:
1102                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1103                 if bb:
1104                     if bb.state() != _BufferBlock.COMMITTED:
1105                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1106                     to_delete.add(s.locator)
1107                     s.locator = bb.locator()
1108             for s in to_delete:
1109                self.parent._my_block_manager().delete_bufferblock(s)
1110
1111         self.parent.notify(MOD, self.parent, self.name, (self, self))
1112
1113     @must_be_writable
1114     @synchronized
1115     def add_segment(self, blocks, pos, size):
1116         """Add a segment to the end of the file.
1117
1118         `pos` and `offset` reference a section of the stream described by
1119         `blocks` (a list of Range objects)
1120
1121         """
1122         self._add_segment(blocks, pos, size)
1123
1124     def _add_segment(self, blocks, pos, size):
1125         """Internal implementation of add_segment."""
1126         self.set_committed(False)
1127         for lr in locators_and_ranges(blocks, pos, size):
1128             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1129             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1130             self._segments.append(r)
1131
1132     @synchronized
1133     def size(self):
1134         """Get the file size."""
1135         if self._segments:
1136             n = self._segments[-1]
1137             return n.range_start + n.range_size
1138         else:
1139             return 0
1140
1141     @synchronized
1142     def manifest_text(self, stream_name=".", portable_locators=False,
1143                       normalize=False, only_committed=False):
1144         buf = ""
1145         filestream = []
1146         for segment in self.segments:
1147             loc = segment.locator
1148             if self.parent._my_block_manager().is_bufferblock(loc):
1149                 if only_committed:
1150                     continue
1151                 loc = self._bufferblocks[loc].calculate_locator()
1152             if portable_locators:
1153                 loc = KeepLocator(loc).stripped()
1154             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1155                                  segment.segment_offset, segment.range_size))
1156         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1157         buf += "\n"
1158         return buf
1159
1160     @must_be_writable
1161     @synchronized
1162     def _reparent(self, newparent, newname):
1163         self.set_committed(False)
1164         self.flush(sync=True)
1165         self.parent.remove(self.name)
1166         self.parent = newparent
1167         self.name = newname
1168         self.lock = self.parent.root_collection().lock
1169
1170
1171 class ArvadosFileReader(ArvadosFileReaderBase):
1172     """Wraps ArvadosFile in a file-like object supporting reading only.
1173
1174     Be aware that this class is NOT thread safe as there is no locking around
1175     updating file pointer.
1176
1177     """
1178
1179     def __init__(self, arvadosfile, num_retries=None):
1180         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1181         self.arvadosfile = arvadosfile
1182
1183     def size(self):
1184         return self.arvadosfile.size()
1185
1186     def stream_name(self):
1187         return self.arvadosfile.parent.stream_name()
1188
1189     @_FileLikeObjectBase._before_close
1190     @retry_method
1191     def read(self, size=None, num_retries=None):
1192         """Read up to `size` bytes from the file and return the result.
1193
1194         Starts at the current file position.  If `size` is None, read the
1195         entire remainder of the file.
1196         """
1197         if size is None:
1198             data = []
1199             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1200             while rd:
1201                 data.append(rd)
1202                 self._filepos += len(rd)
1203                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1204             return ''.join(data)
1205         else:
1206             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1207             self._filepos += len(data)
1208             return data
1209
1210     @_FileLikeObjectBase._before_close
1211     @retry_method
1212     def readfrom(self, offset, size, num_retries=None):
1213         """Read up to `size` bytes from the stream, starting at the specified file offset.
1214
1215         This method does not change the file position.
1216         """
1217         return self.arvadosfile.readfrom(offset, size, num_retries)
1218
1219     def flush(self):
1220         pass
1221
1222
1223 class ArvadosFileWriter(ArvadosFileReader):
1224     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1225
1226     Be aware that this class is NOT thread safe as there is no locking around
1227     updating file pointer.
1228
1229     """
1230
1231     def __init__(self, arvadosfile, mode, num_retries=None):
1232         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1233         self.mode = mode
1234         self.arvadosfile.add_writer(self)
1235
1236     def writable(self):
1237         return True
1238
1239     @_FileLikeObjectBase._before_close
1240     @retry_method
1241     def write(self, data, num_retries=None):
1242         if self.mode[0] == "a":
1243             self.arvadosfile.writeto(self.size(), data, num_retries)
1244         else:
1245             self.arvadosfile.writeto(self._filepos, data, num_retries)
1246             self._filepos += len(data)
1247         return len(data)
1248
1249     @_FileLikeObjectBase._before_close
1250     @retry_method
1251     def writelines(self, seq, num_retries=None):
1252         for s in seq:
1253             self.write(s, num_retries=num_retries)
1254
1255     @_FileLikeObjectBase._before_close
1256     def truncate(self, size=None):
1257         if size is None:
1258             size = self._filepos
1259         self.arvadosfile.truncate(size)
1260
1261     @_FileLikeObjectBase._before_close
1262     def flush(self):
1263         self.arvadosfile.flush()
1264
1265     def close(self, flush=True):
1266         if not self.closed:
1267             self.arvadosfile.remove_writer(self, flush)
1268             super(ArvadosFileWriter, self).close()