11507: Put self._pending_write_size update back under the lock.
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41
42 class UnownedBlockError(Exception):
43     """Raised when there's an writable block without an owner on the BlockManager."""
44     pass
45
46
47 class _FileLikeObjectBase(object):
48     def __init__(self, name, mode):
49         self.name = name
50         self.mode = mode
51         self.closed = False
52
53     @staticmethod
54     def _before_close(orig_func):
55         @functools.wraps(orig_func)
56         def before_close_wrapper(self, *args, **kwargs):
57             if self.closed:
58                 raise ValueError("I/O operation on closed stream file")
59             return orig_func(self, *args, **kwargs)
60         return before_close_wrapper
61
62     def __enter__(self):
63         return self
64
65     def __exit__(self, exc_type, exc_value, traceback):
66         try:
67             self.close()
68         except Exception:
69             if exc_type is None:
70                 raise
71
72     def close(self):
73         self.closed = True
74
75
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77     def __init__(self, name, mode, num_retries=None):
78         super(ArvadosFileReaderBase, self).__init__(name, mode)
79         self._filepos = 0L
80         self.num_retries = num_retries
81         self._readline_cache = (None, None)
82
83     def __iter__(self):
84         while True:
85             data = self.readline()
86             if not data:
87                 break
88             yield data
89
90     def decompressed_name(self):
91         return re.sub('\.(bz2|gz)$', '', self.name)
92
93     @_FileLikeObjectBase._before_close
94     def seek(self, pos, whence=os.SEEK_SET):
95         if whence == os.SEEK_CUR:
96             pos += self._filepos
97         elif whence == os.SEEK_END:
98             pos += self.size()
99         if pos < 0L:
100             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
101         self._filepos = pos
102         return self._filepos
103
104     def tell(self):
105         return self._filepos
106
107     def readable(self):
108         return True
109
110     def writable(self):
111         return False
112
113     def seekable(self):
114         return True
115
116     @_FileLikeObjectBase._before_close
117     @retry_method
118     def readall(self, size=2**20, num_retries=None):
119         while True:
120             data = self.read(size, num_retries=num_retries)
121             if data == '':
122                 break
123             yield data
124
125     @_FileLikeObjectBase._before_close
126     @retry_method
127     def readline(self, size=float('inf'), num_retries=None):
128         cache_pos, cache_data = self._readline_cache
129         if self.tell() == cache_pos:
130             data = [cache_data]
131             self._filepos += len(cache_data)
132         else:
133             data = ['']
134         data_size = len(data[-1])
135         while (data_size < size) and ('\n' not in data[-1]):
136             next_read = self.read(2 ** 20, num_retries=num_retries)
137             if not next_read:
138                 break
139             data.append(next_read)
140             data_size += len(next_read)
141         data = ''.join(data)
142         try:
143             nextline_index = data.index('\n') + 1
144         except ValueError:
145             nextline_index = len(data)
146         nextline_index = min(nextline_index, size)
147         self._filepos -= len(data) - nextline_index
148         self._readline_cache = (self.tell(), data[nextline_index:])
149         return data[:nextline_index]
150
151     @_FileLikeObjectBase._before_close
152     @retry_method
153     def decompress(self, decompress, size, num_retries=None):
154         for segment in self.readall(size, num_retries=num_retries):
155             data = decompress(segment)
156             if data:
157                 yield data
158
159     @_FileLikeObjectBase._before_close
160     @retry_method
161     def readall_decompressed(self, size=2**20, num_retries=None):
162         self.seek(0)
163         if self.name.endswith('.bz2'):
164             dc = bz2.BZ2Decompressor()
165             return self.decompress(dc.decompress, size,
166                                    num_retries=num_retries)
167         elif self.name.endswith('.gz'):
168             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
169             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
170                                    size, num_retries=num_retries)
171         else:
172             return self.readall(size, num_retries=num_retries)
173
174     @_FileLikeObjectBase._before_close
175     @retry_method
176     def readlines(self, sizehint=float('inf'), num_retries=None):
177         data = []
178         data_size = 0
179         for s in self.readall(num_retries=num_retries):
180             data.append(s)
181             data_size += len(s)
182             if data_size >= sizehint:
183                 break
184         return ''.join(data).splitlines(True)
185
186     def size(self):
187         raise IOError(errno.ENOSYS, "Not implemented")
188
189     def read(self, size, num_retries=None):
190         raise IOError(errno.ENOSYS, "Not implemented")
191
192     def readfrom(self, start, size, num_retries=None):
193         raise IOError(errno.ENOSYS, "Not implemented")
194
195
196 class StreamFileReader(ArvadosFileReaderBase):
197     class _NameAttribute(str):
198         # The Python file API provides a plain .name attribute.
199         # Older SDK provided a name() method.
200         # This class provides both, for maximum compatibility.
201         def __call__(self):
202             return self
203
204     def __init__(self, stream, segments, name):
205         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
206         self._stream = stream
207         self.segments = segments
208
209     def stream_name(self):
210         return self._stream.name()
211
212     def size(self):
213         n = self.segments[-1]
214         return n.range_start + n.range_size
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def read(self, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at the current file position"""
220         if size == 0:
221             return ''
222
223         data = ''
224         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
225         if available_chunks:
226             lr = available_chunks[0]
227             data = self._stream.readfrom(lr.locator+lr.segment_offset,
228                                           lr.segment_size,
229                                           num_retries=num_retries)
230
231         self._filepos += len(data)
232         return data
233
234     @_FileLikeObjectBase._before_close
235     @retry_method
236     def readfrom(self, start, size, num_retries=None):
237         """Read up to 'size' bytes from the stream, starting at 'start'"""
238         if size == 0:
239             return ''
240
241         data = []
242         for lr in locators_and_ranges(self.segments, start, size):
243             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
244                                               num_retries=num_retries))
245         return ''.join(data)
246
247     def as_manifest(self):
248         segs = []
249         for r in self.segments:
250             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
251         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
252
253
254 def synchronized(orig_func):
255     @functools.wraps(orig_func)
256     def synchronized_wrapper(self, *args, **kwargs):
257         with self.lock:
258             return orig_func(self, *args, **kwargs)
259     return synchronized_wrapper
260
261
262 class StateChangeError(Exception):
263     def __init__(self, message, state, nextstate):
264         super(StateChangeError, self).__init__(message)
265         self.state = state
266         self.nextstate = nextstate
267
268 class _BufferBlock(object):
269     """A stand-in for a Keep block that is in the process of being written.
270
271     Writers can append to it, get the size, and compute the Keep locator.
272     There are three valid states:
273
274     WRITABLE
275       Can append to block.
276
277     PENDING
278       Block is in the process of being uploaded to Keep, append is an error.
279
280     COMMITTED
281       The block has been written to Keep, its internal buffer has been
282       released, fetching the block will fetch it via keep client (since we
283       discarded the internal copy), and identifiers referring to the BufferBlock
284       can be replaced with the block locator.
285
286     """
287
288     WRITABLE = 0
289     PENDING = 1
290     COMMITTED = 2
291     ERROR = 3
292     DELETED = 4
293
294     def __init__(self, blockid, starting_capacity, owner):
295         """
296         :blockid:
297           the identifier for this block
298
299         :starting_capacity:
300           the initial buffer capacity
301
302         :owner:
303           ArvadosFile that owns this block
304
305         """
306         self.blockid = blockid
307         self.buffer_block = bytearray(starting_capacity)
308         self.buffer_view = memoryview(self.buffer_block)
309         self.write_pointer = 0
310         self._state = _BufferBlock.WRITABLE
311         self._locator = None
312         self.owner = owner
313         self.lock = threading.Lock()
314         self.wait_for_commit = threading.Event()
315         self.error = None
316
317     @synchronized
318     def append(self, data):
319         """Append some data to the buffer.
320
321         Only valid if the block is in WRITABLE state.  Implements an expanding
322         buffer, doubling capacity as needed to accomdate all the data.
323
324         """
325         if self._state == _BufferBlock.WRITABLE:
326             while (self.write_pointer+len(data)) > len(self.buffer_block):
327                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
328                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
329                 self.buffer_block = new_buffer_block
330                 self.buffer_view = memoryview(self.buffer_block)
331             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
332             self.write_pointer += len(data)
333             self._locator = None
334         else:
335             raise AssertionError("Buffer block is not writable")
336
337     STATE_TRANSITIONS = frozenset([
338             (WRITABLE, PENDING),
339             (PENDING, COMMITTED),
340             (PENDING, ERROR),
341             (ERROR, PENDING)])
342
343     @synchronized
344     def set_state(self, nextstate, val=None):
345         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
346             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
347         self._state = nextstate
348
349         if self._state == _BufferBlock.PENDING:
350             self.wait_for_commit.clear()
351
352         if self._state == _BufferBlock.COMMITTED:
353             self._locator = val
354             self.buffer_view = None
355             self.buffer_block = None
356             self.wait_for_commit.set()
357
358         if self._state == _BufferBlock.ERROR:
359             self.error = val
360             self.wait_for_commit.set()
361
362     @synchronized
363     def state(self):
364         return self._state
365
366     def size(self):
367         """The amount of data written to the buffer."""
368         return self.write_pointer
369
370     @synchronized
371     def locator(self):
372         """The Keep locator for this buffer's contents."""
373         if self._locator is None:
374             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
375         return self._locator
376
377     @synchronized
378     def clone(self, new_blockid, owner):
379         if self._state == _BufferBlock.COMMITTED:
380             raise AssertionError("Cannot duplicate committed buffer block")
381         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
382         bufferblock.append(self.buffer_view[0:self.size()])
383         return bufferblock
384
385     @synchronized
386     def clear(self):
387         self._state = _BufferBlock.DELETED
388         self.owner = None
389         self.buffer_block = None
390         self.buffer_view = None
391
392     def __repr__(self):
393         return "<BufferBlock %s>" % (self.blockid)
394
395
396 class NoopLock(object):
397     def __enter__(self):
398         return self
399
400     def __exit__(self, exc_type, exc_value, traceback):
401         pass
402
403     def acquire(self, blocking=False):
404         pass
405
406     def release(self):
407         pass
408
409
410 def must_be_writable(orig_func):
411     @functools.wraps(orig_func)
412     def must_be_writable_wrapper(self, *args, **kwargs):
413         if not self.writable():
414             raise IOError(errno.EROFS, "Collection is read-only.")
415         return orig_func(self, *args, **kwargs)
416     return must_be_writable_wrapper
417
418
419 class _BlockManager(object):
420     """BlockManager handles buffer blocks.
421
422     Also handles background block uploads, and background block prefetch for a
423     Collection of ArvadosFiles.
424
425     """
426
427     DEFAULT_PUT_THREADS = 2
428     DEFAULT_GET_THREADS = 2
429
430     def __init__(self, keep, copies=None, put_threads=None):
431         """keep: KeepClient object to use"""
432         self._keep = keep
433         self._bufferblocks = collections.OrderedDict()
434         self._put_queue = None
435         self._put_threads = None
436         self._prefetch_queue = None
437         self._prefetch_threads = None
438         self.lock = threading.Lock()
439         self.prefetch_enabled = True
440         if put_threads:
441             self.num_put_threads = put_threads
442         else:
443             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
444         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
445         self.copies = copies
446         self._pending_write_size = 0
447         self.threads_lock = threading.Lock()
448         self.padding_block = None
449
450     @synchronized
451     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
452         """Allocate a new, empty bufferblock in WRITABLE state and return it.
453
454         :blockid:
455           optional block identifier, otherwise one will be automatically assigned
456
457         :starting_capacity:
458           optional capacity, otherwise will use default capacity
459
460         :owner:
461           ArvadosFile that owns this block
462
463         """
464         return self._alloc_bufferblock(blockid, starting_capacity, owner)
465
466     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
467         if blockid is None:
468             blockid = str(uuid.uuid4())
469         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
470         self._bufferblocks[bufferblock.blockid] = bufferblock
471         return bufferblock
472
473     @synchronized
474     def dup_block(self, block, owner):
475         """Create a new bufferblock initialized with the content of an existing bufferblock.
476
477         :block:
478           the buffer block to copy.
479
480         :owner:
481           ArvadosFile that owns the new block
482
483         """
484         new_blockid = str(uuid.uuid4())
485         bufferblock = block.clone(new_blockid, owner)
486         self._bufferblocks[bufferblock.blockid] = bufferblock
487         return bufferblock
488
489     @synchronized
490     def is_bufferblock(self, locator):
491         return locator in self._bufferblocks
492
493     def _commit_bufferblock_worker(self):
494         """Background uploader thread."""
495
496         while True:
497             try:
498                 bufferblock = self._put_queue.get()
499                 if bufferblock is None:
500                     return
501
502                 if self.copies is None:
503                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
504                 else:
505                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
506                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
507
508             except Exception as e:
509                 bufferblock.set_state(_BufferBlock.ERROR, e)
510             finally:
511                 if self._put_queue is not None:
512                     self._put_queue.task_done()
513
514     def start_put_threads(self):
515         with self.threads_lock:
516             if self._put_threads is None:
517                 # Start uploader threads.
518
519                 # If we don't limit the Queue size, the upload queue can quickly
520                 # grow to take up gigabytes of RAM if the writing process is
521                 # generating data more quickly than it can be send to the Keep
522                 # servers.
523                 #
524                 # With two upload threads and a queue size of 2, this means up to 4
525                 # blocks pending.  If they are full 64 MiB blocks, that means up to
526                 # 256 MiB of internal buffering, which is the same size as the
527                 # default download block cache in KeepClient.
528                 self._put_queue = Queue.Queue(maxsize=2)
529
530                 self._put_threads = []
531                 for i in xrange(0, self.num_put_threads):
532                     thread = threading.Thread(target=self._commit_bufferblock_worker)
533                     self._put_threads.append(thread)
534                     thread.daemon = True
535                     thread.start()
536
537     def _block_prefetch_worker(self):
538         """The background downloader thread."""
539         while True:
540             try:
541                 b = self._prefetch_queue.get()
542                 if b is None:
543                     return
544                 self._keep.get(b)
545             except Exception:
546                 _logger.exception("Exception doing block prefetch")
547
548     @synchronized
549     def start_get_threads(self):
550         if self._prefetch_threads is None:
551             self._prefetch_queue = Queue.Queue()
552             self._prefetch_threads = []
553             for i in xrange(0, self.num_get_threads):
554                 thread = threading.Thread(target=self._block_prefetch_worker)
555                 self._prefetch_threads.append(thread)
556                 thread.daemon = True
557                 thread.start()
558
559
560     @synchronized
561     def stop_threads(self):
562         """Shut down and wait for background upload and download threads to finish."""
563
564         if self._put_threads is not None:
565             for t in self._put_threads:
566                 self._put_queue.put(None)
567             for t in self._put_threads:
568                 t.join()
569         self._put_threads = None
570         self._put_queue = None
571
572         if self._prefetch_threads is not None:
573             for t in self._prefetch_threads:
574                 self._prefetch_queue.put(None)
575             for t in self._prefetch_threads:
576                 t.join()
577         self._prefetch_threads = None
578         self._prefetch_queue = None
579
580     def __enter__(self):
581         return self
582
583     def __exit__(self, exc_type, exc_value, traceback):
584         self.stop_threads()
585
586     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
587         """Packs small blocks together before uploading"""
588
589         with self.lock:
590             self._pending_write_size += closed_file_size
591
592             # Check if there are enough small blocks for filling up one in full
593             if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
594                 return
595
596             # Search blocks ready for getting packed together before being committed to Keep.
597             # A WRITABLE block always has an owner.
598             # A WRITABLE block with its owner.closed() implies that it's
599             # size is <= KEEP_BLOCK_SIZE/2.
600             bufferblocks = self._bufferblocks.values()
601
602         try:
603             for b in bufferblocks:
604                 if b.state() == _BufferBlock.WRITABLE and b.owner.closed():
605                     b.owner._repack_writes(0)
606         except AttributeError:
607             # Writable blocks without owner shouldn't exist.
608             raise UnownedBlockError()
609
610         with self.lock:
611             small_blocks = [b for b in self._bufferblocks.values()
612                             if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
613
614             if len(small_blocks) <= 1:
615                 # Not enough small blocks for repacking
616                 return
617
618             # Update the pending write size count with its true value, just in case
619             # some small file was opened, written and closed several times.
620             self._pending_write_size = sum([b.size() for b in small_blocks])
621             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
622                 return
623
624             new_bb = self._alloc_bufferblock()
625             files = []
626             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
627                 bb = small_blocks.pop(0)
628                 self._pending_write_size -= bb.size()
629                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
630                 files.append((bb, new_bb.write_pointer - bb.size()))
631
632             self.commit_bufferblock(new_bb, sync=sync)
633
634             for bb, new_bb_segment_offset in files:
635                 newsegs = []
636                 for s in bb.owner.segments():
637                     if s.locator == bb.blockid:
638                         newsegs.append(Range(new_bb.locator(), s.range_start, s.range_size, new_bb_segment_offset+s.segment_offset))
639                     else:
640                         newsegs.append(s)
641                 bb.owner.set_segments(newsegs)
642                 self._delete_bufferblock(bb.blockid)
643
644     def commit_bufferblock(self, block, sync):
645         """Initiate a background upload of a bufferblock.
646
647         :block:
648           The block object to upload
649
650         :sync:
651           If `sync` is True, upload the block synchronously.
652           If `sync` is False, upload the block asynchronously.  This will
653           return immediately unless the upload queue is at capacity, in
654           which case it will wait on an upload queue slot.
655
656         """
657         try:
658             # Mark the block as PENDING so to disallow any more appends.
659             block.set_state(_BufferBlock.PENDING)
660         except StateChangeError as e:
661             if e.state == _BufferBlock.PENDING:
662                 if sync:
663                     block.wait_for_commit.wait()
664                 else:
665                     return
666             if block.state() == _BufferBlock.COMMITTED:
667                 return
668             elif block.state() == _BufferBlock.ERROR:
669                 raise block.error
670             else:
671                 raise
672
673         if sync:
674             try:
675                 if self.copies is None:
676                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
677                 else:
678                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
679                 block.set_state(_BufferBlock.COMMITTED, loc)
680             except Exception as e:
681                 block.set_state(_BufferBlock.ERROR, e)
682                 raise
683         else:
684             self.start_put_threads()
685             self._put_queue.put(block)
686
687     @synchronized
688     def get_bufferblock(self, locator):
689         return self._bufferblocks.get(locator)
690
691     @synchronized
692     def get_padding_block(self):
693         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
694         when using truncate() to extend the size of a file.
695
696         For reference (and possible future optimization), the md5sum of the
697         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
698
699         """
700
701         if self.padding_block is None:
702             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
703             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
704             self.commit_bufferblock(self.padding_block, False)
705         return self.padding_block
706
707     @synchronized
708     def delete_bufferblock(self, locator):
709         self._delete_bufferblock(locator)
710
711     def _delete_bufferblock(self, locator):
712         bb = self._bufferblocks[locator]
713         bb.clear()
714         del self._bufferblocks[locator]
715
716     def get_block_contents(self, locator, num_retries, cache_only=False):
717         """Fetch a block.
718
719         First checks to see if the locator is a BufferBlock and return that, if
720         not, passes the request through to KeepClient.get().
721
722         """
723         with self.lock:
724             if locator in self._bufferblocks:
725                 bufferblock = self._bufferblocks[locator]
726                 if bufferblock.state() != _BufferBlock.COMMITTED:
727                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
728                 else:
729                     locator = bufferblock._locator
730         if cache_only:
731             return self._keep.get_from_cache(locator)
732         else:
733             return self._keep.get(locator, num_retries=num_retries)
734
735     def commit_all(self):
736         """Commit all outstanding buffer blocks.
737
738         This is a synchronous call, and will not return until all buffer blocks
739         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
740
741         """
742         self.repack_small_blocks(force=True, sync=True)
743
744         with self.lock:
745             items = self._bufferblocks.items()
746
747         for k,v in items:
748             if v.state() != _BufferBlock.COMMITTED and v.owner:
749                 v.owner.flush(sync=False)
750
751         with self.lock:
752             if self._put_queue is not None:
753                 self._put_queue.join()
754
755                 err = []
756                 for k,v in items:
757                     if v.state() == _BufferBlock.ERROR:
758                         err.append((v.locator(), v.error))
759                 if err:
760                     raise KeepWriteError("Error writing some blocks", err, label="block")
761
762         for k,v in items:
763             # flush again with sync=True to remove committed bufferblocks from
764             # the segments.
765             if v.owner:
766                 v.owner.flush(sync=True)
767
768     def block_prefetch(self, locator):
769         """Initiate a background download of a block.
770
771         This assumes that the underlying KeepClient implements a block cache,
772         so repeated requests for the same block will not result in repeated
773         downloads (unless the block is evicted from the cache.)  This method
774         does not block.
775
776         """
777
778         if not self.prefetch_enabled:
779             return
780
781         if self._keep.get_from_cache(locator) is not None:
782             return
783
784         with self.lock:
785             if locator in self._bufferblocks:
786                 return
787
788         self.start_get_threads()
789         self._prefetch_queue.put(locator)
790
791
792 class ArvadosFile(object):
793     """Represent a file in a Collection.
794
795     ArvadosFile manages the underlying representation of a file in Keep as a
796     sequence of segments spanning a set of blocks, and implements random
797     read/write access.
798
799     This object may be accessed from multiple threads.
800
801     """
802
803     def __init__(self, parent, name, stream=[], segments=[]):
804         """
805         ArvadosFile constructor.
806
807         :stream:
808           a list of Range objects representing a block stream
809
810         :segments:
811           a list of Range objects representing segments
812         """
813         self.parent = parent
814         self.name = name
815         self._writers = set()
816         self._committed = False
817         self._segments = []
818         self.lock = parent.root_collection().lock
819         for s in segments:
820             self._add_segment(stream, s.locator, s.range_size)
821         self._current_bblock = None
822
823     def writable(self):
824         return self.parent.writable()
825
826     @synchronized
827     def permission_expired(self, as_of_dt=None):
828         """Returns True if any of the segment's locators is expired"""
829         for r in self._segments:
830             if KeepLocator(r.locator).permission_expired(as_of_dt):
831                 return True
832         return False
833
834     @synchronized
835     def segments(self):
836         return copy.copy(self._segments)
837
838     @synchronized
839     def clone(self, new_parent, new_name):
840         """Make a copy of this file."""
841         cp = ArvadosFile(new_parent, new_name)
842         cp.replace_contents(self)
843         return cp
844
845     @must_be_writable
846     @synchronized
847     def replace_contents(self, other):
848         """Replace segments of this file with segments from another `ArvadosFile` object."""
849
850         map_loc = {}
851         self._segments = []
852         for other_segment in other.segments():
853             new_loc = other_segment.locator
854             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
855                 if other_segment.locator not in map_loc:
856                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
857                     if bufferblock.state() != _BufferBlock.WRITABLE:
858                         map_loc[other_segment.locator] = bufferblock.locator()
859                     else:
860                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
861                 new_loc = map_loc[other_segment.locator]
862
863             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
864
865         self.set_committed(False)
866
867     def __eq__(self, other):
868         if other is self:
869             return True
870         if not isinstance(other, ArvadosFile):
871             return False
872
873         othersegs = other.segments()
874         with self.lock:
875             if len(self._segments) != len(othersegs):
876                 return False
877             for i in xrange(0, len(othersegs)):
878                 seg1 = self._segments[i]
879                 seg2 = othersegs[i]
880                 loc1 = seg1.locator
881                 loc2 = seg2.locator
882
883                 if self.parent._my_block_manager().is_bufferblock(loc1):
884                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
885
886                 if other.parent._my_block_manager().is_bufferblock(loc2):
887                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
888
889                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
890                     seg1.range_start != seg2.range_start or
891                     seg1.range_size != seg2.range_size or
892                     seg1.segment_offset != seg2.segment_offset):
893                     return False
894
895         return True
896
897     def __ne__(self, other):
898         return not self.__eq__(other)
899
900     @synchronized
901     def set_segments(self, segs):
902         self._segments = segs
903
904     @synchronized
905     def set_committed(self, value=True):
906         """Set committed flag.
907
908         If value is True, set committed to be True.
909
910         If value is False, set committed to be False for this and all parents.
911         """
912         if value == self._committed:
913             return
914         self._committed = value
915         if self._committed is False and self.parent is not None:
916             self.parent.set_committed(False)
917
918     @synchronized
919     def committed(self):
920         """Get whether this is committed or not."""
921         return self._committed
922
923     @synchronized
924     def add_writer(self, writer):
925         """Add an ArvadosFileWriter reference to the list of writers"""
926         if isinstance(writer, ArvadosFileWriter):
927             self._writers.add(writer)
928
929     @synchronized
930     def remove_writer(self, writer, flush):
931         """
932         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
933         and do some block maintenance tasks.
934         """
935         self._writers.remove(writer)
936
937         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
938             # File writer closed, not small enough for repacking
939             self.flush()
940         elif self.closed():
941             # All writers closed and size is adequate for repacking
942             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
943
944     def closed(self):
945         """
946         Get whether this is closed or not. When the writers list is empty, the file
947         is supposed to be closed.
948         """
949         return len(self._writers) == 0
950
951     @must_be_writable
952     @synchronized
953     def truncate(self, size):
954         """Shrink or expand the size of the file.
955
956         If `size` is less than the size of the file, the file contents after
957         `size` will be discarded.  If `size` is greater than the current size
958         of the file, it will be filled with zero bytes.
959
960         """
961         if size < self.size():
962             new_segs = []
963             for r in self._segments:
964                 range_end = r.range_start+r.range_size
965                 if r.range_start >= size:
966                     # segment is past the trucate size, all done
967                     break
968                 elif size < range_end:
969                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
970                     nr.segment_offset = r.segment_offset
971                     new_segs.append(nr)
972                     break
973                 else:
974                     new_segs.append(r)
975
976             self._segments = new_segs
977             self.set_committed(False)
978         elif size > self.size():
979             padding = self.parent._my_block_manager().get_padding_block()
980             diff = size - self.size()
981             while diff > config.KEEP_BLOCK_SIZE:
982                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
983                 diff -= config.KEEP_BLOCK_SIZE
984             if diff > 0:
985                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
986             self.set_committed(False)
987         else:
988             # size == self.size()
989             pass
990
991     def readfrom(self, offset, size, num_retries, exact=False):
992         """Read up to `size` bytes from the file starting at `offset`.
993
994         :exact:
995          If False (default), return less data than requested if the read
996          crosses a block boundary and the next block isn't cached.  If True,
997          only return less data than requested when hitting EOF.
998         """
999
1000         with self.lock:
1001             if size == 0 or offset >= self.size():
1002                 return ''
1003             readsegs = locators_and_ranges(self._segments, offset, size)
1004             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1005
1006         locs = set()
1007         data = []
1008         for lr in readsegs:
1009             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1010             if block:
1011                 blockview = memoryview(block)
1012                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1013                 locs.add(lr.locator)
1014             else:
1015                 break
1016
1017         for lr in prefetch:
1018             if lr.locator not in locs:
1019                 self.parent._my_block_manager().block_prefetch(lr.locator)
1020                 locs.add(lr.locator)
1021
1022         return ''.join(data)
1023
1024     def _repack_writes(self, num_retries):
1025         """Optimize buffer block by repacking segments in file sequence.
1026
1027         When the client makes random writes, they appear in the buffer block in
1028         the sequence they were written rather than the sequence they appear in
1029         the file.  This makes for inefficient, fragmented manifests.  Attempt
1030         to optimize by repacking writes in file sequence.
1031
1032         """
1033         segs = self._segments
1034
1035         # Collect the segments that reference the buffer block.
1036         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1037
1038         # Collect total data referenced by segments (could be smaller than
1039         # bufferblock size if a portion of the file was written and
1040         # then overwritten).
1041         write_total = sum([s.range_size for s in bufferblock_segs])
1042
1043         if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
1044             # If there's more than one segment referencing this block, it is
1045             # due to out-of-order writes and will produce a fragmented
1046             # manifest, so try to optimize by re-packing into a new buffer.
1047             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1048             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1049             for t in bufferblock_segs:
1050                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1051                 t.segment_offset = new_bb.size() - t.range_size
1052             self._current_bblock.clear()
1053             self._current_bblock = new_bb
1054
1055     @must_be_writable
1056     @synchronized
1057     def writeto(self, offset, data, num_retries):
1058         """Write `data` to the file starting at `offset`.
1059
1060         This will update existing bytes and/or extend the size of the file as
1061         necessary.
1062
1063         """
1064         if len(data) == 0:
1065             return
1066
1067         if offset > self.size():
1068             self.truncate(offset)
1069
1070         if len(data) > config.KEEP_BLOCK_SIZE:
1071             # Chunk it up into smaller writes
1072             n = 0
1073             dataview = memoryview(data)
1074             while n < len(data):
1075                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1076                 n += config.KEEP_BLOCK_SIZE
1077             return
1078
1079         self.set_committed(False)
1080
1081         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1082             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1083
1084         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1085             self._repack_writes(num_retries)
1086             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1087                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1088                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1089
1090         self._current_bblock.append(data)
1091
1092         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1093
1094         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1095
1096         return len(data)
1097
1098     @synchronized
1099     def flush(self, sync=True, num_retries=0):
1100         """Flush the current bufferblock to Keep.
1101
1102         :sync:
1103           If True, commit block synchronously, wait until buffer block has been written.
1104           If False, commit block asynchronously, return immediately after putting block into
1105           the keep put queue.
1106         """
1107         if self.committed():
1108             return
1109
1110         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1111             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1112                 self._repack_writes(num_retries)
1113             if self._current_bblock.state() != _BufferBlock.DELETED:
1114                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1115
1116         if sync:
1117             to_delete = set()
1118             for s in self._segments:
1119                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1120                 if bb:
1121                     if bb.state() != _BufferBlock.COMMITTED:
1122                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1123                     to_delete.add(s.locator)
1124                     s.locator = bb.locator()
1125             for s in to_delete:
1126                self.parent._my_block_manager().delete_bufferblock(s)
1127
1128         self.parent.notify(MOD, self.parent, self.name, (self, self))
1129
1130     @must_be_writable
1131     @synchronized
1132     def add_segment(self, blocks, pos, size):
1133         """Add a segment to the end of the file.
1134
1135         `pos` and `offset` reference a section of the stream described by
1136         `blocks` (a list of Range objects)
1137
1138         """
1139         self._add_segment(blocks, pos, size)
1140
1141     def _add_segment(self, blocks, pos, size):
1142         """Internal implementation of add_segment."""
1143         self.set_committed(False)
1144         for lr in locators_and_ranges(blocks, pos, size):
1145             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1146             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1147             self._segments.append(r)
1148
1149     @synchronized
1150     def size(self):
1151         """Get the file size."""
1152         if self._segments:
1153             n = self._segments[-1]
1154             return n.range_start + n.range_size
1155         else:
1156             return 0
1157
1158     @synchronized
1159     def manifest_text(self, stream_name=".", portable_locators=False,
1160                       normalize=False, only_committed=False):
1161         buf = ""
1162         filestream = []
1163         for segment in self.segments():
1164             loc = segment.locator
1165             if self.parent._my_block_manager().is_bufferblock(loc):
1166                 if only_committed:
1167                     continue
1168                 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1169             if portable_locators:
1170                 loc = KeepLocator(loc).stripped()
1171             filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1172                                  segment.segment_offset, segment.range_size))
1173         buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1174         buf += "\n"
1175         return buf
1176
1177     @must_be_writable
1178     @synchronized
1179     def _reparent(self, newparent, newname):
1180         self.set_committed(False)
1181         self.flush(sync=True)
1182         self.parent.remove(self.name)
1183         self.parent = newparent
1184         self.name = newname
1185         self.lock = self.parent.root_collection().lock
1186
1187
1188 class ArvadosFileReader(ArvadosFileReaderBase):
1189     """Wraps ArvadosFile in a file-like object supporting reading only.
1190
1191     Be aware that this class is NOT thread safe as there is no locking around
1192     updating file pointer.
1193
1194     """
1195
1196     def __init__(self, arvadosfile, num_retries=None):
1197         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1198         self.arvadosfile = arvadosfile
1199
1200     def size(self):
1201         return self.arvadosfile.size()
1202
1203     def stream_name(self):
1204         return self.arvadosfile.parent.stream_name()
1205
1206     @_FileLikeObjectBase._before_close
1207     @retry_method
1208     def read(self, size=None, num_retries=None):
1209         """Read up to `size` bytes from the file and return the result.
1210
1211         Starts at the current file position.  If `size` is None, read the
1212         entire remainder of the file.
1213         """
1214         if size is None:
1215             data = []
1216             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1217             while rd:
1218                 data.append(rd)
1219                 self._filepos += len(rd)
1220                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1221             return ''.join(data)
1222         else:
1223             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1224             self._filepos += len(data)
1225             return data
1226
1227     @_FileLikeObjectBase._before_close
1228     @retry_method
1229     def readfrom(self, offset, size, num_retries=None):
1230         """Read up to `size` bytes from the stream, starting at the specified file offset.
1231
1232         This method does not change the file position.
1233         """
1234         return self.arvadosfile.readfrom(offset, size, num_retries)
1235
1236     def flush(self):
1237         pass
1238
1239
1240 class ArvadosFileWriter(ArvadosFileReader):
1241     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1242
1243     Be aware that this class is NOT thread safe as there is no locking around
1244     updating file pointer.
1245
1246     """
1247
1248     def __init__(self, arvadosfile, mode, num_retries=None):
1249         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1250         self.mode = mode
1251         self.arvadosfile.add_writer(self)
1252
1253     def writable(self):
1254         return True
1255
1256     @_FileLikeObjectBase._before_close
1257     @retry_method
1258     def write(self, data, num_retries=None):
1259         if self.mode[0] == "a":
1260             self.arvadosfile.writeto(self.size(), data, num_retries)
1261         else:
1262             self.arvadosfile.writeto(self._filepos, data, num_retries)
1263             self._filepos += len(data)
1264         return len(data)
1265
1266     @_FileLikeObjectBase._before_close
1267     @retry_method
1268     def writelines(self, seq, num_retries=None):
1269         for s in seq:
1270             self.write(s, num_retries=num_retries)
1271
1272     @_FileLikeObjectBase._before_close
1273     def truncate(self, size=None):
1274         if size is None:
1275             size = self._filepos
1276         self.arvadosfile.truncate(size)
1277
1278     @_FileLikeObjectBase._before_close
1279     def flush(self):
1280         self.arvadosfile.flush()
1281
1282     def close(self, flush=True):
1283         if not self.closed:
1284             self.arvadosfile.remove_writer(self, flush)
1285             super(ArvadosFileWriter, self).close()