Merge branch '14259-pysdk-remote-block-copy'
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import functools
11 import logging
12 import os
13 import re
14 import errno
15 import hashlib
16 import datetime
17 import ciso8601
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
25 from .keep import KeepLocator, KeepClient
26 from .stream import StreamReader
27 from ._normalize_stream import normalize_stream
28 from ._ranges import Range, LocatorAndRange
29 from .safeapi import ThreadSafeApiCache
30 import arvados.config as config
31 import arvados.errors as errors
32 import arvados.util
33 import arvados.events as events
34 from arvados.retry import retry_method
35
36 _logger = logging.getLogger('arvados.collection')
37
38 class CollectionBase(object):
39     """Abstract base class for Collection classes."""
40
41     def __enter__(self):
42         return self
43
44     def __exit__(self, exc_type, exc_value, traceback):
45         pass
46
47     def _my_keep(self):
48         if self._keep_client is None:
49             self._keep_client = KeepClient(api_client=self._api_client,
50                                            num_retries=self.num_retries)
51         return self._keep_client
52
53     def stripped_manifest(self):
54         """Get the manifest with locator hints stripped.
55
56         Return the manifest for the current collection with all
57         non-portable hints (i.e., permission signatures and other
58         hints other than size hints) removed from the locators.
59         """
60         raw = self.manifest_text()
61         clean = []
62         for line in raw.split("\n"):
63             fields = line.split()
64             if fields:
65                 clean_fields = fields[:1] + [
66                     (re.sub(r'\+[^\d][^\+]*', '', x)
67                      if re.match(arvados.util.keep_locator_pattern, x)
68                      else x)
69                     for x in fields[1:]]
70                 clean += [' '.join(clean_fields), "\n"]
71         return ''.join(clean)
72
73
74 class _WriterFile(_FileLikeObjectBase):
75     def __init__(self, coll_writer, name):
76         super(_WriterFile, self).__init__(name, 'wb')
77         self.dest = coll_writer
78
79     def close(self):
80         super(_WriterFile, self).close()
81         self.dest.finish_current_file()
82
83     @_FileLikeObjectBase._before_close
84     def write(self, data):
85         self.dest.write(data)
86
87     @_FileLikeObjectBase._before_close
88     def writelines(self, seq):
89         for data in seq:
90             self.write(data)
91
92     @_FileLikeObjectBase._before_close
93     def flush(self):
94         self.dest.flush_data()
95
96
97 class CollectionWriter(CollectionBase):
98     """Deprecated, use Collection instead."""
99
100     def __init__(self, api_client=None, num_retries=0, replication=None):
101         """Instantiate a CollectionWriter.
102
103         CollectionWriter lets you build a new Arvados Collection from scratch.
104         Write files to it.  The CollectionWriter will upload data to Keep as
105         appropriate, and provide you with the Collection manifest text when
106         you're finished.
107
108         Arguments:
109         * api_client: The API client to use to look up Collections.  If not
110           provided, CollectionReader will build one from available Arvados
111           configuration.
112         * num_retries: The default number of times to retry failed
113           service requests.  Default 0.  You may change this value
114           after instantiation, but note those changes may not
115           propagate to related objects like the Keep client.
116         * replication: The number of copies of each block to store.
117           If this argument is None or not supplied, replication is
118           the server-provided default if available, otherwise 2.
119         """
120         self._api_client = api_client
121         self.num_retries = num_retries
122         self.replication = (2 if replication is None else replication)
123         self._keep_client = None
124         self._data_buffer = []
125         self._data_buffer_len = 0
126         self._current_stream_files = []
127         self._current_stream_length = 0
128         self._current_stream_locators = []
129         self._current_stream_name = '.'
130         self._current_file_name = None
131         self._current_file_pos = 0
132         self._finished_streams = []
133         self._close_file = None
134         self._queued_file = None
135         self._queued_dirents = deque()
136         self._queued_trees = deque()
137         self._last_open = None
138
139     def __exit__(self, exc_type, exc_value, traceback):
140         if exc_type is None:
141             self.finish()
142
143     def do_queued_work(self):
144         # The work queue consists of three pieces:
145         # * _queued_file: The file object we're currently writing to the
146         #   Collection.
147         # * _queued_dirents: Entries under the current directory
148         #   (_queued_trees[0]) that we want to write or recurse through.
149         #   This may contain files from subdirectories if
150         #   max_manifest_depth == 0 for this directory.
151         # * _queued_trees: Directories that should be written as separate
152         #   streams to the Collection.
153         # This function handles the smallest piece of work currently queued
154         # (current file, then current directory, then next directory) until
155         # no work remains.  The _work_THING methods each do a unit of work on
156         # THING.  _queue_THING methods add a THING to the work queue.
157         while True:
158             if self._queued_file:
159                 self._work_file()
160             elif self._queued_dirents:
161                 self._work_dirents()
162             elif self._queued_trees:
163                 self._work_trees()
164             else:
165                 break
166
167     def _work_file(self):
168         while True:
169             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
170             if not buf:
171                 break
172             self.write(buf)
173         self.finish_current_file()
174         if self._close_file:
175             self._queued_file.close()
176         self._close_file = None
177         self._queued_file = None
178
179     def _work_dirents(self):
180         path, stream_name, max_manifest_depth = self._queued_trees[0]
181         if stream_name != self.current_stream_name():
182             self.start_new_stream(stream_name)
183         while self._queued_dirents:
184             dirent = self._queued_dirents.popleft()
185             target = os.path.join(path, dirent)
186             if os.path.isdir(target):
187                 self._queue_tree(target,
188                                  os.path.join(stream_name, dirent),
189                                  max_manifest_depth - 1)
190             else:
191                 self._queue_file(target, dirent)
192                 break
193         if not self._queued_dirents:
194             self._queued_trees.popleft()
195
196     def _work_trees(self):
197         path, stream_name, max_manifest_depth = self._queued_trees[0]
198         d = arvados.util.listdir_recursive(
199             path, max_depth = (None if max_manifest_depth == 0 else 0))
200         if d:
201             self._queue_dirents(stream_name, d)
202         else:
203             self._queued_trees.popleft()
204
205     def _queue_file(self, source, filename=None):
206         assert (self._queued_file is None), "tried to queue more than one file"
207         if not hasattr(source, 'read'):
208             source = open(source, 'rb')
209             self._close_file = True
210         else:
211             self._close_file = False
212         if filename is None:
213             filename = os.path.basename(source.name)
214         self.start_new_file(filename)
215         self._queued_file = source
216
217     def _queue_dirents(self, stream_name, dirents):
218         assert (not self._queued_dirents), "tried to queue more than one tree"
219         self._queued_dirents = deque(sorted(dirents))
220
221     def _queue_tree(self, path, stream_name, max_manifest_depth):
222         self._queued_trees.append((path, stream_name, max_manifest_depth))
223
224     def write_file(self, source, filename=None):
225         self._queue_file(source, filename)
226         self.do_queued_work()
227
228     def write_directory_tree(self,
229                              path, stream_name='.', max_manifest_depth=-1):
230         self._queue_tree(path, stream_name, max_manifest_depth)
231         self.do_queued_work()
232
233     def write(self, newdata):
234         if isinstance(newdata, bytes):
235             pass
236         elif isinstance(newdata, str):
237             newdata = newdata.encode()
238         elif hasattr(newdata, '__iter__'):
239             for s in newdata:
240                 self.write(s)
241             return
242         self._data_buffer.append(newdata)
243         self._data_buffer_len += len(newdata)
244         self._current_stream_length += len(newdata)
245         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
246             self.flush_data()
247
248     def open(self, streampath, filename=None):
249         """open(streampath[, filename]) -> file-like object
250
251         Pass in the path of a file to write to the Collection, either as a
252         single string or as two separate stream name and file name arguments.
253         This method returns a file-like object you can write to add it to the
254         Collection.
255
256         You may only have one file object from the Collection open at a time,
257         so be sure to close the object when you're done.  Using the object in
258         a with statement makes that easy::
259
260           with cwriter.open('./doc/page1.txt') as outfile:
261               outfile.write(page1_data)
262           with cwriter.open('./doc/page2.txt') as outfile:
263               outfile.write(page2_data)
264         """
265         if filename is None:
266             streampath, filename = split(streampath)
267         if self._last_open and not self._last_open.closed:
268             raise errors.AssertionError(
269                 "can't open '{}' when '{}' is still open".format(
270                     filename, self._last_open.name))
271         if streampath != self.current_stream_name():
272             self.start_new_stream(streampath)
273         self.set_current_file_name(filename)
274         self._last_open = _WriterFile(self, filename)
275         return self._last_open
276
277     def flush_data(self):
278         data_buffer = b''.join(self._data_buffer)
279         if data_buffer:
280             self._current_stream_locators.append(
281                 self._my_keep().put(
282                     data_buffer[0:config.KEEP_BLOCK_SIZE],
283                     copies=self.replication))
284             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
285             self._data_buffer_len = len(self._data_buffer[0])
286
287     def start_new_file(self, newfilename=None):
288         self.finish_current_file()
289         self.set_current_file_name(newfilename)
290
291     def set_current_file_name(self, newfilename):
292         if re.search(r'[\t\n]', newfilename):
293             raise errors.AssertionError(
294                 "Manifest filenames cannot contain whitespace: %s" %
295                 newfilename)
296         elif re.search(r'\x00', newfilename):
297             raise errors.AssertionError(
298                 "Manifest filenames cannot contain NUL characters: %s" %
299                 newfilename)
300         self._current_file_name = newfilename
301
302     def current_file_name(self):
303         return self._current_file_name
304
305     def finish_current_file(self):
306         if self._current_file_name is None:
307             if self._current_file_pos == self._current_stream_length:
308                 return
309             raise errors.AssertionError(
310                 "Cannot finish an unnamed file " +
311                 "(%d bytes at offset %d in '%s' stream)" %
312                 (self._current_stream_length - self._current_file_pos,
313                  self._current_file_pos,
314                  self._current_stream_name))
315         self._current_stream_files.append([
316                 self._current_file_pos,
317                 self._current_stream_length - self._current_file_pos,
318                 self._current_file_name])
319         self._current_file_pos = self._current_stream_length
320         self._current_file_name = None
321
322     def start_new_stream(self, newstreamname='.'):
323         self.finish_current_stream()
324         self.set_current_stream_name(newstreamname)
325
326     def set_current_stream_name(self, newstreamname):
327         if re.search(r'[\t\n]', newstreamname):
328             raise errors.AssertionError(
329                 "Manifest stream names cannot contain whitespace: '%s'" %
330                 (newstreamname))
331         self._current_stream_name = '.' if newstreamname=='' else newstreamname
332
333     def current_stream_name(self):
334         return self._current_stream_name
335
336     def finish_current_stream(self):
337         self.finish_current_file()
338         self.flush_data()
339         if not self._current_stream_files:
340             pass
341         elif self._current_stream_name is None:
342             raise errors.AssertionError(
343                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
344                 (self._current_stream_length, len(self._current_stream_files)))
345         else:
346             if not self._current_stream_locators:
347                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
348             self._finished_streams.append([self._current_stream_name,
349                                            self._current_stream_locators,
350                                            self._current_stream_files])
351         self._current_stream_files = []
352         self._current_stream_length = 0
353         self._current_stream_locators = []
354         self._current_stream_name = None
355         self._current_file_pos = 0
356         self._current_file_name = None
357
358     def finish(self):
359         """Store the manifest in Keep and return its locator.
360
361         This is useful for storing manifest fragments (task outputs)
362         temporarily in Keep during a Crunch job.
363
364         In other cases you should make a collection instead, by
365         sending manifest_text() to the API server's "create
366         collection" endpoint.
367         """
368         return self._my_keep().put(self.manifest_text().encode(),
369                                    copies=self.replication)
370
371     def portable_data_hash(self):
372         stripped = self.stripped_manifest().encode()
373         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
374
375     def manifest_text(self):
376         self.finish_current_stream()
377         manifest = ''
378
379         for stream in self._finished_streams:
380             if not re.search(r'^\.(/.*)?$', stream[0]):
381                 manifest += './'
382             manifest += stream[0].replace(' ', '\\040')
383             manifest += ' ' + ' '.join(stream[1])
384             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
385             manifest += "\n"
386
387         return manifest
388
389     def data_locators(self):
390         ret = []
391         for name, locators, files in self._finished_streams:
392             ret += locators
393         return ret
394
395     def save_new(self, name=None):
396         return self._api_client.collections().create(
397             ensure_unique_name=True,
398             body={
399                 'name': name,
400                 'manifest_text': self.manifest_text(),
401             }).execute(num_retries=self.num_retries)
402
403
404 class ResumableCollectionWriter(CollectionWriter):
405     """Deprecated, use Collection instead."""
406
407     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
408                    '_current_stream_locators', '_current_stream_name',
409                    '_current_file_name', '_current_file_pos', '_close_file',
410                    '_data_buffer', '_dependencies', '_finished_streams',
411                    '_queued_dirents', '_queued_trees']
412
413     def __init__(self, api_client=None, **kwargs):
414         self._dependencies = {}
415         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
416
417     @classmethod
418     def from_state(cls, state, *init_args, **init_kwargs):
419         # Try to build a new writer from scratch with the given state.
420         # If the state is not suitable to resume (because files have changed,
421         # been deleted, aren't predictable, etc.), raise a
422         # StaleWriterStateError.  Otherwise, return the initialized writer.
423         # The caller is responsible for calling writer.do_queued_work()
424         # appropriately after it's returned.
425         writer = cls(*init_args, **init_kwargs)
426         for attr_name in cls.STATE_PROPS:
427             attr_value = state[attr_name]
428             attr_class = getattr(writer, attr_name).__class__
429             # Coerce the value into the same type as the initial value, if
430             # needed.
431             if attr_class not in (type(None), attr_value.__class__):
432                 attr_value = attr_class(attr_value)
433             setattr(writer, attr_name, attr_value)
434         # Check dependencies before we try to resume anything.
435         if any(KeepLocator(ls).permission_expired()
436                for ls in writer._current_stream_locators):
437             raise errors.StaleWriterStateError(
438                 "locators include expired permission hint")
439         writer.check_dependencies()
440         if state['_current_file'] is not None:
441             path, pos = state['_current_file']
442             try:
443                 writer._queued_file = open(path, 'rb')
444                 writer._queued_file.seek(pos)
445             except IOError as error:
446                 raise errors.StaleWriterStateError(
447                     "failed to reopen active file {}: {}".format(path, error))
448         return writer
449
450     def check_dependencies(self):
451         for path, orig_stat in listitems(self._dependencies):
452             if not S_ISREG(orig_stat[ST_MODE]):
453                 raise errors.StaleWriterStateError("{} not file".format(path))
454             try:
455                 now_stat = tuple(os.stat(path))
456             except OSError as error:
457                 raise errors.StaleWriterStateError(
458                     "failed to stat {}: {}".format(path, error))
459             if ((not S_ISREG(now_stat[ST_MODE])) or
460                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
461                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
462                 raise errors.StaleWriterStateError("{} changed".format(path))
463
464     def dump_state(self, copy_func=lambda x: x):
465         state = {attr: copy_func(getattr(self, attr))
466                  for attr in self.STATE_PROPS}
467         if self._queued_file is None:
468             state['_current_file'] = None
469         else:
470             state['_current_file'] = (os.path.realpath(self._queued_file.name),
471                                       self._queued_file.tell())
472         return state
473
474     def _queue_file(self, source, filename=None):
475         try:
476             src_path = os.path.realpath(source)
477         except Exception:
478             raise errors.AssertionError("{} not a file path".format(source))
479         try:
480             path_stat = os.stat(src_path)
481         except OSError as stat_error:
482             path_stat = None
483         super(ResumableCollectionWriter, self)._queue_file(source, filename)
484         fd_stat = os.fstat(self._queued_file.fileno())
485         if not S_ISREG(fd_stat.st_mode):
486             # We won't be able to resume from this cache anyway, so don't
487             # worry about further checks.
488             self._dependencies[source] = tuple(fd_stat)
489         elif path_stat is None:
490             raise errors.AssertionError(
491                 "could not stat {}: {}".format(source, stat_error))
492         elif path_stat.st_ino != fd_stat.st_ino:
493             raise errors.AssertionError(
494                 "{} changed between open and stat calls".format(source))
495         else:
496             self._dependencies[src_path] = tuple(fd_stat)
497
498     def write(self, data):
499         if self._queued_file is None:
500             raise errors.AssertionError(
501                 "resumable writer can't accept unsourced data")
502         return super(ResumableCollectionWriter, self).write(data)
503
504
505 ADD = "add"
506 DEL = "del"
507 MOD = "mod"
508 TOK = "tok"
509 FILE = "file"
510 COLLECTION = "collection"
511
512 class RichCollectionBase(CollectionBase):
513     """Base class for Collections and Subcollections.
514
515     Implements the majority of functionality relating to accessing items in the
516     Collection.
517
518     """
519
520     def __init__(self, parent=None):
521         self.parent = parent
522         self._committed = False
523         self._has_remote_blocks = False
524         self._callback = None
525         self._items = {}
526
527     def _my_api(self):
528         raise NotImplementedError()
529
530     def _my_keep(self):
531         raise NotImplementedError()
532
533     def _my_block_manager(self):
534         raise NotImplementedError()
535
536     def writable(self):
537         raise NotImplementedError()
538
539     def root_collection(self):
540         raise NotImplementedError()
541
542     def notify(self, event, collection, name, item):
543         raise NotImplementedError()
544
545     def stream_name(self):
546         raise NotImplementedError()
547
548     @synchronized
549     def has_remote_blocks(self):
550         """Recursively check for a +R segment locator signature."""
551
552         if self._has_remote_blocks:
553             return True
554         for item in self:
555             if self[item].has_remote_blocks():
556                 return True
557         return False
558
559     @synchronized
560     def set_has_remote_blocks(self, val):
561         self._has_remote_blocks = val
562         if self.parent:
563             self.parent.set_has_remote_blocks(val)
564
565     @must_be_writable
566     @synchronized
567     def find_or_create(self, path, create_type):
568         """Recursively search the specified file path.
569
570         May return either a `Collection` or `ArvadosFile`.  If not found, will
571         create a new item at the specified path based on `create_type`.  Will
572         create intermediate subcollections needed to contain the final item in
573         the path.
574
575         :create_type:
576           One of `arvados.collection.FILE` or
577           `arvados.collection.COLLECTION`.  If the path is not found, and value
578           of create_type is FILE then create and return a new ArvadosFile for
579           the last path component.  If COLLECTION, then create and return a new
580           Collection for the last path component.
581
582         """
583
584         pathcomponents = path.split("/", 1)
585         if pathcomponents[0]:
586             item = self._items.get(pathcomponents[0])
587             if len(pathcomponents) == 1:
588                 if item is None:
589                     # create new file
590                     if create_type == COLLECTION:
591                         item = Subcollection(self, pathcomponents[0])
592                     else:
593                         item = ArvadosFile(self, pathcomponents[0])
594                     self._items[pathcomponents[0]] = item
595                     self.set_committed(False)
596                     self.notify(ADD, self, pathcomponents[0], item)
597                 return item
598             else:
599                 if item is None:
600                     # create new collection
601                     item = Subcollection(self, pathcomponents[0])
602                     self._items[pathcomponents[0]] = item
603                     self.set_committed(False)
604                     self.notify(ADD, self, pathcomponents[0], item)
605                 if isinstance(item, RichCollectionBase):
606                     return item.find_or_create(pathcomponents[1], create_type)
607                 else:
608                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
609         else:
610             return self
611
612     @synchronized
613     def find(self, path):
614         """Recursively search the specified file path.
615
616         May return either a Collection or ArvadosFile. Return None if not
617         found.
618         If path is invalid (ex: starts with '/'), an IOError exception will be
619         raised.
620
621         """
622         if not path:
623             raise errors.ArgumentError("Parameter 'path' is empty.")
624
625         pathcomponents = path.split("/", 1)
626         if pathcomponents[0] == '':
627             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
628
629         item = self._items.get(pathcomponents[0])
630         if item is None:
631             return None
632         elif len(pathcomponents) == 1:
633             return item
634         else:
635             if isinstance(item, RichCollectionBase):
636                 if pathcomponents[1]:
637                     return item.find(pathcomponents[1])
638                 else:
639                     return item
640             else:
641                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
642
643     @synchronized
644     def mkdirs(self, path):
645         """Recursive subcollection create.
646
647         Like `os.makedirs()`.  Will create intermediate subcollections needed
648         to contain the leaf subcollection path.
649
650         """
651
652         if self.find(path) != None:
653             raise IOError(errno.EEXIST, "Directory or file exists", path)
654
655         return self.find_or_create(path, COLLECTION)
656
657     def open(self, path, mode="r"):
658         """Open a file-like object for access.
659
660         :path:
661           path to a file in the collection
662         :mode:
663           a string consisting of "r", "w", or "a", optionally followed
664           by "b" or "t", optionally followed by "+".
665           :"b":
666             binary mode: write() accepts bytes, read() returns bytes.
667           :"t":
668             text mode (default): write() accepts strings, read() returns strings.
669           :"r":
670             opens for reading
671           :"r+":
672             opens for reading and writing.  Reads/writes share a file pointer.
673           :"w", "w+":
674             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
675           :"a", "a+":
676             opens for reading and writing.  All writes are appended to
677             the end of the file.  Writing does not affect the file pointer for
678             reading.
679         """
680
681         if not re.search(r'^[rwa][bt]?\+?$', mode):
682             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
683
684         if mode[0] == 'r' and '+' not in mode:
685             fclass = ArvadosFileReader
686             arvfile = self.find(path)
687         elif not self.writable():
688             raise IOError(errno.EROFS, "Collection is read only")
689         else:
690             fclass = ArvadosFileWriter
691             arvfile = self.find_or_create(path, FILE)
692
693         if arvfile is None:
694             raise IOError(errno.ENOENT, "File not found", path)
695         if not isinstance(arvfile, ArvadosFile):
696             raise IOError(errno.EISDIR, "Is a directory", path)
697
698         if mode[0] == 'w':
699             arvfile.truncate(0)
700
701         return fclass(arvfile, mode=mode, num_retries=self.num_retries)
702
703     def modified(self):
704         """Determine if the collection has been modified since last commited."""
705         return not self.committed()
706
707     @synchronized
708     def committed(self):
709         """Determine if the collection has been committed to the API server."""
710         return self._committed
711
712     @synchronized
713     def set_committed(self, value=True):
714         """Recursively set committed flag.
715
716         If value is True, set committed to be True for this and all children.
717
718         If value is False, set committed to be False for this and all parents.
719         """
720         if value == self._committed:
721             return
722         if value:
723             for k,v in listitems(self._items):
724                 v.set_committed(True)
725             self._committed = True
726         else:
727             self._committed = False
728             if self.parent is not None:
729                 self.parent.set_committed(False)
730
731     @synchronized
732     def __iter__(self):
733         """Iterate over names of files and collections contained in this collection."""
734         return iter(viewkeys(self._items))
735
736     @synchronized
737     def __getitem__(self, k):
738         """Get a file or collection that is directly contained by this collection.
739
740         If you want to search a path, use `find()` instead.
741
742         """
743         return self._items[k]
744
745     @synchronized
746     def __contains__(self, k):
747         """Test if there is a file or collection a directly contained by this collection."""
748         return k in self._items
749
750     @synchronized
751     def __len__(self):
752         """Get the number of items directly contained in this collection."""
753         return len(self._items)
754
755     @must_be_writable
756     @synchronized
757     def __delitem__(self, p):
758         """Delete an item by name which is directly contained by this collection."""
759         del self._items[p]
760         self.set_committed(False)
761         self.notify(DEL, self, p, None)
762
763     @synchronized
764     def keys(self):
765         """Get a list of names of files and collections directly contained in this collection."""
766         return self._items.keys()
767
768     @synchronized
769     def values(self):
770         """Get a list of files and collection objects directly contained in this collection."""
771         return listvalues(self._items)
772
773     @synchronized
774     def items(self):
775         """Get a list of (name, object) tuples directly contained in this collection."""
776         return listitems(self._items)
777
778     def exists(self, path):
779         """Test if there is a file or collection at `path`."""
780         return self.find(path) is not None
781
782     @must_be_writable
783     @synchronized
784     def remove(self, path, recursive=False):
785         """Remove the file or subcollection (directory) at `path`.
786
787         :recursive:
788           Specify whether to remove non-empty subcollections (True), or raise an error (False).
789         """
790
791         if not path:
792             raise errors.ArgumentError("Parameter 'path' is empty.")
793
794         pathcomponents = path.split("/", 1)
795         item = self._items.get(pathcomponents[0])
796         if item is None:
797             raise IOError(errno.ENOENT, "File not found", path)
798         if len(pathcomponents) == 1:
799             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
800                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
801             deleteditem = self._items[pathcomponents[0]]
802             del self._items[pathcomponents[0]]
803             self.set_committed(False)
804             self.notify(DEL, self, pathcomponents[0], deleteditem)
805         else:
806             item.remove(pathcomponents[1])
807
808     def _clonefrom(self, source):
809         for k,v in listitems(source):
810             self._items[k] = v.clone(self, k)
811
812     def clone(self):
813         raise NotImplementedError()
814
815     @must_be_writable
816     @synchronized
817     def add(self, source_obj, target_name, overwrite=False, reparent=False):
818         """Copy or move a file or subcollection to this collection.
819
820         :source_obj:
821           An ArvadosFile, or Subcollection object
822
823         :target_name:
824           Destination item name.  If the target name already exists and is a
825           file, this will raise an error unless you specify `overwrite=True`.
826
827         :overwrite:
828           Whether to overwrite target file if it already exists.
829
830         :reparent:
831           If True, source_obj will be moved from its parent collection to this collection.
832           If False, source_obj will be copied and the parent collection will be
833           unmodified.
834
835         """
836
837         if target_name in self and not overwrite:
838             raise IOError(errno.EEXIST, "File already exists", target_name)
839
840         modified_from = None
841         if target_name in self:
842             modified_from = self[target_name]
843
844         # Actually make the move or copy.
845         if reparent:
846             source_obj._reparent(self, target_name)
847             item = source_obj
848         else:
849             item = source_obj.clone(self, target_name)
850
851         self._items[target_name] = item
852         self.set_committed(False)
853         if not self._has_remote_blocks and source_obj.has_remote_blocks():
854             self.set_has_remote_blocks(True)
855
856         if modified_from:
857             self.notify(MOD, self, target_name, (modified_from, item))
858         else:
859             self.notify(ADD, self, target_name, item)
860
861     def _get_src_target(self, source, target_path, source_collection, create_dest):
862         if source_collection is None:
863             source_collection = self
864
865         # Find the object
866         if isinstance(source, basestring):
867             source_obj = source_collection.find(source)
868             if source_obj is None:
869                 raise IOError(errno.ENOENT, "File not found", source)
870             sourcecomponents = source.split("/")
871         else:
872             source_obj = source
873             sourcecomponents = None
874
875         # Find parent collection the target path
876         targetcomponents = target_path.split("/")
877
878         # Determine the name to use.
879         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
880
881         if not target_name:
882             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
883
884         if create_dest:
885             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
886         else:
887             if len(targetcomponents) > 1:
888                 target_dir = self.find("/".join(targetcomponents[0:-1]))
889             else:
890                 target_dir = self
891
892         if target_dir is None:
893             raise IOError(errno.ENOENT, "Target directory not found", target_name)
894
895         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
896             target_dir = target_dir[target_name]
897             target_name = sourcecomponents[-1]
898
899         return (source_obj, target_dir, target_name)
900
901     @must_be_writable
902     @synchronized
903     def copy(self, source, target_path, source_collection=None, overwrite=False):
904         """Copy a file or subcollection to a new path in this collection.
905
906         :source:
907           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
908
909         :target_path:
910           Destination file or path.  If the target path already exists and is a
911           subcollection, the item will be placed inside the subcollection.  If
912           the target path already exists and is a file, this will raise an error
913           unless you specify `overwrite=True`.
914
915         :source_collection:
916           Collection to copy `source_path` from (default `self`)
917
918         :overwrite:
919           Whether to overwrite target file if it already exists.
920         """
921
922         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
923         target_dir.add(source_obj, target_name, overwrite, False)
924
925     @must_be_writable
926     @synchronized
927     def rename(self, source, target_path, source_collection=None, overwrite=False):
928         """Move a file or subcollection from `source_collection` to a new path in this collection.
929
930         :source:
931           A string with a path to source file or subcollection.
932
933         :target_path:
934           Destination file or path.  If the target path already exists and is a
935           subcollection, the item will be placed inside the subcollection.  If
936           the target path already exists and is a file, this will raise an error
937           unless you specify `overwrite=True`.
938
939         :source_collection:
940           Collection to copy `source_path` from (default `self`)
941
942         :overwrite:
943           Whether to overwrite target file if it already exists.
944         """
945
946         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
947         if not source_obj.writable():
948             raise IOError(errno.EROFS, "Source collection is read only", source)
949         target_dir.add(source_obj, target_name, overwrite, True)
950
951     def portable_manifest_text(self, stream_name="."):
952         """Get the manifest text for this collection, sub collections and files.
953
954         This method does not flush outstanding blocks to Keep.  It will return
955         a normalized manifest with access tokens stripped.
956
957         :stream_name:
958           Name to use for this stream (directory)
959
960         """
961         return self._get_manifest_text(stream_name, True, True)
962
963     @synchronized
964     def manifest_text(self, stream_name=".", strip=False, normalize=False,
965                       only_committed=False):
966         """Get the manifest text for this collection, sub collections and files.
967
968         This method will flush outstanding blocks to Keep.  By default, it will
969         not normalize an unmodified manifest or strip access tokens.
970
971         :stream_name:
972           Name to use for this stream (directory)
973
974         :strip:
975           If True, remove signing tokens from block locators if present.
976           If False (default), block locators are left unchanged.
977
978         :normalize:
979           If True, always export the manifest text in normalized form
980           even if the Collection is not modified.  If False (default) and the collection
981           is not modified, return the original manifest text even if it is not
982           in normalized form.
983
984         :only_committed:
985           If True, don't commit pending blocks.
986
987         """
988
989         if not only_committed:
990             self._my_block_manager().commit_all()
991         return self._get_manifest_text(stream_name, strip, normalize,
992                                        only_committed=only_committed)
993
994     @synchronized
995     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
996         """Get the manifest text for this collection, sub collections and files.
997
998         :stream_name:
999           Name to use for this stream (directory)
1000
1001         :strip:
1002           If True, remove signing tokens from block locators if present.
1003           If False (default), block locators are left unchanged.
1004
1005         :normalize:
1006           If True, always export the manifest text in normalized form
1007           even if the Collection is not modified.  If False (default) and the collection
1008           is not modified, return the original manifest text even if it is not
1009           in normalized form.
1010
1011         :only_committed:
1012           If True, only include blocks that were already committed to Keep.
1013
1014         """
1015
1016         if not self.committed() or self._manifest_text is None or normalize:
1017             stream = {}
1018             buf = []
1019             sorted_keys = sorted(self.keys())
1020             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1021                 # Create a stream per file `k`
1022                 arvfile = self[filename]
1023                 filestream = []
1024                 for segment in arvfile.segments():
1025                     loc = segment.locator
1026                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
1027                         if only_committed:
1028                             continue
1029                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1030                     if strip:
1031                         loc = KeepLocator(loc).stripped()
1032                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1033                                          segment.segment_offset, segment.range_size))
1034                 stream[filename] = filestream
1035             if stream:
1036                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1037             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1038                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1039             return "".join(buf)
1040         else:
1041             if strip:
1042                 return self.stripped_manifest()
1043             else:
1044                 return self._manifest_text
1045
1046     @synchronized
1047     def _copy_remote_blocks(self, remote_blocks={}):
1048         """Scan through the entire collection and ask Keep to copy remote blocks.
1049
1050         When accessing a remote collection, blocks will have a remote signature
1051         (+R instead of +A). Collect these signatures and request Keep to copy the
1052         blocks to the local cluster, returning local (+A) signatures.
1053
1054         :remote_blocks:
1055           Shared cache of remote to local block mappings. This is used to avoid
1056           doing extra work when blocks are shared by more than one file in
1057           different subdirectories.
1058
1059         """
1060         for item in self:
1061             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1062         return remote_blocks
1063
1064     @synchronized
1065     def diff(self, end_collection, prefix=".", holding_collection=None):
1066         """Generate list of add/modify/delete actions.
1067
1068         When given to `apply`, will change `self` to match `end_collection`
1069
1070         """
1071         changes = []
1072         if holding_collection is None:
1073             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1074         for k in self:
1075             if k not in end_collection:
1076                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1077         for k in end_collection:
1078             if k in self:
1079                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1080                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1081                 elif end_collection[k] != self[k]:
1082                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1083                 else:
1084                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1085             else:
1086                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1087         return changes
1088
1089     @must_be_writable
1090     @synchronized
1091     def apply(self, changes):
1092         """Apply changes from `diff`.
1093
1094         If a change conflicts with a local change, it will be saved to an
1095         alternate path indicating the conflict.
1096
1097         """
1098         if changes:
1099             self.set_committed(False)
1100         for change in changes:
1101             event_type = change[0]
1102             path = change[1]
1103             initial = change[2]
1104             local = self.find(path)
1105             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1106                                                                     time.gmtime()))
1107             if event_type == ADD:
1108                 if local is None:
1109                     # No local file at path, safe to copy over new file
1110                     self.copy(initial, path)
1111                 elif local is not None and local != initial:
1112                     # There is already local file and it is different:
1113                     # save change to conflict file.
1114                     self.copy(initial, conflictpath)
1115             elif event_type == MOD or event_type == TOK:
1116                 final = change[3]
1117                 if local == initial:
1118                     # Local matches the "initial" item so it has not
1119                     # changed locally and is safe to update.
1120                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1121                         # Replace contents of local file with new contents
1122                         local.replace_contents(final)
1123                     else:
1124                         # Overwrite path with new item; this can happen if
1125                         # path was a file and is now a collection or vice versa
1126                         self.copy(final, path, overwrite=True)
1127                 else:
1128                     # Local is missing (presumably deleted) or local doesn't
1129                     # match the "start" value, so save change to conflict file
1130                     self.copy(final, conflictpath)
1131             elif event_type == DEL:
1132                 if local == initial:
1133                     # Local item matches "initial" value, so it is safe to remove.
1134                     self.remove(path, recursive=True)
1135                 # else, the file is modified or already removed, in either
1136                 # case we don't want to try to remove it.
1137
1138     def portable_data_hash(self):
1139         """Get the portable data hash for this collection's manifest."""
1140         if self._manifest_locator and self.committed():
1141             # If the collection is already saved on the API server, and it's committed
1142             # then return API server's PDH response.
1143             return self._portable_data_hash
1144         else:
1145             stripped = self.portable_manifest_text().encode()
1146             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1147
1148     @synchronized
1149     def subscribe(self, callback):
1150         if self._callback is None:
1151             self._callback = callback
1152         else:
1153             raise errors.ArgumentError("A callback is already set on this collection.")
1154
1155     @synchronized
1156     def unsubscribe(self):
1157         if self._callback is not None:
1158             self._callback = None
1159
1160     @synchronized
1161     def notify(self, event, collection, name, item):
1162         if self._callback:
1163             self._callback(event, collection, name, item)
1164         self.root_collection().notify(event, collection, name, item)
1165
1166     @synchronized
1167     def __eq__(self, other):
1168         if other is self:
1169             return True
1170         if not isinstance(other, RichCollectionBase):
1171             return False
1172         if len(self._items) != len(other):
1173             return False
1174         for k in self._items:
1175             if k not in other:
1176                 return False
1177             if self._items[k] != other[k]:
1178                 return False
1179         return True
1180
1181     def __ne__(self, other):
1182         return not self.__eq__(other)
1183
1184     @synchronized
1185     def flush(self):
1186         """Flush bufferblocks to Keep."""
1187         for e in listvalues(self):
1188             e.flush()
1189
1190
1191 class Collection(RichCollectionBase):
1192     """Represents the root of an Arvados Collection.
1193
1194     This class is threadsafe.  The root collection object, all subcollections
1195     and files are protected by a single lock (i.e. each access locks the entire
1196     collection).
1197
1198     Brief summary of
1199     useful methods:
1200
1201     :To read an existing file:
1202       `c.open("myfile", "r")`
1203
1204     :To write a new file:
1205       `c.open("myfile", "w")`
1206
1207     :To determine if a file exists:
1208       `c.find("myfile") is not None`
1209
1210     :To copy a file:
1211       `c.copy("source", "dest")`
1212
1213     :To delete a file:
1214       `c.remove("myfile")`
1215
1216     :To save to an existing collection record:
1217       `c.save()`
1218
1219     :To save a new collection record:
1220     `c.save_new()`
1221
1222     :To merge remote changes into this object:
1223       `c.update()`
1224
1225     Must be associated with an API server Collection record (during
1226     initialization, or using `save_new`) to use `save` or `update`
1227
1228     """
1229
1230     def __init__(self, manifest_locator_or_text=None,
1231                  api_client=None,
1232                  keep_client=None,
1233                  num_retries=None,
1234                  parent=None,
1235                  apiconfig=None,
1236                  block_manager=None,
1237                  replication_desired=None,
1238                  put_threads=None):
1239         """Collection constructor.
1240
1241         :manifest_locator_or_text:
1242           An Arvados collection UUID, portable data hash, raw manifest
1243           text, or (if creating an empty collection) None.
1244
1245         :parent:
1246           the parent Collection, may be None.
1247
1248         :apiconfig:
1249           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1250           Prefer this over supplying your own api_client and keep_client (except in testing).
1251           Will use default config settings if not specified.
1252
1253         :api_client:
1254           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1255
1256         :keep_client:
1257           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1258
1259         :num_retries:
1260           the number of retries for API and Keep requests.
1261
1262         :block_manager:
1263           the block manager to use.  If not specified, create one.
1264
1265         :replication_desired:
1266           How many copies should Arvados maintain. If None, API server default
1267           configuration applies. If not None, this value will also be used
1268           for determining the number of block copies being written.
1269
1270         """
1271         super(Collection, self).__init__(parent)
1272         self._api_client = api_client
1273         self._keep_client = keep_client
1274         self._block_manager = block_manager
1275         self.replication_desired = replication_desired
1276         self.put_threads = put_threads
1277
1278         if apiconfig:
1279             self._config = apiconfig
1280         else:
1281             self._config = config.settings()
1282
1283         self.num_retries = num_retries if num_retries is not None else 0
1284         self._manifest_locator = None
1285         self._manifest_text = None
1286         self._portable_data_hash = None
1287         self._api_response = None
1288         self._past_versions = set()
1289
1290         self.lock = threading.RLock()
1291         self.events = None
1292
1293         if manifest_locator_or_text:
1294             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1295                 self._manifest_locator = manifest_locator_or_text
1296             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1297                 self._manifest_locator = manifest_locator_or_text
1298                 if not self._has_local_collection_uuid():
1299                     self._has_remote_blocks = True
1300             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1301                 self._manifest_text = manifest_locator_or_text
1302                 if '+R' in self._manifest_text:
1303                     self._has_remote_blocks = True
1304             else:
1305                 raise errors.ArgumentError(
1306                     "Argument to CollectionReader is not a manifest or a collection UUID")
1307
1308             try:
1309                 self._populate()
1310             except (IOError, errors.SyntaxError) as e:
1311                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1312
1313     def root_collection(self):
1314         return self
1315
1316     def get_properties(self):
1317         if self._api_response and self._api_response["properties"]:
1318             return self._api_response["properties"]
1319         else:
1320             return {}
1321
1322     def get_trash_at(self):
1323         if self._api_response and self._api_response["trash_at"]:
1324             return ciso8601.parse_datetime(self._api_response["trash_at"])
1325         else:
1326             return None
1327
1328     def stream_name(self):
1329         return "."
1330
1331     def writable(self):
1332         return True
1333
1334     @synchronized
1335     def known_past_version(self, modified_at_and_portable_data_hash):
1336         return modified_at_and_portable_data_hash in self._past_versions
1337
1338     @synchronized
1339     @retry_method
1340     def update(self, other=None, num_retries=None):
1341         """Merge the latest collection on the API server with the current collection."""
1342
1343         if other is None:
1344             if self._manifest_locator is None:
1345                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1346             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1347             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1348                 response.get("portable_data_hash") != self.portable_data_hash()):
1349                 # The record on the server is different from our current one, but we've seen it before,
1350                 # so ignore it because it's already been merged.
1351                 # However, if it's the same as our current record, proceed with the update, because we want to update
1352                 # our tokens.
1353                 return
1354             else:
1355                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1356             other = CollectionReader(response["manifest_text"])
1357         baseline = CollectionReader(self._manifest_text)
1358         self.apply(baseline.diff(other))
1359         self._manifest_text = self.manifest_text()
1360
1361     @synchronized
1362     def _my_api(self):
1363         if self._api_client is None:
1364             self._api_client = ThreadSafeApiCache(self._config)
1365             if self._keep_client is None:
1366                 self._keep_client = self._api_client.keep
1367         return self._api_client
1368
1369     @synchronized
1370     def _my_keep(self):
1371         if self._keep_client is None:
1372             if self._api_client is None:
1373                 self._my_api()
1374             else:
1375                 self._keep_client = KeepClient(api_client=self._api_client)
1376         return self._keep_client
1377
1378     @synchronized
1379     def _my_block_manager(self):
1380         if self._block_manager is None:
1381             copies = (self.replication_desired or
1382                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1383                                                    2))
1384             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1385         return self._block_manager
1386
1387     def _remember_api_response(self, response):
1388         self._api_response = response
1389         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1390
1391     def _populate_from_api_server(self):
1392         # As in KeepClient itself, we must wait until the last
1393         # possible moment to instantiate an API client, in order to
1394         # avoid tripping up clients that don't have access to an API
1395         # server.  If we do build one, make sure our Keep client uses
1396         # it.  If instantiation fails, we'll fall back to the except
1397         # clause, just like any other Collection lookup
1398         # failure. Return an exception, or None if successful.
1399         self._remember_api_response(self._my_api().collections().get(
1400             uuid=self._manifest_locator).execute(
1401                 num_retries=self.num_retries))
1402         self._manifest_text = self._api_response['manifest_text']
1403         self._portable_data_hash = self._api_response['portable_data_hash']
1404         # If not overriden via kwargs, we should try to load the
1405         # replication_desired from the API server
1406         if self.replication_desired is None:
1407             self.replication_desired = self._api_response.get('replication_desired', None)
1408
1409     def _populate(self):
1410         if self._manifest_text is None:
1411             if self._manifest_locator is None:
1412                 return
1413             else:
1414                 self._populate_from_api_server()
1415         self._baseline_manifest = self._manifest_text
1416         self._import_manifest(self._manifest_text)
1417
1418     def _has_collection_uuid(self):
1419         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1420
1421     def _has_local_collection_uuid(self):
1422         return self._has_collection_uuid and \
1423             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1424
1425     def __enter__(self):
1426         return self
1427
1428     def __exit__(self, exc_type, exc_value, traceback):
1429         """Support scoped auto-commit in a with: block."""
1430         if exc_type is None:
1431             if self.writable() and self._has_collection_uuid():
1432                 self.save()
1433         self.stop_threads()
1434
1435     def stop_threads(self):
1436         if self._block_manager is not None:
1437             self._block_manager.stop_threads()
1438
1439     @synchronized
1440     def manifest_locator(self):
1441         """Get the manifest locator, if any.
1442
1443         The manifest locator will be set when the collection is loaded from an
1444         API server record or the portable data hash of a manifest.
1445
1446         The manifest locator will be None if the collection is newly created or
1447         was created directly from manifest text.  The method `save_new()` will
1448         assign a manifest locator.
1449
1450         """
1451         return self._manifest_locator
1452
1453     @synchronized
1454     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1455         if new_config is None:
1456             new_config = self._config
1457         if readonly:
1458             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1459         else:
1460             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1461
1462         newcollection._clonefrom(self)
1463         return newcollection
1464
1465     @synchronized
1466     def api_response(self):
1467         """Returns information about this Collection fetched from the API server.
1468
1469         If the Collection exists in Keep but not the API server, currently
1470         returns None.  Future versions may provide a synthetic response.
1471
1472         """
1473         return self._api_response
1474
1475     def find_or_create(self, path, create_type):
1476         """See `RichCollectionBase.find_or_create`"""
1477         if path == ".":
1478             return self
1479         else:
1480             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1481
1482     def find(self, path):
1483         """See `RichCollectionBase.find`"""
1484         if path == ".":
1485             return self
1486         else:
1487             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1488
1489     def remove(self, path, recursive=False):
1490         """See `RichCollectionBase.remove`"""
1491         if path == ".":
1492             raise errors.ArgumentError("Cannot remove '.'")
1493         else:
1494             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1495
1496     @must_be_writable
1497     @synchronized
1498     @retry_method
1499     def save(self,
1500              properties=None,
1501              storage_classes=None,
1502              trash_at=None,
1503              merge=True,
1504              num_retries=None):
1505         """Save collection to an existing collection record.
1506
1507         Commit pending buffer blocks to Keep, merge with remote record (if
1508         merge=True, the default), and update the collection record. Returns
1509         the current manifest text.
1510
1511         Will raise AssertionError if not associated with a collection record on
1512         the API server.  If you want to save a manifest to Keep only, see
1513         `save_new()`.
1514
1515         :properties:
1516           Additional properties of collection. This value will replace any existing
1517           properties of collection.
1518
1519         :storage_classes:
1520           Specify desirable storage classes to be used when writing data to Keep.
1521
1522         :trash_at:
1523           A collection is *expiring* when it has a *trash_at* time in the future.
1524           An expiring collection can be accessed as normal,
1525           but is scheduled to be trashed automatically at the *trash_at* time.
1526
1527         :merge:
1528           Update and merge remote changes before saving.  Otherwise, any
1529           remote changes will be ignored and overwritten.
1530
1531         :num_retries:
1532           Retry count on API calls (if None,  use the collection default)
1533
1534         """
1535         if properties and type(properties) is not dict:
1536             raise errors.ArgumentError("properties must be dictionary type.")
1537
1538         if storage_classes and type(storage_classes) is not list:
1539             raise errors.ArgumentError("storage_classes must be list type.")
1540
1541         if trash_at and type(trash_at) is not datetime.datetime:
1542             raise errors.ArgumentError("trash_at must be datetime type.")
1543
1544         body={}
1545         if properties:
1546             body["properties"] = properties
1547         if storage_classes:
1548             body["storage_classes_desired"] = storage_classes
1549         if trash_at:
1550             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1551             body["trash_at"] = t
1552
1553         if not self.committed():
1554             if self._has_remote_blocks:
1555                 # Copy any remote blocks to the local cluster.
1556                 self._copy_remote_blocks(remote_blocks={})
1557                 self._has_remote_blocks = False
1558             if not self._has_collection_uuid():
1559                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1560             elif not self._has_local_collection_uuid():
1561                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1562
1563             self._my_block_manager().commit_all()
1564
1565             if merge:
1566                 self.update()
1567
1568             text = self.manifest_text(strip=False)
1569             body['manifest_text'] = text
1570
1571             self._remember_api_response(self._my_api().collections().update(
1572                 uuid=self._manifest_locator,
1573                 body=body
1574                 ).execute(num_retries=num_retries))
1575             self._manifest_text = self._api_response["manifest_text"]
1576             self._portable_data_hash = self._api_response["portable_data_hash"]
1577             self.set_committed(True)
1578         elif body:
1579             self._remember_api_response(self._my_api().collections().update(
1580                 uuid=self._manifest_locator,
1581                 body=body
1582                 ).execute(num_retries=num_retries))
1583
1584         return self._manifest_text
1585
1586
1587     @must_be_writable
1588     @synchronized
1589     @retry_method
1590     def save_new(self, name=None,
1591                  create_collection_record=True,
1592                  owner_uuid=None,
1593                  properties=None,
1594                  storage_classes=None,
1595                  trash_at=None,
1596                  ensure_unique_name=False,
1597                  num_retries=None):
1598         """Save collection to a new collection record.
1599
1600         Commit pending buffer blocks to Keep and, when create_collection_record
1601         is True (default), create a new collection record.  After creating a
1602         new collection record, this Collection object will be associated with
1603         the new record used by `save()`. Returns the current manifest text.
1604
1605         :name:
1606           The collection name.
1607
1608         :create_collection_record:
1609            If True, create a collection record on the API server.
1610            If False, only commit blocks to Keep and return the manifest text.
1611
1612         :owner_uuid:
1613           the user, or project uuid that will own this collection.
1614           If None, defaults to the current user.
1615
1616         :properties:
1617           Additional properties of collection. This value will replace any existing
1618           properties of collection.
1619
1620         :storage_classes:
1621           Specify desirable storage classes to be used when writing data to Keep.
1622
1623         :trash_at:
1624           A collection is *expiring* when it has a *trash_at* time in the future.
1625           An expiring collection can be accessed as normal,
1626           but is scheduled to be trashed automatically at the *trash_at* time.
1627
1628         :ensure_unique_name:
1629           If True, ask the API server to rename the collection
1630           if it conflicts with a collection with the same name and owner.  If
1631           False, a name conflict will result in an error.
1632
1633         :num_retries:
1634           Retry count on API calls (if None,  use the collection default)
1635
1636         """
1637         if properties and type(properties) is not dict:
1638             raise errors.ArgumentError("properties must be dictionary type.")
1639
1640         if storage_classes and type(storage_classes) is not list:
1641             raise errors.ArgumentError("storage_classes must be list type.")
1642
1643         if trash_at and type(trash_at) is not datetime.datetime:
1644             raise errors.ArgumentError("trash_at must be datetime type.")
1645
1646         if self._has_remote_blocks:
1647             # Copy any remote blocks to the local cluster.
1648             self._copy_remote_blocks(remote_blocks={})
1649             self._has_remote_blocks = False
1650
1651         self._my_block_manager().commit_all()
1652         text = self.manifest_text(strip=False)
1653
1654         if create_collection_record:
1655             if name is None:
1656                 name = "New collection"
1657                 ensure_unique_name = True
1658
1659             body = {"manifest_text": text,
1660                     "name": name,
1661                     "replication_desired": self.replication_desired}
1662             if owner_uuid:
1663                 body["owner_uuid"] = owner_uuid
1664             if properties:
1665                 body["properties"] = properties
1666             if storage_classes:
1667                 body["storage_classes_desired"] = storage_classes
1668             if trash_at:
1669                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1670                 body["trash_at"] = t
1671
1672             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1673             text = self._api_response["manifest_text"]
1674
1675             self._manifest_locator = self._api_response["uuid"]
1676             self._portable_data_hash = self._api_response["portable_data_hash"]
1677
1678             self._manifest_text = text
1679             self.set_committed(True)
1680
1681         return text
1682
1683     _token_re = re.compile(r'(\S+)(\s+|$)')
1684     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1685     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1686
1687     def _unescape_manifest_path(self, path):
1688         return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1689
1690     @synchronized
1691     def _import_manifest(self, manifest_text):
1692         """Import a manifest into a `Collection`.
1693
1694         :manifest_text:
1695           The manifest text to import from.
1696
1697         """
1698         if len(self) > 0:
1699             raise ArgumentError("Can only import manifest into an empty collection")
1700
1701         STREAM_NAME = 0
1702         BLOCKS = 1
1703         SEGMENTS = 2
1704
1705         stream_name = None
1706         state = STREAM_NAME
1707
1708         for token_and_separator in self._token_re.finditer(manifest_text):
1709             tok = token_and_separator.group(1)
1710             sep = token_and_separator.group(2)
1711
1712             if state == STREAM_NAME:
1713                 # starting a new stream
1714                 stream_name = self._unescape_manifest_path(tok)
1715                 blocks = []
1716                 segments = []
1717                 streamoffset = 0
1718                 state = BLOCKS
1719                 self.find_or_create(stream_name, COLLECTION)
1720                 continue
1721
1722             if state == BLOCKS:
1723                 block_locator = self._block_re.match(tok)
1724                 if block_locator:
1725                     blocksize = int(block_locator.group(1))
1726                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1727                     streamoffset += blocksize
1728                 else:
1729                     state = SEGMENTS
1730
1731             if state == SEGMENTS:
1732                 file_segment = self._segment_re.match(tok)
1733                 if file_segment:
1734                     pos = int(file_segment.group(1))
1735                     size = int(file_segment.group(2))
1736                     name = self._unescape_manifest_path(file_segment.group(3))
1737                     if name.split('/')[-1] == '.':
1738                         # placeholder for persisting an empty directory, not a real file
1739                         if len(name) > 2:
1740                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1741                     else:
1742                         filepath = os.path.join(stream_name, name)
1743                         afile = self.find_or_create(filepath, FILE)
1744                         if isinstance(afile, ArvadosFile):
1745                             afile.add_segment(blocks, pos, size)
1746                         else:
1747                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1748                 else:
1749                     # error!
1750                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1751
1752             if sep == "\n":
1753                 stream_name = None
1754                 state = STREAM_NAME
1755
1756         self.set_committed(True)
1757
1758     @synchronized
1759     def notify(self, event, collection, name, item):
1760         if self._callback:
1761             self._callback(event, collection, name, item)
1762
1763
1764 class Subcollection(RichCollectionBase):
1765     """This is a subdirectory within a collection that doesn't have its own API
1766     server record.
1767
1768     Subcollection locking falls under the umbrella lock of its root collection.
1769
1770     """
1771
1772     def __init__(self, parent, name):
1773         super(Subcollection, self).__init__(parent)
1774         self.lock = self.root_collection().lock
1775         self._manifest_text = None
1776         self.name = name
1777         self.num_retries = parent.num_retries
1778
1779     def root_collection(self):
1780         return self.parent.root_collection()
1781
1782     def writable(self):
1783         return self.root_collection().writable()
1784
1785     def _my_api(self):
1786         return self.root_collection()._my_api()
1787
1788     def _my_keep(self):
1789         return self.root_collection()._my_keep()
1790
1791     def _my_block_manager(self):
1792         return self.root_collection()._my_block_manager()
1793
1794     def stream_name(self):
1795         return os.path.join(self.parent.stream_name(), self.name)
1796
1797     @synchronized
1798     def clone(self, new_parent, new_name):
1799         c = Subcollection(new_parent, new_name)
1800         c._clonefrom(self)
1801         return c
1802
1803     @must_be_writable
1804     @synchronized
1805     def _reparent(self, newparent, newname):
1806         self.set_committed(False)
1807         self.flush()
1808         self.parent.remove(self.name, recursive=True)
1809         self.parent = newparent
1810         self.name = newname
1811         self.lock = self.parent.root_collection().lock
1812
1813
1814 class CollectionReader(Collection):
1815     """A read-only collection object.
1816
1817     Initialize from a collection UUID or portable data hash, or raw
1818     manifest text.  See `Collection` constructor for detailed options.
1819
1820     """
1821     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1822         self._in_init = True
1823         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1824         self._in_init = False
1825
1826         # Forego any locking since it should never change once initialized.
1827         self.lock = NoopLock()
1828
1829         # Backwards compatability with old CollectionReader
1830         # all_streams() and all_files()
1831         self._streams = None
1832
1833     def writable(self):
1834         return self._in_init
1835
1836     def _populate_streams(orig_func):
1837         @functools.wraps(orig_func)
1838         def populate_streams_wrapper(self, *args, **kwargs):
1839             # Defer populating self._streams until needed since it creates a copy of the manifest.
1840             if self._streams is None:
1841                 if self._manifest_text:
1842                     self._streams = [sline.split()
1843                                      for sline in self._manifest_text.split("\n")
1844                                      if sline]
1845                 else:
1846                     self._streams = []
1847             return orig_func(self, *args, **kwargs)
1848         return populate_streams_wrapper
1849
1850     @_populate_streams
1851     def normalize(self):
1852         """Normalize the streams returned by `all_streams`.
1853
1854         This method is kept for backwards compatability and only affects the
1855         behavior of `all_streams()` and `all_files()`
1856
1857         """
1858
1859         # Rearrange streams
1860         streams = {}
1861         for s in self.all_streams():
1862             for f in s.all_files():
1863                 streamname, filename = split(s.name() + "/" + f.name())
1864                 if streamname not in streams:
1865                     streams[streamname] = {}
1866                 if filename not in streams[streamname]:
1867                     streams[streamname][filename] = []
1868                 for r in f.segments:
1869                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1870
1871         self._streams = [normalize_stream(s, streams[s])
1872                          for s in sorted(streams)]
1873     @_populate_streams
1874     def all_streams(self):
1875         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1876                 for s in self._streams]
1877
1878     @_populate_streams
1879     def all_files(self):
1880         for s in self.all_streams():
1881             for f in s.all_files():
1882                 yield f