Merge branch '14345-webdav-lock-and-empty-dir'
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import functools
11 import logging
12 import os
13 import re
14 import errno
15 import hashlib
16 import datetime
17 import ciso8601
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
25 from .keep import KeepLocator, KeepClient
26 from .stream import StreamReader
27 from ._normalize_stream import normalize_stream
28 from ._ranges import Range, LocatorAndRange
29 from .safeapi import ThreadSafeApiCache
30 import arvados.config as config
31 import arvados.errors as errors
32 import arvados.util
33 import arvados.events as events
34 from arvados.retry import retry_method
35
36 _logger = logging.getLogger('arvados.collection')
37
38 class CollectionBase(object):
39     """Abstract base class for Collection classes."""
40
41     def __enter__(self):
42         return self
43
44     def __exit__(self, exc_type, exc_value, traceback):
45         pass
46
47     def _my_keep(self):
48         if self._keep_client is None:
49             self._keep_client = KeepClient(api_client=self._api_client,
50                                            num_retries=self.num_retries)
51         return self._keep_client
52
53     def stripped_manifest(self):
54         """Get the manifest with locator hints stripped.
55
56         Return the manifest for the current collection with all
57         non-portable hints (i.e., permission signatures and other
58         hints other than size hints) removed from the locators.
59         """
60         raw = self.manifest_text()
61         clean = []
62         for line in raw.split("\n"):
63             fields = line.split()
64             if fields:
65                 clean_fields = fields[:1] + [
66                     (re.sub(r'\+[^\d][^\+]*', '', x)
67                      if re.match(arvados.util.keep_locator_pattern, x)
68                      else x)
69                     for x in fields[1:]]
70                 clean += [' '.join(clean_fields), "\n"]
71         return ''.join(clean)
72
73
74 class _WriterFile(_FileLikeObjectBase):
75     def __init__(self, coll_writer, name):
76         super(_WriterFile, self).__init__(name, 'wb')
77         self.dest = coll_writer
78
79     def close(self):
80         super(_WriterFile, self).close()
81         self.dest.finish_current_file()
82
83     @_FileLikeObjectBase._before_close
84     def write(self, data):
85         self.dest.write(data)
86
87     @_FileLikeObjectBase._before_close
88     def writelines(self, seq):
89         for data in seq:
90             self.write(data)
91
92     @_FileLikeObjectBase._before_close
93     def flush(self):
94         self.dest.flush_data()
95
96
97 class CollectionWriter(CollectionBase):
98     """Deprecated, use Collection instead."""
99
100     def __init__(self, api_client=None, num_retries=0, replication=None):
101         """Instantiate a CollectionWriter.
102
103         CollectionWriter lets you build a new Arvados Collection from scratch.
104         Write files to it.  The CollectionWriter will upload data to Keep as
105         appropriate, and provide you with the Collection manifest text when
106         you're finished.
107
108         Arguments:
109         * api_client: The API client to use to look up Collections.  If not
110           provided, CollectionReader will build one from available Arvados
111           configuration.
112         * num_retries: The default number of times to retry failed
113           service requests.  Default 0.  You may change this value
114           after instantiation, but note those changes may not
115           propagate to related objects like the Keep client.
116         * replication: The number of copies of each block to store.
117           If this argument is None or not supplied, replication is
118           the server-provided default if available, otherwise 2.
119         """
120         self._api_client = api_client
121         self.num_retries = num_retries
122         self.replication = (2 if replication is None else replication)
123         self._keep_client = None
124         self._data_buffer = []
125         self._data_buffer_len = 0
126         self._current_stream_files = []
127         self._current_stream_length = 0
128         self._current_stream_locators = []
129         self._current_stream_name = '.'
130         self._current_file_name = None
131         self._current_file_pos = 0
132         self._finished_streams = []
133         self._close_file = None
134         self._queued_file = None
135         self._queued_dirents = deque()
136         self._queued_trees = deque()
137         self._last_open = None
138
139     def __exit__(self, exc_type, exc_value, traceback):
140         if exc_type is None:
141             self.finish()
142
143     def do_queued_work(self):
144         # The work queue consists of three pieces:
145         # * _queued_file: The file object we're currently writing to the
146         #   Collection.
147         # * _queued_dirents: Entries under the current directory
148         #   (_queued_trees[0]) that we want to write or recurse through.
149         #   This may contain files from subdirectories if
150         #   max_manifest_depth == 0 for this directory.
151         # * _queued_trees: Directories that should be written as separate
152         #   streams to the Collection.
153         # This function handles the smallest piece of work currently queued
154         # (current file, then current directory, then next directory) until
155         # no work remains.  The _work_THING methods each do a unit of work on
156         # THING.  _queue_THING methods add a THING to the work queue.
157         while True:
158             if self._queued_file:
159                 self._work_file()
160             elif self._queued_dirents:
161                 self._work_dirents()
162             elif self._queued_trees:
163                 self._work_trees()
164             else:
165                 break
166
167     def _work_file(self):
168         while True:
169             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
170             if not buf:
171                 break
172             self.write(buf)
173         self.finish_current_file()
174         if self._close_file:
175             self._queued_file.close()
176         self._close_file = None
177         self._queued_file = None
178
179     def _work_dirents(self):
180         path, stream_name, max_manifest_depth = self._queued_trees[0]
181         if stream_name != self.current_stream_name():
182             self.start_new_stream(stream_name)
183         while self._queued_dirents:
184             dirent = self._queued_dirents.popleft()
185             target = os.path.join(path, dirent)
186             if os.path.isdir(target):
187                 self._queue_tree(target,
188                                  os.path.join(stream_name, dirent),
189                                  max_manifest_depth - 1)
190             else:
191                 self._queue_file(target, dirent)
192                 break
193         if not self._queued_dirents:
194             self._queued_trees.popleft()
195
196     def _work_trees(self):
197         path, stream_name, max_manifest_depth = self._queued_trees[0]
198         d = arvados.util.listdir_recursive(
199             path, max_depth = (None if max_manifest_depth == 0 else 0))
200         if d:
201             self._queue_dirents(stream_name, d)
202         else:
203             self._queued_trees.popleft()
204
205     def _queue_file(self, source, filename=None):
206         assert (self._queued_file is None), "tried to queue more than one file"
207         if not hasattr(source, 'read'):
208             source = open(source, 'rb')
209             self._close_file = True
210         else:
211             self._close_file = False
212         if filename is None:
213             filename = os.path.basename(source.name)
214         self.start_new_file(filename)
215         self._queued_file = source
216
217     def _queue_dirents(self, stream_name, dirents):
218         assert (not self._queued_dirents), "tried to queue more than one tree"
219         self._queued_dirents = deque(sorted(dirents))
220
221     def _queue_tree(self, path, stream_name, max_manifest_depth):
222         self._queued_trees.append((path, stream_name, max_manifest_depth))
223
224     def write_file(self, source, filename=None):
225         self._queue_file(source, filename)
226         self.do_queued_work()
227
228     def write_directory_tree(self,
229                              path, stream_name='.', max_manifest_depth=-1):
230         self._queue_tree(path, stream_name, max_manifest_depth)
231         self.do_queued_work()
232
233     def write(self, newdata):
234         if isinstance(newdata, bytes):
235             pass
236         elif isinstance(newdata, str):
237             newdata = newdata.encode()
238         elif hasattr(newdata, '__iter__'):
239             for s in newdata:
240                 self.write(s)
241             return
242         self._data_buffer.append(newdata)
243         self._data_buffer_len += len(newdata)
244         self._current_stream_length += len(newdata)
245         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
246             self.flush_data()
247
248     def open(self, streampath, filename=None):
249         """open(streampath[, filename]) -> file-like object
250
251         Pass in the path of a file to write to the Collection, either as a
252         single string or as two separate stream name and file name arguments.
253         This method returns a file-like object you can write to add it to the
254         Collection.
255
256         You may only have one file object from the Collection open at a time,
257         so be sure to close the object when you're done.  Using the object in
258         a with statement makes that easy::
259
260           with cwriter.open('./doc/page1.txt') as outfile:
261               outfile.write(page1_data)
262           with cwriter.open('./doc/page2.txt') as outfile:
263               outfile.write(page2_data)
264         """
265         if filename is None:
266             streampath, filename = split(streampath)
267         if self._last_open and not self._last_open.closed:
268             raise errors.AssertionError(
269                 "can't open '{}' when '{}' is still open".format(
270                     filename, self._last_open.name))
271         if streampath != self.current_stream_name():
272             self.start_new_stream(streampath)
273         self.set_current_file_name(filename)
274         self._last_open = _WriterFile(self, filename)
275         return self._last_open
276
277     def flush_data(self):
278         data_buffer = b''.join(self._data_buffer)
279         if data_buffer:
280             self._current_stream_locators.append(
281                 self._my_keep().put(
282                     data_buffer[0:config.KEEP_BLOCK_SIZE],
283                     copies=self.replication))
284             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
285             self._data_buffer_len = len(self._data_buffer[0])
286
287     def start_new_file(self, newfilename=None):
288         self.finish_current_file()
289         self.set_current_file_name(newfilename)
290
291     def set_current_file_name(self, newfilename):
292         if re.search(r'[\t\n]', newfilename):
293             raise errors.AssertionError(
294                 "Manifest filenames cannot contain whitespace: %s" %
295                 newfilename)
296         elif re.search(r'\x00', newfilename):
297             raise errors.AssertionError(
298                 "Manifest filenames cannot contain NUL characters: %s" %
299                 newfilename)
300         self._current_file_name = newfilename
301
302     def current_file_name(self):
303         return self._current_file_name
304
305     def finish_current_file(self):
306         if self._current_file_name is None:
307             if self._current_file_pos == self._current_stream_length:
308                 return
309             raise errors.AssertionError(
310                 "Cannot finish an unnamed file " +
311                 "(%d bytes at offset %d in '%s' stream)" %
312                 (self._current_stream_length - self._current_file_pos,
313                  self._current_file_pos,
314                  self._current_stream_name))
315         self._current_stream_files.append([
316                 self._current_file_pos,
317                 self._current_stream_length - self._current_file_pos,
318                 self._current_file_name])
319         self._current_file_pos = self._current_stream_length
320         self._current_file_name = None
321
322     def start_new_stream(self, newstreamname='.'):
323         self.finish_current_stream()
324         self.set_current_stream_name(newstreamname)
325
326     def set_current_stream_name(self, newstreamname):
327         if re.search(r'[\t\n]', newstreamname):
328             raise errors.AssertionError(
329                 "Manifest stream names cannot contain whitespace: '%s'" %
330                 (newstreamname))
331         self._current_stream_name = '.' if newstreamname=='' else newstreamname
332
333     def current_stream_name(self):
334         return self._current_stream_name
335
336     def finish_current_stream(self):
337         self.finish_current_file()
338         self.flush_data()
339         if not self._current_stream_files:
340             pass
341         elif self._current_stream_name is None:
342             raise errors.AssertionError(
343                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
344                 (self._current_stream_length, len(self._current_stream_files)))
345         else:
346             if not self._current_stream_locators:
347                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
348             self._finished_streams.append([self._current_stream_name,
349                                            self._current_stream_locators,
350                                            self._current_stream_files])
351         self._current_stream_files = []
352         self._current_stream_length = 0
353         self._current_stream_locators = []
354         self._current_stream_name = None
355         self._current_file_pos = 0
356         self._current_file_name = None
357
358     def finish(self):
359         """Store the manifest in Keep and return its locator.
360
361         This is useful for storing manifest fragments (task outputs)
362         temporarily in Keep during a Crunch job.
363
364         In other cases you should make a collection instead, by
365         sending manifest_text() to the API server's "create
366         collection" endpoint.
367         """
368         return self._my_keep().put(self.manifest_text().encode(),
369                                    copies=self.replication)
370
371     def portable_data_hash(self):
372         stripped = self.stripped_manifest().encode()
373         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
374
375     def manifest_text(self):
376         self.finish_current_stream()
377         manifest = ''
378
379         for stream in self._finished_streams:
380             if not re.search(r'^\.(/.*)?$', stream[0]):
381                 manifest += './'
382             manifest += stream[0].replace(' ', '\\040')
383             manifest += ' ' + ' '.join(stream[1])
384             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
385             manifest += "\n"
386
387         return manifest
388
389     def data_locators(self):
390         ret = []
391         for name, locators, files in self._finished_streams:
392             ret += locators
393         return ret
394
395     def save_new(self, name=None):
396         return self._api_client.collections().create(
397             ensure_unique_name=True,
398             body={
399                 'name': name,
400                 'manifest_text': self.manifest_text(),
401             }).execute(num_retries=self.num_retries)
402
403
404 class ResumableCollectionWriter(CollectionWriter):
405     """Deprecated, use Collection instead."""
406
407     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
408                    '_current_stream_locators', '_current_stream_name',
409                    '_current_file_name', '_current_file_pos', '_close_file',
410                    '_data_buffer', '_dependencies', '_finished_streams',
411                    '_queued_dirents', '_queued_trees']
412
413     def __init__(self, api_client=None, **kwargs):
414         self._dependencies = {}
415         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
416
417     @classmethod
418     def from_state(cls, state, *init_args, **init_kwargs):
419         # Try to build a new writer from scratch with the given state.
420         # If the state is not suitable to resume (because files have changed,
421         # been deleted, aren't predictable, etc.), raise a
422         # StaleWriterStateError.  Otherwise, return the initialized writer.
423         # The caller is responsible for calling writer.do_queued_work()
424         # appropriately after it's returned.
425         writer = cls(*init_args, **init_kwargs)
426         for attr_name in cls.STATE_PROPS:
427             attr_value = state[attr_name]
428             attr_class = getattr(writer, attr_name).__class__
429             # Coerce the value into the same type as the initial value, if
430             # needed.
431             if attr_class not in (type(None), attr_value.__class__):
432                 attr_value = attr_class(attr_value)
433             setattr(writer, attr_name, attr_value)
434         # Check dependencies before we try to resume anything.
435         if any(KeepLocator(ls).permission_expired()
436                for ls in writer._current_stream_locators):
437             raise errors.StaleWriterStateError(
438                 "locators include expired permission hint")
439         writer.check_dependencies()
440         if state['_current_file'] is not None:
441             path, pos = state['_current_file']
442             try:
443                 writer._queued_file = open(path, 'rb')
444                 writer._queued_file.seek(pos)
445             except IOError as error:
446                 raise errors.StaleWriterStateError(
447                     "failed to reopen active file {}: {}".format(path, error))
448         return writer
449
450     def check_dependencies(self):
451         for path, orig_stat in listitems(self._dependencies):
452             if not S_ISREG(orig_stat[ST_MODE]):
453                 raise errors.StaleWriterStateError("{} not file".format(path))
454             try:
455                 now_stat = tuple(os.stat(path))
456             except OSError as error:
457                 raise errors.StaleWriterStateError(
458                     "failed to stat {}: {}".format(path, error))
459             if ((not S_ISREG(now_stat[ST_MODE])) or
460                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
461                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
462                 raise errors.StaleWriterStateError("{} changed".format(path))
463
464     def dump_state(self, copy_func=lambda x: x):
465         state = {attr: copy_func(getattr(self, attr))
466                  for attr in self.STATE_PROPS}
467         if self._queued_file is None:
468             state['_current_file'] = None
469         else:
470             state['_current_file'] = (os.path.realpath(self._queued_file.name),
471                                       self._queued_file.tell())
472         return state
473
474     def _queue_file(self, source, filename=None):
475         try:
476             src_path = os.path.realpath(source)
477         except Exception:
478             raise errors.AssertionError("{} not a file path".format(source))
479         try:
480             path_stat = os.stat(src_path)
481         except OSError as stat_error:
482             path_stat = None
483         super(ResumableCollectionWriter, self)._queue_file(source, filename)
484         fd_stat = os.fstat(self._queued_file.fileno())
485         if not S_ISREG(fd_stat.st_mode):
486             # We won't be able to resume from this cache anyway, so don't
487             # worry about further checks.
488             self._dependencies[source] = tuple(fd_stat)
489         elif path_stat is None:
490             raise errors.AssertionError(
491                 "could not stat {}: {}".format(source, stat_error))
492         elif path_stat.st_ino != fd_stat.st_ino:
493             raise errors.AssertionError(
494                 "{} changed between open and stat calls".format(source))
495         else:
496             self._dependencies[src_path] = tuple(fd_stat)
497
498     def write(self, data):
499         if self._queued_file is None:
500             raise errors.AssertionError(
501                 "resumable writer can't accept unsourced data")
502         return super(ResumableCollectionWriter, self).write(data)
503
504
505 ADD = "add"
506 DEL = "del"
507 MOD = "mod"
508 TOK = "tok"
509 FILE = "file"
510 COLLECTION = "collection"
511
512 class RichCollectionBase(CollectionBase):
513     """Base class for Collections and Subcollections.
514
515     Implements the majority of functionality relating to accessing items in the
516     Collection.
517
518     """
519
520     def __init__(self, parent=None):
521         self.parent = parent
522         self._committed = False
523         self._callback = None
524         self._items = {}
525
526     def _my_api(self):
527         raise NotImplementedError()
528
529     def _my_keep(self):
530         raise NotImplementedError()
531
532     def _my_block_manager(self):
533         raise NotImplementedError()
534
535     def writable(self):
536         raise NotImplementedError()
537
538     def root_collection(self):
539         raise NotImplementedError()
540
541     def notify(self, event, collection, name, item):
542         raise NotImplementedError()
543
544     def stream_name(self):
545         raise NotImplementedError()
546
547     @must_be_writable
548     @synchronized
549     def find_or_create(self, path, create_type):
550         """Recursively search the specified file path.
551
552         May return either a `Collection` or `ArvadosFile`.  If not found, will
553         create a new item at the specified path based on `create_type`.  Will
554         create intermediate subcollections needed to contain the final item in
555         the path.
556
557         :create_type:
558           One of `arvados.collection.FILE` or
559           `arvados.collection.COLLECTION`.  If the path is not found, and value
560           of create_type is FILE then create and return a new ArvadosFile for
561           the last path component.  If COLLECTION, then create and return a new
562           Collection for the last path component.
563
564         """
565
566         pathcomponents = path.split("/", 1)
567         if pathcomponents[0]:
568             item = self._items.get(pathcomponents[0])
569             if len(pathcomponents) == 1:
570                 if item is None:
571                     # create new file
572                     if create_type == COLLECTION:
573                         item = Subcollection(self, pathcomponents[0])
574                     else:
575                         item = ArvadosFile(self, pathcomponents[0])
576                     self._items[pathcomponents[0]] = item
577                     self.set_committed(False)
578                     self.notify(ADD, self, pathcomponents[0], item)
579                 return item
580             else:
581                 if item is None:
582                     # create new collection
583                     item = Subcollection(self, pathcomponents[0])
584                     self._items[pathcomponents[0]] = item
585                     self.set_committed(False)
586                     self.notify(ADD, self, pathcomponents[0], item)
587                 if isinstance(item, RichCollectionBase):
588                     return item.find_or_create(pathcomponents[1], create_type)
589                 else:
590                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
591         else:
592             return self
593
594     @synchronized
595     def find(self, path):
596         """Recursively search the specified file path.
597
598         May return either a Collection or ArvadosFile. Return None if not
599         found.
600         If path is invalid (ex: starts with '/'), an IOError exception will be
601         raised.
602
603         """
604         if not path:
605             raise errors.ArgumentError("Parameter 'path' is empty.")
606
607         pathcomponents = path.split("/", 1)
608         if pathcomponents[0] == '':
609             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
610
611         item = self._items.get(pathcomponents[0])
612         if item is None:
613             return None
614         elif len(pathcomponents) == 1:
615             return item
616         else:
617             if isinstance(item, RichCollectionBase):
618                 if pathcomponents[1]:
619                     return item.find(pathcomponents[1])
620                 else:
621                     return item
622             else:
623                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
624
625     @synchronized
626     def mkdirs(self, path):
627         """Recursive subcollection create.
628
629         Like `os.makedirs()`.  Will create intermediate subcollections needed
630         to contain the leaf subcollection path.
631
632         """
633
634         if self.find(path) != None:
635             raise IOError(errno.EEXIST, "Directory or file exists", path)
636
637         return self.find_or_create(path, COLLECTION)
638
639     def open(self, path, mode="r"):
640         """Open a file-like object for access.
641
642         :path:
643           path to a file in the collection
644         :mode:
645           a string consisting of "r", "w", or "a", optionally followed
646           by "b" or "t", optionally followed by "+".
647           :"b":
648             binary mode: write() accepts bytes, read() returns bytes.
649           :"t":
650             text mode (default): write() accepts strings, read() returns strings.
651           :"r":
652             opens for reading
653           :"r+":
654             opens for reading and writing.  Reads/writes share a file pointer.
655           :"w", "w+":
656             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
657           :"a", "a+":
658             opens for reading and writing.  All writes are appended to
659             the end of the file.  Writing does not affect the file pointer for
660             reading.
661         """
662
663         if not re.search(r'^[rwa][bt]?\+?$', mode):
664             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
665
666         if mode[0] == 'r' and '+' not in mode:
667             fclass = ArvadosFileReader
668             arvfile = self.find(path)
669         elif not self.writable():
670             raise IOError(errno.EROFS, "Collection is read only")
671         else:
672             fclass = ArvadosFileWriter
673             arvfile = self.find_or_create(path, FILE)
674
675         if arvfile is None:
676             raise IOError(errno.ENOENT, "File not found", path)
677         if not isinstance(arvfile, ArvadosFile):
678             raise IOError(errno.EISDIR, "Is a directory", path)
679
680         if mode[0] == 'w':
681             arvfile.truncate(0)
682
683         return fclass(arvfile, mode=mode, num_retries=self.num_retries)
684
685     def modified(self):
686         """Determine if the collection has been modified since last commited."""
687         return not self.committed()
688
689     @synchronized
690     def committed(self):
691         """Determine if the collection has been committed to the API server."""
692         return self._committed
693
694     @synchronized
695     def set_committed(self, value=True):
696         """Recursively set committed flag.
697
698         If value is True, set committed to be True for this and all children.
699
700         If value is False, set committed to be False for this and all parents.
701         """
702         if value == self._committed:
703             return
704         if value:
705             for k,v in listitems(self._items):
706                 v.set_committed(True)
707             self._committed = True
708         else:
709             self._committed = False
710             if self.parent is not None:
711                 self.parent.set_committed(False)
712
713     @synchronized
714     def __iter__(self):
715         """Iterate over names of files and collections contained in this collection."""
716         return iter(viewkeys(self._items))
717
718     @synchronized
719     def __getitem__(self, k):
720         """Get a file or collection that is directly contained by this collection.
721
722         If you want to search a path, use `find()` instead.
723
724         """
725         return self._items[k]
726
727     @synchronized
728     def __contains__(self, k):
729         """Test if there is a file or collection a directly contained by this collection."""
730         return k in self._items
731
732     @synchronized
733     def __len__(self):
734         """Get the number of items directly contained in this collection."""
735         return len(self._items)
736
737     @must_be_writable
738     @synchronized
739     def __delitem__(self, p):
740         """Delete an item by name which is directly contained by this collection."""
741         del self._items[p]
742         self.set_committed(False)
743         self.notify(DEL, self, p, None)
744
745     @synchronized
746     def keys(self):
747         """Get a list of names of files and collections directly contained in this collection."""
748         return self._items.keys()
749
750     @synchronized
751     def values(self):
752         """Get a list of files and collection objects directly contained in this collection."""
753         return listvalues(self._items)
754
755     @synchronized
756     def items(self):
757         """Get a list of (name, object) tuples directly contained in this collection."""
758         return listitems(self._items)
759
760     def exists(self, path):
761         """Test if there is a file or collection at `path`."""
762         return self.find(path) is not None
763
764     @must_be_writable
765     @synchronized
766     def remove(self, path, recursive=False):
767         """Remove the file or subcollection (directory) at `path`.
768
769         :recursive:
770           Specify whether to remove non-empty subcollections (True), or raise an error (False).
771         """
772
773         if not path:
774             raise errors.ArgumentError("Parameter 'path' is empty.")
775
776         pathcomponents = path.split("/", 1)
777         item = self._items.get(pathcomponents[0])
778         if item is None:
779             raise IOError(errno.ENOENT, "File not found", path)
780         if len(pathcomponents) == 1:
781             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
782                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
783             deleteditem = self._items[pathcomponents[0]]
784             del self._items[pathcomponents[0]]
785             self.set_committed(False)
786             self.notify(DEL, self, pathcomponents[0], deleteditem)
787         else:
788             item.remove(pathcomponents[1])
789
790     def _clonefrom(self, source):
791         for k,v in listitems(source):
792             self._items[k] = v.clone(self, k)
793
794     def clone(self):
795         raise NotImplementedError()
796
797     @must_be_writable
798     @synchronized
799     def add(self, source_obj, target_name, overwrite=False, reparent=False):
800         """Copy or move a file or subcollection to this collection.
801
802         :source_obj:
803           An ArvadosFile, or Subcollection object
804
805         :target_name:
806           Destination item name.  If the target name already exists and is a
807           file, this will raise an error unless you specify `overwrite=True`.
808
809         :overwrite:
810           Whether to overwrite target file if it already exists.
811
812         :reparent:
813           If True, source_obj will be moved from its parent collection to this collection.
814           If False, source_obj will be copied and the parent collection will be
815           unmodified.
816
817         """
818
819         if target_name in self and not overwrite:
820             raise IOError(errno.EEXIST, "File already exists", target_name)
821
822         modified_from = None
823         if target_name in self:
824             modified_from = self[target_name]
825
826         # Actually make the move or copy.
827         if reparent:
828             source_obj._reparent(self, target_name)
829             item = source_obj
830         else:
831             item = source_obj.clone(self, target_name)
832
833         self._items[target_name] = item
834         self.set_committed(False)
835
836         if modified_from:
837             self.notify(MOD, self, target_name, (modified_from, item))
838         else:
839             self.notify(ADD, self, target_name, item)
840
841     def _get_src_target(self, source, target_path, source_collection, create_dest):
842         if source_collection is None:
843             source_collection = self
844
845         # Find the object
846         if isinstance(source, basestring):
847             source_obj = source_collection.find(source)
848             if source_obj is None:
849                 raise IOError(errno.ENOENT, "File not found", source)
850             sourcecomponents = source.split("/")
851         else:
852             source_obj = source
853             sourcecomponents = None
854
855         # Find parent collection the target path
856         targetcomponents = target_path.split("/")
857
858         # Determine the name to use.
859         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
860
861         if not target_name:
862             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
863
864         if create_dest:
865             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
866         else:
867             if len(targetcomponents) > 1:
868                 target_dir = self.find("/".join(targetcomponents[0:-1]))
869             else:
870                 target_dir = self
871
872         if target_dir is None:
873             raise IOError(errno.ENOENT, "Target directory not found", target_name)
874
875         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
876             target_dir = target_dir[target_name]
877             target_name = sourcecomponents[-1]
878
879         return (source_obj, target_dir, target_name)
880
881     @must_be_writable
882     @synchronized
883     def copy(self, source, target_path, source_collection=None, overwrite=False):
884         """Copy a file or subcollection to a new path in this collection.
885
886         :source:
887           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
888
889         :target_path:
890           Destination file or path.  If the target path already exists and is a
891           subcollection, the item will be placed inside the subcollection.  If
892           the target path already exists and is a file, this will raise an error
893           unless you specify `overwrite=True`.
894
895         :source_collection:
896           Collection to copy `source_path` from (default `self`)
897
898         :overwrite:
899           Whether to overwrite target file if it already exists.
900         """
901
902         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
903         target_dir.add(source_obj, target_name, overwrite, False)
904
905     @must_be_writable
906     @synchronized
907     def rename(self, source, target_path, source_collection=None, overwrite=False):
908         """Move a file or subcollection from `source_collection` to a new path in this collection.
909
910         :source:
911           A string with a path to source file or subcollection.
912
913         :target_path:
914           Destination file or path.  If the target path already exists and is a
915           subcollection, the item will be placed inside the subcollection.  If
916           the target path already exists and is a file, this will raise an error
917           unless you specify `overwrite=True`.
918
919         :source_collection:
920           Collection to copy `source_path` from (default `self`)
921
922         :overwrite:
923           Whether to overwrite target file if it already exists.
924         """
925
926         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
927         if not source_obj.writable():
928             raise IOError(errno.EROFS, "Source collection is read only", source)
929         target_dir.add(source_obj, target_name, overwrite, True)
930
931     def portable_manifest_text(self, stream_name="."):
932         """Get the manifest text for this collection, sub collections and files.
933
934         This method does not flush outstanding blocks to Keep.  It will return
935         a normalized manifest with access tokens stripped.
936
937         :stream_name:
938           Name to use for this stream (directory)
939
940         """
941         return self._get_manifest_text(stream_name, True, True)
942
943     @synchronized
944     def manifest_text(self, stream_name=".", strip=False, normalize=False,
945                       only_committed=False):
946         """Get the manifest text for this collection, sub collections and files.
947
948         This method will flush outstanding blocks to Keep.  By default, it will
949         not normalize an unmodified manifest or strip access tokens.
950
951         :stream_name:
952           Name to use for this stream (directory)
953
954         :strip:
955           If True, remove signing tokens from block locators if present.
956           If False (default), block locators are left unchanged.
957
958         :normalize:
959           If True, always export the manifest text in normalized form
960           even if the Collection is not modified.  If False (default) and the collection
961           is not modified, return the original manifest text even if it is not
962           in normalized form.
963
964         :only_committed:
965           If True, don't commit pending blocks.
966
967         """
968
969         if not only_committed:
970             self._my_block_manager().commit_all()
971         return self._get_manifest_text(stream_name, strip, normalize,
972                                        only_committed=only_committed)
973
974     @synchronized
975     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
976         """Get the manifest text for this collection, sub collections and files.
977
978         :stream_name:
979           Name to use for this stream (directory)
980
981         :strip:
982           If True, remove signing tokens from block locators if present.
983           If False (default), block locators are left unchanged.
984
985         :normalize:
986           If True, always export the manifest text in normalized form
987           even if the Collection is not modified.  If False (default) and the collection
988           is not modified, return the original manifest text even if it is not
989           in normalized form.
990
991         :only_committed:
992           If True, only include blocks that were already committed to Keep.
993
994         """
995
996         if not self.committed() or self._manifest_text is None or normalize:
997             stream = {}
998             buf = []
999             sorted_keys = sorted(self.keys())
1000             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1001                 # Create a stream per file `k`
1002                 arvfile = self[filename]
1003                 filestream = []
1004                 for segment in arvfile.segments():
1005                     loc = segment.locator
1006                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
1007                         if only_committed:
1008                             continue
1009                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1010                     if strip:
1011                         loc = KeepLocator(loc).stripped()
1012                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1013                                          segment.segment_offset, segment.range_size))
1014                 stream[filename] = filestream
1015             if stream:
1016                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1017             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1018                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1019             return "".join(buf)
1020         else:
1021             if strip:
1022                 return self.stripped_manifest()
1023             else:
1024                 return self._manifest_text
1025
1026     @synchronized
1027     def _copy_remote_blocks(self, remote_blocks={}):
1028         """Scan through the entire collection and ask Keep to copy remote blocks.
1029
1030         When accessing a remote collection, blocks will have a remote signature
1031         (+R instead of +A). Collect these signatures and request Keep to copy the
1032         blocks to the local cluster, returning local (+A) signatures.
1033
1034         :remote_blocks:
1035           Shared cache of remote to local block mappings. This is used to avoid
1036           doing extra work when blocks are shared by more than one file in
1037           different subdirectories.
1038
1039         """
1040         for filename in [f for f in self.keys() if isinstance(self[f], ArvadosFile)]:
1041             for s in self[filename].segments():
1042                 if '+R' in s.locator:
1043                     try:
1044                         loc = remote_blocks[s.locator]
1045                     except KeyError:
1046                         loc = self._my_keep().refresh_signature(s.locator)
1047                         remote_blocks[s.locator] = loc
1048                     s.locator = loc
1049                     self.set_committed(False)
1050         for dirname in [d for d in self.keys() if isinstance(self[d], RichCollectionBase)]:
1051             remote_blocks = self[dirname]._copy_remote_blocks(remote_blocks)
1052         return remote_blocks
1053
1054     @synchronized
1055     def diff(self, end_collection, prefix=".", holding_collection=None):
1056         """Generate list of add/modify/delete actions.
1057
1058         When given to `apply`, will change `self` to match `end_collection`
1059
1060         """
1061         changes = []
1062         if holding_collection is None:
1063             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1064         for k in self:
1065             if k not in end_collection:
1066                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1067         for k in end_collection:
1068             if k in self:
1069                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1070                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1071                 elif end_collection[k] != self[k]:
1072                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1073                 else:
1074                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1075             else:
1076                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1077         return changes
1078
1079     @must_be_writable
1080     @synchronized
1081     def apply(self, changes):
1082         """Apply changes from `diff`.
1083
1084         If a change conflicts with a local change, it will be saved to an
1085         alternate path indicating the conflict.
1086
1087         """
1088         if changes:
1089             self.set_committed(False)
1090         for change in changes:
1091             event_type = change[0]
1092             path = change[1]
1093             initial = change[2]
1094             local = self.find(path)
1095             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1096                                                                     time.gmtime()))
1097             if event_type == ADD:
1098                 if local is None:
1099                     # No local file at path, safe to copy over new file
1100                     self.copy(initial, path)
1101                 elif local is not None and local != initial:
1102                     # There is already local file and it is different:
1103                     # save change to conflict file.
1104                     self.copy(initial, conflictpath)
1105             elif event_type == MOD or event_type == TOK:
1106                 final = change[3]
1107                 if local == initial:
1108                     # Local matches the "initial" item so it has not
1109                     # changed locally and is safe to update.
1110                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1111                         # Replace contents of local file with new contents
1112                         local.replace_contents(final)
1113                     else:
1114                         # Overwrite path with new item; this can happen if
1115                         # path was a file and is now a collection or vice versa
1116                         self.copy(final, path, overwrite=True)
1117                 else:
1118                     # Local is missing (presumably deleted) or local doesn't
1119                     # match the "start" value, so save change to conflict file
1120                     self.copy(final, conflictpath)
1121             elif event_type == DEL:
1122                 if local == initial:
1123                     # Local item matches "initial" value, so it is safe to remove.
1124                     self.remove(path, recursive=True)
1125                 # else, the file is modified or already removed, in either
1126                 # case we don't want to try to remove it.
1127
1128     def portable_data_hash(self):
1129         """Get the portable data hash for this collection's manifest."""
1130         if self._manifest_locator and self.committed():
1131             # If the collection is already saved on the API server, and it's committed
1132             # then return API server's PDH response.
1133             return self._portable_data_hash
1134         else:
1135             stripped = self.portable_manifest_text().encode()
1136             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1137
1138     @synchronized
1139     def subscribe(self, callback):
1140         if self._callback is None:
1141             self._callback = callback
1142         else:
1143             raise errors.ArgumentError("A callback is already set on this collection.")
1144
1145     @synchronized
1146     def unsubscribe(self):
1147         if self._callback is not None:
1148             self._callback = None
1149
1150     @synchronized
1151     def notify(self, event, collection, name, item):
1152         if self._callback:
1153             self._callback(event, collection, name, item)
1154         self.root_collection().notify(event, collection, name, item)
1155
1156     @synchronized
1157     def __eq__(self, other):
1158         if other is self:
1159             return True
1160         if not isinstance(other, RichCollectionBase):
1161             return False
1162         if len(self._items) != len(other):
1163             return False
1164         for k in self._items:
1165             if k not in other:
1166                 return False
1167             if self._items[k] != other[k]:
1168                 return False
1169         return True
1170
1171     def __ne__(self, other):
1172         return not self.__eq__(other)
1173
1174     @synchronized
1175     def flush(self):
1176         """Flush bufferblocks to Keep."""
1177         for e in listvalues(self):
1178             e.flush()
1179
1180
1181 class Collection(RichCollectionBase):
1182     """Represents the root of an Arvados Collection.
1183
1184     This class is threadsafe.  The root collection object, all subcollections
1185     and files are protected by a single lock (i.e. each access locks the entire
1186     collection).
1187
1188     Brief summary of
1189     useful methods:
1190
1191     :To read an existing file:
1192       `c.open("myfile", "r")`
1193
1194     :To write a new file:
1195       `c.open("myfile", "w")`
1196
1197     :To determine if a file exists:
1198       `c.find("myfile") is not None`
1199
1200     :To copy a file:
1201       `c.copy("source", "dest")`
1202
1203     :To delete a file:
1204       `c.remove("myfile")`
1205
1206     :To save to an existing collection record:
1207       `c.save()`
1208
1209     :To save a new collection record:
1210     `c.save_new()`
1211
1212     :To merge remote changes into this object:
1213       `c.update()`
1214
1215     Must be associated with an API server Collection record (during
1216     initialization, or using `save_new`) to use `save` or `update`
1217
1218     """
1219
1220     def __init__(self, manifest_locator_or_text=None,
1221                  api_client=None,
1222                  keep_client=None,
1223                  num_retries=None,
1224                  parent=None,
1225                  apiconfig=None,
1226                  block_manager=None,
1227                  replication_desired=None,
1228                  put_threads=None):
1229         """Collection constructor.
1230
1231         :manifest_locator_or_text:
1232           An Arvados collection UUID, portable data hash, raw manifest
1233           text, or (if creating an empty collection) None.
1234
1235         :parent:
1236           the parent Collection, may be None.
1237
1238         :apiconfig:
1239           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1240           Prefer this over supplying your own api_client and keep_client (except in testing).
1241           Will use default config settings if not specified.
1242
1243         :api_client:
1244           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1245
1246         :keep_client:
1247           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1248
1249         :num_retries:
1250           the number of retries for API and Keep requests.
1251
1252         :block_manager:
1253           the block manager to use.  If not specified, create one.
1254
1255         :replication_desired:
1256           How many copies should Arvados maintain. If None, API server default
1257           configuration applies. If not None, this value will also be used
1258           for determining the number of block copies being written.
1259
1260         """
1261         super(Collection, self).__init__(parent)
1262         self._api_client = api_client
1263         self._keep_client = keep_client
1264         self._block_manager = block_manager
1265         self.replication_desired = replication_desired
1266         self.put_threads = put_threads
1267
1268         if apiconfig:
1269             self._config = apiconfig
1270         else:
1271             self._config = config.settings()
1272
1273         self.num_retries = num_retries if num_retries is not None else 0
1274         self._manifest_locator = None
1275         self._manifest_text = None
1276         self._portable_data_hash = None
1277         self._api_response = None
1278         self._past_versions = set()
1279
1280         self.lock = threading.RLock()
1281         self.events = None
1282
1283         if manifest_locator_or_text:
1284             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1285                 self._manifest_locator = manifest_locator_or_text
1286             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1287                 self._manifest_locator = manifest_locator_or_text
1288             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1289                 self._manifest_text = manifest_locator_or_text
1290             else:
1291                 raise errors.ArgumentError(
1292                     "Argument to CollectionReader is not a manifest or a collection UUID")
1293
1294             try:
1295                 self._populate()
1296             except (IOError, errors.SyntaxError) as e:
1297                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1298
1299     def root_collection(self):
1300         return self
1301
1302     def get_properties(self):
1303         if self._api_response and self._api_response["properties"]:
1304             return self._api_response["properties"]
1305         else:
1306             return {}
1307
1308     def get_trash_at(self):
1309         if self._api_response and self._api_response["trash_at"]:
1310             return ciso8601.parse_datetime(self._api_response["trash_at"])
1311         else:
1312             return None
1313
1314     def stream_name(self):
1315         return "."
1316
1317     def writable(self):
1318         return True
1319
1320     @synchronized
1321     def known_past_version(self, modified_at_and_portable_data_hash):
1322         return modified_at_and_portable_data_hash in self._past_versions
1323
1324     @synchronized
1325     @retry_method
1326     def update(self, other=None, num_retries=None):
1327         """Merge the latest collection on the API server with the current collection."""
1328
1329         if other is None:
1330             if self._manifest_locator is None:
1331                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1332             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1333             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1334                 response.get("portable_data_hash") != self.portable_data_hash()):
1335                 # The record on the server is different from our current one, but we've seen it before,
1336                 # so ignore it because it's already been merged.
1337                 # However, if it's the same as our current record, proceed with the update, because we want to update
1338                 # our tokens.
1339                 return
1340             else:
1341                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1342             other = CollectionReader(response["manifest_text"])
1343         baseline = CollectionReader(self._manifest_text)
1344         self.apply(baseline.diff(other))
1345         self._manifest_text = self.manifest_text()
1346
1347     @synchronized
1348     def _my_api(self):
1349         if self._api_client is None:
1350             self._api_client = ThreadSafeApiCache(self._config)
1351             if self._keep_client is None:
1352                 self._keep_client = self._api_client.keep
1353         return self._api_client
1354
1355     @synchronized
1356     def _my_keep(self):
1357         if self._keep_client is None:
1358             if self._api_client is None:
1359                 self._my_api()
1360             else:
1361                 self._keep_client = KeepClient(api_client=self._api_client)
1362         return self._keep_client
1363
1364     @synchronized
1365     def _my_block_manager(self):
1366         if self._block_manager is None:
1367             copies = (self.replication_desired or
1368                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1369                                                    2))
1370             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1371         return self._block_manager
1372
1373     def _remember_api_response(self, response):
1374         self._api_response = response
1375         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1376
1377     def _populate_from_api_server(self):
1378         # As in KeepClient itself, we must wait until the last
1379         # possible moment to instantiate an API client, in order to
1380         # avoid tripping up clients that don't have access to an API
1381         # server.  If we do build one, make sure our Keep client uses
1382         # it.  If instantiation fails, we'll fall back to the except
1383         # clause, just like any other Collection lookup
1384         # failure. Return an exception, or None if successful.
1385         self._remember_api_response(self._my_api().collections().get(
1386             uuid=self._manifest_locator).execute(
1387                 num_retries=self.num_retries))
1388         self._manifest_text = self._api_response['manifest_text']
1389         self._portable_data_hash = self._api_response['portable_data_hash']
1390         # If not overriden via kwargs, we should try to load the
1391         # replication_desired from the API server
1392         if self.replication_desired is None:
1393             self.replication_desired = self._api_response.get('replication_desired', None)
1394
1395     def _populate(self):
1396         if self._manifest_text is None:
1397             if self._manifest_locator is None:
1398                 return
1399             else:
1400                 self._populate_from_api_server()
1401         self._baseline_manifest = self._manifest_text
1402         self._import_manifest(self._manifest_text)
1403
1404     def _has_collection_uuid(self):
1405         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1406
1407     def _has_local_collection_uuid(self):
1408         return self._has_collection_uuid and \
1409             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1410
1411     def __enter__(self):
1412         return self
1413
1414     def __exit__(self, exc_type, exc_value, traceback):
1415         """Support scoped auto-commit in a with: block."""
1416         if exc_type is None:
1417             if self.writable() and self._has_collection_uuid():
1418                 self.save()
1419         self.stop_threads()
1420
1421     def stop_threads(self):
1422         if self._block_manager is not None:
1423             self._block_manager.stop_threads()
1424
1425     @synchronized
1426     def manifest_locator(self):
1427         """Get the manifest locator, if any.
1428
1429         The manifest locator will be set when the collection is loaded from an
1430         API server record or the portable data hash of a manifest.
1431
1432         The manifest locator will be None if the collection is newly created or
1433         was created directly from manifest text.  The method `save_new()` will
1434         assign a manifest locator.
1435
1436         """
1437         return self._manifest_locator
1438
1439     @synchronized
1440     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1441         if new_config is None:
1442             new_config = self._config
1443         if readonly:
1444             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1445         else:
1446             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1447
1448         newcollection._clonefrom(self)
1449         return newcollection
1450
1451     @synchronized
1452     def api_response(self):
1453         """Returns information about this Collection fetched from the API server.
1454
1455         If the Collection exists in Keep but not the API server, currently
1456         returns None.  Future versions may provide a synthetic response.
1457
1458         """
1459         return self._api_response
1460
1461     def find_or_create(self, path, create_type):
1462         """See `RichCollectionBase.find_or_create`"""
1463         if path == ".":
1464             return self
1465         else:
1466             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1467
1468     def find(self, path):
1469         """See `RichCollectionBase.find`"""
1470         if path == ".":
1471             return self
1472         else:
1473             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1474
1475     def remove(self, path, recursive=False):
1476         """See `RichCollectionBase.remove`"""
1477         if path == ".":
1478             raise errors.ArgumentError("Cannot remove '.'")
1479         else:
1480             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1481
1482     @must_be_writable
1483     @synchronized
1484     @retry_method
1485     def save(self,
1486              properties=None,
1487              storage_classes=None,
1488              trash_at=None,
1489              merge=True,
1490              num_retries=None):
1491         """Save collection to an existing collection record.
1492
1493         Commit pending buffer blocks to Keep, merge with remote record (if
1494         merge=True, the default), and update the collection record. Returns
1495         the current manifest text.
1496
1497         Will raise AssertionError if not associated with a collection record on
1498         the API server.  If you want to save a manifest to Keep only, see
1499         `save_new()`.
1500
1501         :properties:
1502           Additional properties of collection. This value will replace any existing
1503           properties of collection.
1504
1505         :storage_classes:
1506           Specify desirable storage classes to be used when writing data to Keep.
1507
1508         :trash_at:
1509           A collection is *expiring* when it has a *trash_at* time in the future.
1510           An expiring collection can be accessed as normal,
1511           but is scheduled to be trashed automatically at the *trash_at* time.
1512
1513         :merge:
1514           Update and merge remote changes before saving.  Otherwise, any
1515           remote changes will be ignored and overwritten.
1516
1517         :num_retries:
1518           Retry count on API calls (if None,  use the collection default)
1519
1520         """
1521         if properties and type(properties) is not dict:
1522             raise errors.ArgumentError("properties must be dictionary type.")
1523
1524         if storage_classes and type(storage_classes) is not list:
1525             raise errors.ArgumentError("storage_classes must be list type.")
1526
1527         if trash_at and type(trash_at) is not datetime.datetime:
1528             raise errors.ArgumentError("trash_at must be datetime type.")
1529
1530         body={}
1531         if properties:
1532             body["properties"] = properties
1533         if storage_classes:
1534             body["storage_classes_desired"] = storage_classes
1535         if trash_at:
1536             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1537             body["trash_at"] = t
1538
1539         # Copy any remote blocks to the local cluster.
1540         self._copy_remote_blocks(remote_blocks={})
1541
1542         if not self.committed():
1543             if not self._has_collection_uuid():
1544                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1545             elif not self._has_local_collection_uuid():
1546                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1547
1548             self._my_block_manager().commit_all()
1549
1550             if merge:
1551                 self.update()
1552
1553             text = self.manifest_text(strip=False)
1554             body['manifest_text'] = text
1555
1556             self._remember_api_response(self._my_api().collections().update(
1557                 uuid=self._manifest_locator,
1558                 body=body
1559                 ).execute(num_retries=num_retries))
1560             self._manifest_text = self._api_response["manifest_text"]
1561             self._portable_data_hash = self._api_response["portable_data_hash"]
1562             self.set_committed(True)
1563         elif body:
1564             self._remember_api_response(self._my_api().collections().update(
1565                 uuid=self._manifest_locator,
1566                 body=body
1567                 ).execute(num_retries=num_retries))
1568
1569         return self._manifest_text
1570
1571
1572     @must_be_writable
1573     @synchronized
1574     @retry_method
1575     def save_new(self, name=None,
1576                  create_collection_record=True,
1577                  owner_uuid=None,
1578                  properties=None,
1579                  storage_classes=None,
1580                  trash_at=None,
1581                  ensure_unique_name=False,
1582                  num_retries=None):
1583         """Save collection to a new collection record.
1584
1585         Commit pending buffer blocks to Keep and, when create_collection_record
1586         is True (default), create a new collection record.  After creating a
1587         new collection record, this Collection object will be associated with
1588         the new record used by `save()`. Returns the current manifest text.
1589
1590         :name:
1591           The collection name.
1592
1593         :create_collection_record:
1594            If True, create a collection record on the API server.
1595            If False, only commit blocks to Keep and return the manifest text.
1596
1597         :owner_uuid:
1598           the user, or project uuid that will own this collection.
1599           If None, defaults to the current user.
1600
1601         :properties:
1602           Additional properties of collection. This value will replace any existing
1603           properties of collection.
1604
1605         :storage_classes:
1606           Specify desirable storage classes to be used when writing data to Keep.
1607
1608         :trash_at:
1609           A collection is *expiring* when it has a *trash_at* time in the future.
1610           An expiring collection can be accessed as normal,
1611           but is scheduled to be trashed automatically at the *trash_at* time.
1612
1613         :ensure_unique_name:
1614           If True, ask the API server to rename the collection
1615           if it conflicts with a collection with the same name and owner.  If
1616           False, a name conflict will result in an error.
1617
1618         :num_retries:
1619           Retry count on API calls (if None,  use the collection default)
1620
1621         """
1622         if properties and type(properties) is not dict:
1623             raise errors.ArgumentError("properties must be dictionary type.")
1624
1625         if storage_classes and type(storage_classes) is not list:
1626             raise errors.ArgumentError("storage_classes must be list type.")
1627
1628         if trash_at and type(trash_at) is not datetime.datetime:
1629             raise errors.ArgumentError("trash_at must be datetime type.")
1630
1631         # Copy any remote blocks to the local cluster.
1632         self._copy_remote_blocks(remote_blocks={})
1633
1634         self._my_block_manager().commit_all()
1635         text = self.manifest_text(strip=False)
1636
1637         if create_collection_record:
1638             if name is None:
1639                 name = "New collection"
1640                 ensure_unique_name = True
1641
1642             body = {"manifest_text": text,
1643                     "name": name,
1644                     "replication_desired": self.replication_desired}
1645             if owner_uuid:
1646                 body["owner_uuid"] = owner_uuid
1647             if properties:
1648                 body["properties"] = properties
1649             if storage_classes:
1650                 body["storage_classes_desired"] = storage_classes
1651             if trash_at:
1652                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1653                 body["trash_at"] = t
1654
1655             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1656             text = self._api_response["manifest_text"]
1657
1658             self._manifest_locator = self._api_response["uuid"]
1659             self._portable_data_hash = self._api_response["portable_data_hash"]
1660
1661             self._manifest_text = text
1662             self.set_committed(True)
1663
1664         return text
1665
1666     _token_re = re.compile(r'(\S+)(\s+|$)')
1667     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1668     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1669
1670     def _unescape_manifest_path(self, path):
1671         return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1672
1673     @synchronized
1674     def _import_manifest(self, manifest_text):
1675         """Import a manifest into a `Collection`.
1676
1677         :manifest_text:
1678           The manifest text to import from.
1679
1680         """
1681         if len(self) > 0:
1682             raise ArgumentError("Can only import manifest into an empty collection")
1683
1684         STREAM_NAME = 0
1685         BLOCKS = 1
1686         SEGMENTS = 2
1687
1688         stream_name = None
1689         state = STREAM_NAME
1690
1691         for token_and_separator in self._token_re.finditer(manifest_text):
1692             tok = token_and_separator.group(1)
1693             sep = token_and_separator.group(2)
1694
1695             if state == STREAM_NAME:
1696                 # starting a new stream
1697                 stream_name = self._unescape_manifest_path(tok)
1698                 blocks = []
1699                 segments = []
1700                 streamoffset = 0
1701                 state = BLOCKS
1702                 self.find_or_create(stream_name, COLLECTION)
1703                 continue
1704
1705             if state == BLOCKS:
1706                 block_locator = self._block_re.match(tok)
1707                 if block_locator:
1708                     blocksize = int(block_locator.group(1))
1709                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1710                     streamoffset += blocksize
1711                 else:
1712                     state = SEGMENTS
1713
1714             if state == SEGMENTS:
1715                 file_segment = self._segment_re.match(tok)
1716                 if file_segment:
1717                     pos = int(file_segment.group(1))
1718                     size = int(file_segment.group(2))
1719                     name = self._unescape_manifest_path(file_segment.group(3))
1720                     if name.split('/')[-1] == '.':
1721                         # placeholder for persisting an empty directory, not a real file
1722                         if len(name) > 2:
1723                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1724                     else:
1725                         filepath = os.path.join(stream_name, name)
1726                         afile = self.find_or_create(filepath, FILE)
1727                         if isinstance(afile, ArvadosFile):
1728                             afile.add_segment(blocks, pos, size)
1729                         else:
1730                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1731                 else:
1732                     # error!
1733                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1734
1735             if sep == "\n":
1736                 stream_name = None
1737                 state = STREAM_NAME
1738
1739         self.set_committed(True)
1740
1741     @synchronized
1742     def notify(self, event, collection, name, item):
1743         if self._callback:
1744             self._callback(event, collection, name, item)
1745
1746
1747 class Subcollection(RichCollectionBase):
1748     """This is a subdirectory within a collection that doesn't have its own API
1749     server record.
1750
1751     Subcollection locking falls under the umbrella lock of its root collection.
1752
1753     """
1754
1755     def __init__(self, parent, name):
1756         super(Subcollection, self).__init__(parent)
1757         self.lock = self.root_collection().lock
1758         self._manifest_text = None
1759         self.name = name
1760         self.num_retries = parent.num_retries
1761
1762     def root_collection(self):
1763         return self.parent.root_collection()
1764
1765     def writable(self):
1766         return self.root_collection().writable()
1767
1768     def _my_api(self):
1769         return self.root_collection()._my_api()
1770
1771     def _my_keep(self):
1772         return self.root_collection()._my_keep()
1773
1774     def _my_block_manager(self):
1775         return self.root_collection()._my_block_manager()
1776
1777     def stream_name(self):
1778         return os.path.join(self.parent.stream_name(), self.name)
1779
1780     @synchronized
1781     def clone(self, new_parent, new_name):
1782         c = Subcollection(new_parent, new_name)
1783         c._clonefrom(self)
1784         return c
1785
1786     @must_be_writable
1787     @synchronized
1788     def _reparent(self, newparent, newname):
1789         self.set_committed(False)
1790         self.flush()
1791         self.parent.remove(self.name, recursive=True)
1792         self.parent = newparent
1793         self.name = newname
1794         self.lock = self.parent.root_collection().lock
1795
1796
1797 class CollectionReader(Collection):
1798     """A read-only collection object.
1799
1800     Initialize from a collection UUID or portable data hash, or raw
1801     manifest text.  See `Collection` constructor for detailed options.
1802
1803     """
1804     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1805         self._in_init = True
1806         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1807         self._in_init = False
1808
1809         # Forego any locking since it should never change once initialized.
1810         self.lock = NoopLock()
1811
1812         # Backwards compatability with old CollectionReader
1813         # all_streams() and all_files()
1814         self._streams = None
1815
1816     def writable(self):
1817         return self._in_init
1818
1819     def _populate_streams(orig_func):
1820         @functools.wraps(orig_func)
1821         def populate_streams_wrapper(self, *args, **kwargs):
1822             # Defer populating self._streams until needed since it creates a copy of the manifest.
1823             if self._streams is None:
1824                 if self._manifest_text:
1825                     self._streams = [sline.split()
1826                                      for sline in self._manifest_text.split("\n")
1827                                      if sline]
1828                 else:
1829                     self._streams = []
1830             return orig_func(self, *args, **kwargs)
1831         return populate_streams_wrapper
1832
1833     @_populate_streams
1834     def normalize(self):
1835         """Normalize the streams returned by `all_streams`.
1836
1837         This method is kept for backwards compatability and only affects the
1838         behavior of `all_streams()` and `all_files()`
1839
1840         """
1841
1842         # Rearrange streams
1843         streams = {}
1844         for s in self.all_streams():
1845             for f in s.all_files():
1846                 streamname, filename = split(s.name() + "/" + f.name())
1847                 if streamname not in streams:
1848                     streams[streamname] = {}
1849                 if filename not in streams[streamname]:
1850                     streams[streamname][filename] = []
1851                 for r in f.segments:
1852                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1853
1854         self._streams = [normalize_stream(s, streams[s])
1855                          for s in sorted(streams)]
1856     @_populate_streams
1857     def all_streams(self):
1858         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1859                 for s in self._streams]
1860
1861     @_populate_streams
1862     def all_files(self):
1863         for s in self.all_streams():
1864             for f in s.all_files():
1865                 yield f