11305: added kernel and module dependency for docker migration in doc
[arvados.git] / sdk / python / arvados / arvfile.py
1 import functools
2 import os
3 import zlib
4 import bz2
5 import config
6 import hashlib
7 import threading
8 import Queue
9 import copy
10 import errno
11 import re
12 import logging
13 import collections
14 import uuid
15
16 from .errors import KeepWriteError, AssertionError, ArgumentError
17 from .keep import KeepLocator
18 from ._normalize_stream import normalize_stream
19 from ._ranges import locators_and_ranges, replace_range, Range
20 from .retry import retry_method
21
22 MOD = "mod"
23 WRITE = "write"
24
25 _logger = logging.getLogger('arvados.arvfile')
26
27 def split(path):
28     """split(path) -> streamname, filename
29
30     Separate the stream name and file name in a /-separated stream path and
31     return a tuple (stream_name, file_name).  If no stream name is available,
32     assume '.'.
33
34     """
35     try:
36         stream_name, file_name = path.rsplit('/', 1)
37     except ValueError:  # No / in string
38         stream_name, file_name = '.', path
39     return stream_name, file_name
40
41
42 class UnownedBlockError(Exception):
43     """Raised when there's an writable block without an owner on the BlockManager."""
44     pass
45
46
47 class _FileLikeObjectBase(object):
48     def __init__(self, name, mode):
49         self.name = name
50         self.mode = mode
51         self.closed = False
52
53     @staticmethod
54     def _before_close(orig_func):
55         @functools.wraps(orig_func)
56         def before_close_wrapper(self, *args, **kwargs):
57             if self.closed:
58                 raise ValueError("I/O operation on closed stream file")
59             return orig_func(self, *args, **kwargs)
60         return before_close_wrapper
61
62     def __enter__(self):
63         return self
64
65     def __exit__(self, exc_type, exc_value, traceback):
66         try:
67             self.close()
68         except Exception:
69             if exc_type is None:
70                 raise
71
72     def close(self):
73         self.closed = True
74
75
76 class ArvadosFileReaderBase(_FileLikeObjectBase):
77     def __init__(self, name, mode, num_retries=None):
78         super(ArvadosFileReaderBase, self).__init__(name, mode)
79         self._filepos = 0L
80         self.num_retries = num_retries
81         self._readline_cache = (None, None)
82
83     def __iter__(self):
84         while True:
85             data = self.readline()
86             if not data:
87                 break
88             yield data
89
90     def decompressed_name(self):
91         return re.sub('\.(bz2|gz)$', '', self.name)
92
93     @_FileLikeObjectBase._before_close
94     def seek(self, pos, whence=os.SEEK_SET):
95         if whence == os.SEEK_CUR:
96             pos += self._filepos
97         elif whence == os.SEEK_END:
98             pos += self.size()
99         self._filepos = min(max(pos, 0L), self.size())
100
101     def tell(self):
102         return self._filepos
103
104     @_FileLikeObjectBase._before_close
105     @retry_method
106     def readall(self, size=2**20, num_retries=None):
107         while True:
108             data = self.read(size, num_retries=num_retries)
109             if data == '':
110                 break
111             yield data
112
113     @_FileLikeObjectBase._before_close
114     @retry_method
115     def readline(self, size=float('inf'), num_retries=None):
116         cache_pos, cache_data = self._readline_cache
117         if self.tell() == cache_pos:
118             data = [cache_data]
119             self._filepos += len(cache_data)
120         else:
121             data = ['']
122         data_size = len(data[-1])
123         while (data_size < size) and ('\n' not in data[-1]):
124             next_read = self.read(2 ** 20, num_retries=num_retries)
125             if not next_read:
126                 break
127             data.append(next_read)
128             data_size += len(next_read)
129         data = ''.join(data)
130         try:
131             nextline_index = data.index('\n') + 1
132         except ValueError:
133             nextline_index = len(data)
134         nextline_index = min(nextline_index, size)
135         self._filepos -= len(data) - nextline_index
136         self._readline_cache = (self.tell(), data[nextline_index:])
137         return data[:nextline_index]
138
139     @_FileLikeObjectBase._before_close
140     @retry_method
141     def decompress(self, decompress, size, num_retries=None):
142         for segment in self.readall(size, num_retries=num_retries):
143             data = decompress(segment)
144             if data:
145                 yield data
146
147     @_FileLikeObjectBase._before_close
148     @retry_method
149     def readall_decompressed(self, size=2**20, num_retries=None):
150         self.seek(0)
151         if self.name.endswith('.bz2'):
152             dc = bz2.BZ2Decompressor()
153             return self.decompress(dc.decompress, size,
154                                    num_retries=num_retries)
155         elif self.name.endswith('.gz'):
156             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
157             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
158                                    size, num_retries=num_retries)
159         else:
160             return self.readall(size, num_retries=num_retries)
161
162     @_FileLikeObjectBase._before_close
163     @retry_method
164     def readlines(self, sizehint=float('inf'), num_retries=None):
165         data = []
166         data_size = 0
167         for s in self.readall(num_retries=num_retries):
168             data.append(s)
169             data_size += len(s)
170             if data_size >= sizehint:
171                 break
172         return ''.join(data).splitlines(True)
173
174     def size(self):
175         raise NotImplementedError()
176
177     def read(self, size, num_retries=None):
178         raise NotImplementedError()
179
180     def readfrom(self, start, size, num_retries=None):
181         raise NotImplementedError()
182
183
184 class StreamFileReader(ArvadosFileReaderBase):
185     class _NameAttribute(str):
186         # The Python file API provides a plain .name attribute.
187         # Older SDK provided a name() method.
188         # This class provides both, for maximum compatibility.
189         def __call__(self):
190             return self
191
192     def __init__(self, stream, segments, name):
193         super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
194         self._stream = stream
195         self.segments = segments
196
197     def stream_name(self):
198         return self._stream.name()
199
200     def size(self):
201         n = self.segments[-1]
202         return n.range_start + n.range_size
203
204     @_FileLikeObjectBase._before_close
205     @retry_method
206     def read(self, size, num_retries=None):
207         """Read up to 'size' bytes from the stream, starting at the current file position"""
208         if size == 0:
209             return ''
210
211         data = ''
212         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
213         if available_chunks:
214             lr = available_chunks[0]
215             data = self._stream.readfrom(lr.locator+lr.segment_offset,
216                                           lr.segment_size,
217                                           num_retries=num_retries)
218
219         self._filepos += len(data)
220         return data
221
222     @_FileLikeObjectBase._before_close
223     @retry_method
224     def readfrom(self, start, size, num_retries=None):
225         """Read up to 'size' bytes from the stream, starting at 'start'"""
226         if size == 0:
227             return ''
228
229         data = []
230         for lr in locators_and_ranges(self.segments, start, size):
231             data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
232                                               num_retries=num_retries))
233         return ''.join(data)
234
235     def as_manifest(self):
236         segs = []
237         for r in self.segments:
238             segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
239         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
240
241
242 def synchronized(orig_func):
243     @functools.wraps(orig_func)
244     def synchronized_wrapper(self, *args, **kwargs):
245         with self.lock:
246             return orig_func(self, *args, **kwargs)
247     return synchronized_wrapper
248
249
250 class StateChangeError(Exception):
251     def __init__(self, message, state, nextstate):
252         super(StateChangeError, self).__init__(message)
253         self.state = state
254         self.nextstate = nextstate
255
256 class _BufferBlock(object):
257     """A stand-in for a Keep block that is in the process of being written.
258
259     Writers can append to it, get the size, and compute the Keep locator.
260     There are three valid states:
261
262     WRITABLE
263       Can append to block.
264
265     PENDING
266       Block is in the process of being uploaded to Keep, append is an error.
267
268     COMMITTED
269       The block has been written to Keep, its internal buffer has been
270       released, fetching the block will fetch it via keep client (since we
271       discarded the internal copy), and identifiers referring to the BufferBlock
272       can be replaced with the block locator.
273
274     """
275
276     WRITABLE = 0
277     PENDING = 1
278     COMMITTED = 2
279     ERROR = 3
280
281     def __init__(self, blockid, starting_capacity, owner):
282         """
283         :blockid:
284           the identifier for this block
285
286         :starting_capacity:
287           the initial buffer capacity
288
289         :owner:
290           ArvadosFile that owns this block
291
292         """
293         self.blockid = blockid
294         self.buffer_block = bytearray(starting_capacity)
295         self.buffer_view = memoryview(self.buffer_block)
296         self.write_pointer = 0
297         self._state = _BufferBlock.WRITABLE
298         self._locator = None
299         self.owner = owner
300         self.lock = threading.Lock()
301         self.wait_for_commit = threading.Event()
302         self.error = None
303
304     @synchronized
305     def append(self, data):
306         """Append some data to the buffer.
307
308         Only valid if the block is in WRITABLE state.  Implements an expanding
309         buffer, doubling capacity as needed to accomdate all the data.
310
311         """
312         if self._state == _BufferBlock.WRITABLE:
313             while (self.write_pointer+len(data)) > len(self.buffer_block):
314                 new_buffer_block = bytearray(len(self.buffer_block) * 2)
315                 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
316                 self.buffer_block = new_buffer_block
317                 self.buffer_view = memoryview(self.buffer_block)
318             self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
319             self.write_pointer += len(data)
320             self._locator = None
321         else:
322             raise AssertionError("Buffer block is not writable")
323
324     STATE_TRANSITIONS = frozenset([
325             (WRITABLE, PENDING),
326             (PENDING, COMMITTED),
327             (PENDING, ERROR),
328             (ERROR, PENDING)])
329
330     @synchronized
331     def set_state(self, nextstate, val=None):
332         if (self._state, nextstate) not in self.STATE_TRANSITIONS:
333             raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
334         self._state = nextstate
335
336         if self._state == _BufferBlock.PENDING:
337             self.wait_for_commit.clear()
338
339         if self._state == _BufferBlock.COMMITTED:
340             self._locator = val
341             self.buffer_view = None
342             self.buffer_block = None
343             self.wait_for_commit.set()
344
345         if self._state == _BufferBlock.ERROR:
346             self.error = val
347             self.wait_for_commit.set()
348
349     @synchronized
350     def state(self):
351         return self._state
352
353     def size(self):
354         """The amount of data written to the buffer."""
355         return self.write_pointer
356
357     @synchronized
358     def locator(self):
359         """The Keep locator for this buffer's contents."""
360         if self._locator is None:
361             self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
362         return self._locator
363
364     @synchronized
365     def clone(self, new_blockid, owner):
366         if self._state == _BufferBlock.COMMITTED:
367             raise AssertionError("Cannot duplicate committed buffer block")
368         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
369         bufferblock.append(self.buffer_view[0:self.size()])
370         return bufferblock
371
372     @synchronized
373     def clear(self):
374         self.owner = None
375         self.buffer_block = None
376         self.buffer_view = None
377
378
379 class NoopLock(object):
380     def __enter__(self):
381         return self
382
383     def __exit__(self, exc_type, exc_value, traceback):
384         pass
385
386     def acquire(self, blocking=False):
387         pass
388
389     def release(self):
390         pass
391
392
393 def must_be_writable(orig_func):
394     @functools.wraps(orig_func)
395     def must_be_writable_wrapper(self, *args, **kwargs):
396         if not self.writable():
397             raise IOError(errno.EROFS, "Collection is read-only.")
398         return orig_func(self, *args, **kwargs)
399     return must_be_writable_wrapper
400
401
402 class _BlockManager(object):
403     """BlockManager handles buffer blocks.
404
405     Also handles background block uploads, and background block prefetch for a
406     Collection of ArvadosFiles.
407
408     """
409
410     DEFAULT_PUT_THREADS = 2
411     DEFAULT_GET_THREADS = 2
412
413     def __init__(self, keep, copies=None, put_threads=None):
414         """keep: KeepClient object to use"""
415         self._keep = keep
416         self._bufferblocks = collections.OrderedDict()
417         self._put_queue = None
418         self._put_threads = None
419         self._prefetch_queue = None
420         self._prefetch_threads = None
421         self.lock = threading.Lock()
422         self.prefetch_enabled = True
423         if put_threads:
424             self.num_put_threads = put_threads
425         else:
426             self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
427         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
428         self.copies = copies
429         self._pending_write_size = 0
430         self.threads_lock = threading.Lock()
431
432     @synchronized
433     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
434         """Allocate a new, empty bufferblock in WRITABLE state and return it.
435
436         :blockid:
437           optional block identifier, otherwise one will be automatically assigned
438
439         :starting_capacity:
440           optional capacity, otherwise will use default capacity
441
442         :owner:
443           ArvadosFile that owns this block
444
445         """
446         return self._alloc_bufferblock(blockid, starting_capacity, owner)
447
448     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
449         if blockid is None:
450             blockid = "%s" % uuid.uuid4()
451         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
452         self._bufferblocks[bufferblock.blockid] = bufferblock
453         return bufferblock
454
455     @synchronized
456     def dup_block(self, block, owner):
457         """Create a new bufferblock initialized with the content of an existing bufferblock.
458
459         :block:
460           the buffer block to copy.
461
462         :owner:
463           ArvadosFile that owns the new block
464
465         """
466         new_blockid = "bufferblock%i" % len(self._bufferblocks)
467         bufferblock = block.clone(new_blockid, owner)
468         self._bufferblocks[bufferblock.blockid] = bufferblock
469         return bufferblock
470
471     @synchronized
472     def is_bufferblock(self, locator):
473         return locator in self._bufferblocks
474
475     def _commit_bufferblock_worker(self):
476         """Background uploader thread."""
477
478         while True:
479             try:
480                 bufferblock = self._put_queue.get()
481                 if bufferblock is None:
482                     return
483
484                 if self.copies is None:
485                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
486                 else:
487                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
488                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
489
490             except Exception as e:
491                 bufferblock.set_state(_BufferBlock.ERROR, e)
492             finally:
493                 if self._put_queue is not None:
494                     self._put_queue.task_done()
495
496     def start_put_threads(self):
497         with self.threads_lock:
498             if self._put_threads is None:
499                 # Start uploader threads.
500
501                 # If we don't limit the Queue size, the upload queue can quickly
502                 # grow to take up gigabytes of RAM if the writing process is
503                 # generating data more quickly than it can be send to the Keep
504                 # servers.
505                 #
506                 # With two upload threads and a queue size of 2, this means up to 4
507                 # blocks pending.  If they are full 64 MiB blocks, that means up to
508                 # 256 MiB of internal buffering, which is the same size as the
509                 # default download block cache in KeepClient.
510                 self._put_queue = Queue.Queue(maxsize=2)
511
512                 self._put_threads = []
513                 for i in xrange(0, self.num_put_threads):
514                     thread = threading.Thread(target=self._commit_bufferblock_worker)
515                     self._put_threads.append(thread)
516                     thread.daemon = True
517                     thread.start()
518
519     def _block_prefetch_worker(self):
520         """The background downloader thread."""
521         while True:
522             try:
523                 b = self._prefetch_queue.get()
524                 if b is None:
525                     return
526                 self._keep.get(b)
527             except Exception:
528                 _logger.exception("Exception doing block prefetch")
529
530     @synchronized
531     def start_get_threads(self):
532         if self._prefetch_threads is None:
533             self._prefetch_queue = Queue.Queue()
534             self._prefetch_threads = []
535             for i in xrange(0, self.num_get_threads):
536                 thread = threading.Thread(target=self._block_prefetch_worker)
537                 self._prefetch_threads.append(thread)
538                 thread.daemon = True
539                 thread.start()
540
541
542     @synchronized
543     def stop_threads(self):
544         """Shut down and wait for background upload and download threads to finish."""
545
546         if self._put_threads is not None:
547             for t in self._put_threads:
548                 self._put_queue.put(None)
549             for t in self._put_threads:
550                 t.join()
551         self._put_threads = None
552         self._put_queue = None
553
554         if self._prefetch_threads is not None:
555             for t in self._prefetch_threads:
556                 self._prefetch_queue.put(None)
557             for t in self._prefetch_threads:
558                 t.join()
559         self._prefetch_threads = None
560         self._prefetch_queue = None
561
562     def __enter__(self):
563         return self
564
565     def __exit__(self, exc_type, exc_value, traceback):
566         self.stop_threads()
567
568     @synchronized
569     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
570         """Packs small blocks together before uploading"""
571         self._pending_write_size += closed_file_size
572
573         # Check if there are enough small blocks for filling up one in full
574         if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
575
576             # Search blocks ready for getting packed together before being committed to Keep.
577             # A WRITABLE block always has an owner.
578             # A WRITABLE block with its owner.closed() implies that it's
579             # size is <= KEEP_BLOCK_SIZE/2.
580             try:
581                 small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
582             except AttributeError:
583                 # Writable blocks without owner shouldn't exist.
584                 raise UnownedBlockError()
585
586             if len(small_blocks) <= 1:
587                 # Not enough small blocks for repacking
588                 return
589
590             # Update the pending write size count with its true value, just in case
591             # some small file was opened, written and closed several times.
592             self._pending_write_size = sum([b.size() for b in small_blocks])
593             if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
594                 return
595
596             new_bb = self._alloc_bufferblock()
597             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
598                 bb = small_blocks.pop(0)
599                 arvfile = bb.owner
600                 self._pending_write_size -= bb.size()
601                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
602                 arvfile.set_segments([Range(new_bb.blockid,
603                                             0,
604                                             bb.size(),
605                                             new_bb.write_pointer - bb.size())])
606                 self._delete_bufferblock(bb.blockid)
607             self.commit_bufferblock(new_bb, sync=sync)
608
609     def commit_bufferblock(self, block, sync):
610         """Initiate a background upload of a bufferblock.
611
612         :block:
613           The block object to upload
614
615         :sync:
616           If `sync` is True, upload the block synchronously.
617           If `sync` is False, upload the block asynchronously.  This will
618           return immediately unless the upload queue is at capacity, in
619           which case it will wait on an upload queue slot.
620
621         """
622         try:
623             # Mark the block as PENDING so to disallow any more appends.
624             block.set_state(_BufferBlock.PENDING)
625         except StateChangeError as e:
626             if e.state == _BufferBlock.PENDING:
627                 if sync:
628                     block.wait_for_commit.wait()
629                 else:
630                     return
631             if block.state() == _BufferBlock.COMMITTED:
632                 return
633             elif block.state() == _BufferBlock.ERROR:
634                 raise block.error
635             else:
636                 raise
637
638         if sync:
639             try:
640                 if self.copies is None:
641                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
642                 else:
643                     loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
644                 block.set_state(_BufferBlock.COMMITTED, loc)
645             except Exception as e:
646                 block.set_state(_BufferBlock.ERROR, e)
647                 raise
648         else:
649             self.start_put_threads()
650             self._put_queue.put(block)
651
652     @synchronized
653     def get_bufferblock(self, locator):
654         return self._bufferblocks.get(locator)
655
656     @synchronized
657     def delete_bufferblock(self, locator):
658         self._delete_bufferblock(locator)
659
660     def _delete_bufferblock(self, locator):
661         bb = self._bufferblocks[locator]
662         bb.clear()
663         del self._bufferblocks[locator]
664
665     def get_block_contents(self, locator, num_retries, cache_only=False):
666         """Fetch a block.
667
668         First checks to see if the locator is a BufferBlock and return that, if
669         not, passes the request through to KeepClient.get().
670
671         """
672         with self.lock:
673             if locator in self._bufferblocks:
674                 bufferblock = self._bufferblocks[locator]
675                 if bufferblock.state() != _BufferBlock.COMMITTED:
676                     return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
677                 else:
678                     locator = bufferblock._locator
679         if cache_only:
680             return self._keep.get_from_cache(locator)
681         else:
682             return self._keep.get(locator, num_retries=num_retries)
683
684     def commit_all(self):
685         """Commit all outstanding buffer blocks.
686
687         This is a synchronous call, and will not return until all buffer blocks
688         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
689
690         """
691         self.repack_small_blocks(force=True, sync=True)
692
693         with self.lock:
694             items = self._bufferblocks.items()
695
696         for k,v in items:
697             if v.state() != _BufferBlock.COMMITTED and v.owner:
698                 v.owner.flush(sync=False)
699
700         with self.lock:
701             if self._put_queue is not None:
702                 self._put_queue.join()
703
704                 err = []
705                 for k,v in items:
706                     if v.state() == _BufferBlock.ERROR:
707                         err.append((v.locator(), v.error))
708                 if err:
709                     raise KeepWriteError("Error writing some blocks", err, label="block")
710
711         for k,v in items:
712             # flush again with sync=True to remove committed bufferblocks from
713             # the segments.
714             if v.owner:
715                 v.owner.flush(sync=True)
716
717     def block_prefetch(self, locator):
718         """Initiate a background download of a block.
719
720         This assumes that the underlying KeepClient implements a block cache,
721         so repeated requests for the same block will not result in repeated
722         downloads (unless the block is evicted from the cache.)  This method
723         does not block.
724
725         """
726
727         if not self.prefetch_enabled:
728             return
729
730         if self._keep.get_from_cache(locator) is not None:
731             return
732
733         with self.lock:
734             if locator in self._bufferblocks:
735                 return
736
737         self.start_get_threads()
738         self._prefetch_queue.put(locator)
739
740
741 class ArvadosFile(object):
742     """Represent a file in a Collection.
743
744     ArvadosFile manages the underlying representation of a file in Keep as a
745     sequence of segments spanning a set of blocks, and implements random
746     read/write access.
747
748     This object may be accessed from multiple threads.
749
750     """
751
752     def __init__(self, parent, name, stream=[], segments=[]):
753         """
754         ArvadosFile constructor.
755
756         :stream:
757           a list of Range objects representing a block stream
758
759         :segments:
760           a list of Range objects representing segments
761         """
762         self.parent = parent
763         self.name = name
764         self._writers = set()
765         self._committed = False
766         self._segments = []
767         self.lock = parent.root_collection().lock
768         for s in segments:
769             self._add_segment(stream, s.locator, s.range_size)
770         self._current_bblock = None
771
772     def writable(self):
773         return self.parent.writable()
774
775     @synchronized
776     def permission_expired(self, as_of_dt=None):
777         """Returns True if any of the segment's locators is expired"""
778         for r in self._segments:
779             if KeepLocator(r.locator).permission_expired(as_of_dt):
780                 return True
781         return False
782
783     @synchronized
784     def segments(self):
785         return copy.copy(self._segments)
786
787     @synchronized
788     def clone(self, new_parent, new_name):
789         """Make a copy of this file."""
790         cp = ArvadosFile(new_parent, new_name)
791         cp.replace_contents(self)
792         return cp
793
794     @must_be_writable
795     @synchronized
796     def replace_contents(self, other):
797         """Replace segments of this file with segments from another `ArvadosFile` object."""
798
799         map_loc = {}
800         self._segments = []
801         for other_segment in other.segments():
802             new_loc = other_segment.locator
803             if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
804                 if other_segment.locator not in map_loc:
805                     bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
806                     if bufferblock.state() != _BufferBlock.WRITABLE:
807                         map_loc[other_segment.locator] = bufferblock.locator()
808                     else:
809                         map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
810                 new_loc = map_loc[other_segment.locator]
811
812             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
813
814         self.set_committed(False)
815
816     def __eq__(self, other):
817         if other is self:
818             return True
819         if not isinstance(other, ArvadosFile):
820             return False
821
822         othersegs = other.segments()
823         with self.lock:
824             if len(self._segments) != len(othersegs):
825                 return False
826             for i in xrange(0, len(othersegs)):
827                 seg1 = self._segments[i]
828                 seg2 = othersegs[i]
829                 loc1 = seg1.locator
830                 loc2 = seg2.locator
831
832                 if self.parent._my_block_manager().is_bufferblock(loc1):
833                     loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
834
835                 if other.parent._my_block_manager().is_bufferblock(loc2):
836                     loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
837
838                 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
839                     seg1.range_start != seg2.range_start or
840                     seg1.range_size != seg2.range_size or
841                     seg1.segment_offset != seg2.segment_offset):
842                     return False
843
844         return True
845
846     def __ne__(self, other):
847         return not self.__eq__(other)
848
849     @synchronized
850     def set_segments(self, segs):
851         self._segments = segs
852
853     @synchronized
854     def set_committed(self, value=True):
855         """Set committed flag.
856
857         If value is True, set committed to be True.
858
859         If value is False, set committed to be False for this and all parents.
860         """
861         if value == self._committed:
862             return
863         self._committed = value
864         if self._committed is False and self.parent is not None:
865             self.parent.set_committed(False)
866
867     @synchronized
868     def committed(self):
869         """Get whether this is committed or not."""
870         return self._committed
871
872     @synchronized
873     def add_writer(self, writer):
874         """Add an ArvadosFileWriter reference to the list of writers"""
875         if isinstance(writer, ArvadosFileWriter):
876             self._writers.add(writer)
877
878     @synchronized
879     def remove_writer(self, writer, flush):
880         """
881         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
882         and do some block maintenance tasks.
883         """
884         self._writers.remove(writer)
885
886         if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
887             # File writer closed, not small enough for repacking
888             self.flush()
889         elif self.closed():
890             # All writers closed and size is adequate for repacking
891             self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
892
893     def closed(self):
894         """
895         Get whether this is closed or not. When the writers list is empty, the file
896         is supposed to be closed.
897         """
898         return len(self._writers) == 0
899
900     @must_be_writable
901     @synchronized
902     def truncate(self, size):
903         """Shrink the size of the file.
904
905         If `size` is less than the size of the file, the file contents after
906         `size` will be discarded.  If `size` is greater than the current size
907         of the file, an IOError will be raised.
908
909         """
910         if size < self.size():
911             new_segs = []
912             for r in self._segments:
913                 range_end = r.range_start+r.range_size
914                 if r.range_start >= size:
915                     # segment is past the trucate size, all done
916                     break
917                 elif size < range_end:
918                     nr = Range(r.locator, r.range_start, size - r.range_start, 0)
919                     nr.segment_offset = r.segment_offset
920                     new_segs.append(nr)
921                     break
922                 else:
923                     new_segs.append(r)
924
925             self._segments = new_segs
926             self.set_committed(False)
927         elif size > self.size():
928             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
929
930     def readfrom(self, offset, size, num_retries, exact=False):
931         """Read up to `size` bytes from the file starting at `offset`.
932
933         :exact:
934          If False (default), return less data than requested if the read
935          crosses a block boundary and the next block isn't cached.  If True,
936          only return less data than requested when hitting EOF.
937         """
938
939         with self.lock:
940             if size == 0 or offset >= self.size():
941                 return ''
942             readsegs = locators_and_ranges(self._segments, offset, size)
943             prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
944
945         locs = set()
946         data = []
947         for lr in readsegs:
948             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
949             if block:
950                 blockview = memoryview(block)
951                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
952                 locs.add(lr.locator)
953             else:
954                 break
955
956         for lr in prefetch:
957             if lr.locator not in locs:
958                 self.parent._my_block_manager().block_prefetch(lr.locator)
959                 locs.add(lr.locator)
960
961         return ''.join(data)
962
963     def _repack_writes(self, num_retries):
964         """Test if the buffer block has more data than actual segments.
965
966         This happens when a buffered write over-writes a file range written in
967         a previous buffered write.  Re-pack the buffer block for efficiency
968         and to avoid leaking information.
969
970         """
971         segs = self._segments
972
973         # Sum up the segments to get the total bytes of the file referencing
974         # into the buffer block.
975         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
976         write_total = sum([s.range_size for s in bufferblock_segs])
977
978         if write_total < self._current_bblock.size():
979             # There is more data in the buffer block than is actually accounted for by segments, so
980             # re-pack into a new buffer by copying over to a new buffer block.
981             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
982             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
983             for t in bufferblock_segs:
984                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
985                 t.segment_offset = new_bb.size() - t.range_size
986
987             self._current_bblock = new_bb
988
989     @must_be_writable
990     @synchronized
991     def writeto(self, offset, data, num_retries):
992         """Write `data` to the file starting at `offset`.
993
994         This will update existing bytes and/or extend the size of the file as
995         necessary.
996
997         """
998         if len(data) == 0:
999             return
1000
1001         if offset > self.size():
1002             raise ArgumentError("Offset is past the end of the file")
1003
1004         if len(data) > config.KEEP_BLOCK_SIZE:
1005             # Chunk it up into smaller writes
1006             n = 0
1007             dataview = memoryview(data)
1008             while n < len(data):
1009                 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1010                 n += config.KEEP_BLOCK_SIZE
1011             return
1012
1013         self.set_committed(False)
1014
1015         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1016             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1017
1018         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1019             self._repack_writes(num_retries)
1020             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1021                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1022                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1023
1024         self._current_bblock.append(data)
1025
1026         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1027
1028         self.parent.notify(WRITE, self.parent, self.name, (self, self))
1029
1030         return len(data)
1031
1032     @synchronized
1033     def flush(self, sync=True, num_retries=0):
1034         """Flush the current bufferblock to Keep.
1035
1036         :sync:
1037           If True, commit block synchronously, wait until buffer block has been written.
1038           If False, commit block asynchronously, return immediately after putting block into
1039           the keep put queue.
1040         """
1041         if self.committed():
1042             return
1043
1044         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1045             if self._current_bblock.state() == _BufferBlock.WRITABLE:
1046                 self._repack_writes(num_retries)
1047             self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1048
1049         if sync:
1050             to_delete = set()
1051             for s in self._segments:
1052                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1053                 if bb:
1054                     if bb.state() != _BufferBlock.COMMITTED:
1055                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1056                     to_delete.add(s.locator)
1057                     s.locator = bb.locator()
1058             for s in to_delete:
1059                self.parent._my_block_manager().delete_bufferblock(s)
1060
1061         self.parent.notify(MOD, self.parent, self.name, (self, self))
1062
1063     @must_be_writable
1064     @synchronized
1065     def add_segment(self, blocks, pos, size):
1066         """Add a segment to the end of the file.
1067
1068         `pos` and `offset` reference a section of the stream described by
1069         `blocks` (a list of Range objects)
1070
1071         """
1072         self._add_segment(blocks, pos, size)
1073
1074     def _add_segment(self, blocks, pos, size):
1075         """Internal implementation of add_segment."""
1076         self.set_committed(False)
1077         for lr in locators_and_ranges(blocks, pos, size):
1078             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1079             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1080             self._segments.append(r)
1081
1082     @synchronized
1083     def size(self):
1084         """Get the file size."""
1085         if self._segments:
1086             n = self._segments[-1]
1087             return n.range_start + n.range_size
1088         else:
1089             return 0
1090
1091     @synchronized
1092     def manifest_text(self, stream_name=".", portable_locators=False,
1093                       normalize=False, only_committed=False):
1094         buf = ""
1095         filestream = []
1096         for segment in self.segments:
1097             loc = segment.locator
1098             if self.parent._my_block_manager().is_bufferblock(loc):
1099                 if only_committed:
1100                     continue
1101                 loc = self._bufferblocks[loc].calculate_locator()
1102             if portable_locators:
1103                 loc = KeepLocator(loc).stripped()
1104             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1105                                  segment.segment_offset, segment.range_size))
1106         buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1107         buf += "\n"
1108         return buf
1109
1110     @must_be_writable
1111     @synchronized
1112     def _reparent(self, newparent, newname):
1113         self.set_committed(False)
1114         self.flush(sync=True)
1115         self.parent.remove(self.name)
1116         self.parent = newparent
1117         self.name = newname
1118         self.lock = self.parent.root_collection().lock
1119
1120
1121 class ArvadosFileReader(ArvadosFileReaderBase):
1122     """Wraps ArvadosFile in a file-like object supporting reading only.
1123
1124     Be aware that this class is NOT thread safe as there is no locking around
1125     updating file pointer.
1126
1127     """
1128
1129     def __init__(self, arvadosfile, num_retries=None):
1130         super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1131         self.arvadosfile = arvadosfile
1132
1133     def size(self):
1134         return self.arvadosfile.size()
1135
1136     def stream_name(self):
1137         return self.arvadosfile.parent.stream_name()
1138
1139     @_FileLikeObjectBase._before_close
1140     @retry_method
1141     def read(self, size=None, num_retries=None):
1142         """Read up to `size` bytes from the file and return the result.
1143
1144         Starts at the current file position.  If `size` is None, read the
1145         entire remainder of the file.
1146         """
1147         if size is None:
1148             data = []
1149             rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1150             while rd:
1151                 data.append(rd)
1152                 self._filepos += len(rd)
1153                 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1154             return ''.join(data)
1155         else:
1156             data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1157             self._filepos += len(data)
1158             return data
1159
1160     @_FileLikeObjectBase._before_close
1161     @retry_method
1162     def readfrom(self, offset, size, num_retries=None):
1163         """Read up to `size` bytes from the stream, starting at the specified file offset.
1164
1165         This method does not change the file position.
1166         """
1167         return self.arvadosfile.readfrom(offset, size, num_retries)
1168
1169     def flush(self):
1170         pass
1171
1172
1173 class ArvadosFileWriter(ArvadosFileReader):
1174     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1175
1176     Be aware that this class is NOT thread safe as there is no locking around
1177     updating file pointer.
1178
1179     """
1180
1181     def __init__(self, arvadosfile, mode, num_retries=None):
1182         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1183         self.mode = mode
1184         self.arvadosfile.add_writer(self)
1185
1186     @_FileLikeObjectBase._before_close
1187     @retry_method
1188     def write(self, data, num_retries=None):
1189         if self.mode[0] == "a":
1190             self.arvadosfile.writeto(self.size(), data, num_retries)
1191         else:
1192             self.arvadosfile.writeto(self._filepos, data, num_retries)
1193             self._filepos += len(data)
1194         return len(data)
1195
1196     @_FileLikeObjectBase._before_close
1197     @retry_method
1198     def writelines(self, seq, num_retries=None):
1199         for s in seq:
1200             self.write(s, num_retries=num_retries)
1201
1202     @_FileLikeObjectBase._before_close
1203     def truncate(self, size=None):
1204         if size is None:
1205             size = self._filepos
1206         self.arvadosfile.truncate(size)
1207         if self._filepos > self.size():
1208             self._filepos = self.size()
1209
1210     @_FileLikeObjectBase._before_close
1211     def flush(self):
1212         self.arvadosfile.flush()
1213
1214     def close(self, flush=True):
1215         if not self.closed:
1216             self.arvadosfile.remove_writer(self, flush)
1217             super(ArvadosFileWriter, self).close()