10243: Merge branch 'master' into 10243-make-packing-optional
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41 class _FileLikeObjectBase(object):
42     def __init__(self, name, mode):
43         self.name = name
44         self.mode = mode
45         self.closed = False
46
47     @staticmethod
48     def _before_close(orig_func):
49         @functools.wraps(orig_func)
50         def before_close_wrapper(self, *args, **kwargs):
51             if self.closed:
52                 raise ValueError("I/O operation on closed stream file")
53             return orig_func(self, *args, **kwargs)
54         return before_close_wrapper
55
56     def __enter__(self):
57         return self
58
59     def __exit__(self, exc_type, exc_value, traceback):
60         try:
61             self.close()
62         except Exception:
63             if exc_type is None:
64                 raise
65
66     def close(self):
67         self.closed = True
68
69
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71     def __init__(self, name, mode, num_retries=None):
72         super(ArvadosFileReaderBase, self).__init__(name, mode)
73         self._filepos = 0L
74         self.num_retries = num_retries
75         self._readline_cache = (None, None)
76
77     def __iter__(self):
78         while True:
79             data = self.readline()
80             if not data:
81                 break
82             yield data
83
84     def decompressed_name(self):
85         return re.sub('\.(bz2|gz)$', '', self.name)
86
87     @_FileLikeObjectBase._before_close
88     def seek(self, pos, whence=os.SEEK_SET):
89         if whence == os.SEEK_CUR:
90             pos += self._filepos
91         elif whence == os.SEEK_END:
92             pos += self.size()
93         self._filepos = min(max(pos, 0L), self.size())
94
95     def tell(self):
96         return self._filepos
97
98     @_FileLikeObjectBase._before_close
99     @retry_method
100     def readall(self, size=2**20, num_retries=None):
101         while True:
102             data = self.read(size, num_retries=num_retries)
103             if data == '':
104                 break
105             yield data
106
107     @_FileLikeObjectBase._before_close
108     @retry_method
109     def readline(self, size=float('inf'), num_retries=None):
110         cache_pos, cache_data = self._readline_cache
111         if self.tell() == cache_pos:
112             data = [cache_data]
113             self._filepos += len(cache_data)
114         else:
115             data = ['']
116         data_size = len(data[-1])
117         while (data_size < size) and ('\n' not in data[-1]):
118             next_read = self.read(2 ** 20, num_retries=num_retries)
119             if not next_read:
120                 break
121             data.append(next_read)
122             data_size += len(next_read)
123         data = ''.join(data)
124         try:
125             nextline_index = data.index('\n') + 1
126         except ValueError:
127             nextline_index = len(data)
128         nextline_index = min(nextline_index, size)
129         self._filepos -= len(data) - nextline_index
130         self._readline_cache = (self.tell(), data[nextline_index:])
131         return data[:nextline_index]
132
133     @_FileLikeObjectBase._before_close
134     @retry_method
135     def decompress(self, decompress, size, num_retries=None):
136         for segment in self.readall(size, num_retries=num_retries):
137             data = decompress(segment)
138             if data:
139                 yield data
140
141     @_FileLikeObjectBase._before_close
142     @retry_method
143     def readall_decompressed(self, size=2**20, num_retries=None):
144         self.seek(0)
145         if self.name.endswith('.bz2'):
146             dc = bz2.BZ2Decompressor()
147             return self.decompress(dc.decompress, size,
148                                    num_retries=num_retries)
149         elif self.name.endswith('.gz'):
150             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152                                    size, num_retries=num_retries)
153         else:
154             return self.readall(size, num_retries=num_retries)
155
156     @_FileLikeObjectBase._before_close
157     @retry_method
158     def readlines(self, sizehint=float('inf'), num_retries=None):
159         data = []
160         data_size = 0
161         for s in self.readall(num_retries=num_retries):
162             data.append(s)
163             data_size += len(s)
164             if data_size >= sizehint:
165                 break
166         return ''.join(data).splitlines(True)
167
168     def size(self):
169         raise NotImplementedError()
170
171     def read(self, size, num_retries=None):
172         raise NotImplementedError()
173
174     def readfrom(self, start, size, num_retries=None):
175         raise NotImplementedError()
176
177
178 class StreamFileReader(ArvadosFileReaderBase):
179     class _NameAttribute(str):
180         # The Python file API provides a plain .name attribute.
181         # Older SDK provided a name() method.
182         # This class provides both, for maximum compatibility.
183         def __call__(self):
184             return self
185
186     def __init__(self, stream, segments, name):
187         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188         self._stream = stream
189         self.segments = segments
190
191     def stream_name(self):
192         return self._stream.name()
193
194     def size(self):
195         n = self.segments[-1]
196         return n.range_start + n.range_size
197
198     @_FileLikeObjectBase._before_close
199     @retry_method
200     def read(self, size, num_retries=None):
201         """Read up to 'size' bytes from the stream, starting at the current file position"""
202         if size == 0:
203             return ''
204
205         data = ''
206         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
207         if available_chunks:
208             lr = available_chunks[0]
209             data = self._stream.readfrom(lr.locator+lr.segment_offset,
210                                           lr.segment_size,
211                                           num_retries=num_retries)
212
213         self._filepos += len(data)
214         return data
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def readfrom(self, start, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at 'start'"""
220         if size == 0:
221             return ''
222
223         data = []
224         for lr in locators_and_ranges(self.segments, start, size):
225             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226                                               num_retries=num_retries))
227         return ''.join(data)
228
229     def as_manifest(self):
230         segs = []
231         for r in self.segments:
232             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234
235
236 def synchronized(orig_func):
237     @functools.wraps(orig_func)
238     def synchronized_wrapper(self, *args, **kwargs):
239         with self.lock:
240             return orig_func(self, *args, **kwargs)
241     return synchronized_wrapper
242
243
244 class StateChangeError(Exception):
245     def __init__(self, message, state, nextstate):
246         super(StateChangeError, self).__init__(message)
247         self.state = state
248         self.nextstate = nextstate
249
250 class _BufferBlock(object):
251     """A stand-in for a Keep block that is in the process of being written.
252
253     Writers can append to it, get the size, and compute the Keep locator.
254     There are three valid states:
255
256     WRITABLE
257       Can append to block.
258
259     PENDING
260       Block is in the process of being uploaded to Keep, append is an error.
261
262     COMMITTED
263       The block has been written to Keep, its internal buffer has been
264       released, fetching the block will fetch it via keep client (since we
265       discarded the internal copy), and identifiers referring to the BufferBlock
266       can be replaced with the block locator.
267
268     """
269
270     WRITABLE = 0
271     PENDING = 1
272     COMMITTED = 2
273     ERROR = 3
274
275     def __init__(self, blockid, starting_capacity, owner):
276         """
277         :blockid:
278           the identifier for this block
279
280         :starting_capacity:
281           the initial buffer capacity
282
283         :owner:
284           ArvadosFile that owns this block
285
286         """
287         self.blockid = blockid
288         self.buffer_block = bytearray(starting_capacity)
289         self.buffer_view = memoryview(self.buffer_block)
290         self.write_pointer = 0
291         self._state = _BufferBlock.WRITABLE
292         self._locator = None
293         self.owner = owner
294         self.lock = threading.Lock()
295         self.wait_for_commit = threading.Event()
296         self.error = None
297
298     @synchronized
299     def append(self, data):
300         """Append some data to the buffer.
301
302         Only valid if the block is in WRITABLE state.  Implements an expanding
303         buffer, doubling capacity as needed to accomdate all the data.
304
305         """
306         if self._state == _BufferBlock.WRITABLE:
307             while (self.write_pointer+len(data)) > len(self.buffer_block):
308                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310                 self.buffer_block = new_buffer_block
311                 self.buffer_view = memoryview(self.buffer_block)
312             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313             self.write_pointer += len(data)
314             self._locator = None
315         else:
316             raise AssertionError("Buffer block is not writable")
317
318     STATE_TRANSITIONS = frozenset([
319             (WRITABLE, PENDING),
320             (PENDING, COMMITTED),
321             (PENDING, ERROR),
322             (ERROR, PENDING)])
323
324     @synchronized
325     def set_state(self, nextstate, val=None):
326         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328         self._state = nextstate
329
330         if self._state == _BufferBlock.PENDING:
331             self.wait_for_commit.clear()
332
333         if self._state == _BufferBlock.COMMITTED:
334             self._locator = val
335             self.buffer_view = None
336             self.buffer_block = None
337             self.wait_for_commit.set()
338
339         if self._state == _BufferBlock.ERROR:
340             self.error = val
341             self.wait_for_commit.set()
342
343     @synchronized
344     def state(self):
345         return self._state
346
347     def size(self):
348         """The amount of data written to the buffer."""
349         return self.write_pointer
350
351     @synchronized
352     def locator(self):
353         """The Keep locator for this buffer's contents."""
354         if self._locator is None:
355             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
356         return self._locator
357
358     @synchronized
359     def clone(self, new_blockid, owner):
360         if self._state == _BufferBlock.COMMITTED:
361             raise AssertionError("Cannot duplicate committed buffer block")
362         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363         bufferblock.append(self.buffer_view[0:self.size()])
364         return bufferblock
365
366     @synchronized
367     def clear(self):
368         self.owner = None
369         self.buffer_block = None
370         self.buffer_view = None
371
372
373 class NoopLock(object):
374     def __enter__(self):
375         return self
376
377     def __exit__(self, exc_type, exc_value, traceback):
378         pass
379
380     def acquire(self, blocking=False):
381         pass
382
383     def release(self):
384         pass
385
386
387 def must_be_writable(orig_func):
388     @functools.wraps(orig_func)
389     def must_be_writable_wrapper(self, *args, **kwargs):
390         if not self.writable():
391             raise IOError(errno.EROFS, "Collection is read-only.")
392         return orig_func(self, *args, **kwargs)
393     return must_be_writable_wrapper
394
395
396 class _BlockManager(object):
397     """BlockManager handles buffer blocks.
398
399     Also handles background block uploads, and background block prefetch for a
400     Collection of ArvadosFiles.
401
402     """
403
404     DEFAULT_PUT_THREADS = 2
405     DEFAULT_GET_THREADS = 2
406
407     def __init__(self, keep, copies=None):
408         """keep: KeepClient object to use"""
409         self._keep = keep
410         self._bufferblocks = collections.OrderedDict()
411         self._put_queue = None
412         self._put_threads = None
413         self._prefetch_queue = None
414         self._prefetch_threads = None
415         self.lock = threading.Lock()
416         self.prefetch_enabled = True
417         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
418         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
419         self.copies = copies
420
421     @synchronized
422     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
423         """Allocate a new, empty bufferblock in WRITABLE state and return it.
424
425         :blockid:
426           optional block identifier, otherwise one will be automatically assigned
427
428         :starting_capacity:
429           optional capacity, otherwise will use default capacity
430
431         :owner:
432           ArvadosFile that owns this block
433
434         """
435         return self._alloc_bufferblock(blockid, starting_capacity, owner)
436
437     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
438         if blockid is None:
439             blockid = "%s" % uuid.uuid4()
440         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
441         self._bufferblocks[bufferblock.blockid] = bufferblock
442         return bufferblock
443
444     @synchronized
445     def dup_block(self, block, owner):
446         """Create a new bufferblock initialized with the content of an existing bufferblock.
447
448         :block:
449           the buffer block to copy.
450
451         :owner:
452           ArvadosFile that owns the new block
453
454         """
455         new_blockid = "bufferblock%i" % len(self._bufferblocks)
456         bufferblock = block.clone(new_blockid, owner)
457         self._bufferblocks[bufferblock.blockid] = bufferblock
458         return bufferblock
459
460     @synchronized
461     def is_bufferblock(self, locator):
462         return locator in self._bufferblocks
463
464     def _commit_bufferblock_worker(self):
465         """Background uploader thread."""
466
467         while True:
468             try:
469                 bufferblock = self._put_queue.get()
470                 if bufferblock is None:
471                     return
472
473                 if self.copies is None:
474                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
475                 else:
476                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
477                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
478
479             except Exception as e:
480                 bufferblock.set_state(_BufferBlock.ERROR, e)
481             finally:
482                 if self._put_queue is not None:
483                     self._put_queue.task_done()
484
485     @synchronized
486     def start_put_threads(self):
487         if self._put_threads is None:
488             # Start uploader threads.
489
490             # If we don't limit the Queue size, the upload queue can quickly
491             # grow to take up gigabytes of RAM if the writing process is
492             # generating data more quickly than it can be send to the Keep
493             # servers.
494             #
495             # With two upload threads and a queue size of 2, this means up to 4
496             # blocks pending.  If they are full 64 MiB blocks, that means up to
497             # 256 MiB of internal buffering, which is the same size as the
498             # default download block cache in KeepClient.
499             self._put_queue = Queue.Queue(maxsize=2)
500
501             self._put_threads = []
502             for i in xrange(0, self.num_put_threads):
503                 thread = threading.Thread(target=self._commit_bufferblock_worker)
504                 self._put_threads.append(thread)
505                 thread.daemon = True
506                 thread.start()
507
508     def _block_prefetch_worker(self):
509         """The background downloader thread."""
510         while True:
511             try:
512                 b = self._prefetch_queue.get()
513                 if b is None:
514                     return
515                 self._keep.get(b)
516             except Exception:
517                 pass
518
519     @synchronized
520     def start_get_threads(self):
521         if self._prefetch_threads is None:
522             self._prefetch_queue = Queue.Queue()
523             self._prefetch_threads = []
524             for i in xrange(0, self.num_get_threads):
525                 thread = threading.Thread(target=self._block_prefetch_worker)
526                 self._prefetch_threads.append(thread)
527                 thread.daemon = True
528                 thread.start()
529
530
531     @synchronized
532     def stop_threads(self):
533         """Shut down and wait for background upload and download threads to finish."""
534
535         if self._put_threads is not None:
536             for t in self._put_threads:
537                 self._put_queue.put(None)
538             for t in self._put_threads:
539                 t.join()
540         self._put_threads = None
541         self._put_queue = None
542
543         if self._prefetch_threads is not None:
544             for t in self._prefetch_threads:
545                 self._prefetch_queue.put(None)
546             for t in self._prefetch_threads:
547                 t.join()
548         self._prefetch_threads = None
549         self._prefetch_queue = None
550
551     def __enter__(self):
552         return self
553
554     def __exit__(self, exc_type, exc_value, traceback):
555         self.stop_threads()
556
557     @synchronized
558     def repack_small_blocks(self, force=False, sync=False):
559         """Packs small blocks together before uploading"""
560         # Search blocks ready for getting packed together before being committed to Keep.
561         # A WRITABLE block always has an owner.
562         # A WRITABLE block with its owner.closed() implies that it's
563         # size is <= KEEP_BLOCK_SIZE/2.
564         small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
565         if len(small_blocks) <= 1:
566             # Not enough small blocks for repacking
567             return
568
569         # Check if there are enough small blocks for filling up one in full
570         pending_write_size = sum([b.size() for b in small_blocks])
571         if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
572             new_bb = self._alloc_bufferblock()
573             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
574                 bb = small_blocks.pop(0)
575                 arvfile = bb.owner
576                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
577                 arvfile.set_segments([Range(new_bb.blockid,
578                                             0,
579                                             bb.size(),
580                                             new_bb.write_pointer - bb.size())])
581                 self._delete_bufferblock(bb.blockid)
582             self.commit_bufferblock(new_bb, sync=sync)
583
584     def commit_bufferblock(self, block, sync):
585         """Initiate a background upload of a bufferblock.
586
587         :block:
588           The block object to upload
589
590         :sync:
591           If `sync` is True, upload the block synchronously.
592           If `sync` is False, upload the block asynchronously.  This will
593           return immediately unless the upload queue is at capacity, in
594           which case it will wait on an upload queue slot.
595
596         """
597         try:
598             # Mark the block as PENDING so to disallow any more appends.
599             block.set_state(_BufferBlock.PENDING)
600         except StateChangeError as e:
601             if e.state == _BufferBlock.PENDING:
602                 if sync:
603                     block.wait_for_commit.wait()
604                 else:
605                     return
606             if block.state() == _BufferBlock.COMMITTED:
607                 return
608             elif block.state() == _BufferBlock.ERROR:
609                 raise block.error
610             else:
611                 raise
612
613         if sync:
614             try:
615                 if self.copies is None:
616                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
617                 else:
618                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
619                 block.set_state(_BufferBlock.COMMITTED, loc)
620             except Exception as e:
621                 block.set_state(_BufferBlock.ERROR, e)
622                 raise
623         else:
624             self.start_put_threads()
625             self._put_queue.put(block)
626
627     @synchronized
628     def get_bufferblock(self, locator):
629         return self._bufferblocks.get(locator)
630
631     @synchronized
632     def delete_bufferblock(self, locator):
633         self._delete_bufferblock(locator)
634
635     def _delete_bufferblock(self, locator):
636         bb = self._bufferblocks[locator]
637         bb.clear()
638         del self._bufferblocks[locator]
639
640     def get_block_contents(self, locator, num_retries, cache_only=False):
641         """Fetch a block.
642
643         First checks to see if the locator is a BufferBlock and return that, if
644         not, passes the request through to KeepClient.get().
645
646         """
647         with self.lock:
648             if locator in self._bufferblocks:
649                 bufferblock = self._bufferblocks[locator]
650                 if bufferblock.state() != _BufferBlock.COMMITTED:
651                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
652                 else:
653                     locator = bufferblock._locator
654         if cache_only:
655             return self._keep.get_from_cache(locator)
656         else:
657             return self._keep.get(locator, num_retries=num_retries)
658
659     def commit_all(self):
660         """Commit all outstanding buffer blocks.
661
662         This is a synchronous call, and will not return until all buffer blocks
663         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
664
665         """
666         self.repack_small_blocks(force=True, sync=True)
667
668         with self.lock:
669             items = self._bufferblocks.items()
670
671         for k,v in items:
672             if v.state() != _BufferBlock.COMMITTED and v.owner:
673                 v.owner.flush(sync=False)
674
675         with self.lock:
676             if self._put_queue is not None:
677                 self._put_queue.join()
678
679                 err = []
680                 for k,v in items:
681                     if v.state() == _BufferBlock.ERROR:
682                         err.append((v.locator(), v.error))
683                 if err:
684                     raise KeepWriteError("Error writing some blocks", err, label="block")
685
686         for k,v in items:
687             # flush again with sync=True to remove committed bufferblocks from
688             # the segments.
689             if v.owner:
690                 v.owner.flush(sync=True)
691
692     def block_prefetch(self, locator):
693         """Initiate a background download of a block.
694
695         This assumes that the underlying KeepClient implements a block cache,
696         so repeated requests for the same block will not result in repeated
697         downloads (unless the block is evicted from the cache.)  This method
698         does not block.
699
700         """
701
702         if not self.prefetch_enabled:
703             return
704
705         if self._keep.get_from_cache(locator) is not None:
706             return
707
708         with self.lock:
709             if locator in self._bufferblocks:
710                 return
711
712         self.start_get_threads()
713         self._prefetch_queue.put(locator)
714
715
716 class ArvadosFile(object):
717     """Represent a file in a Collection.
718
719     ArvadosFile manages the underlying representation of a file in Keep as a
720     sequence of segments spanning a set of blocks, and implements random
721     read/write access.
722
723     This object may be accessed from multiple threads.
724
725     """
726
727     def __init__(self, parent, name, stream=[], segments=[]):
728         """
729         ArvadosFile constructor.
730
731         :stream:
732           a list of Range objects representing a block stream
733
734         :segments:
735           a list of Range objects representing segments
736         """
737         self.parent = parent
738         self.name = name
739         self._writers = set()
740         self._committed = False
741         self._segments = []
742         self.lock = parent.root_collection().lock
743         for s in segments:
744             self._add_segment(stream, s.locator, s.range_size)
745         self._current_bblock = None
746
747     def writable(self):
748         return self.parent.writable()
749
750     @synchronized
751     def segments(self):
752         return copy.copy(self._segments)
753
754     @synchronized
755     def clone(self, new_parent, new_name):
756         """Make a copy of this file."""
757         cp = ArvadosFile(new_parent, new_name)
758         cp.replace_contents(self)
759         return cp
760
761     @must_be_writable
762     @synchronized
763     def replace_contents(self, other):
764         """Replace segments of this file with segments from another `ArvadosFile` object."""
765
766         map_loc = {}
767         self._segments = []
768         for other_segment in other.segments():
769             new_loc = other_segment.locator
770             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
771                 if other_segment.locator not in map_loc:
772                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
773                     if bufferblock.state() != _BufferBlock.WRITABLE:
774                         map_loc[other_segment.locator] = bufferblock.locator()
775                     else:
776                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
777                 new_loc = map_loc[other_segment.locator]
778
779             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
780
781         self._committed = False
782
783     def __eq__(self, other):
784         if other is self:
785             return True
786         if not isinstance(other, ArvadosFile):
787             return False
788
789         othersegs = other.segments()
790         with self.lock:
791             if len(self._segments) != len(othersegs):
792                 return False
793             for i in xrange(0, len(othersegs)):
794                 seg1 = self._segments[i]
795                 seg2 = othersegs[i]
796                 loc1 = seg1.locator
797                 loc2 = seg2.locator
798
799                 if self.parent._my_block_manager().is_bufferblock(loc1):
800                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
801
802                 if other.parent._my_block_manager().is_bufferblock(loc2):
803                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
804
805                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
806                     seg1.range_start != seg2.range_start or
807                     seg1.range_size != seg2.range_size or
808                     seg1.segment_offset != seg2.segment_offset):
809                     return False
810
811         return True
812
813     def __ne__(self, other):
814         return not self.__eq__(other)
815
816     @synchronized
817     def set_segments(self, segs):
818         self._segments = segs
819
820     @synchronized
821     def set_committed(self):
822         """Set committed flag to True"""
823         self._committed = True
824
825     @synchronized
826     def committed(self):
827         """Get whether this is committed or not."""
828         return self._committed
829
830     @synchronized
831     def add_writer(self, writer):
832         """Add an ArvadosFileWriter reference to the list of writers"""
833         if isinstance(writer, ArvadosFileWriter):
834             self._writers.add(writer)
835
836     @synchronized
837     def remove_writer(self, writer, flush):
838         """
839         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
840         and do some block maintenance tasks.
841         """
842         self._writers.remove(writer)
843
844         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
845             # File writer closed, not small enough for repacking
846             self.flush()
847         elif self.closed():
848             # All writers closed and size is adequate for repacking
849             self.parent._my_block_manager().repack_small_blocks()
850
851     def closed(self):
852         """
853         Get whether this is closed or not. When the writers list is empty, the file
854         is supposed to be closed.
855         """
856         return len(self._writers) == 0
857
858     @must_be_writable
859     @synchronized
860     def truncate(self, size):
861         """Shrink the size of the file.
862
863         If `size` is less than the size of the file, the file contents after
864         `size` will be discarded.  If `size` is greater than the current size
865         of the file, an IOError will be raised.
866
867         """
868         if size < self.size():
869             new_segs = []
870             for r in self._segments:
871                 range_end = r.range_start+r.range_size
872                 if r.range_start >= size:
873                     # segment is past the trucate size, all done
874                     break
875                 elif size < range_end:
876                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
877                     nr.segment_offset = r.segment_offset
878                     new_segs.append(nr)
879                     break
880                 else:
881                     new_segs.append(r)
882
883             self._segments = new_segs
884             self._committed = False
885         elif size > self.size():
886             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
887
888     def readfrom(self, offset, size, num_retries, exact=False):
889         """Read up to `size` bytes from the file starting at `offset`.
890
891         :exact:
892          If False (default), return less data than requested if the read
893          crosses a block boundary and the next block isn't cached.  If True,
894          only return less data than requested when hitting EOF.
895         """
896
897         with self.lock:
898             if size == 0 or offset >= self.size():
899                 return ''
900             readsegs = locators_and_ranges(self._segments, offset, size)
901             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
902
903         locs = set()
904         data = []
905         for lr in readsegs:
906             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
907             if block:
908                 blockview = memoryview(block)
909                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
910                 locs.add(lr.locator)
911             else:
912                 break
913
914         for lr in prefetch:
915             if lr.locator not in locs:
916                 self.parent._my_block_manager().block_prefetch(lr.locator)
917                 locs.add(lr.locator)
918
919         return ''.join(data)
920
921     def _repack_writes(self, num_retries):
922         """Test if the buffer block has more data than actual segments.
923
924         This happens when a buffered write over-writes a file range written in
925         a previous buffered write.  Re-pack the buffer block for efficiency
926         and to avoid leaking information.
927
928         """
929         segs = self._segments
930
931         # Sum up the segments to get the total bytes of the file referencing
932         # into the buffer block.
933         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
934         write_total = sum([s.range_size for s in bufferblock_segs])
935
936         if write_total < self._current_bblock.size():
937             # There is more data in the buffer block than is actually accounted for by segments, so
938             # re-pack into a new buffer by copying over to a new buffer block.
939             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
940             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
941             for t in bufferblock_segs:
942                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
943                 t.segment_offset = new_bb.size() - t.range_size
944
945             self._current_bblock = new_bb
946
947     @must_be_writable
948     @synchronized
949     def writeto(self, offset, data, num_retries):
950         """Write `data` to the file starting at `offset`.
951
952         This will update existing bytes and/or extend the size of the file as
953         necessary.
954
955         """
956         if len(data) == 0:
957             return
958
959         if offset > self.size():
960             raise ArgumentError("Offset is past the end of the file")
961
962         if len(data) > config.KEEP_BLOCK_SIZE:
963             # Chunk it up into smaller writes
964             n = 0
965             dataview = memoryview(data)
966             while n < len(data):
967                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
968                 n += config.KEEP_BLOCK_SIZE
969             return
970
971         self._committed = False
972
973         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
974             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
975
976         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
977             self._repack_writes(num_retries)
978             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
979                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
980                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
981
982         self._current_bblock.append(data)
983
984         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
985
986         self.parent.notify(WRITE, self.parent, self.name, (self, self))
987
988         return len(data)
989
990     @synchronized
991     def flush(self, sync=True, num_retries=0):
992         """Flush the current bufferblock to Keep.
993
994         :sync:
995           If True, commit block synchronously, wait until buffer block has been written.
996           If False, commit block asynchronously, return immediately after putting block into
997           the keep put queue.
998         """
999         if self.committed():
1000             return
1001
1002         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1003             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1004                 self._repack_writes(num_retries)
1005             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1006
1007         if sync:
1008             to_delete = set()
1009             for s in self._segments:
1010                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1011                 if bb:
1012                     if bb.state() != _BufferBlock.COMMITTED:
1013                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1014                     to_delete.add(s.locator)
1015                     s.locator = bb.locator()
1016             for s in to_delete:
1017                self.parent._my_block_manager().delete_bufferblock(s)
1018
1019         self.parent.notify(MOD, self.parent, self.name, (self, self))
1020
1021     @must_be_writable
1022     @synchronized
1023     def add_segment(self, blocks, pos, size):
1024         """Add a segment to the end of the file.
1025
1026         `pos` and `offset` reference a section of the stream described by
1027         `blocks` (a list of Range objects)
1028
1029         """
1030         self._add_segment(blocks, pos, size)
1031
1032     def _add_segment(self, blocks, pos, size):
1033         """Internal implementation of add_segment."""
1034         self._committed = False
1035         for lr in locators_and_ranges(blocks, pos, size):
1036             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1037             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1038             self._segments.append(r)
1039
1040     @synchronized
1041     def size(self):
1042         """Get the file size."""
1043         if self._segments:
1044             n = self._segments[-1]
1045             return n.range_start + n.range_size
1046         else:
1047             return 0
1048
1049     @synchronized
1050     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1051         buf = ""
1052         filestream = []
1053         for segment in self.segments:
1054             loc = segment.locator
1055             if loc.startswith("bufferblock"):
1056                 loc = self._bufferblocks[loc].calculate_locator()
1057             if portable_locators:
1058                 loc = KeepLocator(loc).stripped()
1059             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1060                                  segment.segment_offset, segment.range_size))
1061         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1062         buf += "\n"
1063         return buf
1064
1065     @must_be_writable
1066     @synchronized
1067     def _reparent(self, newparent, newname):
1068         self._committed = False
1069         self.flush(sync=True)
1070         self.parent.remove(self.name)
1071         self.parent = newparent
1072         self.name = newname
1073         self.lock = self.parent.root_collection().lock
1074
1075
1076 class ArvadosFileReader(ArvadosFileReaderBase):
1077     """Wraps ArvadosFile in a file-like object supporting reading only.
1078
1079     Be aware that this class is NOT thread safe as there is no locking around
1080     updating file pointer.
1081
1082     """
1083
1084     def __init__(self, arvadosfile, num_retries=None):
1085         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1086         self.arvadosfile = arvadosfile
1087
1088     def size(self):
1089         return self.arvadosfile.size()
1090
1091     def stream_name(self):
1092         return self.arvadosfile.parent.stream_name()
1093
1094     @_FileLikeObjectBase._before_close
1095     @retry_method
1096     def read(self, size=None, num_retries=None):
1097         """Read up to `size` bytes from the file and return the result.
1098
1099         Starts at the current file position.  If `size` is None, read the
1100         entire remainder of the file.
1101         """
1102         if size is None:
1103             data = []
1104             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1105             while rd:
1106                 data.append(rd)
1107                 self._filepos += len(rd)
1108                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1109             return ''.join(data)
1110         else:
1111             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1112             self._filepos += len(data)
1113             return data
1114
1115     @_FileLikeObjectBase._before_close
1116     @retry_method
1117     def readfrom(self, offset, size, num_retries=None):
1118         """Read up to `size` bytes from the stream, starting at the specified file offset.
1119
1120         This method does not change the file position.
1121         """
1122         return self.arvadosfile.readfrom(offset, size, num_retries)
1123
1124     def flush(self):
1125         pass
1126
1127
1128 class ArvadosFileWriter(ArvadosFileReader):
1129     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1130
1131     Be aware that this class is NOT thread safe as there is no locking around
1132     updating file pointer.
1133
1134     """
1135
1136     def __init__(self, arvadosfile, mode, num_retries=None):
1137         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1138         self.mode = mode
1139         self.arvadosfile.add_writer(self)
1140
1141     @_FileLikeObjectBase._before_close
1142     @retry_method
1143     def write(self, data, num_retries=None):
1144         if self.mode[0] == "a":
1145             self.arvadosfile.writeto(self.size(), data, num_retries)
1146         else:
1147             self.arvadosfile.writeto(self._filepos, data, num_retries)
1148             self._filepos += len(data)
1149         return len(data)
1150
1151     @_FileLikeObjectBase._before_close
1152     @retry_method
1153     def writelines(self, seq, num_retries=None):
1154         for s in seq:
1155             self.write(s, num_retries=num_retries)
1156
1157     @_FileLikeObjectBase._before_close
1158     def truncate(self, size=None):
1159         if size is None:
1160             size = self._filepos
1161         self.arvadosfile.truncate(size)
1162         if self._filepos > self.size():
1163             self._filepos = self.size()
1164
1165     @_FileLikeObjectBase._before_close
1166     def flush(self):
1167         self.arvadosfile.flush()
1168
1169     def close(self, flush=True):
1170         if not self.closed:
1171             self.arvadosfile.remove_writer(self, flush)
1172             super(ArvadosFileWriter, self).close()