10813: Merge branch 'master' into 10813-arv-put-six-threads
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41 class _FileLikeObjectBase(object):
42     def __init__(self, name, mode):
43         self.name = name
44         self.mode = mode
45         self.closed = False
46
47     @staticmethod
48     def _before_close(orig_func):
49         @functools.wraps(orig_func)
50         def before_close_wrapper(self, *args, **kwargs):
51             if self.closed:
52                 raise ValueError("I/O operation on closed stream file")
53             return orig_func(self, *args, **kwargs)
54         return before_close_wrapper
55
56     def __enter__(self):
57         return self
58
59     def __exit__(self, exc_type, exc_value, traceback):
60         try:
61             self.close()
62         except Exception:
63             if exc_type is None:
64                 raise
65
66     def close(self):
67         self.closed = True
68
69
70 class ArvadosFileReaderBase(_FileLikeObjectBase):
71     def __init__(self, name, mode, num_retries=None):
72         super(ArvadosFileReaderBase, self).__init__(name, mode)
73         self._filepos = 0L
74         self.num_retries = num_retries
75         self._readline_cache = (None, None)
76
77     def __iter__(self):
78         while True:
79             data = self.readline()
80             if not data:
81                 break
82             yield data
83
84     def decompressed_name(self):
85         return re.sub('\.(bz2|gz)$', '', self.name)
86
87     @_FileLikeObjectBase._before_close
88     def seek(self, pos, whence=os.SEEK_SET):
89         if whence == os.SEEK_CUR:
90             pos += self._filepos
91         elif whence == os.SEEK_END:
92             pos += self.size()
93         self._filepos = min(max(pos, 0L), self.size())
94
95     def tell(self):
96         return self._filepos
97
98     @_FileLikeObjectBase._before_close
99     @retry_method
100     def readall(self, size=2**20, num_retries=None):
101         while True:
102             data = self.read(size, num_retries=num_retries)
103             if data == '':
104                 break
105             yield data
106
107     @_FileLikeObjectBase._before_close
108     @retry_method
109     def readline(self, size=float('inf'), num_retries=None):
110         cache_pos, cache_data = self._readline_cache
111         if self.tell() == cache_pos:
112             data = [cache_data]
113             self._filepos += len(cache_data)
114         else:
115             data = ['']
116         data_size = len(data[-1])
117         while (data_size < size) and ('\n' not in data[-1]):
118             next_read = self.read(2 ** 20, num_retries=num_retries)
119             if not next_read:
120                 break
121             data.append(next_read)
122             data_size += len(next_read)
123         data = ''.join(data)
124         try:
125             nextline_index = data.index('\n') + 1
126         except ValueError:
127             nextline_index = len(data)
128         nextline_index = min(nextline_index, size)
129         self._filepos -= len(data) - nextline_index
130         self._readline_cache = (self.tell(), data[nextline_index:])
131         return data[:nextline_index]
132
133     @_FileLikeObjectBase._before_close
134     @retry_method
135     def decompress(self, decompress, size, num_retries=None):
136         for segment in self.readall(size, num_retries=num_retries):
137             data = decompress(segment)
138             if data:
139                 yield data
140
141     @_FileLikeObjectBase._before_close
142     @retry_method
143     def readall_decompressed(self, size=2**20, num_retries=None):
144         self.seek(0)
145         if self.name.endswith('.bz2'):
146             dc = bz2.BZ2Decompressor()
147             return self.decompress(dc.decompress, size,
148                                    num_retries=num_retries)
149         elif self.name.endswith('.gz'):
150             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
151             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
152                                    size, num_retries=num_retries)
153         else:
154             return self.readall(size, num_retries=num_retries)
155
156     @_FileLikeObjectBase._before_close
157     @retry_method
158     def readlines(self, sizehint=float('inf'), num_retries=None):
159         data = []
160         data_size = 0
161         for s in self.readall(num_retries=num_retries):
162             data.append(s)
163             data_size += len(s)
164             if data_size >= sizehint:
165                 break
166         return ''.join(data).splitlines(True)
167
168     def size(self):
169         raise NotImplementedError()
170
171     def read(self, size, num_retries=None):
172         raise NotImplementedError()
173
174     def readfrom(self, start, size, num_retries=None):
175         raise NotImplementedError()
176
177
178 class StreamFileReader(ArvadosFileReaderBase):
179     class _NameAttribute(str):
180         # The Python file API provides a plain .name attribute.
181         # Older SDK provided a name() method.
182         # This class provides both, for maximum compatibility.
183         def __call__(self):
184             return self
185
186     def __init__(self, stream, segments, name):
187         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
188         self._stream = stream
189         self.segments = segments
190
191     def stream_name(self):
192         return self._stream.name()
193
194     def size(self):
195         n = self.segments[-1]
196         return n.range_start + n.range_size
197
198     @_FileLikeObjectBase._before_close
199     @retry_method
200     def read(self, size, num_retries=None):
201         """Read up to 'size' bytes from the stream, starting at the current file position"""
202         if size == 0:
203             return ''
204
205         data = ''
206         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
207         if available_chunks:
208             lr = available_chunks[0]
209             data = self._stream.readfrom(lr.locator+lr.segment_offset,
210                                           lr.segment_size,
211                                           num_retries=num_retries)
212
213         self._filepos += len(data)
214         return data
215
216     @_FileLikeObjectBase._before_close
217     @retry_method
218     def readfrom(self, start, size, num_retries=None):
219         """Read up to 'size' bytes from the stream, starting at 'start'"""
220         if size == 0:
221             return ''
222
223         data = []
224         for lr in locators_and_ranges(self.segments, start, size):
225             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
226                                               num_retries=num_retries))
227         return ''.join(data)
228
229     def as_manifest(self):
230         segs = []
231         for r in self.segments:
232             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
233         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
234
235
236 def synchronized(orig_func):
237     @functools.wraps(orig_func)
238     def synchronized_wrapper(self, *args, **kwargs):
239         with self.lock:
240             return orig_func(self, *args, **kwargs)
241     return synchronized_wrapper
242
243
244 class StateChangeError(Exception):
245     def __init__(self, message, state, nextstate):
246         super(StateChangeError, self).__init__(message)
247         self.state = state
248         self.nextstate = nextstate
249
250 class _BufferBlock(object):
251     """A stand-in for a Keep block that is in the process of being written.
252
253     Writers can append to it, get the size, and compute the Keep locator.
254     There are three valid states:
255
256     WRITABLE
257       Can append to block.
258
259     PENDING
260       Block is in the process of being uploaded to Keep, append is an error.
261
262     COMMITTED
263       The block has been written to Keep, its internal buffer has been
264       released, fetching the block will fetch it via keep client (since we
265       discarded the internal copy), and identifiers referring to the BufferBlock
266       can be replaced with the block locator.
267
268     """
269
270     WRITABLE = 0
271     PENDING = 1
272     COMMITTED = 2
273     ERROR = 3
274
275     def __init__(self, blockid, starting_capacity, owner):
276         """
277         :blockid:
278           the identifier for this block
279
280         :starting_capacity:
281           the initial buffer capacity
282
283         :owner:
284           ArvadosFile that owns this block
285
286         """
287         self.blockid = blockid
288         self.buffer_block = bytearray(starting_capacity)
289         self.buffer_view = memoryview(self.buffer_block)
290         self.write_pointer = 0
291         self._state = _BufferBlock.WRITABLE
292         self._locator = None
293         self.owner = owner
294         self.lock = threading.Lock()
295         self.wait_for_commit = threading.Event()
296         self.error = None
297
298     @synchronized
299     def append(self, data):
300         """Append some data to the buffer.
301
302         Only valid if the block is in WRITABLE state.  Implements an expanding
303         buffer, doubling capacity as needed to accomdate all the data.
304
305         """
306         if self._state == _BufferBlock.WRITABLE:
307             while (self.write_pointer+len(data)) > len(self.buffer_block):
308                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
309                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
310                 self.buffer_block = new_buffer_block
311                 self.buffer_view = memoryview(self.buffer_block)
312             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
313             self.write_pointer += len(data)
314             self._locator = None
315         else:
316             raise AssertionError("Buffer block is not writable")
317
318     STATE_TRANSITIONS = frozenset([
319             (WRITABLE, PENDING),
320             (PENDING, COMMITTED),
321             (PENDING, ERROR),
322             (ERROR, PENDING)])
323
324     @synchronized
325     def set_state(self, nextstate, val=None):
326         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
327             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
328         self._state = nextstate
329
330         if self._state == _BufferBlock.PENDING:
331             self.wait_for_commit.clear()
332
333         if self._state == _BufferBlock.COMMITTED:
334             self._locator = val
335             self.buffer_view = None
336             self.buffer_block = None
337             self.wait_for_commit.set()
338
339         if self._state == _BufferBlock.ERROR:
340             self.error = val
341             self.wait_for_commit.set()
342
343     @synchronized
344     def state(self):
345         return self._state
346
347     def size(self):
348         """The amount of data written to the buffer."""
349         return self.write_pointer
350
351     @synchronized
352     def locator(self):
353         """The Keep locator for this buffer's contents."""
354         if self._locator is None:
355             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
356         return self._locator
357
358     @synchronized
359     def clone(self, new_blockid, owner):
360         if self._state == _BufferBlock.COMMITTED:
361             raise AssertionError("Cannot duplicate committed buffer block")
362         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
363         bufferblock.append(self.buffer_view[0:self.size()])
364         return bufferblock
365
366     @synchronized
367     def clear(self):
368         self.owner = None
369         self.buffer_block = None
370         self.buffer_view = None
371
372
373 class NoopLock(object):
374     def __enter__(self):
375         return self
376
377     def __exit__(self, exc_type, exc_value, traceback):
378         pass
379
380     def acquire(self, blocking=False):
381         pass
382
383     def release(self):
384         pass
385
386
387 def must_be_writable(orig_func):
388     @functools.wraps(orig_func)
389     def must_be_writable_wrapper(self, *args, **kwargs):
390         if not self.writable():
391             raise IOError(errno.EROFS, "Collection is read-only.")
392         return orig_func(self, *args, **kwargs)
393     return must_be_writable_wrapper
394
395
396 class _BlockManager(object):
397     """BlockManager handles buffer blocks.
398
399     Also handles background block uploads, and background block prefetch for a
400     Collection of ArvadosFiles.
401
402     """
403
404     DEFAULT_PUT_THREADS = 2
405     DEFAULT_GET_THREADS = 2
406
407     def __init__(self, keep, copies=None, put_threads=None):
408         """keep: KeepClient object to use"""
409         self._keep = keep
410         self._bufferblocks = collections.OrderedDict()
411         self._put_queue = None
412         self._put_threads = None
413         self._prefetch_queue = None
414         self._prefetch_threads = None
415         self.lock = threading.Lock()
416         self.prefetch_enabled = True
417         if put_threads:
418             self.num_put_threads = put_threads
419         else:
420             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
421         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
422         self.copies = copies
423         self._pending_write_size = 0
424         self.threads_lock = threading.Lock()
425
426     @synchronized
427     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
428         """Allocate a new, empty bufferblock in WRITABLE state and return it.
429
430         :blockid:
431           optional block identifier, otherwise one will be automatically assigned
432
433         :starting_capacity:
434           optional capacity, otherwise will use default capacity
435
436         :owner:
437           ArvadosFile that owns this block
438
439         """
440         return self._alloc_bufferblock(blockid, starting_capacity, owner)
441
442     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
443         if blockid is None:
444             blockid = "%s" % uuid.uuid4()
445         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
446         self._bufferblocks[bufferblock.blockid] = bufferblock
447         return bufferblock
448
449     @synchronized
450     def dup_block(self, block, owner):
451         """Create a new bufferblock initialized with the content of an existing bufferblock.
452
453         :block:
454           the buffer block to copy.
455
456         :owner:
457           ArvadosFile that owns the new block
458
459         """
460         new_blockid = "bufferblock%i" % len(self._bufferblocks)
461         bufferblock = block.clone(new_blockid, owner)
462         self._bufferblocks[bufferblock.blockid] = bufferblock
463         return bufferblock
464
465     @synchronized
466     def is_bufferblock(self, locator):
467         return locator in self._bufferblocks
468
469     def _commit_bufferblock_worker(self):
470         """Background uploader thread."""
471
472         while True:
473             try:
474                 bufferblock = self._put_queue.get()
475                 if bufferblock is None:
476                     return
477
478                 if self.copies is None:
479                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
480                 else:
481                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
482                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
483
484             except Exception as e:
485                 bufferblock.set_state(_BufferBlock.ERROR, e)
486             finally:
487                 if self._put_queue is not None:
488                     self._put_queue.task_done()
489
490     def start_put_threads(self):
491         with self.threads_lock:
492             if self._put_threads is None:
493                 # Start uploader threads.
494
495                 # If we don't limit the Queue size, the upload queue can quickly
496                 # grow to take up gigabytes of RAM if the writing process is
497                 # generating data more quickly than it can be send to the Keep
498                 # servers.
499                 #
500                 # With two upload threads and a queue size of 2, this means up to 4
501                 # blocks pending.  If they are full 64 MiB blocks, that means up to
502                 # 256 MiB of internal buffering, which is the same size as the
503                 # default download block cache in KeepClient.
504                 self._put_queue = Queue.Queue(maxsize=2)
505
506                 self._put_threads = []
507                 for i in xrange(0, self.num_put_threads):
508                     thread = threading.Thread(target=self._commit_bufferblock_worker)
509                     self._put_threads.append(thread)
510                     thread.daemon = True
511                     thread.start()
512
513     def _block_prefetch_worker(self):
514         """The background downloader thread."""
515         while True:
516             try:
517                 b = self._prefetch_queue.get()
518                 if b is None:
519                     return
520                 self._keep.get(b)
521             except Exception:
522                 _logger.exception("Exception doing block prefetch")
523
524     @synchronized
525     def start_get_threads(self):
526         if self._prefetch_threads is None:
527             self._prefetch_queue = Queue.Queue()
528             self._prefetch_threads = []
529             for i in xrange(0, self.num_get_threads):
530                 thread = threading.Thread(target=self._block_prefetch_worker)
531                 self._prefetch_threads.append(thread)
532                 thread.daemon = True
533                 thread.start()
534
535
536     @synchronized
537     def stop_threads(self):
538         """Shut down and wait for background upload and download threads to finish."""
539
540         if self._put_threads is not None:
541             for t in self._put_threads:
542                 self._put_queue.put(None)
543             for t in self._put_threads:
544                 t.join()
545         self._put_threads = None
546         self._put_queue = None
547
548         if self._prefetch_threads is not None:
549             for t in self._prefetch_threads:
550                 self._prefetch_queue.put(None)
551             for t in self._prefetch_threads:
552                 t.join()
553         self._prefetch_threads = None
554         self._prefetch_queue = None
555
556     def __enter__(self):
557         return self
558
559     def __exit__(self, exc_type, exc_value, traceback):
560         self.stop_threads()
561
562     @synchronized
563     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
564         """Packs small blocks together before uploading"""
565         self._pending_write_size += closed_file_size
566
567         # Check if there are enough small blocks for filling up one in full
568         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
569
570             # Search blocks ready for getting packed together before being committed to Keep.
571             # A WRITABLE block always has an owner.
572             # A WRITABLE block with its owner.closed() implies that it's
573             # size is <= KEEP_BLOCK_SIZE/2.
574             small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
575
576             if len(small_blocks) <= 1:
577                 # Not enough small blocks for repacking
578                 return
579
580             # Update the pending write size count with its true value, just in case
581             # some small file was opened, written and closed several times.
582             self._pending_write_size = sum([b.size() for b in small_blocks])
583             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
584                 return
585
586             new_bb = self._alloc_bufferblock()
587             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
588                 bb = small_blocks.pop(0)
589                 arvfile = bb.owner
590                 self._pending_write_size -= bb.size()
591                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
592                 arvfile.set_segments([Range(new_bb.blockid,
593                                             0,
594                                             bb.size(),
595                                             new_bb.write_pointer - bb.size())])
596                 self._delete_bufferblock(bb.blockid)
597             self.commit_bufferblock(new_bb, sync=sync)
598
599     def commit_bufferblock(self, block, sync):
600         """Initiate a background upload of a bufferblock.
601
602         :block:
603           The block object to upload
604
605         :sync:
606           If `sync` is True, upload the block synchronously.
607           If `sync` is False, upload the block asynchronously.  This will
608           return immediately unless the upload queue is at capacity, in
609           which case it will wait on an upload queue slot.
610
611         """
612         try:
613             # Mark the block as PENDING so to disallow any more appends.
614             block.set_state(_BufferBlock.PENDING)
615         except StateChangeError as e:
616             if e.state == _BufferBlock.PENDING:
617                 if sync:
618                     block.wait_for_commit.wait()
619                 else:
620                     return
621             if block.state() == _BufferBlock.COMMITTED:
622                 return
623             elif block.state() == _BufferBlock.ERROR:
624                 raise block.error
625             else:
626                 raise
627
628         if sync:
629             try:
630                 if self.copies is None:
631                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
632                 else:
633                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
634                 block.set_state(_BufferBlock.COMMITTED, loc)
635             except Exception as e:
636                 block.set_state(_BufferBlock.ERROR, e)
637                 raise
638         else:
639             self.start_put_threads()
640             self._put_queue.put(block)
641
642     @synchronized
643     def get_bufferblock(self, locator):
644         return self._bufferblocks.get(locator)
645
646     @synchronized
647     def delete_bufferblock(self, locator):
648         self._delete_bufferblock(locator)
649
650     def _delete_bufferblock(self, locator):
651         bb = self._bufferblocks[locator]
652         bb.clear()
653         del self._bufferblocks[locator]
654
655     def get_block_contents(self, locator, num_retries, cache_only=False):
656         """Fetch a block.
657
658         First checks to see if the locator is a BufferBlock and return that, if
659         not, passes the request through to KeepClient.get().
660
661         """
662         with self.lock:
663             if locator in self._bufferblocks:
664                 bufferblock = self._bufferblocks[locator]
665                 if bufferblock.state() != _BufferBlock.COMMITTED:
666                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
667                 else:
668                     locator = bufferblock._locator
669         if cache_only:
670             return self._keep.get_from_cache(locator)
671         else:
672             return self._keep.get(locator, num_retries=num_retries)
673
674     def commit_all(self):
675         """Commit all outstanding buffer blocks.
676
677         This is a synchronous call, and will not return until all buffer blocks
678         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
679
680         """
681         self.repack_small_blocks(force=True, sync=True)
682
683         with self.lock:
684             items = self._bufferblocks.items()
685
686         for k,v in items:
687             if v.state() != _BufferBlock.COMMITTED and v.owner:
688                 v.owner.flush(sync=False)
689
690         with self.lock:
691             if self._put_queue is not None:
692                 self._put_queue.join()
693
694                 err = []
695                 for k,v in items:
696                     if v.state() == _BufferBlock.ERROR:
697                         err.append((v.locator(), v.error))
698                 if err:
699                     raise KeepWriteError("Error writing some blocks", err, label="block")
700
701         for k,v in items:
702             # flush again with sync=True to remove committed bufferblocks from
703             # the segments.
704             if v.owner:
705                 v.owner.flush(sync=True)
706
707     def block_prefetch(self, locator):
708         """Initiate a background download of a block.
709
710         This assumes that the underlying KeepClient implements a block cache,
711         so repeated requests for the same block will not result in repeated
712         downloads (unless the block is evicted from the cache.)  This method
713         does not block.
714
715         """
716
717         if not self.prefetch_enabled:
718             return
719
720         if self._keep.get_from_cache(locator) is not None:
721             return
722
723         with self.lock:
724             if locator in self._bufferblocks:
725                 return
726
727         self.start_get_threads()
728         self._prefetch_queue.put(locator)
729
730
731 class ArvadosFile(object):
732     """Represent a file in a Collection.
733
734     ArvadosFile manages the underlying representation of a file in Keep as a
735     sequence of segments spanning a set of blocks, and implements random
736     read/write access.
737
738     This object may be accessed from multiple threads.
739
740     """
741
742     def __init__(self, parent, name, stream=[], segments=[]):
743         """
744         ArvadosFile constructor.
745
746         :stream:
747           a list of Range objects representing a block stream
748
749         :segments:
750           a list of Range objects representing segments
751         """
752         self.parent = parent
753         self.name = name
754         self._writers = set()
755         self._committed = False
756         self._segments = []
757         self.lock = parent.root_collection().lock
758         for s in segments:
759             self._add_segment(stream, s.locator, s.range_size)
760         self._current_bblock = None
761
762     def writable(self):
763         return self.parent.writable()
764
765     @synchronized
766     def permission_expired(self, as_of_dt=None):
767         """Returns True if any of the segment's locators is expired"""
768         for r in self._segments:
769             if KeepLocator(r.locator).permission_expired(as_of_dt):
770                 return True
771         return False
772
773     @synchronized
774     def segments(self):
775         return copy.copy(self._segments)
776
777     @synchronized
778     def clone(self, new_parent, new_name):
779         """Make a copy of this file."""
780         cp = ArvadosFile(new_parent, new_name)
781         cp.replace_contents(self)
782         return cp
783
784     @must_be_writable
785     @synchronized
786     def replace_contents(self, other):
787         """Replace segments of this file with segments from another `ArvadosFile` object."""
788
789         map_loc = {}
790         self._segments = []
791         for other_segment in other.segments():
792             new_loc = other_segment.locator
793             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
794                 if other_segment.locator not in map_loc:
795                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
796                     if bufferblock.state() != _BufferBlock.WRITABLE:
797                         map_loc[other_segment.locator] = bufferblock.locator()
798                     else:
799                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
800                 new_loc = map_loc[other_segment.locator]
801
802             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
803
804         self._committed = False
805
806     def __eq__(self, other):
807         if other is self:
808             return True
809         if not isinstance(other, ArvadosFile):
810             return False
811
812         othersegs = other.segments()
813         with self.lock:
814             if len(self._segments) != len(othersegs):
815                 return False
816             for i in xrange(0, len(othersegs)):
817                 seg1 = self._segments[i]
818                 seg2 = othersegs[i]
819                 loc1 = seg1.locator
820                 loc2 = seg2.locator
821
822                 if self.parent._my_block_manager().is_bufferblock(loc1):
823                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
824
825                 if other.parent._my_block_manager().is_bufferblock(loc2):
826                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
827
828                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
829                     seg1.range_start != seg2.range_start or
830                     seg1.range_size != seg2.range_size or
831                     seg1.segment_offset != seg2.segment_offset):
832                     return False
833
834         return True
835
836     def __ne__(self, other):
837         return not self.__eq__(other)
838
839     @synchronized
840     def set_segments(self, segs):
841         self._segments = segs
842
843     @synchronized
844     def set_committed(self):
845         """Set committed flag to True"""
846         self._committed = True
847
848     @synchronized
849     def committed(self):
850         """Get whether this is committed or not."""
851         return self._committed
852
853     @synchronized
854     def add_writer(self, writer):
855         """Add an ArvadosFileWriter reference to the list of writers"""
856         if isinstance(writer, ArvadosFileWriter):
857             self._writers.add(writer)
858
859     @synchronized
860     def remove_writer(self, writer, flush):
861         """
862         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
863         and do some block maintenance tasks.
864         """
865         self._writers.remove(writer)
866
867         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
868             # File writer closed, not small enough for repacking
869             self.flush()
870         elif self.closed():
871             # All writers closed and size is adequate for repacking
872             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
873
874     def closed(self):
875         """
876         Get whether this is closed or not. When the writers list is empty, the file
877         is supposed to be closed.
878         """
879         return len(self._writers) == 0
880
881     @must_be_writable
882     @synchronized
883     def truncate(self, size):
884         """Shrink the size of the file.
885
886         If `size` is less than the size of the file, the file contents after
887         `size` will be discarded.  If `size` is greater than the current size
888         of the file, an IOError will be raised.
889
890         """
891         if size < self.size():
892             new_segs = []
893             for r in self._segments:
894                 range_end = r.range_start+r.range_size
895                 if r.range_start >= size:
896                     # segment is past the trucate size, all done
897                     break
898                 elif size < range_end:
899                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
900                     nr.segment_offset = r.segment_offset
901                     new_segs.append(nr)
902                     break
903                 else:
904                     new_segs.append(r)
905
906             self._segments = new_segs
907             self._committed = False
908         elif size > self.size():
909             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
910
911     def readfrom(self, offset, size, num_retries, exact=False):
912         """Read up to `size` bytes from the file starting at `offset`.
913
914         :exact:
915          If False (default), return less data than requested if the read
916          crosses a block boundary and the next block isn't cached.  If True,
917          only return less data than requested when hitting EOF.
918         """
919
920         with self.lock:
921             if size == 0 or offset >= self.size():
922                 return ''
923             readsegs = locators_and_ranges(self._segments, offset, size)
924             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
925
926         locs = set()
927         data = []
928         for lr in readsegs:
929             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
930             if block:
931                 blockview = memoryview(block)
932                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
933                 locs.add(lr.locator)
934             else:
935                 break
936
937         for lr in prefetch:
938             if lr.locator not in locs:
939                 self.parent._my_block_manager().block_prefetch(lr.locator)
940                 locs.add(lr.locator)
941
942         return ''.join(data)
943
944     def _repack_writes(self, num_retries):
945         """Test if the buffer block has more data than actual segments.
946
947         This happens when a buffered write over-writes a file range written in
948         a previous buffered write.  Re-pack the buffer block for efficiency
949         and to avoid leaking information.
950
951         """
952         segs = self._segments
953
954         # Sum up the segments to get the total bytes of the file referencing
955         # into the buffer block.
956         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
957         write_total = sum([s.range_size for s in bufferblock_segs])
958
959         if write_total < self._current_bblock.size():
960             # There is more data in the buffer block than is actually accounted for by segments, so
961             # re-pack into a new buffer by copying over to a new buffer block.
962             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
963             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
964             for t in bufferblock_segs:
965                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
966                 t.segment_offset = new_bb.size() - t.range_size
967
968             self._current_bblock = new_bb
969
970     @must_be_writable
971     @synchronized
972     def writeto(self, offset, data, num_retries):
973         """Write `data` to the file starting at `offset`.
974
975         This will update existing bytes and/or extend the size of the file as
976         necessary.
977
978         """
979         if len(data) == 0:
980             return
981
982         if offset > self.size():
983             raise ArgumentError("Offset is past the end of the file")
984
985         if len(data) > config.KEEP_BLOCK_SIZE:
986             # Chunk it up into smaller writes
987             n = 0
988             dataview = memoryview(data)
989             while n < len(data):
990                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
991                 n += config.KEEP_BLOCK_SIZE
992             return
993
994         self._committed = False
995
996         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
997             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
998
999         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1000             self._repack_writes(num_retries)
1001             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1002                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1003                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1004
1005         self._current_bblock.append(data)
1006
1007         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1008
1009         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1010
1011         return len(data)
1012
1013     @synchronized
1014     def flush(self, sync=True, num_retries=0):
1015         """Flush the current bufferblock to Keep.
1016
1017         :sync:
1018           If True, commit block synchronously, wait until buffer block has been written.
1019           If False, commit block asynchronously, return immediately after putting block into
1020           the keep put queue.
1021         """
1022         if self.committed():
1023             return
1024
1025         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1026             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1027                 self._repack_writes(num_retries)
1028             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1029
1030         if sync:
1031             to_delete = set()
1032             for s in self._segments:
1033                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1034                 if bb:
1035                     if bb.state() != _BufferBlock.COMMITTED:
1036                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1037                     to_delete.add(s.locator)
1038                     s.locator = bb.locator()
1039             for s in to_delete:
1040                self.parent._my_block_manager().delete_bufferblock(s)
1041
1042         self.parent.notify(MOD, self.parent, self.name, (self, self))
1043
1044     @must_be_writable
1045     @synchronized
1046     def add_segment(self, blocks, pos, size):
1047         """Add a segment to the end of the file.
1048
1049         `pos` and `offset` reference a section of the stream described by
1050         `blocks` (a list of Range objects)
1051
1052         """
1053         self._add_segment(blocks, pos, size)
1054
1055     def _add_segment(self, blocks, pos, size):
1056         """Internal implementation of add_segment."""
1057         self._committed = False
1058         for lr in locators_and_ranges(blocks, pos, size):
1059             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1060             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1061             self._segments.append(r)
1062
1063     @synchronized
1064     def size(self):
1065         """Get the file size."""
1066         if self._segments:
1067             n = self._segments[-1]
1068             return n.range_start + n.range_size
1069         else:
1070             return 0
1071
1072     @synchronized
1073     def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
1074         buf = ""
1075         filestream = []
1076         for segment in self.segments:
1077             loc = segment.locator
1078             if loc.startswith("bufferblock"):
1079                 loc = self._bufferblocks[loc].calculate_locator()
1080             if portable_locators:
1081                 loc = KeepLocator(loc).stripped()
1082             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1083                                  segment.segment_offset, segment.range_size))
1084         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1085         buf += "\n"
1086         return buf
1087
1088     @must_be_writable
1089     @synchronized
1090     def _reparent(self, newparent, newname):
1091         self._committed = False
1092         self.flush(sync=True)
1093         self.parent.remove(self.name)
1094         self.parent = newparent
1095         self.name = newname
1096         self.lock = self.parent.root_collection().lock
1097
1098
1099 class ArvadosFileReader(ArvadosFileReaderBase):
1100     """Wraps ArvadosFile in a file-like object supporting reading only.
1101
1102     Be aware that this class is NOT thread safe as there is no locking around
1103     updating file pointer.
1104
1105     """
1106
1107     def __init__(self, arvadosfile, num_retries=None):
1108         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1109         self.arvadosfile = arvadosfile
1110
1111     def size(self):
1112         return self.arvadosfile.size()
1113
1114     def stream_name(self):
1115         return self.arvadosfile.parent.stream_name()
1116
1117     @_FileLikeObjectBase._before_close
1118     @retry_method
1119     def read(self, size=None, num_retries=None):
1120         """Read up to `size` bytes from the file and return the result.
1121
1122         Starts at the current file position.  If `size` is None, read the
1123         entire remainder of the file.
1124         """
1125         if size is None:
1126             data = []
1127             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1128             while rd:
1129                 data.append(rd)
1130                 self._filepos += len(rd)
1131                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1132             return ''.join(data)
1133         else:
1134             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1135             self._filepos += len(data)
1136             return data
1137
1138     @_FileLikeObjectBase._before_close
1139     @retry_method
1140     def readfrom(self, offset, size, num_retries=None):
1141         """Read up to `size` bytes from the stream, starting at the specified file offset.
1142
1143         This method does not change the file position.
1144         """
1145         return self.arvadosfile.readfrom(offset, size, num_retries)
1146
1147     def flush(self):
1148         pass
1149
1150
1151 class ArvadosFileWriter(ArvadosFileReader):
1152     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1153
1154     Be aware that this class is NOT thread safe as there is no locking around
1155     updating file pointer.
1156
1157     """
1158
1159     def __init__(self, arvadosfile, mode, num_retries=None):
1160         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1161         self.mode = mode
1162         self.arvadosfile.add_writer(self)
1163
1164     @_FileLikeObjectBase._before_close
1165     @retry_method
1166     def write(self, data, num_retries=None):
1167         if self.mode[0] == "a":
1168             self.arvadosfile.writeto(self.size(), data, num_retries)
1169         else:
1170             self.arvadosfile.writeto(self._filepos, data, num_retries)
1171             self._filepos += len(data)
1172         return len(data)
1173
1174     @_FileLikeObjectBase._before_close
1175     @retry_method
1176     def writelines(self, seq, num_retries=None):
1177         for s in seq:
1178             self.write(s, num_retries=num_retries)
1179
1180     @_FileLikeObjectBase._before_close
1181     def truncate(self, size=None):
1182         if size is None:
1183             size = self._filepos
1184         self.arvadosfile.truncate(size)
1185         if self._filepos > self.size():
1186             self._filepos = self.size()
1187
1188     @_FileLikeObjectBase._before_close
1189     def flush(self):
1190         self.arvadosfile.flush()
1191
1192     def close(self, flush=True):
1193         if not self.closed:
1194             self.arvadosfile.remove_writer(self, flush)
1195             super(ArvadosFileWriter, self).close()