Merge branch 'master' into 14259-pysdk-remote-block-copy
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import functools
11 import logging
12 import os
13 import re
14 import errno
15 import hashlib
16 import datetime
17 import ciso8601
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
25 from .keep import KeepLocator, KeepClient
26 from .stream import StreamReader
27 from ._normalize_stream import normalize_stream
28 from ._ranges import Range, LocatorAndRange
29 from .safeapi import ThreadSafeApiCache
30 import arvados.config as config
31 import arvados.errors as errors
32 import arvados.util
33 import arvados.events as events
34 from arvados.retry import retry_method
35
36 _logger = logging.getLogger('arvados.collection')
37
38 class CollectionBase(object):
39     """Abstract base class for Collection classes."""
40
41     def __enter__(self):
42         return self
43
44     def __exit__(self, exc_type, exc_value, traceback):
45         pass
46
47     def _my_keep(self):
48         if self._keep_client is None:
49             self._keep_client = KeepClient(api_client=self._api_client,
50                                            num_retries=self.num_retries)
51         return self._keep_client
52
53     def stripped_manifest(self):
54         """Get the manifest with locator hints stripped.
55
56         Return the manifest for the current collection with all
57         non-portable hints (i.e., permission signatures and other
58         hints other than size hints) removed from the locators.
59         """
60         raw = self.manifest_text()
61         clean = []
62         for line in raw.split("\n"):
63             fields = line.split()
64             if fields:
65                 clean_fields = fields[:1] + [
66                     (re.sub(r'\+[^\d][^\+]*', '', x)
67                      if re.match(arvados.util.keep_locator_pattern, x)
68                      else x)
69                     for x in fields[1:]]
70                 clean += [' '.join(clean_fields), "\n"]
71         return ''.join(clean)
72
73
74 class _WriterFile(_FileLikeObjectBase):
75     def __init__(self, coll_writer, name):
76         super(_WriterFile, self).__init__(name, 'wb')
77         self.dest = coll_writer
78
79     def close(self):
80         super(_WriterFile, self).close()
81         self.dest.finish_current_file()
82
83     @_FileLikeObjectBase._before_close
84     def write(self, data):
85         self.dest.write(data)
86
87     @_FileLikeObjectBase._before_close
88     def writelines(self, seq):
89         for data in seq:
90             self.write(data)
91
92     @_FileLikeObjectBase._before_close
93     def flush(self):
94         self.dest.flush_data()
95
96
97 class CollectionWriter(CollectionBase):
98     """Deprecated, use Collection instead."""
99
100     def __init__(self, api_client=None, num_retries=0, replication=None):
101         """Instantiate a CollectionWriter.
102
103         CollectionWriter lets you build a new Arvados Collection from scratch.
104         Write files to it.  The CollectionWriter will upload data to Keep as
105         appropriate, and provide you with the Collection manifest text when
106         you're finished.
107
108         Arguments:
109         * api_client: The API client to use to look up Collections.  If not
110           provided, CollectionReader will build one from available Arvados
111           configuration.
112         * num_retries: The default number of times to retry failed
113           service requests.  Default 0.  You may change this value
114           after instantiation, but note those changes may not
115           propagate to related objects like the Keep client.
116         * replication: The number of copies of each block to store.
117           If this argument is None or not supplied, replication is
118           the server-provided default if available, otherwise 2.
119         """
120         self._api_client = api_client
121         self.num_retries = num_retries
122         self.replication = (2 if replication is None else replication)
123         self._keep_client = None
124         self._data_buffer = []
125         self._data_buffer_len = 0
126         self._current_stream_files = []
127         self._current_stream_length = 0
128         self._current_stream_locators = []
129         self._current_stream_name = '.'
130         self._current_file_name = None
131         self._current_file_pos = 0
132         self._finished_streams = []
133         self._close_file = None
134         self._queued_file = None
135         self._queued_dirents = deque()
136         self._queued_trees = deque()
137         self._last_open = None
138
139     def __exit__(self, exc_type, exc_value, traceback):
140         if exc_type is None:
141             self.finish()
142
143     def do_queued_work(self):
144         # The work queue consists of three pieces:
145         # * _queued_file: The file object we're currently writing to the
146         #   Collection.
147         # * _queued_dirents: Entries under the current directory
148         #   (_queued_trees[0]) that we want to write or recurse through.
149         #   This may contain files from subdirectories if
150         #   max_manifest_depth == 0 for this directory.
151         # * _queued_trees: Directories that should be written as separate
152         #   streams to the Collection.
153         # This function handles the smallest piece of work currently queued
154         # (current file, then current directory, then next directory) until
155         # no work remains.  The _work_THING methods each do a unit of work on
156         # THING.  _queue_THING methods add a THING to the work queue.
157         while True:
158             if self._queued_file:
159                 self._work_file()
160             elif self._queued_dirents:
161                 self._work_dirents()
162             elif self._queued_trees:
163                 self._work_trees()
164             else:
165                 break
166
167     def _work_file(self):
168         while True:
169             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
170             if not buf:
171                 break
172             self.write(buf)
173         self.finish_current_file()
174         if self._close_file:
175             self._queued_file.close()
176         self._close_file = None
177         self._queued_file = None
178
179     def _work_dirents(self):
180         path, stream_name, max_manifest_depth = self._queued_trees[0]
181         if stream_name != self.current_stream_name():
182             self.start_new_stream(stream_name)
183         while self._queued_dirents:
184             dirent = self._queued_dirents.popleft()
185             target = os.path.join(path, dirent)
186             if os.path.isdir(target):
187                 self._queue_tree(target,
188                                  os.path.join(stream_name, dirent),
189                                  max_manifest_depth - 1)
190             else:
191                 self._queue_file(target, dirent)
192                 break
193         if not self._queued_dirents:
194             self._queued_trees.popleft()
195
196     def _work_trees(self):
197         path, stream_name, max_manifest_depth = self._queued_trees[0]
198         d = arvados.util.listdir_recursive(
199             path, max_depth = (None if max_manifest_depth == 0 else 0))
200         if d:
201             self._queue_dirents(stream_name, d)
202         else:
203             self._queued_trees.popleft()
204
205     def _queue_file(self, source, filename=None):
206         assert (self._queued_file is None), "tried to queue more than one file"
207         if not hasattr(source, 'read'):
208             source = open(source, 'rb')
209             self._close_file = True
210         else:
211             self._close_file = False
212         if filename is None:
213             filename = os.path.basename(source.name)
214         self.start_new_file(filename)
215         self._queued_file = source
216
217     def _queue_dirents(self, stream_name, dirents):
218         assert (not self._queued_dirents), "tried to queue more than one tree"
219         self._queued_dirents = deque(sorted(dirents))
220
221     def _queue_tree(self, path, stream_name, max_manifest_depth):
222         self._queued_trees.append((path, stream_name, max_manifest_depth))
223
224     def write_file(self, source, filename=None):
225         self._queue_file(source, filename)
226         self.do_queued_work()
227
228     def write_directory_tree(self,
229                              path, stream_name='.', max_manifest_depth=-1):
230         self._queue_tree(path, stream_name, max_manifest_depth)
231         self.do_queued_work()
232
233     def write(self, newdata):
234         if isinstance(newdata, bytes):
235             pass
236         elif isinstance(newdata, str):
237             newdata = newdata.encode()
238         elif hasattr(newdata, '__iter__'):
239             for s in newdata:
240                 self.write(s)
241             return
242         self._data_buffer.append(newdata)
243         self._data_buffer_len += len(newdata)
244         self._current_stream_length += len(newdata)
245         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
246             self.flush_data()
247
248     def open(self, streampath, filename=None):
249         """open(streampath[, filename]) -> file-like object
250
251         Pass in the path of a file to write to the Collection, either as a
252         single string or as two separate stream name and file name arguments.
253         This method returns a file-like object you can write to add it to the
254         Collection.
255
256         You may only have one file object from the Collection open at a time,
257         so be sure to close the object when you're done.  Using the object in
258         a with statement makes that easy::
259
260           with cwriter.open('./doc/page1.txt') as outfile:
261               outfile.write(page1_data)
262           with cwriter.open('./doc/page2.txt') as outfile:
263               outfile.write(page2_data)
264         """
265         if filename is None:
266             streampath, filename = split(streampath)
267         if self._last_open and not self._last_open.closed:
268             raise errors.AssertionError(
269                 "can't open '{}' when '{}' is still open".format(
270                     filename, self._last_open.name))
271         if streampath != self.current_stream_name():
272             self.start_new_stream(streampath)
273         self.set_current_file_name(filename)
274         self._last_open = _WriterFile(self, filename)
275         return self._last_open
276
277     def flush_data(self):
278         data_buffer = b''.join(self._data_buffer)
279         if data_buffer:
280             self._current_stream_locators.append(
281                 self._my_keep().put(
282                     data_buffer[0:config.KEEP_BLOCK_SIZE],
283                     copies=self.replication))
284             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
285             self._data_buffer_len = len(self._data_buffer[0])
286
287     def start_new_file(self, newfilename=None):
288         self.finish_current_file()
289         self.set_current_file_name(newfilename)
290
291     def set_current_file_name(self, newfilename):
292         if re.search(r'[\t\n]', newfilename):
293             raise errors.AssertionError(
294                 "Manifest filenames cannot contain whitespace: %s" %
295                 newfilename)
296         elif re.search(r'\x00', newfilename):
297             raise errors.AssertionError(
298                 "Manifest filenames cannot contain NUL characters: %s" %
299                 newfilename)
300         self._current_file_name = newfilename
301
302     def current_file_name(self):
303         return self._current_file_name
304
305     def finish_current_file(self):
306         if self._current_file_name is None:
307             if self._current_file_pos == self._current_stream_length:
308                 return
309             raise errors.AssertionError(
310                 "Cannot finish an unnamed file " +
311                 "(%d bytes at offset %d in '%s' stream)" %
312                 (self._current_stream_length - self._current_file_pos,
313                  self._current_file_pos,
314                  self._current_stream_name))
315         self._current_stream_files.append([
316                 self._current_file_pos,
317                 self._current_stream_length - self._current_file_pos,
318                 self._current_file_name])
319         self._current_file_pos = self._current_stream_length
320         self._current_file_name = None
321
322     def start_new_stream(self, newstreamname='.'):
323         self.finish_current_stream()
324         self.set_current_stream_name(newstreamname)
325
326     def set_current_stream_name(self, newstreamname):
327         if re.search(r'[\t\n]', newstreamname):
328             raise errors.AssertionError(
329                 "Manifest stream names cannot contain whitespace: '%s'" %
330                 (newstreamname))
331         self._current_stream_name = '.' if newstreamname=='' else newstreamname
332
333     def current_stream_name(self):
334         return self._current_stream_name
335
336     def finish_current_stream(self):
337         self.finish_current_file()
338         self.flush_data()
339         if not self._current_stream_files:
340             pass
341         elif self._current_stream_name is None:
342             raise errors.AssertionError(
343                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
344                 (self._current_stream_length, len(self._current_stream_files)))
345         else:
346             if not self._current_stream_locators:
347                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
348             self._finished_streams.append([self._current_stream_name,
349                                            self._current_stream_locators,
350                                            self._current_stream_files])
351         self._current_stream_files = []
352         self._current_stream_length = 0
353         self._current_stream_locators = []
354         self._current_stream_name = None
355         self._current_file_pos = 0
356         self._current_file_name = None
357
358     def finish(self):
359         """Store the manifest in Keep and return its locator.
360
361         This is useful for storing manifest fragments (task outputs)
362         temporarily in Keep during a Crunch job.
363
364         In other cases you should make a collection instead, by
365         sending manifest_text() to the API server's "create
366         collection" endpoint.
367         """
368         return self._my_keep().put(self.manifest_text().encode(),
369                                    copies=self.replication)
370
371     def portable_data_hash(self):
372         stripped = self.stripped_manifest().encode()
373         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
374
375     def manifest_text(self):
376         self.finish_current_stream()
377         manifest = ''
378
379         for stream in self._finished_streams:
380             if not re.search(r'^\.(/.*)?$', stream[0]):
381                 manifest += './'
382             manifest += stream[0].replace(' ', '\\040')
383             manifest += ' ' + ' '.join(stream[1])
384             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
385             manifest += "\n"
386
387         return manifest
388
389     def data_locators(self):
390         ret = []
391         for name, locators, files in self._finished_streams:
392             ret += locators
393         return ret
394
395     def save_new(self, name=None):
396         return self._api_client.collections().create(
397             ensure_unique_name=True,
398             body={
399                 'name': name,
400                 'manifest_text': self.manifest_text(),
401             }).execute(num_retries=self.num_retries)
402
403
404 class ResumableCollectionWriter(CollectionWriter):
405     """Deprecated, use Collection instead."""
406
407     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
408                    '_current_stream_locators', '_current_stream_name',
409                    '_current_file_name', '_current_file_pos', '_close_file',
410                    '_data_buffer', '_dependencies', '_finished_streams',
411                    '_queued_dirents', '_queued_trees']
412
413     def __init__(self, api_client=None, **kwargs):
414         self._dependencies = {}
415         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
416
417     @classmethod
418     def from_state(cls, state, *init_args, **init_kwargs):
419         # Try to build a new writer from scratch with the given state.
420         # If the state is not suitable to resume (because files have changed,
421         # been deleted, aren't predictable, etc.), raise a
422         # StaleWriterStateError.  Otherwise, return the initialized writer.
423         # The caller is responsible for calling writer.do_queued_work()
424         # appropriately after it's returned.
425         writer = cls(*init_args, **init_kwargs)
426         for attr_name in cls.STATE_PROPS:
427             attr_value = state[attr_name]
428             attr_class = getattr(writer, attr_name).__class__
429             # Coerce the value into the same type as the initial value, if
430             # needed.
431             if attr_class not in (type(None), attr_value.__class__):
432                 attr_value = attr_class(attr_value)
433             setattr(writer, attr_name, attr_value)
434         # Check dependencies before we try to resume anything.
435         if any(KeepLocator(ls).permission_expired()
436                for ls in writer._current_stream_locators):
437             raise errors.StaleWriterStateError(
438                 "locators include expired permission hint")
439         writer.check_dependencies()
440         if state['_current_file'] is not None:
441             path, pos = state['_current_file']
442             try:
443                 writer._queued_file = open(path, 'rb')
444                 writer._queued_file.seek(pos)
445             except IOError as error:
446                 raise errors.StaleWriterStateError(
447                     "failed to reopen active file {}: {}".format(path, error))
448         return writer
449
450     def check_dependencies(self):
451         for path, orig_stat in listitems(self._dependencies):
452             if not S_ISREG(orig_stat[ST_MODE]):
453                 raise errors.StaleWriterStateError("{} not file".format(path))
454             try:
455                 now_stat = tuple(os.stat(path))
456             except OSError as error:
457                 raise errors.StaleWriterStateError(
458                     "failed to stat {}: {}".format(path, error))
459             if ((not S_ISREG(now_stat[ST_MODE])) or
460                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
461                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
462                 raise errors.StaleWriterStateError("{} changed".format(path))
463
464     def dump_state(self, copy_func=lambda x: x):
465         state = {attr: copy_func(getattr(self, attr))
466                  for attr in self.STATE_PROPS}
467         if self._queued_file is None:
468             state['_current_file'] = None
469         else:
470             state['_current_file'] = (os.path.realpath(self._queued_file.name),
471                                       self._queued_file.tell())
472         return state
473
474     def _queue_file(self, source, filename=None):
475         try:
476             src_path = os.path.realpath(source)
477         except Exception:
478             raise errors.AssertionError("{} not a file path".format(source))
479         try:
480             path_stat = os.stat(src_path)
481         except OSError as stat_error:
482             path_stat = None
483         super(ResumableCollectionWriter, self)._queue_file(source, filename)
484         fd_stat = os.fstat(self._queued_file.fileno())
485         if not S_ISREG(fd_stat.st_mode):
486             # We won't be able to resume from this cache anyway, so don't
487             # worry about further checks.
488             self._dependencies[source] = tuple(fd_stat)
489         elif path_stat is None:
490             raise errors.AssertionError(
491                 "could not stat {}: {}".format(source, stat_error))
492         elif path_stat.st_ino != fd_stat.st_ino:
493             raise errors.AssertionError(
494                 "{} changed between open and stat calls".format(source))
495         else:
496             self._dependencies[src_path] = tuple(fd_stat)
497
498     def write(self, data):
499         if self._queued_file is None:
500             raise errors.AssertionError(
501                 "resumable writer can't accept unsourced data")
502         return super(ResumableCollectionWriter, self).write(data)
503
504
505 ADD = "add"
506 DEL = "del"
507 MOD = "mod"
508 TOK = "tok"
509 FILE = "file"
510 COLLECTION = "collection"
511
512 class RichCollectionBase(CollectionBase):
513     """Base class for Collections and Subcollections.
514
515     Implements the majority of functionality relating to accessing items in the
516     Collection.
517
518     """
519
520     def __init__(self, parent=None):
521         self.parent = parent
522         self._committed = False
523         self._has_remote_blocks = False
524         self._callback = None
525         self._items = {}
526
527     def _my_api(self):
528         raise NotImplementedError()
529
530     def _my_keep(self):
531         raise NotImplementedError()
532
533     def _my_block_manager(self):
534         raise NotImplementedError()
535
536     def writable(self):
537         raise NotImplementedError()
538
539     def root_collection(self):
540         raise NotImplementedError()
541
542     def notify(self, event, collection, name, item):
543         raise NotImplementedError()
544
545     def stream_name(self):
546         raise NotImplementedError()
547
548     @synchronized
549     def has_remote_blocks(self):
550         """Recursively check for a +R segment locator signature."""
551
552         for item in self:
553             if self[item].has_remote_blocks():
554                 return True
555         return False
556
557     @must_be_writable
558     @synchronized
559     def find_or_create(self, path, create_type):
560         """Recursively search the specified file path.
561
562         May return either a `Collection` or `ArvadosFile`.  If not found, will
563         create a new item at the specified path based on `create_type`.  Will
564         create intermediate subcollections needed to contain the final item in
565         the path.
566
567         :create_type:
568           One of `arvados.collection.FILE` or
569           `arvados.collection.COLLECTION`.  If the path is not found, and value
570           of create_type is FILE then create and return a new ArvadosFile for
571           the last path component.  If COLLECTION, then create and return a new
572           Collection for the last path component.
573
574         """
575
576         pathcomponents = path.split("/", 1)
577         if pathcomponents[0]:
578             item = self._items.get(pathcomponents[0])
579             if len(pathcomponents) == 1:
580                 if item is None:
581                     # create new file
582                     if create_type == COLLECTION:
583                         item = Subcollection(self, pathcomponents[0])
584                     else:
585                         item = ArvadosFile(self, pathcomponents[0])
586                     self._items[pathcomponents[0]] = item
587                     self.set_committed(False)
588                     self.notify(ADD, self, pathcomponents[0], item)
589                 return item
590             else:
591                 if item is None:
592                     # create new collection
593                     item = Subcollection(self, pathcomponents[0])
594                     self._items[pathcomponents[0]] = item
595                     self.set_committed(False)
596                     self.notify(ADD, self, pathcomponents[0], item)
597                 if isinstance(item, RichCollectionBase):
598                     return item.find_or_create(pathcomponents[1], create_type)
599                 else:
600                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
601         else:
602             return self
603
604     @synchronized
605     def find(self, path):
606         """Recursively search the specified file path.
607
608         May return either a Collection or ArvadosFile. Return None if not
609         found.
610         If path is invalid (ex: starts with '/'), an IOError exception will be
611         raised.
612
613         """
614         if not path:
615             raise errors.ArgumentError("Parameter 'path' is empty.")
616
617         pathcomponents = path.split("/", 1)
618         if pathcomponents[0] == '':
619             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
620
621         item = self._items.get(pathcomponents[0])
622         if item is None:
623             return None
624         elif len(pathcomponents) == 1:
625             return item
626         else:
627             if isinstance(item, RichCollectionBase):
628                 if pathcomponents[1]:
629                     return item.find(pathcomponents[1])
630                 else:
631                     return item
632             else:
633                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
634
635     @synchronized
636     def mkdirs(self, path):
637         """Recursive subcollection create.
638
639         Like `os.makedirs()`.  Will create intermediate subcollections needed
640         to contain the leaf subcollection path.
641
642         """
643
644         if self.find(path) != None:
645             raise IOError(errno.EEXIST, "Directory or file exists", path)
646
647         return self.find_or_create(path, COLLECTION)
648
649     def open(self, path, mode="r"):
650         """Open a file-like object for access.
651
652         :path:
653           path to a file in the collection
654         :mode:
655           a string consisting of "r", "w", or "a", optionally followed
656           by "b" or "t", optionally followed by "+".
657           :"b":
658             binary mode: write() accepts bytes, read() returns bytes.
659           :"t":
660             text mode (default): write() accepts strings, read() returns strings.
661           :"r":
662             opens for reading
663           :"r+":
664             opens for reading and writing.  Reads/writes share a file pointer.
665           :"w", "w+":
666             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
667           :"a", "a+":
668             opens for reading and writing.  All writes are appended to
669             the end of the file.  Writing does not affect the file pointer for
670             reading.
671         """
672
673         if not re.search(r'^[rwa][bt]?\+?$', mode):
674             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
675
676         if mode[0] == 'r' and '+' not in mode:
677             fclass = ArvadosFileReader
678             arvfile = self.find(path)
679         elif not self.writable():
680             raise IOError(errno.EROFS, "Collection is read only")
681         else:
682             fclass = ArvadosFileWriter
683             arvfile = self.find_or_create(path, FILE)
684
685         if arvfile is None:
686             raise IOError(errno.ENOENT, "File not found", path)
687         if not isinstance(arvfile, ArvadosFile):
688             raise IOError(errno.EISDIR, "Is a directory", path)
689
690         if mode[0] == 'w':
691             arvfile.truncate(0)
692
693         return fclass(arvfile, mode=mode, num_retries=self.num_retries)
694
695     def modified(self):
696         """Determine if the collection has been modified since last commited."""
697         return not self.committed()
698
699     @synchronized
700     def committed(self):
701         """Determine if the collection has been committed to the API server."""
702         return self._committed
703
704     @synchronized
705     def set_committed(self, value=True):
706         """Recursively set committed flag.
707
708         If value is True, set committed to be True for this and all children.
709
710         If value is False, set committed to be False for this and all parents.
711         """
712         if value == self._committed:
713             return
714         if value:
715             for k,v in listitems(self._items):
716                 v.set_committed(True)
717             self._committed = True
718         else:
719             self._committed = False
720             if self.parent is not None:
721                 self.parent.set_committed(False)
722
723     @synchronized
724     def __iter__(self):
725         """Iterate over names of files and collections contained in this collection."""
726         return iter(viewkeys(self._items))
727
728     @synchronized
729     def __getitem__(self, k):
730         """Get a file or collection that is directly contained by this collection.
731
732         If you want to search a path, use `find()` instead.
733
734         """
735         return self._items[k]
736
737     @synchronized
738     def __contains__(self, k):
739         """Test if there is a file or collection a directly contained by this collection."""
740         return k in self._items
741
742     @synchronized
743     def __len__(self):
744         """Get the number of items directly contained in this collection."""
745         return len(self._items)
746
747     @must_be_writable
748     @synchronized
749     def __delitem__(self, p):
750         """Delete an item by name which is directly contained by this collection."""
751         del self._items[p]
752         self.set_committed(False)
753         self.notify(DEL, self, p, None)
754
755     @synchronized
756     def keys(self):
757         """Get a list of names of files and collections directly contained in this collection."""
758         return self._items.keys()
759
760     @synchronized
761     def values(self):
762         """Get a list of files and collection objects directly contained in this collection."""
763         return listvalues(self._items)
764
765     @synchronized
766     def items(self):
767         """Get a list of (name, object) tuples directly contained in this collection."""
768         return listitems(self._items)
769
770     def exists(self, path):
771         """Test if there is a file or collection at `path`."""
772         return self.find(path) is not None
773
774     @must_be_writable
775     @synchronized
776     def remove(self, path, recursive=False):
777         """Remove the file or subcollection (directory) at `path`.
778
779         :recursive:
780           Specify whether to remove non-empty subcollections (True), or raise an error (False).
781         """
782
783         if not path:
784             raise errors.ArgumentError("Parameter 'path' is empty.")
785
786         pathcomponents = path.split("/", 1)
787         item = self._items.get(pathcomponents[0])
788         if item is None:
789             raise IOError(errno.ENOENT, "File not found", path)
790         if len(pathcomponents) == 1:
791             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
792                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
793             deleteditem = self._items[pathcomponents[0]]
794             del self._items[pathcomponents[0]]
795             self.set_committed(False)
796             self.notify(DEL, self, pathcomponents[0], deleteditem)
797         else:
798             item.remove(pathcomponents[1])
799
800     def _clonefrom(self, source):
801         for k,v in listitems(source):
802             self._items[k] = v.clone(self, k)
803
804     def clone(self):
805         raise NotImplementedError()
806
807     @must_be_writable
808     @synchronized
809     def add(self, source_obj, target_name, overwrite=False, reparent=False):
810         """Copy or move a file or subcollection to this collection.
811
812         :source_obj:
813           An ArvadosFile, or Subcollection object
814
815         :target_name:
816           Destination item name.  If the target name already exists and is a
817           file, this will raise an error unless you specify `overwrite=True`.
818
819         :overwrite:
820           Whether to overwrite target file if it already exists.
821
822         :reparent:
823           If True, source_obj will be moved from its parent collection to this collection.
824           If False, source_obj will be copied and the parent collection will be
825           unmodified.
826
827         """
828
829         if target_name in self and not overwrite:
830             raise IOError(errno.EEXIST, "File already exists", target_name)
831
832         modified_from = None
833         if target_name in self:
834             modified_from = self[target_name]
835
836         # Actually make the move or copy.
837         if reparent:
838             source_obj._reparent(self, target_name)
839             item = source_obj
840         else:
841             item = source_obj.clone(self, target_name)
842
843         self._items[target_name] = item
844         self.set_committed(False)
845
846         if modified_from:
847             self.notify(MOD, self, target_name, (modified_from, item))
848         else:
849             self.notify(ADD, self, target_name, item)
850
851     def _get_src_target(self, source, target_path, source_collection, create_dest):
852         if source_collection is None:
853             source_collection = self
854
855         # Find the object
856         if isinstance(source, basestring):
857             source_obj = source_collection.find(source)
858             if source_obj is None:
859                 raise IOError(errno.ENOENT, "File not found", source)
860             sourcecomponents = source.split("/")
861         else:
862             source_obj = source
863             sourcecomponents = None
864
865         # Find parent collection the target path
866         targetcomponents = target_path.split("/")
867
868         # Determine the name to use.
869         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
870
871         if not target_name:
872             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
873
874         if create_dest:
875             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
876         else:
877             if len(targetcomponents) > 1:
878                 target_dir = self.find("/".join(targetcomponents[0:-1]))
879             else:
880                 target_dir = self
881
882         if target_dir is None:
883             raise IOError(errno.ENOENT, "Target directory not found", target_name)
884
885         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
886             target_dir = target_dir[target_name]
887             target_name = sourcecomponents[-1]
888
889         return (source_obj, target_dir, target_name)
890
891     @must_be_writable
892     @synchronized
893     def copy(self, source, target_path, source_collection=None, overwrite=False):
894         """Copy a file or subcollection to a new path in this collection.
895
896         :source:
897           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
898
899         :target_path:
900           Destination file or path.  If the target path already exists and is a
901           subcollection, the item will be placed inside the subcollection.  If
902           the target path already exists and is a file, this will raise an error
903           unless you specify `overwrite=True`.
904
905         :source_collection:
906           Collection to copy `source_path` from (default `self`)
907
908         :overwrite:
909           Whether to overwrite target file if it already exists.
910         """
911
912         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
913         target_dir.add(source_obj, target_name, overwrite, False)
914         if not self._has_remote_blocks and source_obj.has_remote_blocks():
915             self._has_remote_blocks = True
916
917     @must_be_writable
918     @synchronized
919     def rename(self, source, target_path, source_collection=None, overwrite=False):
920         """Move a file or subcollection from `source_collection` to a new path in this collection.
921
922         :source:
923           A string with a path to source file or subcollection.
924
925         :target_path:
926           Destination file or path.  If the target path already exists and is a
927           subcollection, the item will be placed inside the subcollection.  If
928           the target path already exists and is a file, this will raise an error
929           unless you specify `overwrite=True`.
930
931         :source_collection:
932           Collection to copy `source_path` from (default `self`)
933
934         :overwrite:
935           Whether to overwrite target file if it already exists.
936         """
937
938         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
939         if not source_obj.writable():
940             raise IOError(errno.EROFS, "Source collection is read only", source)
941         target_dir.add(source_obj, target_name, overwrite, True)
942         if not self._has_remote_blocks and source_obj.has_remote_blocks():
943             self._has_remote_blocks = True
944
945     def portable_manifest_text(self, stream_name="."):
946         """Get the manifest text for this collection, sub collections and files.
947
948         This method does not flush outstanding blocks to Keep.  It will return
949         a normalized manifest with access tokens stripped.
950
951         :stream_name:
952           Name to use for this stream (directory)
953
954         """
955         return self._get_manifest_text(stream_name, True, True)
956
957     @synchronized
958     def manifest_text(self, stream_name=".", strip=False, normalize=False,
959                       only_committed=False):
960         """Get the manifest text for this collection, sub collections and files.
961
962         This method will flush outstanding blocks to Keep.  By default, it will
963         not normalize an unmodified manifest or strip access tokens.
964
965         :stream_name:
966           Name to use for this stream (directory)
967
968         :strip:
969           If True, remove signing tokens from block locators if present.
970           If False (default), block locators are left unchanged.
971
972         :normalize:
973           If True, always export the manifest text in normalized form
974           even if the Collection is not modified.  If False (default) and the collection
975           is not modified, return the original manifest text even if it is not
976           in normalized form.
977
978         :only_committed:
979           If True, don't commit pending blocks.
980
981         """
982
983         if not only_committed:
984             self._my_block_manager().commit_all()
985         return self._get_manifest_text(stream_name, strip, normalize,
986                                        only_committed=only_committed)
987
988     @synchronized
989     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
990         """Get the manifest text for this collection, sub collections and files.
991
992         :stream_name:
993           Name to use for this stream (directory)
994
995         :strip:
996           If True, remove signing tokens from block locators if present.
997           If False (default), block locators are left unchanged.
998
999         :normalize:
1000           If True, always export the manifest text in normalized form
1001           even if the Collection is not modified.  If False (default) and the collection
1002           is not modified, return the original manifest text even if it is not
1003           in normalized form.
1004
1005         :only_committed:
1006           If True, only include blocks that were already committed to Keep.
1007
1008         """
1009
1010         if not self.committed() or self._manifest_text is None or normalize:
1011             stream = {}
1012             buf = []
1013             sorted_keys = sorted(self.keys())
1014             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1015                 # Create a stream per file `k`
1016                 arvfile = self[filename]
1017                 filestream = []
1018                 for segment in arvfile.segments():
1019                     loc = segment.locator
1020                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
1021                         if only_committed:
1022                             continue
1023                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1024                     if strip:
1025                         loc = KeepLocator(loc).stripped()
1026                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1027                                          segment.segment_offset, segment.range_size))
1028                 stream[filename] = filestream
1029             if stream:
1030                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1031             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1032                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1033             return "".join(buf)
1034         else:
1035             if strip:
1036                 return self.stripped_manifest()
1037             else:
1038                 return self._manifest_text
1039
1040     @synchronized
1041     def _copy_remote_blocks(self, remote_blocks={}):
1042         """Scan through the entire collection and ask Keep to copy remote blocks.
1043
1044         When accessing a remote collection, blocks will have a remote signature
1045         (+R instead of +A). Collect these signatures and request Keep to copy the
1046         blocks to the local cluster, returning local (+A) signatures.
1047
1048         :remote_blocks:
1049           Shared cache of remote to local block mappings. This is used to avoid
1050           doing extra work when blocks are shared by more than one file in
1051           different subdirectories.
1052
1053         """
1054         for item in self:
1055             if isinstance(self[item], ArvadosFile):
1056                 for s in self[item].segments():
1057                     if '+R' in s.locator:
1058                         try:
1059                             loc = remote_blocks[s.locator]
1060                         except KeyError:
1061                             loc = self._my_keep().refresh_signature(s.locator)
1062                             remote_blocks[s.locator] = loc
1063                         s.locator = loc
1064                         self.set_committed(False)
1065             elif isinstance(self[item], RichCollectionBase):
1066                 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1067         return remote_blocks
1068
1069     @synchronized
1070     def diff(self, end_collection, prefix=".", holding_collection=None):
1071         """Generate list of add/modify/delete actions.
1072
1073         When given to `apply`, will change `self` to match `end_collection`
1074
1075         """
1076         changes = []
1077         if holding_collection is None:
1078             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1079         for k in self:
1080             if k not in end_collection:
1081                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1082         for k in end_collection:
1083             if k in self:
1084                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1085                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1086                 elif end_collection[k] != self[k]:
1087                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1088                 else:
1089                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1090             else:
1091                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1092         return changes
1093
1094     @must_be_writable
1095     @synchronized
1096     def apply(self, changes):
1097         """Apply changes from `diff`.
1098
1099         If a change conflicts with a local change, it will be saved to an
1100         alternate path indicating the conflict.
1101
1102         """
1103         if changes:
1104             self.set_committed(False)
1105         for change in changes:
1106             event_type = change[0]
1107             path = change[1]
1108             initial = change[2]
1109             local = self.find(path)
1110             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1111                                                                     time.gmtime()))
1112             if event_type == ADD:
1113                 if local is None:
1114                     # No local file at path, safe to copy over new file
1115                     self.copy(initial, path)
1116                 elif local is not None and local != initial:
1117                     # There is already local file and it is different:
1118                     # save change to conflict file.
1119                     self.copy(initial, conflictpath)
1120             elif event_type == MOD or event_type == TOK:
1121                 final = change[3]
1122                 if local == initial:
1123                     # Local matches the "initial" item so it has not
1124                     # changed locally and is safe to update.
1125                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1126                         # Replace contents of local file with new contents
1127                         local.replace_contents(final)
1128                     else:
1129                         # Overwrite path with new item; this can happen if
1130                         # path was a file and is now a collection or vice versa
1131                         self.copy(final, path, overwrite=True)
1132                 else:
1133                     # Local is missing (presumably deleted) or local doesn't
1134                     # match the "start" value, so save change to conflict file
1135                     self.copy(final, conflictpath)
1136             elif event_type == DEL:
1137                 if local == initial:
1138                     # Local item matches "initial" value, so it is safe to remove.
1139                     self.remove(path, recursive=True)
1140                 # else, the file is modified or already removed, in either
1141                 # case we don't want to try to remove it.
1142
1143     def portable_data_hash(self):
1144         """Get the portable data hash for this collection's manifest."""
1145         if self._manifest_locator and self.committed():
1146             # If the collection is already saved on the API server, and it's committed
1147             # then return API server's PDH response.
1148             return self._portable_data_hash
1149         else:
1150             stripped = self.portable_manifest_text().encode()
1151             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1152
1153     @synchronized
1154     def subscribe(self, callback):
1155         if self._callback is None:
1156             self._callback = callback
1157         else:
1158             raise errors.ArgumentError("A callback is already set on this collection.")
1159
1160     @synchronized
1161     def unsubscribe(self):
1162         if self._callback is not None:
1163             self._callback = None
1164
1165     @synchronized
1166     def notify(self, event, collection, name, item):
1167         if self._callback:
1168             self._callback(event, collection, name, item)
1169         self.root_collection().notify(event, collection, name, item)
1170
1171     @synchronized
1172     def __eq__(self, other):
1173         if other is self:
1174             return True
1175         if not isinstance(other, RichCollectionBase):
1176             return False
1177         if len(self._items) != len(other):
1178             return False
1179         for k in self._items:
1180             if k not in other:
1181                 return False
1182             if self._items[k] != other[k]:
1183                 return False
1184         return True
1185
1186     def __ne__(self, other):
1187         return not self.__eq__(other)
1188
1189     @synchronized
1190     def flush(self):
1191         """Flush bufferblocks to Keep."""
1192         for e in listvalues(self):
1193             e.flush()
1194
1195
1196 class Collection(RichCollectionBase):
1197     """Represents the root of an Arvados Collection.
1198
1199     This class is threadsafe.  The root collection object, all subcollections
1200     and files are protected by a single lock (i.e. each access locks the entire
1201     collection).
1202
1203     Brief summary of
1204     useful methods:
1205
1206     :To read an existing file:
1207       `c.open("myfile", "r")`
1208
1209     :To write a new file:
1210       `c.open("myfile", "w")`
1211
1212     :To determine if a file exists:
1213       `c.find("myfile") is not None`
1214
1215     :To copy a file:
1216       `c.copy("source", "dest")`
1217
1218     :To delete a file:
1219       `c.remove("myfile")`
1220
1221     :To save to an existing collection record:
1222       `c.save()`
1223
1224     :To save a new collection record:
1225     `c.save_new()`
1226
1227     :To merge remote changes into this object:
1228       `c.update()`
1229
1230     Must be associated with an API server Collection record (during
1231     initialization, or using `save_new`) to use `save` or `update`
1232
1233     """
1234
1235     def __init__(self, manifest_locator_or_text=None,
1236                  api_client=None,
1237                  keep_client=None,
1238                  num_retries=None,
1239                  parent=None,
1240                  apiconfig=None,
1241                  block_manager=None,
1242                  replication_desired=None,
1243                  put_threads=None):
1244         """Collection constructor.
1245
1246         :manifest_locator_or_text:
1247           An Arvados collection UUID, portable data hash, raw manifest
1248           text, or (if creating an empty collection) None.
1249
1250         :parent:
1251           the parent Collection, may be None.
1252
1253         :apiconfig:
1254           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1255           Prefer this over supplying your own api_client and keep_client (except in testing).
1256           Will use default config settings if not specified.
1257
1258         :api_client:
1259           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1260
1261         :keep_client:
1262           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1263
1264         :num_retries:
1265           the number of retries for API and Keep requests.
1266
1267         :block_manager:
1268           the block manager to use.  If not specified, create one.
1269
1270         :replication_desired:
1271           How many copies should Arvados maintain. If None, API server default
1272           configuration applies. If not None, this value will also be used
1273           for determining the number of block copies being written.
1274
1275         """
1276         super(Collection, self).__init__(parent)
1277         self._api_client = api_client
1278         self._keep_client = keep_client
1279         self._block_manager = block_manager
1280         self.replication_desired = replication_desired
1281         self.put_threads = put_threads
1282
1283         if apiconfig:
1284             self._config = apiconfig
1285         else:
1286             self._config = config.settings()
1287
1288         self.num_retries = num_retries if num_retries is not None else 0
1289         self._manifest_locator = None
1290         self._manifest_text = None
1291         self._portable_data_hash = None
1292         self._api_response = None
1293         self._past_versions = set()
1294
1295         self.lock = threading.RLock()
1296         self.events = None
1297
1298         if manifest_locator_or_text:
1299             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1300                 self._manifest_locator = manifest_locator_or_text
1301             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1302                 self._manifest_locator = manifest_locator_or_text
1303                 if not self._has_local_collection_uuid():
1304                     self._has_remote_blocks = True
1305             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1306                 self._manifest_text = manifest_locator_or_text
1307                 if '+R' in self._manifest_text:
1308                     self._has_remote_blocks = True
1309             else:
1310                 raise errors.ArgumentError(
1311                     "Argument to CollectionReader is not a manifest or a collection UUID")
1312
1313             try:
1314                 self._populate()
1315             except (IOError, errors.SyntaxError) as e:
1316                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1317
1318     def root_collection(self):
1319         return self
1320
1321     def get_properties(self):
1322         if self._api_response and self._api_response["properties"]:
1323             return self._api_response["properties"]
1324         else:
1325             return {}
1326
1327     def get_trash_at(self):
1328         if self._api_response and self._api_response["trash_at"]:
1329             return ciso8601.parse_datetime(self._api_response["trash_at"])
1330         else:
1331             return None
1332
1333     def stream_name(self):
1334         return "."
1335
1336     def writable(self):
1337         return True
1338
1339     @synchronized
1340     def known_past_version(self, modified_at_and_portable_data_hash):
1341         return modified_at_and_portable_data_hash in self._past_versions
1342
1343     @synchronized
1344     @retry_method
1345     def update(self, other=None, num_retries=None):
1346         """Merge the latest collection on the API server with the current collection."""
1347
1348         if other is None:
1349             if self._manifest_locator is None:
1350                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1351             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1352             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1353                 response.get("portable_data_hash") != self.portable_data_hash()):
1354                 # The record on the server is different from our current one, but we've seen it before,
1355                 # so ignore it because it's already been merged.
1356                 # However, if it's the same as our current record, proceed with the update, because we want to update
1357                 # our tokens.
1358                 return
1359             else:
1360                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1361             other = CollectionReader(response["manifest_text"])
1362         baseline = CollectionReader(self._manifest_text)
1363         self.apply(baseline.diff(other))
1364         self._manifest_text = self.manifest_text()
1365
1366     @synchronized
1367     def _my_api(self):
1368         if self._api_client is None:
1369             self._api_client = ThreadSafeApiCache(self._config)
1370             if self._keep_client is None:
1371                 self._keep_client = self._api_client.keep
1372         return self._api_client
1373
1374     @synchronized
1375     def _my_keep(self):
1376         if self._keep_client is None:
1377             if self._api_client is None:
1378                 self._my_api()
1379             else:
1380                 self._keep_client = KeepClient(api_client=self._api_client)
1381         return self._keep_client
1382
1383     @synchronized
1384     def _my_block_manager(self):
1385         if self._block_manager is None:
1386             copies = (self.replication_desired or
1387                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1388                                                    2))
1389             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1390         return self._block_manager
1391
1392     def _remember_api_response(self, response):
1393         self._api_response = response
1394         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1395
1396     def _populate_from_api_server(self):
1397         # As in KeepClient itself, we must wait until the last
1398         # possible moment to instantiate an API client, in order to
1399         # avoid tripping up clients that don't have access to an API
1400         # server.  If we do build one, make sure our Keep client uses
1401         # it.  If instantiation fails, we'll fall back to the except
1402         # clause, just like any other Collection lookup
1403         # failure. Return an exception, or None if successful.
1404         self._remember_api_response(self._my_api().collections().get(
1405             uuid=self._manifest_locator).execute(
1406                 num_retries=self.num_retries))
1407         self._manifest_text = self._api_response['manifest_text']
1408         self._portable_data_hash = self._api_response['portable_data_hash']
1409         # If not overriden via kwargs, we should try to load the
1410         # replication_desired from the API server
1411         if self.replication_desired is None:
1412             self.replication_desired = self._api_response.get('replication_desired', None)
1413
1414     def _populate(self):
1415         if self._manifest_text is None:
1416             if self._manifest_locator is None:
1417                 return
1418             else:
1419                 self._populate_from_api_server()
1420         self._baseline_manifest = self._manifest_text
1421         self._import_manifest(self._manifest_text)
1422
1423     def _has_collection_uuid(self):
1424         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1425
1426     def _has_local_collection_uuid(self):
1427         return self._has_collection_uuid and \
1428             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1429
1430     def __enter__(self):
1431         return self
1432
1433     def __exit__(self, exc_type, exc_value, traceback):
1434         """Support scoped auto-commit in a with: block."""
1435         if exc_type is None:
1436             if self.writable() and self._has_collection_uuid():
1437                 self.save()
1438         self.stop_threads()
1439
1440     def stop_threads(self):
1441         if self._block_manager is not None:
1442             self._block_manager.stop_threads()
1443
1444     @synchronized
1445     def manifest_locator(self):
1446         """Get the manifest locator, if any.
1447
1448         The manifest locator will be set when the collection is loaded from an
1449         API server record or the portable data hash of a manifest.
1450
1451         The manifest locator will be None if the collection is newly created or
1452         was created directly from manifest text.  The method `save_new()` will
1453         assign a manifest locator.
1454
1455         """
1456         return self._manifest_locator
1457
1458     @synchronized
1459     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1460         if new_config is None:
1461             new_config = self._config
1462         if readonly:
1463             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1464         else:
1465             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1466
1467         newcollection._clonefrom(self)
1468         return newcollection
1469
1470     @synchronized
1471     def api_response(self):
1472         """Returns information about this Collection fetched from the API server.
1473
1474         If the Collection exists in Keep but not the API server, currently
1475         returns None.  Future versions may provide a synthetic response.
1476
1477         """
1478         return self._api_response
1479
1480     def find_or_create(self, path, create_type):
1481         """See `RichCollectionBase.find_or_create`"""
1482         if path == ".":
1483             return self
1484         else:
1485             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1486
1487     def find(self, path):
1488         """See `RichCollectionBase.find`"""
1489         if path == ".":
1490             return self
1491         else:
1492             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1493
1494     def remove(self, path, recursive=False):
1495         """See `RichCollectionBase.remove`"""
1496         if path == ".":
1497             raise errors.ArgumentError("Cannot remove '.'")
1498         else:
1499             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1500
1501     @must_be_writable
1502     @synchronized
1503     @retry_method
1504     def save(self,
1505              properties=None,
1506              storage_classes=None,
1507              trash_at=None,
1508              merge=True,
1509              num_retries=None):
1510         """Save collection to an existing collection record.
1511
1512         Commit pending buffer blocks to Keep, merge with remote record (if
1513         merge=True, the default), and update the collection record. Returns
1514         the current manifest text.
1515
1516         Will raise AssertionError if not associated with a collection record on
1517         the API server.  If you want to save a manifest to Keep only, see
1518         `save_new()`.
1519
1520         :properties:
1521           Additional properties of collection. This value will replace any existing
1522           properties of collection.
1523
1524         :storage_classes:
1525           Specify desirable storage classes to be used when writing data to Keep.
1526
1527         :trash_at:
1528           A collection is *expiring* when it has a *trash_at* time in the future.
1529           An expiring collection can be accessed as normal,
1530           but is scheduled to be trashed automatically at the *trash_at* time.
1531
1532         :merge:
1533           Update and merge remote changes before saving.  Otherwise, any
1534           remote changes will be ignored and overwritten.
1535
1536         :num_retries:
1537           Retry count on API calls (if None,  use the collection default)
1538
1539         """
1540         if properties and type(properties) is not dict:
1541             raise errors.ArgumentError("properties must be dictionary type.")
1542
1543         if storage_classes and type(storage_classes) is not list:
1544             raise errors.ArgumentError("storage_classes must be list type.")
1545
1546         if trash_at and type(trash_at) is not datetime.datetime:
1547             raise errors.ArgumentError("trash_at must be datetime type.")
1548
1549         body={}
1550         if properties:
1551             body["properties"] = properties
1552         if storage_classes:
1553             body["storage_classes_desired"] = storage_classes
1554         if trash_at:
1555             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1556             body["trash_at"] = t
1557
1558         if not self.committed():
1559             if self._has_remote_blocks:
1560                 # Copy any remote blocks to the local cluster.
1561                 self._copy_remote_blocks(remote_blocks={})
1562                 self._has_remote_blocks = False
1563             if not self._has_collection_uuid():
1564                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1565             elif not self._has_local_collection_uuid():
1566                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1567
1568             self._my_block_manager().commit_all()
1569
1570             if merge:
1571                 self.update()
1572
1573             text = self.manifest_text(strip=False)
1574             body['manifest_text'] = text
1575
1576             self._remember_api_response(self._my_api().collections().update(
1577                 uuid=self._manifest_locator,
1578                 body=body
1579                 ).execute(num_retries=num_retries))
1580             self._manifest_text = self._api_response["manifest_text"]
1581             self._portable_data_hash = self._api_response["portable_data_hash"]
1582             self.set_committed(True)
1583         elif body:
1584             self._remember_api_response(self._my_api().collections().update(
1585                 uuid=self._manifest_locator,
1586                 body=body
1587                 ).execute(num_retries=num_retries))
1588
1589         return self._manifest_text
1590
1591
1592     @must_be_writable
1593     @synchronized
1594     @retry_method
1595     def save_new(self, name=None,
1596                  create_collection_record=True,
1597                  owner_uuid=None,
1598                  properties=None,
1599                  storage_classes=None,
1600                  trash_at=None,
1601                  ensure_unique_name=False,
1602                  num_retries=None):
1603         """Save collection to a new collection record.
1604
1605         Commit pending buffer blocks to Keep and, when create_collection_record
1606         is True (default), create a new collection record.  After creating a
1607         new collection record, this Collection object will be associated with
1608         the new record used by `save()`. Returns the current manifest text.
1609
1610         :name:
1611           The collection name.
1612
1613         :create_collection_record:
1614            If True, create a collection record on the API server.
1615            If False, only commit blocks to Keep and return the manifest text.
1616
1617         :owner_uuid:
1618           the user, or project uuid that will own this collection.
1619           If None, defaults to the current user.
1620
1621         :properties:
1622           Additional properties of collection. This value will replace any existing
1623           properties of collection.
1624
1625         :storage_classes:
1626           Specify desirable storage classes to be used when writing data to Keep.
1627
1628         :trash_at:
1629           A collection is *expiring* when it has a *trash_at* time in the future.
1630           An expiring collection can be accessed as normal,
1631           but is scheduled to be trashed automatically at the *trash_at* time.
1632
1633         :ensure_unique_name:
1634           If True, ask the API server to rename the collection
1635           if it conflicts with a collection with the same name and owner.  If
1636           False, a name conflict will result in an error.
1637
1638         :num_retries:
1639           Retry count on API calls (if None,  use the collection default)
1640
1641         """
1642         if properties and type(properties) is not dict:
1643             raise errors.ArgumentError("properties must be dictionary type.")
1644
1645         if storage_classes and type(storage_classes) is not list:
1646             raise errors.ArgumentError("storage_classes must be list type.")
1647
1648         if trash_at and type(trash_at) is not datetime.datetime:
1649             raise errors.ArgumentError("trash_at must be datetime type.")
1650
1651         if self._has_remote_blocks:
1652             # Copy any remote blocks to the local cluster.
1653             self._copy_remote_blocks(remote_blocks={})
1654             self._has_remote_blocks = False
1655
1656         self._my_block_manager().commit_all()
1657         text = self.manifest_text(strip=False)
1658
1659         if create_collection_record:
1660             if name is None:
1661                 name = "New collection"
1662                 ensure_unique_name = True
1663
1664             body = {"manifest_text": text,
1665                     "name": name,
1666                     "replication_desired": self.replication_desired}
1667             if owner_uuid:
1668                 body["owner_uuid"] = owner_uuid
1669             if properties:
1670                 body["properties"] = properties
1671             if storage_classes:
1672                 body["storage_classes_desired"] = storage_classes
1673             if trash_at:
1674                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1675                 body["trash_at"] = t
1676
1677             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1678             text = self._api_response["manifest_text"]
1679
1680             self._manifest_locator = self._api_response["uuid"]
1681             self._portable_data_hash = self._api_response["portable_data_hash"]
1682
1683             self._manifest_text = text
1684             self.set_committed(True)
1685
1686         return text
1687
1688     _token_re = re.compile(r'(\S+)(\s+|$)')
1689     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1690     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1691
1692     @synchronized
1693     def _import_manifest(self, manifest_text):
1694         """Import a manifest into a `Collection`.
1695
1696         :manifest_text:
1697           The manifest text to import from.
1698
1699         """
1700         if len(self) > 0:
1701             raise ArgumentError("Can only import manifest into an empty collection")
1702
1703         STREAM_NAME = 0
1704         BLOCKS = 1
1705         SEGMENTS = 2
1706
1707         stream_name = None
1708         state = STREAM_NAME
1709
1710         for token_and_separator in self._token_re.finditer(manifest_text):
1711             tok = token_and_separator.group(1)
1712             sep = token_and_separator.group(2)
1713
1714             if state == STREAM_NAME:
1715                 # starting a new stream
1716                 stream_name = tok.replace('\\040', ' ')
1717                 blocks = []
1718                 segments = []
1719                 streamoffset = 0
1720                 state = BLOCKS
1721                 self.find_or_create(stream_name, COLLECTION)
1722                 continue
1723
1724             if state == BLOCKS:
1725                 block_locator = self._block_re.match(tok)
1726                 if block_locator:
1727                     blocksize = int(block_locator.group(1))
1728                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1729                     streamoffset += blocksize
1730                 else:
1731                     state = SEGMENTS
1732
1733             if state == SEGMENTS:
1734                 file_segment = self._segment_re.match(tok)
1735                 if file_segment:
1736                     pos = int(file_segment.group(1))
1737                     size = int(file_segment.group(2))
1738                     name = file_segment.group(3).replace('\\040', ' ')
1739                     filepath = os.path.join(stream_name, name)
1740                     afile = self.find_or_create(filepath, FILE)
1741                     if isinstance(afile, ArvadosFile):
1742                         afile.add_segment(blocks, pos, size)
1743                     else:
1744                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1745                 else:
1746                     # error!
1747                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1748
1749             if sep == "\n":
1750                 stream_name = None
1751                 state = STREAM_NAME
1752
1753         self.set_committed(True)
1754
1755     @synchronized
1756     def notify(self, event, collection, name, item):
1757         if self._callback:
1758             self._callback(event, collection, name, item)
1759
1760
1761 class Subcollection(RichCollectionBase):
1762     """This is a subdirectory within a collection that doesn't have its own API
1763     server record.
1764
1765     Subcollection locking falls under the umbrella lock of its root collection.
1766
1767     """
1768
1769     def __init__(self, parent, name):
1770         super(Subcollection, self).__init__(parent)
1771         self.lock = self.root_collection().lock
1772         self._manifest_text = None
1773         self.name = name
1774         self.num_retries = parent.num_retries
1775
1776     def root_collection(self):
1777         return self.parent.root_collection()
1778
1779     def writable(self):
1780         return self.root_collection().writable()
1781
1782     def _my_api(self):
1783         return self.root_collection()._my_api()
1784
1785     def _my_keep(self):
1786         return self.root_collection()._my_keep()
1787
1788     def _my_block_manager(self):
1789         return self.root_collection()._my_block_manager()
1790
1791     def stream_name(self):
1792         return os.path.join(self.parent.stream_name(), self.name)
1793
1794     @synchronized
1795     def clone(self, new_parent, new_name):
1796         c = Subcollection(new_parent, new_name)
1797         c._clonefrom(self)
1798         return c
1799
1800     @must_be_writable
1801     @synchronized
1802     def _reparent(self, newparent, newname):
1803         self.set_committed(False)
1804         self.flush()
1805         self.parent.remove(self.name, recursive=True)
1806         self.parent = newparent
1807         self.name = newname
1808         self.lock = self.parent.root_collection().lock
1809
1810
1811 class CollectionReader(Collection):
1812     """A read-only collection object.
1813
1814     Initialize from a collection UUID or portable data hash, or raw
1815     manifest text.  See `Collection` constructor for detailed options.
1816
1817     """
1818     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1819         self._in_init = True
1820         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1821         self._in_init = False
1822
1823         # Forego any locking since it should never change once initialized.
1824         self.lock = NoopLock()
1825
1826         # Backwards compatability with old CollectionReader
1827         # all_streams() and all_files()
1828         self._streams = None
1829
1830     def writable(self):
1831         return self._in_init
1832
1833     def _populate_streams(orig_func):
1834         @functools.wraps(orig_func)
1835         def populate_streams_wrapper(self, *args, **kwargs):
1836             # Defer populating self._streams until needed since it creates a copy of the manifest.
1837             if self._streams is None:
1838                 if self._manifest_text:
1839                     self._streams = [sline.split()
1840                                      for sline in self._manifest_text.split("\n")
1841                                      if sline]
1842                 else:
1843                     self._streams = []
1844             return orig_func(self, *args, **kwargs)
1845         return populate_streams_wrapper
1846
1847     @_populate_streams
1848     def normalize(self):
1849         """Normalize the streams returned by `all_streams`.
1850
1851         This method is kept for backwards compatability and only affects the
1852         behavior of `all_streams()` and `all_files()`
1853
1854         """
1855
1856         # Rearrange streams
1857         streams = {}
1858         for s in self.all_streams():
1859             for f in s.all_files():
1860                 streamname, filename = split(s.name() + "/" + f.name())
1861                 if streamname not in streams:
1862                     streams[streamname] = {}
1863                 if filename not in streams[streamname]:
1864                     streams[streamname][filename] = []
1865                 for r in f.segments:
1866                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1867
1868         self._streams = [normalize_stream(s, streams[s])
1869                          for s in sorted(streams)]
1870     @_populate_streams
1871     def all_streams(self):
1872         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1873                 for s in self._streams]
1874
1875     @_populate_streams
1876     def all_files(self):
1877         for s in self.all_streams():
1878             for f in s.all_files():
1879                 yield f