11507: Cleanup
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41
42 class UnownedBlockError(Exception):
43     """Raised when there's an writable block without an owner on the BlockManager."""
44     pass
45
46
47 class _FileLikeObjectBase(object):
48     def __init__(self, name, mode):
49         self.name = name
50         self.mode = mode
51         self.closed = False
52
53     @staticmethod
54     def _before_close(orig_func):
55         @functools.wraps(orig_func)
56         def before_close_wrapper(self, *args, **kwargs):
57             if self.closed:
58                 raise ValueError("I/O operation on closed stream file")
59             return orig_func(self, *args, **kwargs)
60         return before_close_wrapper
61
62     def __enter__(self):
63         return self
64
65     def __exit__(self, exc_type, exc_value, traceback):
66         try:
67             self.close()
68         except Exception:
69             if exc_type is None:
70                 raise
71
72     def close(self):
73         self.closed = True
74
75
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77     def __init__(self, name, mode, num_retries=None):
78         super(ArvadosFileReaderBase, self).__init__(name, mode)
79         self._filepos = 0L
80         self.num_retries = num_retries
81         self._readline_cache = (None, None)
82
83     def __iter__(self):
84         while True:
85             data = self.readline()
86             if not data:
87                 break
88             yield data
89
90     def decompressed_name(self):
91         return re.sub('\.(bz2|gz)$', '', self.name)
92
93     @_FileLikeObjectBase._before_close
94     def seek(self, pos, whence=os.SEEK_SET):
95         if whence == os.SEEK_CUR:
96             pos += self._filepos
97         elif whence == os.SEEK_END:
98             pos += self.size()
99         if pos < 0L:
100             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
101         self._filepos = pos
102         return self._filepos
103
104     def tell(self):
105         return self._filepos
106
107     def readable(self):
108         return True
109
110     def writable(self):
111         return False
112
113     def seekable(self):
114         return True
115
116     @_FileLikeObjectBase._before_close
117     @retry_method
118     def readall(self, size=2**20, num_retries=None):
119         while True:
120             data = self.read(size, num_retries=num_retries)
121             if data == '':
122                 break
123             yield data
124
125     @_FileLikeObjectBase._before_close
126     @retry_method
127     def readline(self, size=float('inf'), num_retries=None):
128         cache_pos, cache_data = self._readline_cache
129         if self.tell() == cache_pos:
130             data = [cache_data]
131             self._filepos += len(cache_data)
132         else:
133             data = ['']
134         data_size = len(data[-1])
135         while (data_size < size) and ('\n' not in data[-1]):
136             next_read = self.read(2 ** 20, num_retries=num_retries)
137             if not next_read:
138                 break
139             data.append(next_read)
140             data_size += len(next_read)
141         data = ''.join(data)
142         try:
143             nextline_index = data.index('\n') + 1
144         except ValueError:
145             nextline_index = len(data)
146         nextline_index = min(nextline_index, size)
147         self._filepos -= len(data) - nextline_index
148         self._readline_cache = (self.tell(), data[nextline_index:])
149         return data[:nextline_index]
150
151     @_FileLikeObjectBase._before_close
152     @retry_method
153     def decompress(self, decompress, size, num_retries=None):
154         for segment in self.readall(size, num_retries=num_retries):
155             data = decompress(segment)
156             if data:
157                 yield data
158
159     @_FileLikeObjectBase._before_close
160     @retry_method
161     def readall_decompressed(self, size=2**20, num_retries=None):
162         self.seek(0)
163         if self.name.endswith('.bz2'):
164             dc = bz2.BZ2Decompressor()
165             return self.decompress(dc.decompress, size,
166                                    num_retries=num_retries)
167         elif self.name.endswith('.gz'):
168             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
169             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
170                                    size, num_retries=num_retries)
171         else:
172             return self.readall(size, num_retries=num_retries)
173
174     @_FileLikeObjectBase._before_close
175     @retry_method
176     def readlines(self, sizehint=float('inf'), num_retries=None):
177         data = []
178         data_size = 0
179         for s in self.readall(num_retries=num_retries):
180             data.append(s)
181             data_size += len(s)
182             if data_size >= sizehint:
183                 break
184         return ''.join(data).splitlines(True)
185
186     def size(self):
187         raise IOError(errno.ENOSYS, "Not implemented")
188
189     def read(self, size, num_retries=None):
190         raise IOError(errno.ENOSYS, "Not implemented")
191
192     def readfrom(self, start, size, num_retries=None):
193         raise IOError(errno.ENOSYS, "Not implemented")
194
195
196 class StreamFileReader(ArvadosFileReaderBase):
197     class _NameAttribute(str):
198         # The Python file API provides a plain .name attribute.
199         # Older SDK provided a name() method.
200         # This class provides both, for maximum compatibility.
201         def __call__(self):
202             return self
203
204     def __init__(self, stream, segments, name):
205         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
206         self._stream = stream
207         self.segments = segments
208
209     def stream_name(self):
210         return self._stream.name()
211
212     def size(self):
213         n = self.segments[-1]
214         return n.range_start + n.range_size
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def read(self, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at the current file position"""
220         if size == 0:
221             return ''
222
223         data = ''
224         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
225         if available_chunks:
226             lr = available_chunks[0]
227             data = self._stream.readfrom(lr.locator+lr.segment_offset,
228                                           lr.segment_size,
229                                           num_retries=num_retries)
230
231         self._filepos += len(data)
232         return data
233
234     @_FileLikeObjectBase._before_close
235     @retry_method
236     def readfrom(self, start, size, num_retries=None):
237         """Read up to 'size' bytes from the stream, starting at 'start'"""
238         if size == 0:
239             return ''
240
241         data = []
242         for lr in locators_and_ranges(self.segments, start, size):
243             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
244                                               num_retries=num_retries))
245         return ''.join(data)
246
247     def as_manifest(self):
248         segs = []
249         for r in self.segments:
250             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
251         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
252
253
254 def synchronized(orig_func):
255     @functools.wraps(orig_func)
256     def synchronized_wrapper(self, *args, **kwargs):
257         with self.lock:
258             return orig_func(self, *args, **kwargs)
259     return synchronized_wrapper
260
261
262 class StateChangeError(Exception):
263     def __init__(self, message, state, nextstate):
264         super(StateChangeError, self).__init__(message)
265         self.state = state
266         self.nextstate = nextstate
267
268 class _BufferBlock(object):
269     """A stand-in for a Keep block that is in the process of being written.
270
271     Writers can append to it, get the size, and compute the Keep locator.
272     There are three valid states:
273
274     WRITABLE
275       Can append to block.
276
277     PENDING
278       Block is in the process of being uploaded to Keep, append is an error.
279
280     COMMITTED
281       The block has been written to Keep, its internal buffer has been
282       released, fetching the block will fetch it via keep client (since we
283       discarded the internal copy), and identifiers referring to the BufferBlock
284       can be replaced with the block locator.
285
286     """
287
288     WRITABLE = 0
289     PENDING = 1
290     COMMITTED = 2
291     ERROR = 3
292     DELETED = 4
293
294     def __init__(self, blockid, starting_capacity, owner):
295         """
296         :blockid:
297           the identifier for this block
298
299         :starting_capacity:
300           the initial buffer capacity
301
302         :owner:
303           ArvadosFile that owns this block
304
305         """
306         self.blockid = blockid
307         self.buffer_block = bytearray(starting_capacity)
308         self.buffer_view = memoryview(self.buffer_block)
309         self.write_pointer = 0
310         self._state = _BufferBlock.WRITABLE
311         self._locator = None
312         self.owner = owner
313         self.lock = threading.Lock()
314         self.wait_for_commit = threading.Event()
315         self.error = None
316
317     @synchronized
318     def append(self, data):
319         """Append some data to the buffer.
320
321         Only valid if the block is in WRITABLE state.  Implements an expanding
322         buffer, doubling capacity as needed to accomdate all the data.
323
324         """
325         if self._state == _BufferBlock.WRITABLE:
326             while (self.write_pointer+len(data)) > len(self.buffer_block):
327                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
328                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
329                 self.buffer_block = new_buffer_block
330                 self.buffer_view = memoryview(self.buffer_block)
331             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
332             self.write_pointer += len(data)
333             self._locator = None
334         else:
335             raise AssertionError("Buffer block is not writable")
336
337     STATE_TRANSITIONS = frozenset([
338             (WRITABLE, PENDING),
339             (PENDING, COMMITTED),
340             (PENDING, ERROR),
341             (ERROR, PENDING)])
342
343     @synchronized
344     def set_state(self, nextstate, val=None):
345         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
346             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
347         self._state = nextstate
348
349         if self._state == _BufferBlock.PENDING:
350             self.wait_for_commit.clear()
351
352         if self._state == _BufferBlock.COMMITTED:
353             self._locator = val
354             self.buffer_view = None
355             self.buffer_block = None
356             self.wait_for_commit.set()
357
358         if self._state == _BufferBlock.ERROR:
359             self.error = val
360             self.wait_for_commit.set()
361
362     @synchronized
363     def state(self):
364         return self._state
365
366     def size(self):
367         """The amount of data written to the buffer."""
368         return self.write_pointer
369
370     @synchronized
371     def locator(self):
372         """The Keep locator for this buffer's contents."""
373         if self._locator is None:
374             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
375         return self._locator
376
377     @synchronized
378     def clone(self, new_blockid, owner):
379         if self._state == _BufferBlock.COMMITTED:
380             raise AssertionError("Cannot duplicate committed buffer block")
381         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
382         bufferblock.append(self.buffer_view[0:self.size()])
383         return bufferblock
384
385     @synchronized
386     def clear(self):
387         self._state = _BufferBlock.DELETED
388         self.owner = None
389         self.buffer_block = None
390         self.buffer_view = None
391
392
393 class NoopLock(object):
394     def __enter__(self):
395         return self
396
397     def __exit__(self, exc_type, exc_value, traceback):
398         pass
399
400     def acquire(self, blocking=False):
401         pass
402
403     def release(self):
404         pass
405
406
407 def must_be_writable(orig_func):
408     @functools.wraps(orig_func)
409     def must_be_writable_wrapper(self, *args, **kwargs):
410         if not self.writable():
411             raise IOError(errno.EROFS, "Collection is read-only.")
412         return orig_func(self, *args, **kwargs)
413     return must_be_writable_wrapper
414
415
416 class _BlockManager(object):
417     """BlockManager handles buffer blocks.
418
419     Also handles background block uploads, and background block prefetch for a
420     Collection of ArvadosFiles.
421
422     """
423
424     DEFAULT_PUT_THREADS = 2
425     DEFAULT_GET_THREADS = 2
426
427     def __init__(self, keep, copies=None, put_threads=None):
428         """keep: KeepClient object to use"""
429         self._keep = keep
430         self._bufferblocks = collections.OrderedDict()
431         self._put_queue = None
432         self._put_threads = None
433         self._prefetch_queue = None
434         self._prefetch_threads = None
435         self.lock = threading.Lock()
436         self.prefetch_enabled = True
437         if put_threads:
438             self.num_put_threads = put_threads
439         else:
440             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
441         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
442         self.copies = copies
443         self._pending_write_size = 0
444         self.threads_lock = threading.Lock()
445         self.padding_block = None
446
447     @synchronized
448     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
449         """Allocate a new, empty bufferblock in WRITABLE state and return it.
450
451         :blockid:
452           optional block identifier, otherwise one will be automatically assigned
453
454         :starting_capacity:
455           optional capacity, otherwise will use default capacity
456
457         :owner:
458           ArvadosFile that owns this block
459
460         """
461         return self._alloc_bufferblock(blockid, starting_capacity, owner)
462
463     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
464         if blockid is None:
465             blockid = str(uuid.uuid4())
466         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
467         self._bufferblocks[bufferblock.blockid] = bufferblock
468         return bufferblock
469
470     @synchronized
471     def dup_block(self, block, owner):
472         """Create a new bufferblock initialized with the content of an existing bufferblock.
473
474         :block:
475           the buffer block to copy.
476
477         :owner:
478           ArvadosFile that owns the new block
479
480         """
481         new_blockid = str(uuid.uuid4())
482         bufferblock = block.clone(new_blockid, owner)
483         self._bufferblocks[bufferblock.blockid] = bufferblock
484         return bufferblock
485
486     @synchronized
487     def is_bufferblock(self, locator):
488         return locator in self._bufferblocks
489
490     def _commit_bufferblock_worker(self):
491         """Background uploader thread."""
492
493         while True:
494             try:
495                 bufferblock = self._put_queue.get()
496                 if bufferblock is None:
497                     return
498
499                 if self.copies is None:
500                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
501                 else:
502                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
503                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
504
505             except Exception as e:
506                 bufferblock.set_state(_BufferBlock.ERROR, e)
507             finally:
508                 if self._put_queue is not None:
509                     self._put_queue.task_done()
510
511     def start_put_threads(self):
512         with self.threads_lock:
513             if self._put_threads is None:
514                 # Start uploader threads.
515
516                 # If we don't limit the Queue size, the upload queue can quickly
517                 # grow to take up gigabytes of RAM if the writing process is
518                 # generating data more quickly than it can be send to the Keep
519                 # servers.
520                 #
521                 # With two upload threads and a queue size of 2, this means up to 4
522                 # blocks pending.  If they are full 64 MiB blocks, that means up to
523                 # 256 MiB of internal buffering, which is the same size as the
524                 # default download block cache in KeepClient.
525                 self._put_queue = Queue.Queue(maxsize=2)
526
527                 self._put_threads = []
528                 for i in xrange(0, self.num_put_threads):
529                     thread = threading.Thread(target=self._commit_bufferblock_worker)
530                     self._put_threads.append(thread)
531                     thread.daemon = True
532                     thread.start()
533
534     def _block_prefetch_worker(self):
535         """The background downloader thread."""
536         while True:
537             try:
538                 b = self._prefetch_queue.get()
539                 if b is None:
540                     return
541                 self._keep.get(b)
542             except Exception:
543                 _logger.exception("Exception doing block prefetch")
544
545     @synchronized
546     def start_get_threads(self):
547         if self._prefetch_threads is None:
548             self._prefetch_queue = Queue.Queue()
549             self._prefetch_threads = []
550             for i in xrange(0, self.num_get_threads):
551                 thread = threading.Thread(target=self._block_prefetch_worker)
552                 self._prefetch_threads.append(thread)
553                 thread.daemon = True
554                 thread.start()
555
556
557     @synchronized
558     def stop_threads(self):
559         """Shut down and wait for background upload and download threads to finish."""
560
561         if self._put_threads is not None:
562             for t in self._put_threads:
563                 self._put_queue.put(None)
564             for t in self._put_threads:
565                 t.join()
566         self._put_threads = None
567         self._put_queue = None
568
569         if self._prefetch_threads is not None:
570             for t in self._prefetch_threads:
571                 self._prefetch_queue.put(None)
572             for t in self._prefetch_threads:
573                 t.join()
574         self._prefetch_threads = None
575         self._prefetch_queue = None
576
577     def __enter__(self):
578         return self
579
580     def __exit__(self, exc_type, exc_value, traceback):
581         self.stop_threads()
582
583     @synchronized
584     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
585         """Packs small blocks together before uploading"""
586         self._pending_write_size += closed_file_size
587
588         # Check if there are enough small blocks for filling up one in full
589         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
590
591             # Search blocks ready for getting packed together before being committed to Keep.
592             # A WRITABLE block always has an owner.
593             # A WRITABLE block with its owner.closed() implies that it's
594             # size is <= KEEP_BLOCK_SIZE/2.
595             try:
596                 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
597             except AttributeError:
598                 # Writable blocks without owner shouldn't exist.
599                 raise UnownedBlockError()
600
601             if len(small_blocks) <= 1:
602                 # Not enough small blocks for repacking
603                 return
604
605             # Update the pending write size count with its true value, just in case
606             # some small file was opened, written and closed several times.
607             self._pending_write_size = sum([b.size() for b in small_blocks])
608             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
609                 return
610
611             new_bb = self._alloc_bufferblock()
612             files = []
613             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
614                 bb = small_blocks.pop(0)
615                 self._pending_write_size -= bb.size()
616                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
617                 files.append((bb, new_bb.write_pointer - bb.size()))
618
619             self.commit_bufferblock(new_bb, sync=sync)
620
621             for bb, segment_offset in files:
622                 bb.owner.set_segments([Range(new_bb.locator(), 0, bb.size(), segment_offset)])
623                 self._delete_bufferblock(bb.blockid)
624
625     def commit_bufferblock(self, block, sync):
626         """Initiate a background upload of a bufferblock.
627
628         :block:
629           The block object to upload
630
631         :sync:
632           If `sync` is True, upload the block synchronously.
633           If `sync` is False, upload the block asynchronously.  This will
634           return immediately unless the upload queue is at capacity, in
635           which case it will wait on an upload queue slot.
636
637         """
638         try:
639             # Mark the block as PENDING so to disallow any more appends.
640             block.set_state(_BufferBlock.PENDING)
641         except StateChangeError as e:
642             if e.state == _BufferBlock.PENDING:
643                 if sync:
644                     block.wait_for_commit.wait()
645                 else:
646                     return
647             if block.state() == _BufferBlock.COMMITTED:
648                 return
649             elif block.state() == _BufferBlock.ERROR:
650                 raise block.error
651             else:
652                 raise
653
654         if sync:
655             try:
656                 if self.copies is None:
657                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
658                 else:
659                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
660                 block.set_state(_BufferBlock.COMMITTED, loc)
661             except Exception as e:
662                 block.set_state(_BufferBlock.ERROR, e)
663                 raise
664         else:
665             self.start_put_threads()
666             self._put_queue.put(block)
667
668     @synchronized
669     def get_bufferblock(self, locator):
670         return self._bufferblocks.get(locator)
671
672     @synchronized
673     def get_padding_block(self):
674         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
675         when using truncate() to extend the size of a file.
676
677         For reference (and possible future optimization), the md5sum of the
678         padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
679
680         """
681
682         if self.padding_block is None:
683             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
684             self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
685             self.commit_bufferblock(self.padding_block, False)
686         return self.padding_block
687
688     @synchronized
689     def delete_bufferblock(self, locator):
690         self._delete_bufferblock(locator)
691
692     def _delete_bufferblock(self, locator):
693         bb = self._bufferblocks[locator]
694         bb.clear()
695         del self._bufferblocks[locator]
696
697     def get_block_contents(self, locator, num_retries, cache_only=False):
698         """Fetch a block.
699
700         First checks to see if the locator is a BufferBlock and return that, if
701         not, passes the request through to KeepClient.get().
702
703         """
704         with self.lock:
705             if locator in self._bufferblocks:
706                 bufferblock = self._bufferblocks[locator]
707                 if bufferblock.state() != _BufferBlock.COMMITTED:
708                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
709                 else:
710                     locator = bufferblock._locator
711         if cache_only:
712             return self._keep.get_from_cache(locator)
713         else:
714             return self._keep.get(locator, num_retries=num_retries)
715
716     def commit_all(self):
717         """Commit all outstanding buffer blocks.
718
719         This is a synchronous call, and will not return until all buffer blocks
720         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
721
722         """
723         self.repack_small_blocks(force=True, sync=True)
724
725         with self.lock:
726             items = self._bufferblocks.items()
727
728         for k,v in items:
729             if v.state() != _BufferBlock.COMMITTED and v.owner:
730                 v.owner.flush(sync=False)
731
732         with self.lock:
733             if self._put_queue is not None:
734                 self._put_queue.join()
735
736                 err = []
737                 for k,v in items:
738                     if v.state() == _BufferBlock.ERROR:
739                         err.append((v.locator(), v.error))
740                 if err:
741                     raise KeepWriteError("Error writing some blocks", err, label="block")
742
743         for k,v in items:
744             # flush again with sync=True to remove committed bufferblocks from
745             # the segments.
746             if v.owner:
747                 v.owner.flush(sync=True)
748
749     def block_prefetch(self, locator):
750         """Initiate a background download of a block.
751
752         This assumes that the underlying KeepClient implements a block cache,
753         so repeated requests for the same block will not result in repeated
754         downloads (unless the block is evicted from the cache.)  This method
755         does not block.
756
757         """
758
759         if not self.prefetch_enabled:
760             return
761
762         if self._keep.get_from_cache(locator) is not None:
763             return
764
765         with self.lock:
766             if locator in self._bufferblocks:
767                 return
768
769         self.start_get_threads()
770         self._prefetch_queue.put(locator)
771
772
773 class ArvadosFile(object):
774     """Represent a file in a Collection.
775
776     ArvadosFile manages the underlying representation of a file in Keep as a
777     sequence of segments spanning a set of blocks, and implements random
778     read/write access.
779
780     This object may be accessed from multiple threads.
781
782     """
783
784     def __init__(self, parent, name, stream=[], segments=[]):
785         """
786         ArvadosFile constructor.
787
788         :stream:
789           a list of Range objects representing a block stream
790
791         :segments:
792           a list of Range objects representing segments
793         """
794         self.parent = parent
795         self.name = name
796         self._writers = set()
797         self._committed = False
798         self._segments = []
799         self.lock = parent.root_collection().lock
800         for s in segments:
801             self._add_segment(stream, s.locator, s.range_size)
802         self._current_bblock = None
803
804     def writable(self):
805         return self.parent.writable()
806
807     @synchronized
808     def permission_expired(self, as_of_dt=None):
809         """Returns True if any of the segment's locators is expired"""
810         for r in self._segments:
811             if KeepLocator(r.locator).permission_expired(as_of_dt):
812                 return True
813         return False
814
815     @synchronized
816     def segments(self):
817         return copy.copy(self._segments)
818
819     @synchronized
820     def clone(self, new_parent, new_name):
821         """Make a copy of this file."""
822         cp = ArvadosFile(new_parent, new_name)
823         cp.replace_contents(self)
824         return cp
825
826     @must_be_writable
827     @synchronized
828     def replace_contents(self, other):
829         """Replace segments of this file with segments from another `ArvadosFile` object."""
830
831         map_loc = {}
832         self._segments = []
833         for other_segment in other.segments():
834             new_loc = other_segment.locator
835             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
836                 if other_segment.locator not in map_loc:
837                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
838                     if bufferblock.state() != _BufferBlock.WRITABLE:
839                         map_loc[other_segment.locator] = bufferblock.locator()
840                     else:
841                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
842                 new_loc = map_loc[other_segment.locator]
843
844             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
845
846         self.set_committed(False)
847
848     def __eq__(self, other):
849         if other is self:
850             return True
851         if not isinstance(other, ArvadosFile):
852             return False
853
854         othersegs = other.segments()
855         with self.lock:
856             if len(self._segments) != len(othersegs):
857                 return False
858             for i in xrange(0, len(othersegs)):
859                 seg1 = self._segments[i]
860                 seg2 = othersegs[i]
861                 loc1 = seg1.locator
862                 loc2 = seg2.locator
863
864                 if self.parent._my_block_manager().is_bufferblock(loc1):
865                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
866
867                 if other.parent._my_block_manager().is_bufferblock(loc2):
868                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
869
870                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
871                     seg1.range_start != seg2.range_start or
872                     seg1.range_size != seg2.range_size or
873                     seg1.segment_offset != seg2.segment_offset):
874                     return False
875
876         return True
877
878     def __ne__(self, other):
879         return not self.__eq__(other)
880
881     @synchronized
882     def set_segments(self, segs):
883         self._segments = segs
884
885     @synchronized
886     def set_committed(self, value=True):
887         """Set committed flag.
888
889         If value is True, set committed to be True.
890
891         If value is False, set committed to be False for this and all parents.
892         """
893         if value == self._committed:
894             return
895         self._committed = value
896         if self._committed is False and self.parent is not None:
897             self.parent.set_committed(False)
898
899     @synchronized
900     def committed(self):
901         """Get whether this is committed or not."""
902         return self._committed
903
904     @synchronized
905     def add_writer(self, writer):
906         """Add an ArvadosFileWriter reference to the list of writers"""
907         if isinstance(writer, ArvadosFileWriter):
908             self._writers.add(writer)
909
910     @synchronized
911     def remove_writer(self, writer, flush):
912         """
913         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
914         and do some block maintenance tasks.
915         """
916         self._writers.remove(writer)
917
918         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
919             # File writer closed, not small enough for repacking
920             self.flush()
921         elif self.closed():
922             # All writers closed and size is adequate for repacking
923             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
924
925     def closed(self):
926         """
927         Get whether this is closed or not. When the writers list is empty, the file
928         is supposed to be closed.
929         """
930         return len(self._writers) == 0
931
932     @must_be_writable
933     @synchronized
934     def truncate(self, size):
935         """Shrink or expand the size of the file.
936
937         If `size` is less than the size of the file, the file contents after
938         `size` will be discarded.  If `size` is greater than the current size
939         of the file, it will be filled with zero bytes.
940
941         """
942         if size < self.size():
943             new_segs = []
944             for r in self._segments:
945                 range_end = r.range_start+r.range_size
946                 if r.range_start >= size:
947                     # segment is past the trucate size, all done
948                     break
949                 elif size < range_end:
950                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
951                     nr.segment_offset = r.segment_offset
952                     new_segs.append(nr)
953                     break
954                 else:
955                     new_segs.append(r)
956
957             self._segments = new_segs
958             self.set_committed(False)
959         elif size > self.size():
960             padding = self.parent._my_block_manager().get_padding_block()
961             diff = size - self.size()
962             while diff > config.KEEP_BLOCK_SIZE:
963                 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
964                 diff -= config.KEEP_BLOCK_SIZE
965             if diff > 0:
966                 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
967             self.set_committed(False)
968         else:
969             # size == self.size()
970             pass
971
972     def readfrom(self, offset, size, num_retries, exact=False):
973         """Read up to `size` bytes from the file starting at `offset`.
974
975         :exact:
976          If False (default), return less data than requested if the read
977          crosses a block boundary and the next block isn't cached.  If True,
978          only return less data than requested when hitting EOF.
979         """
980
981         with self.lock:
982             if size == 0 or offset >= self.size():
983                 return ''
984             readsegs = locators_and_ranges(self._segments, offset, size)
985             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
986
987         locs = set()
988         data = []
989         for lr in readsegs:
990             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
991             if block:
992                 blockview = memoryview(block)
993                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
994                 locs.add(lr.locator)
995             else:
996                 break
997
998         for lr in prefetch:
999             if lr.locator not in locs:
1000                 self.parent._my_block_manager().block_prefetch(lr.locator)
1001                 locs.add(lr.locator)
1002
1003         return ''.join(data)
1004
1005     def _repack_writes(self, num_retries):
1006         """Optimize buffer block by repacking segments in file sequence.
1007
1008         When the client makes random writes, they appear in the buffer block in
1009         the sequence they were written rather than the sequence they appear in
1010         the file.  This makes for inefficient, fragmented manifests.  Attempt
1011         to optimize by repacking writes in file sequence.
1012
1013         """
1014         segs = self._segments
1015
1016         # Collect the segments that reference the buffer block.
1017         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
1018
1019         # Collect total data referenced by segments (could be smaller than
1020         # bufferblock size if a portion of the file was written and
1021         # then overwritten).
1022         write_total = sum([s.range_size for s in bufferblock_segs])
1023
1024         if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
1025             # If there's more than one segment referencing this block, it is
1026             # due to out-of-order writes and will produce a fragmented
1027             # manifest, so try to optimize by re-packing into a new buffer.
1028             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
1029             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
1030             for t in bufferblock_segs:
1031                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
1032                 t.segment_offset = new_bb.size() - t.range_size
1033
1034             self._current_bblock = new_bb
1035
1036     @must_be_writable
1037     @synchronized
1038     def writeto(self, offset, data, num_retries):
1039         """Write `data` to the file starting at `offset`.
1040
1041         This will update existing bytes and/or extend the size of the file as
1042         necessary.
1043
1044         """
1045         if len(data) == 0:
1046             return
1047
1048         if offset > self.size():
1049             self.truncate(offset)
1050
1051         if len(data) > config.KEEP_BLOCK_SIZE:
1052             # Chunk it up into smaller writes
1053             n = 0
1054             dataview = memoryview(data)
1055             while n < len(data):
1056                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1057                 n += config.KEEP_BLOCK_SIZE
1058             return
1059
1060         self.set_committed(False)
1061
1062         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1063             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1064
1065         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1066             self._repack_writes(num_retries)
1067             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1068                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1069                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1070
1071         self._current_bblock.append(data)
1072
1073         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1074
1075         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1076
1077         return len(data)
1078
1079     @synchronized
1080     def flush(self, sync=True, num_retries=0):
1081         """Flush the current bufferblock to Keep.
1082
1083         :sync:
1084           If True, commit block synchronously, wait until buffer block has been written.
1085           If False, commit block asynchronously, return immediately after putting block into
1086           the keep put queue.
1087         """
1088         if self.committed():
1089             return
1090
1091         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1092             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1093                 self._repack_writes(num_retries)
1094             if self._current_bblock.state() != _BufferBlock.DELETED:
1095                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1096
1097         if sync:
1098             to_delete = set()
1099             for s in self._segments:
1100                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1101                 if bb:
1102                     if bb.state() != _BufferBlock.COMMITTED:
1103                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1104                     to_delete.add(s.locator)
1105                     s.locator = bb.locator()
1106             for s in to_delete:
1107                self.parent._my_block_manager().delete_bufferblock(s)
1108
1109         self.parent.notify(MOD, self.parent, self.name, (self, self))
1110
1111     @must_be_writable
1112     @synchronized
1113     def add_segment(self, blocks, pos, size):
1114         """Add a segment to the end of the file.
1115
1116         `pos` and `offset` reference a section of the stream described by
1117         `blocks` (a list of Range objects)
1118
1119         """
1120         self._add_segment(blocks, pos, size)
1121
1122     def _add_segment(self, blocks, pos, size):
1123         """Internal implementation of add_segment."""
1124         self.set_committed(False)
1125         for lr in locators_and_ranges(blocks, pos, size):
1126             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1127             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1128             self._segments.append(r)
1129
1130     @synchronized
1131     def size(self):
1132         """Get the file size."""
1133         if self._segments:
1134             n = self._segments[-1]
1135             return n.range_start + n.range_size
1136         else:
1137             return 0
1138
1139     @synchronized
1140     def manifest_text(self, stream_name=".", portable_locators=False,
1141                       normalize=False, only_committed=False):
1142         buf = ""
1143         filestream = []
1144         for segment in self.segments:
1145             loc = segment.locator
1146             if self.parent._my_block_manager().is_bufferblock(loc):
1147                 if only_committed:
1148                     continue
1149                 loc = self._bufferblocks[loc].calculate_locator()
1150             if portable_locators:
1151                 loc = KeepLocator(loc).stripped()
1152             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1153                                  segment.segment_offset, segment.range_size))
1154         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1155         buf += "\n"
1156         return buf
1157
1158     @must_be_writable
1159     @synchronized
1160     def _reparent(self, newparent, newname):
1161         self.set_committed(False)
1162         self.flush(sync=True)
1163         self.parent.remove(self.name)
1164         self.parent = newparent
1165         self.name = newname
1166         self.lock = self.parent.root_collection().lock
1167
1168
1169 class ArvadosFileReader(ArvadosFileReaderBase):
1170     """Wraps ArvadosFile in a file-like object supporting reading only.
1171
1172     Be aware that this class is NOT thread safe as there is no locking around
1173     updating file pointer.
1174
1175     """
1176
1177     def __init__(self, arvadosfile, num_retries=None):
1178         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1179         self.arvadosfile = arvadosfile
1180
1181     def size(self):
1182         return self.arvadosfile.size()
1183
1184     def stream_name(self):
1185         return self.arvadosfile.parent.stream_name()
1186
1187     @_FileLikeObjectBase._before_close
1188     @retry_method
1189     def read(self, size=None, num_retries=None):
1190         """Read up to `size` bytes from the file and return the result.
1191
1192         Starts at the current file position.  If `size` is None, read the
1193         entire remainder of the file.
1194         """
1195         if size is None:
1196             data = []
1197             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1198             while rd:
1199                 data.append(rd)
1200                 self._filepos += len(rd)
1201                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1202             return ''.join(data)
1203         else:
1204             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1205             self._filepos += len(data)
1206             return data
1207
1208     @_FileLikeObjectBase._before_close
1209     @retry_method
1210     def readfrom(self, offset, size, num_retries=None):
1211         """Read up to `size` bytes from the stream, starting at the specified file offset.
1212
1213         This method does not change the file position.
1214         """
1215         return self.arvadosfile.readfrom(offset, size, num_retries)
1216
1217     def flush(self):
1218         pass
1219
1220
1221 class ArvadosFileWriter(ArvadosFileReader):
1222     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1223
1224     Be aware that this class is NOT thread safe as there is no locking around
1225     updating file pointer.
1226
1227     """
1228
1229     def __init__(self, arvadosfile, mode, num_retries=None):
1230         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1231         self.mode = mode
1232         self.arvadosfile.add_writer(self)
1233
1234     def writable(self):
1235         return True
1236
1237     @_FileLikeObjectBase._before_close
1238     @retry_method
1239     def write(self, data, num_retries=None):
1240         if self.mode[0] == "a":
1241             self.arvadosfile.writeto(self.size(), data, num_retries)
1242         else:
1243             self.arvadosfile.writeto(self._filepos, data, num_retries)
1244             self._filepos += len(data)
1245         return len(data)
1246
1247     @_FileLikeObjectBase._before_close
1248     @retry_method
1249     def writelines(self, seq, num_retries=None):
1250         for s in seq:
1251             self.write(s, num_retries=num_retries)
1252
1253     @_FileLikeObjectBase._before_close
1254     def truncate(self, size=None):
1255         if size is None:
1256             size = self._filepos
1257         self.arvadosfile.truncate(size)
1258
1259     @_FileLikeObjectBase._before_close
1260     def flush(self):
1261         self.arvadosfile.flush()
1262
1263     def close(self, flush=True):
1264         if not self.closed:
1265             self.arvadosfile.remove_writer(self, flush)
1266             super(ArvadosFileWriter, self).close()