9701: Added clarifying comments to the small block searching list comprehension.
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41 class _FileLikeObjectBase(object):
42     def __init__(self, name, mode):
43         self.name = name
44         self.mode = mode
45         self.closed = False
46
47     @staticmethod
48     def _before_close(orig_func):
49         @functools.wraps(orig_func)
50         def before_close_wrapper(self, *args, **kwargs):
51             if self.closed:
52                 raise ValueError("I/O operation on closed stream file")
53             return orig_func(self, *args, **kwargs)
54         return before_close_wrapper
55
56     def __enter__(self):
57         return self
58
59     def __exit__(self, exc_type, exc_value, traceback):
60         try:
61             self.close()
62         except Exception:
63             if exc_type is None:
64                 raise
65
66     def close(self):
67         self.closed = True
68
69
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71     def __init__(self, name, mode, num_retries=None):
72         super(ArvadosFileReaderBase, self).__init__(name, mode)
73         self._filepos = 0L
74         self.num_retries = num_retries
75         self._readline_cache = (None, None)
76
77     def __iter__(self):
78         while True:
79             data = self.readline()
80             if not data:
81                 break
82             yield data
83
84     def decompressed_name(self):
85         return re.sub('\.(bz2|gz)$', '', self.name)
86
87     @_FileLikeObjectBase._before_close
88     def seek(self, pos, whence=os.SEEK_SET):
89         if whence == os.SEEK_CUR:
90             pos += self._filepos
91         elif whence == os.SEEK_END:
92             pos += self.size()
93         self._filepos = min(max(pos, 0L), self.size())
94
95     def tell(self):
96         return self._filepos
97
98     @_FileLikeObjectBase._before_close
99     @retry_method
100     def readall(self, size=2**20, num_retries=None):
101         while True:
102             data = self.read(size, num_retries=num_retries)
103             if data == '':
104                 break
105             yield data
106
107     @_FileLikeObjectBase._before_close
108     @retry_method
109     def readline(self, size=float('inf'), num_retries=None):
110         cache_pos, cache_data = self._readline_cache
111         if self.tell() == cache_pos:
112             data = [cache_data]
113             self._filepos += len(cache_data)
114         else:
115             data = ['']
116         data_size = len(data[-1])
117         while (data_size < size) and ('\n' not in data[-1]):
118             next_read = self.read(2 ** 20, num_retries=num_retries)
119             if not next_read:
120                 break
121             data.append(next_read)
122             data_size += len(next_read)
123         data = ''.join(data)
124         try:
125             nextline_index = data.index('\n') + 1
126         except ValueError:
127             nextline_index = len(data)
128         nextline_index = min(nextline_index, size)
129         self._filepos -= len(data) - nextline_index
130         self._readline_cache = (self.tell(), data[nextline_index:])
131         return data[:nextline_index]
132
133     @_FileLikeObjectBase._before_close
134     @retry_method
135     def decompress(self, decompress, size, num_retries=None):
136         for segment in self.readall(size, num_retries=num_retries):
137             data = decompress(segment)
138             if data:
139                 yield data
140
141     @_FileLikeObjectBase._before_close
142     @retry_method
143     def readall_decompressed(self, size=2**20, num_retries=None):
144         self.seek(0)
145         if self.name.endswith('.bz2'):
146             dc = bz2.BZ2Decompressor()
147             return self.decompress(dc.decompress, size,
148                                    num_retries=num_retries)
149         elif self.name.endswith('.gz'):
150             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152                                    size, num_retries=num_retries)
153         else:
154             return self.readall(size, num_retries=num_retries)
155
156     @_FileLikeObjectBase._before_close
157     @retry_method
158     def readlines(self, sizehint=float('inf'), num_retries=None):
159         data = []
160         data_size = 0
161         for s in self.readall(num_retries=num_retries):
162             data.append(s)
163             data_size += len(s)
164             if data_size >= sizehint:
165                 break
166         return ''.join(data).splitlines(True)
167
168     def size(self):
169         raise NotImplementedError()
170
171     def read(self, size, num_retries=None):
172         raise NotImplementedError()
173
174     def readfrom(self, start, size, num_retries=None):
175         raise NotImplementedError()
176
177
178 class StreamFileReader(ArvadosFileReaderBase):
179     class _NameAttribute(str):
180         # The Python file API provides a plain .name attribute.
181         # Older SDK provided a name() method.
182         # This class provides both, for maximum compatibility.
183         def __call__(self):
184             return self
185
186     def __init__(self, stream, segments, name):
187         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188         self._stream = stream
189         self.segments = segments
190
191     def stream_name(self):
192         return self._stream.name()
193
194     def size(self):
195         n = self.segments[-1]
196         return n.range_start + n.range_size
197
198     @_FileLikeObjectBase._before_close
199     @retry_method
200     def read(self, size, num_retries=None):
201         """Read up to 'size' bytes from the stream, starting at the current file position"""
202         if size == 0:
203             return ''
204
205         data = ''
206         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
207         if available_chunks:
208             lr = available_chunks[0]
209             data = self._stream.readfrom(lr.locator+lr.segment_offset,
210                                           lr.segment_size,
211                                           num_retries=num_retries)
212
213         self._filepos += len(data)
214         return data
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def readfrom(self, start, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at 'start'"""
220         if size == 0:
221             return ''
222
223         data = []
224         for lr in locators_and_ranges(self.segments, start, size):
225             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226                                               num_retries=num_retries))
227         return ''.join(data)
228
229     def as_manifest(self):
230         segs = []
231         for r in self.segments:
232             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234
235
236 def synchronized(orig_func):
237     @functools.wraps(orig_func)
238     def synchronized_wrapper(self, *args, **kwargs):
239         with self.lock:
240             return orig_func(self, *args, **kwargs)
241     return synchronized_wrapper
242
243
244 class StateChangeError(Exception):
245     def __init__(self, message, state, nextstate):
246         super(StateChangeError, self).__init__(message)
247         self.state = state
248         self.nextstate = nextstate
249
250 class _BufferBlock(object):
251     """A stand-in for a Keep block that is in the process of being written.
252
253     Writers can append to it, get the size, and compute the Keep locator.
254     There are three valid states:
255
256     WRITABLE
257       Can append to block.
258
259     PENDING
260       Block is in the process of being uploaded to Keep, append is an error.
261
262     COMMITTED
263       The block has been written to Keep, its internal buffer has been
264       released, fetching the block will fetch it via keep client (since we
265       discarded the internal copy), and identifiers referring to the BufferBlock
266       can be replaced with the block locator.
267
268     """
269
270     WRITABLE = 0
271     PENDING = 1
272     COMMITTED = 2
273     ERROR = 3
274
275     def __init__(self, blockid, starting_capacity, owner):
276         """
277         :blockid:
278           the identifier for this block
279
280         :starting_capacity:
281           the initial buffer capacity
282
283         :owner:
284           ArvadosFile that owns this block
285
286         """
287         self.blockid = blockid
288         self.buffer_block = bytearray(starting_capacity)
289         self.buffer_view = memoryview(self.buffer_block)
290         self.write_pointer = 0
291         self._state = _BufferBlock.WRITABLE
292         self._locator = None
293         self.owner = owner
294         self.lock = threading.Lock()
295         self.wait_for_commit = threading.Event()
296         self.error = None
297
298     @synchronized
299     def append(self, data):
300         """Append some data to the buffer.
301
302         Only valid if the block is in WRITABLE state.  Implements an expanding
303         buffer, doubling capacity as needed to accomdate all the data.
304
305         """
306         if self._state == _BufferBlock.WRITABLE:
307             while (self.write_pointer+len(data)) > len(self.buffer_block):
308                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310                 self.buffer_block = new_buffer_block
311                 self.buffer_view = memoryview(self.buffer_block)
312             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313             self.write_pointer += len(data)
314             self._locator = None
315         else:
316             raise AssertionError("Buffer block is not writable")
317
318     STATE_TRANSITIONS = frozenset([
319             (WRITABLE, PENDING),
320             (PENDING, COMMITTED),
321             (PENDING, ERROR),
322             (ERROR, PENDING)])
323
324     @synchronized
325     def set_state(self, nextstate, val=None):
326         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328         self._state = nextstate
329
330         if self._state == _BufferBlock.PENDING:
331             self.wait_for_commit.clear()
332
333         if self._state == _BufferBlock.COMMITTED:
334             self._locator = val
335             self.buffer_view = None
336             self.buffer_block = None
337             self.wait_for_commit.set()
338
339         if self._state == _BufferBlock.ERROR:
340             self.error = val
341             self.wait_for_commit.set()
342
343     @synchronized
344     def state(self):
345         return self._state
346
347     def size(self):
348         """The amount of data written to the buffer."""
349         return self.write_pointer
350
351     @synchronized
352     def locator(self):
353         """The Keep locator for this buffer's contents."""
354         if self._locator is None:
355             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
356         return self._locator
357
358     @synchronized
359     def clone(self, new_blockid, owner):
360         if self._state == _BufferBlock.COMMITTED:
361             raise AssertionError("Cannot duplicate committed buffer block")
362         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363         bufferblock.append(self.buffer_view[0:self.size()])
364         return bufferblock
365
366     @synchronized
367     def clear(self):
368         self.owner = None
369         self.buffer_block = None
370         self.buffer_view = None
371
372
373 class NoopLock(object):
374     def __enter__(self):
375         return self
376
377     def __exit__(self, exc_type, exc_value, traceback):
378         pass
379
380     def acquire(self, blocking=False):
381         pass
382
383     def release(self):
384         pass
385
386
387 def must_be_writable(orig_func):
388     @functools.wraps(orig_func)
389     def must_be_writable_wrapper(self, *args, **kwargs):
390         if not self.writable():
391             raise IOError(errno.EROFS, "Collection is read-only.")
392         return orig_func(self, *args, **kwargs)
393     return must_be_writable_wrapper
394
395
396 class _BlockManager(object):
397     """BlockManager handles buffer blocks.
398
399     Also handles background block uploads, and background block prefetch for a
400     Collection of ArvadosFiles.
401
402     """
403
404     DEFAULT_PUT_THREADS = 2
405     DEFAULT_GET_THREADS = 2
406
407     def __init__(self, keep, copies=None):
408         """keep: KeepClient object to use"""
409         self._keep = keep
410         self._bufferblocks = collections.OrderedDict()
411         self._put_queue = None
412         self._put_threads = None
413         self._prefetch_queue = None
414         self._prefetch_threads = None
415         self.lock = threading.Lock()
416         self.prefetch_enabled = True
417         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
418         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
419         self.copies = copies
420
421     @synchronized
422     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
423         """Allocate a new, empty bufferblock in WRITABLE state and return it.
424
425         :blockid:
426           optional block identifier, otherwise one will be automatically assigned
427
428         :starting_capacity:
429           optional capacity, otherwise will use default capacity
430
431         :owner:
432           ArvadosFile that owns this block
433
434         """
435         if blockid is None:
436             blockid = "%s" % uuid.uuid4()
437         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
438         self._bufferblocks[bufferblock.blockid] = bufferblock
439         return bufferblock
440
441     @synchronized
442     def dup_block(self, block, owner):
443         """Create a new bufferblock initialized with the content of an existing bufferblock.
444
445         :block:
446           the buffer block to copy.
447
448         :owner:
449           ArvadosFile that owns the new block
450
451         """
452         new_blockid = "bufferblock%i" % len(self._bufferblocks)
453         bufferblock = block.clone(new_blockid, owner)
454         self._bufferblocks[bufferblock.blockid] = bufferblock
455         return bufferblock
456
457     @synchronized
458     def is_bufferblock(self, locator):
459         return locator in self._bufferblocks
460
461     def _commit_bufferblock_worker(self):
462         """Background uploader thread."""
463
464         while True:
465             try:
466                 bufferblock = self._put_queue.get()
467                 if bufferblock is None:
468                     return
469
470                 if self.copies is None:
471                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
472                 else:
473                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
474                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
475
476             except Exception as e:
477                 bufferblock.set_state(_BufferBlock.ERROR, e)
478             finally:
479                 if self._put_queue is not None:
480                     self._put_queue.task_done()
481
482     @synchronized
483     def start_put_threads(self):
484         if self._put_threads is None:
485             # Start uploader threads.
486
487             # If we don't limit the Queue size, the upload queue can quickly
488             # grow to take up gigabytes of RAM if the writing process is
489             # generating data more quickly than it can be send to the Keep
490             # servers.
491             #
492             # With two upload threads and a queue size of 2, this means up to 4
493             # blocks pending.  If they are full 64 MiB blocks, that means up to
494             # 256 MiB of internal buffering, which is the same size as the
495             # default download block cache in KeepClient.
496             self._put_queue = Queue.Queue(maxsize=2)
497
498             self._put_threads = []
499             for i in xrange(0, self.num_put_threads):
500                 thread = threading.Thread(target=self._commit_bufferblock_worker)
501                 self._put_threads.append(thread)
502                 thread.daemon = True
503                 thread.start()
504
505     def _block_prefetch_worker(self):
506         """The background downloader thread."""
507         while True:
508             try:
509                 b = self._prefetch_queue.get()
510                 if b is None:
511                     return
512                 self._keep.get(b)
513             except Exception:
514                 pass
515
516     @synchronized
517     def start_get_threads(self):
518         if self._prefetch_threads is None:
519             self._prefetch_queue = Queue.Queue()
520             self._prefetch_threads = []
521             for i in xrange(0, self.num_get_threads):
522                 thread = threading.Thread(target=self._block_prefetch_worker)
523                 self._prefetch_threads.append(thread)
524                 thread.daemon = True
525                 thread.start()
526
527
528     @synchronized
529     def stop_threads(self):
530         """Shut down and wait for background upload and download threads to finish."""
531
532         if self._put_threads is not None:
533             for t in self._put_threads:
534                 self._put_queue.put(None)
535             for t in self._put_threads:
536                 t.join()
537         self._put_threads = None
538         self._put_queue = None
539
540         if self._prefetch_threads is not None:
541             for t in self._prefetch_threads:
542                 self._prefetch_queue.put(None)
543             for t in self._prefetch_threads:
544                 t.join()
545         self._prefetch_threads = None
546         self._prefetch_queue = None
547
548     def __enter__(self):
549         return self
550
551     def __exit__(self, exc_type, exc_value, traceback):
552         self.stop_threads()
553
554     @synchronized
555     def repack_small_blocks(self, force=False, sync=False):
556         """Packs small blocks together before uploading"""
557         # Search blocks ready for getting packed together before being committed to Keep.
558         # A WRITABLE block always has an owner.
559         # A WRITABLE block with its owner.closed() implies that it's
560         # size is <= KEEP_BLOCK_SIZE/2.
561         small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
562         if len(small_blocks) <= 1:
563             # Not enough small blocks for repacking
564             return
565
566         # Check if there are enough small blocks for filling up one in full
567         pending_write_size = sum([b.size() for b in small_blocks])
568         if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
569             new_bb = _BufferBlock("%s" % uuid.uuid4(), 2**14, None)
570             self._bufferblocks[new_bb.blockid] = new_bb
571             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
572                 bb = small_blocks.pop(0)
573                 arvfile = bb.owner
574                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
575                 arvfile.set_segments([Range(new_bb.blockid,
576                                             0,
577                                             bb.size(),
578                                             new_bb.write_pointer - bb.size())])
579                 bb.clear()
580                 del self._bufferblocks[bb.blockid]
581             self.commit_bufferblock(new_bb, sync=sync)
582
583     def commit_bufferblock(self, block, sync):
584         """Initiate a background upload of a bufferblock.
585
586         :block:
587           The block object to upload
588
589         :sync:
590           If `sync` is True, upload the block synchronously.
591           If `sync` is False, upload the block asynchronously.  This will
592           return immediately unless the upload queue is at capacity, in
593           which case it will wait on an upload queue slot.
594
595         """
596         try:
597             # Mark the block as PENDING so to disallow any more appends.
598             block.set_state(_BufferBlock.PENDING)
599         except StateChangeError as e:
600             if e.state == _BufferBlock.PENDING:
601                 if sync:
602                     block.wait_for_commit.wait()
603                 else:
604                     return
605             if block.state() == _BufferBlock.COMMITTED:
606                 return
607             elif block.state() == _BufferBlock.ERROR:
608                 raise block.error
609             else:
610                 raise
611
612         if sync:
613             try:
614                 if self.copies is None:
615                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
616                 else:
617                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
618                 block.set_state(_BufferBlock.COMMITTED, loc)
619             except Exception as e:
620                 block.set_state(_BufferBlock.ERROR, e)
621                 raise
622         else:
623             self.start_put_threads()
624             self._put_queue.put(block)
625
626     @synchronized
627     def get_bufferblock(self, locator):
628         return self._bufferblocks.get(locator)
629
630     @synchronized
631     def delete_bufferblock(self, locator):
632         bb = self._bufferblocks[locator]
633         bb.clear()
634         del self._bufferblocks[locator]
635
636     def get_block_contents(self, locator, num_retries, cache_only=False):
637         """Fetch a block.
638
639         First checks to see if the locator is a BufferBlock and return that, if
640         not, passes the request through to KeepClient.get().
641
642         """
643         with self.lock:
644             if locator in self._bufferblocks:
645                 bufferblock = self._bufferblocks[locator]
646                 if bufferblock.state() != _BufferBlock.COMMITTED:
647                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
648                 else:
649                     locator = bufferblock._locator
650         if cache_only:
651             return self._keep.get_from_cache(locator)
652         else:
653             return self._keep.get(locator, num_retries=num_retries)
654
655     def commit_all(self):
656         """Commit all outstanding buffer blocks.
657
658         This is a synchronous call, and will not return until all buffer blocks
659         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
660
661         """
662         self.repack_small_blocks(force=True, sync=True)
663
664         with self.lock:
665             items = self._bufferblocks.items()
666
667         for k,v in items:
668             if v.state() != _BufferBlock.COMMITTED and v.owner:
669                 v.owner.flush(sync=False)
670
671         with self.lock:
672             if self._put_queue is not None:
673                 self._put_queue.join()
674
675                 err = []
676                 for k,v in items:
677                     if v.state() == _BufferBlock.ERROR:
678                         err.append((v.locator(), v.error))
679                 if err:
680                     raise KeepWriteError("Error writing some blocks", err, label="block")
681
682         for k,v in items:
683             # flush again with sync=True to remove committed bufferblocks from
684             # the segments.
685             if v.owner:
686                 v.owner.flush(sync=True)
687
688     def block_prefetch(self, locator):
689         """Initiate a background download of a block.
690
691         This assumes that the underlying KeepClient implements a block cache,
692         so repeated requests for the same block will not result in repeated
693         downloads (unless the block is evicted from the cache.)  This method
694         does not block.
695
696         """
697
698         if not self.prefetch_enabled:
699             return
700
701         if self._keep.get_from_cache(locator) is not None:
702             return
703
704         with self.lock:
705             if locator in self._bufferblocks:
706                 return
707
708         self.start_get_threads()
709         self._prefetch_queue.put(locator)
710
711
712 class ArvadosFile(object):
713     """Represent a file in a Collection.
714
715     ArvadosFile manages the underlying representation of a file in Keep as a
716     sequence of segments spanning a set of blocks, and implements random
717     read/write access.
718
719     This object may be accessed from multiple threads.
720
721     """
722
723     def __init__(self, parent, name, stream=[], segments=[]):
724         """
725         ArvadosFile constructor.
726
727         :stream:
728           a list of Range objects representing a block stream
729
730         :segments:
731           a list of Range objects representing segments
732         """
733         self.parent = parent
734         self.name = name
735         self._writers = set()
736         self._committed = False
737         self._segments = []
738         self.lock = parent.root_collection().lock
739         for s in segments:
740             self._add_segment(stream, s.locator, s.range_size)
741         self._current_bblock = None
742
743     def writable(self):
744         return self.parent.writable()
745
746     @synchronized
747     def segments(self):
748         return copy.copy(self._segments)
749
750     @synchronized
751     def clone(self, new_parent, new_name):
752         """Make a copy of this file."""
753         cp = ArvadosFile(new_parent, new_name)
754         cp.replace_contents(self)
755         return cp
756
757     @must_be_writable
758     @synchronized
759     def replace_contents(self, other):
760         """Replace segments of this file with segments from another `ArvadosFile` object."""
761
762         map_loc = {}
763         self._segments = []
764         for other_segment in other.segments():
765             new_loc = other_segment.locator
766             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
767                 if other_segment.locator not in map_loc:
768                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
769                     if bufferblock.state() != _BufferBlock.WRITABLE:
770                         map_loc[other_segment.locator] = bufferblock.locator()
771                     else:
772                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
773                 new_loc = map_loc[other_segment.locator]
774
775             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
776
777         self._committed = False
778
779     def __eq__(self, other):
780         if other is self:
781             return True
782         if not isinstance(other, ArvadosFile):
783             return False
784
785         othersegs = other.segments()
786         with self.lock:
787             if len(self._segments) != len(othersegs):
788                 return False
789             for i in xrange(0, len(othersegs)):
790                 seg1 = self._segments[i]
791                 seg2 = othersegs[i]
792                 loc1 = seg1.locator
793                 loc2 = seg2.locator
794
795                 if self.parent._my_block_manager().is_bufferblock(loc1):
796                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
797
798                 if other.parent._my_block_manager().is_bufferblock(loc2):
799                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
800
801                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
802                     seg1.range_start != seg2.range_start or
803                     seg1.range_size != seg2.range_size or
804                     seg1.segment_offset != seg2.segment_offset):
805                     return False
806
807         return True
808
809     def __ne__(self, other):
810         return not self.__eq__(other)
811
812     @synchronized
813     def set_segments(self, segs):
814         self._segments = segs
815
816     @synchronized
817     def set_committed(self):
818         """Set committed flag to True"""
819         self._committed = True
820
821     @synchronized
822     def committed(self):
823         """Get whether this is committed or not."""
824         return self._committed
825
826     @synchronized
827     def add_writer(self, writer):
828         """Add an ArvadosFileWriter reference to the list of writers"""
829         if isinstance(writer, ArvadosFileWriter):
830             self._writers.add(writer)
831
832     @synchronized
833     def remove_writer(self, writer):
834         """
835         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
836         and do some block maintenance tasks.
837         """
838         self._writers.remove(writer)
839
840         if self.size() > config.KEEP_BLOCK_SIZE / 2:
841             # File writer closed, not small enough for repacking
842             self.flush()
843         elif self.closed():
844             # All writers closed and size is adequate for repacking
845             self.parent._my_block_manager().repack_small_blocks()
846
847     def closed(self):
848         """
849         Get whether this is closed or not. When the writers list is empty, the file
850         is supposed to be closed.
851         """
852         return len(self._writers) == 0
853
854     @must_be_writable
855     @synchronized
856     def truncate(self, size):
857         """Shrink the size of the file.
858
859         If `size` is less than the size of the file, the file contents after
860         `size` will be discarded.  If `size` is greater than the current size
861         of the file, an IOError will be raised.
862
863         """
864         if size < self.size():
865             new_segs = []
866             for r in self._segments:
867                 range_end = r.range_start+r.range_size
868                 if r.range_start >= size:
869                     # segment is past the trucate size, all done
870                     break
871                 elif size < range_end:
872                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
873                     nr.segment_offset = r.segment_offset
874                     new_segs.append(nr)
875                     break
876                 else:
877                     new_segs.append(r)
878
879             self._segments = new_segs
880             self._committed = False
881         elif size > self.size():
882             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
883
884     def readfrom(self, offset, size, num_retries, exact=False):
885         """Read up to `size` bytes from the file starting at `offset`.
886
887         :exact:
888          If False (default), return less data than requested if the read
889          crosses a block boundary and the next block isn't cached.  If True,
890          only return less data than requested when hitting EOF.
891         """
892
893         with self.lock:
894             if size == 0 or offset >= self.size():
895                 return ''
896             readsegs = locators_and_ranges(self._segments, offset, size)
897             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
898
899         locs = set()
900         data = []
901         for lr in readsegs:
902             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
903             if block:
904                 blockview = memoryview(block)
905                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
906                 locs.add(lr.locator)
907             else:
908                 break
909
910         for lr in prefetch:
911             if lr.locator not in locs:
912                 self.parent._my_block_manager().block_prefetch(lr.locator)
913                 locs.add(lr.locator)
914
915         return ''.join(data)
916
917     def _repack_writes(self, num_retries):
918         """Test if the buffer block has more data than actual segments.
919
920         This happens when a buffered write over-writes a file range written in
921         a previous buffered write.  Re-pack the buffer block for efficiency
922         and to avoid leaking information.
923
924         """
925         segs = self._segments
926
927         # Sum up the segments to get the total bytes of the file referencing
928         # into the buffer block.
929         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
930         write_total = sum([s.range_size for s in bufferblock_segs])
931
932         if write_total < self._current_bblock.size():
933             # There is more data in the buffer block than is actually accounted for by segments, so
934             # re-pack into a new buffer by copying over to a new buffer block.
935             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
936             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
937             for t in bufferblock_segs:
938                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
939                 t.segment_offset = new_bb.size() - t.range_size
940
941             self._current_bblock = new_bb
942
943     @must_be_writable
944     @synchronized
945     def writeto(self, offset, data, num_retries):
946         """Write `data` to the file starting at `offset`.
947
948         This will update existing bytes and/or extend the size of the file as
949         necessary.
950
951         """
952         if len(data) == 0:
953             return
954
955         if offset > self.size():
956             raise ArgumentError("Offset is past the end of the file")
957
958         if len(data) > config.KEEP_BLOCK_SIZE:
959             # Chunk it up into smaller writes
960             n = 0
961             dataview = memoryview(data)
962             while n < len(data):
963                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
964                 n += config.KEEP_BLOCK_SIZE
965             return
966
967         self._committed = False
968
969         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
970             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
971
972         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
973             self._repack_writes(num_retries)
974             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
975                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
976                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
977
978         self._current_bblock.append(data)
979
980         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
981
982         self.parent.notify(WRITE, self.parent, self.name, (self, self))
983
984         return len(data)
985
986     @synchronized
987     def flush(self, sync=True, num_retries=0):
988         """Flush the current bufferblock to Keep.
989
990         :sync:
991           If True, commit block synchronously, wait until buffer block has been written.
992           If False, commit block asynchronously, return immediately after putting block into
993           the keep put queue.
994         """
995         if self.committed():
996             return
997
998         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
999             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1000                 self._repack_writes(num_retries)
1001             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1002
1003         if sync:
1004             to_delete = set()
1005             for s in self._segments:
1006                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1007                 if bb:
1008                     if bb.state() != _BufferBlock.COMMITTED:
1009                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1010                     to_delete.add(s.locator)
1011                     s.locator = bb.locator()
1012             for s in to_delete:
1013                self.parent._my_block_manager().delete_bufferblock(s)
1014
1015         self.parent.notify(MOD, self.parent, self.name, (self, self))
1016
1017     @must_be_writable
1018     @synchronized
1019     def add_segment(self, blocks, pos, size):
1020         """Add a segment to the end of the file.
1021
1022         `pos` and `offset` reference a section of the stream described by
1023         `blocks` (a list of Range objects)
1024
1025         """
1026         self._add_segment(blocks, pos, size)
1027
1028     def _add_segment(self, blocks, pos, size):
1029         """Internal implementation of add_segment."""
1030         self._committed = False
1031         for lr in locators_and_ranges(blocks, pos, size):
1032             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1033             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1034             self._segments.append(r)
1035
1036     @synchronized
1037     def size(self):
1038         """Get the file size."""
1039         if self._segments:
1040             n = self._segments[-1]
1041             return n.range_start + n.range_size
1042         else:
1043             return 0
1044
1045     @synchronized
1046     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1047         buf = ""
1048         filestream = []
1049         for segment in self.segments:
1050             loc = segment.locator
1051             if loc.startswith("bufferblock"):
1052                 loc = self._bufferblocks[loc].calculate_locator()
1053             if portable_locators:
1054                 loc = KeepLocator(loc).stripped()
1055             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1056                                  segment.segment_offset, segment.range_size))
1057         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1058         buf += "\n"
1059         return buf
1060
1061     @must_be_writable
1062     @synchronized
1063     def _reparent(self, newparent, newname):
1064         self._committed = False
1065         self.flush(sync=True)
1066         self.parent.remove(self.name)
1067         self.parent = newparent
1068         self.name = newname
1069         self.lock = self.parent.root_collection().lock
1070
1071
1072 class ArvadosFileReader(ArvadosFileReaderBase):
1073     """Wraps ArvadosFile in a file-like object supporting reading only.
1074
1075     Be aware that this class is NOT thread safe as there is no locking around
1076     updating file pointer.
1077
1078     """
1079
1080     def __init__(self, arvadosfile, num_retries=None):
1081         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1082         self.arvadosfile = arvadosfile
1083
1084     def size(self):
1085         return self.arvadosfile.size()
1086
1087     def stream_name(self):
1088         return self.arvadosfile.parent.stream_name()
1089
1090     @_FileLikeObjectBase._before_close
1091     @retry_method
1092     def read(self, size=None, num_retries=None):
1093         """Read up to `size` bytes from the file and return the result.
1094
1095         Starts at the current file position.  If `size` is None, read the
1096         entire remainder of the file.
1097         """
1098         if size is None:
1099             data = []
1100             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1101             while rd:
1102                 data.append(rd)
1103                 self._filepos += len(rd)
1104                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1105             return ''.join(data)
1106         else:
1107             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1108             self._filepos += len(data)
1109             return data
1110
1111     @_FileLikeObjectBase._before_close
1112     @retry_method
1113     def readfrom(self, offset, size, num_retries=None):
1114         """Read up to `size` bytes from the stream, starting at the specified file offset.
1115
1116         This method does not change the file position.
1117         """
1118         return self.arvadosfile.readfrom(offset, size, num_retries)
1119
1120     def flush(self):
1121         pass
1122
1123
1124 class ArvadosFileWriter(ArvadosFileReader):
1125     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1126
1127     Be aware that this class is NOT thread safe as there is no locking around
1128     updating file pointer.
1129
1130     """
1131
1132     def __init__(self, arvadosfile, mode, num_retries=None):
1133         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1134         self.mode = mode
1135         self.arvadosfile.add_writer(self)
1136
1137     @_FileLikeObjectBase._before_close
1138     @retry_method
1139     def write(self, data, num_retries=None):
1140         if self.mode[0] == "a":
1141             self.arvadosfile.writeto(self.size(), data, num_retries)
1142         else:
1143             self.arvadosfile.writeto(self._filepos, data, num_retries)
1144             self._filepos += len(data)
1145         return len(data)
1146
1147     @_FileLikeObjectBase._before_close
1148     @retry_method
1149     def writelines(self, seq, num_retries=None):
1150         for s in seq:
1151             self.write(s, num_retries=num_retries)
1152
1153     @_FileLikeObjectBase._before_close
1154     def truncate(self, size=None):
1155         if size is None:
1156             size = self._filepos
1157         self.arvadosfile.truncate(size)
1158         if self._filepos > self.size():
1159             self._filepos = self.size()
1160
1161     @_FileLikeObjectBase._before_close
1162     def flush(self):
1163         self.arvadosfile.flush()
1164
1165     def close(self):
1166         if not self.closed:
1167             self.arvadosfile.remove_writer(self)
1168             super(ArvadosFileWriter, self).close()