Merge branch '11341-arvput-resume-error'
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace: '%s'" %
311                 (newstreamname))
312         self._current_stream_name = '.' if newstreamname=='' else newstreamname
313
314     def current_stream_name(self):
315         return self._current_stream_name
316
317     def finish_current_stream(self):
318         self.finish_current_file()
319         self.flush_data()
320         if not self._current_stream_files:
321             pass
322         elif self._current_stream_name is None:
323             raise errors.AssertionError(
324                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
325                 (self._current_stream_length, len(self._current_stream_files)))
326         else:
327             if not self._current_stream_locators:
328                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
329             self._finished_streams.append([self._current_stream_name,
330                                            self._current_stream_locators,
331                                            self._current_stream_files])
332         self._current_stream_files = []
333         self._current_stream_length = 0
334         self._current_stream_locators = []
335         self._current_stream_name = None
336         self._current_file_pos = 0
337         self._current_file_name = None
338
339     def finish(self):
340         """Store the manifest in Keep and return its locator.
341
342         This is useful for storing manifest fragments (task outputs)
343         temporarily in Keep during a Crunch job.
344
345         In other cases you should make a collection instead, by
346         sending manifest_text() to the API server's "create
347         collection" endpoint.
348         """
349         return self._my_keep().put(self.manifest_text(), copies=self.replication)
350
351     def portable_data_hash(self):
352         stripped = self.stripped_manifest()
353         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354
355     def manifest_text(self):
356         self.finish_current_stream()
357         manifest = ''
358
359         for stream in self._finished_streams:
360             if not re.search(r'^\.(/.*)?$', stream[0]):
361                 manifest += './'
362             manifest += stream[0].replace(' ', '\\040')
363             manifest += ' ' + ' '.join(stream[1])
364             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
365             manifest += "\n"
366
367         return manifest
368
369     def data_locators(self):
370         ret = []
371         for name, locators, files in self._finished_streams:
372             ret += locators
373         return ret
374
375
376 class ResumableCollectionWriter(CollectionWriter):
377     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
378                    '_current_stream_locators', '_current_stream_name',
379                    '_current_file_name', '_current_file_pos', '_close_file',
380                    '_data_buffer', '_dependencies', '_finished_streams',
381                    '_queued_dirents', '_queued_trees']
382
383     def __init__(self, api_client=None, **kwargs):
384         self._dependencies = {}
385         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
386
387     @classmethod
388     def from_state(cls, state, *init_args, **init_kwargs):
389         # Try to build a new writer from scratch with the given state.
390         # If the state is not suitable to resume (because files have changed,
391         # been deleted, aren't predictable, etc.), raise a
392         # StaleWriterStateError.  Otherwise, return the initialized writer.
393         # The caller is responsible for calling writer.do_queued_work()
394         # appropriately after it's returned.
395         writer = cls(*init_args, **init_kwargs)
396         for attr_name in cls.STATE_PROPS:
397             attr_value = state[attr_name]
398             attr_class = getattr(writer, attr_name).__class__
399             # Coerce the value into the same type as the initial value, if
400             # needed.
401             if attr_class not in (type(None), attr_value.__class__):
402                 attr_value = attr_class(attr_value)
403             setattr(writer, attr_name, attr_value)
404         # Check dependencies before we try to resume anything.
405         if any(KeepLocator(ls).permission_expired()
406                for ls in writer._current_stream_locators):
407             raise errors.StaleWriterStateError(
408                 "locators include expired permission hint")
409         writer.check_dependencies()
410         if state['_current_file'] is not None:
411             path, pos = state['_current_file']
412             try:
413                 writer._queued_file = open(path, 'rb')
414                 writer._queued_file.seek(pos)
415             except IOError as error:
416                 raise errors.StaleWriterStateError(
417                     "failed to reopen active file {}: {}".format(path, error))
418         return writer
419
420     def check_dependencies(self):
421         for path, orig_stat in self._dependencies.items():
422             if not S_ISREG(orig_stat[ST_MODE]):
423                 raise errors.StaleWriterStateError("{} not file".format(path))
424             try:
425                 now_stat = tuple(os.stat(path))
426             except OSError as error:
427                 raise errors.StaleWriterStateError(
428                     "failed to stat {}: {}".format(path, error))
429             if ((not S_ISREG(now_stat[ST_MODE])) or
430                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
431                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
432                 raise errors.StaleWriterStateError("{} changed".format(path))
433
434     def dump_state(self, copy_func=lambda x: x):
435         state = {attr: copy_func(getattr(self, attr))
436                  for attr in self.STATE_PROPS}
437         if self._queued_file is None:
438             state['_current_file'] = None
439         else:
440             state['_current_file'] = (os.path.realpath(self._queued_file.name),
441                                       self._queued_file.tell())
442         return state
443
444     def _queue_file(self, source, filename=None):
445         try:
446             src_path = os.path.realpath(source)
447         except Exception:
448             raise errors.AssertionError("{} not a file path".format(source))
449         try:
450             path_stat = os.stat(src_path)
451         except OSError as stat_error:
452             path_stat = None
453         super(ResumableCollectionWriter, self)._queue_file(source, filename)
454         fd_stat = os.fstat(self._queued_file.fileno())
455         if not S_ISREG(fd_stat.st_mode):
456             # We won't be able to resume from this cache anyway, so don't
457             # worry about further checks.
458             self._dependencies[source] = tuple(fd_stat)
459         elif path_stat is None:
460             raise errors.AssertionError(
461                 "could not stat {}: {}".format(source, stat_error))
462         elif path_stat.st_ino != fd_stat.st_ino:
463             raise errors.AssertionError(
464                 "{} changed between open and stat calls".format(source))
465         else:
466             self._dependencies[src_path] = tuple(fd_stat)
467
468     def write(self, data):
469         if self._queued_file is None:
470             raise errors.AssertionError(
471                 "resumable writer can't accept unsourced data")
472         return super(ResumableCollectionWriter, self).write(data)
473
474
475 ADD = "add"
476 DEL = "del"
477 MOD = "mod"
478 TOK = "tok"
479 FILE = "file"
480 COLLECTION = "collection"
481
482 class RichCollectionBase(CollectionBase):
483     """Base class for Collections and Subcollections.
484
485     Implements the majority of functionality relating to accessing items in the
486     Collection.
487
488     """
489
490     def __init__(self, parent=None):
491         self.parent = parent
492         self._committed = False
493         self._callback = None
494         self._items = {}
495
496     def _my_api(self):
497         raise NotImplementedError()
498
499     def _my_keep(self):
500         raise NotImplementedError()
501
502     def _my_block_manager(self):
503         raise NotImplementedError()
504
505     def writable(self):
506         raise NotImplementedError()
507
508     def root_collection(self):
509         raise NotImplementedError()
510
511     def notify(self, event, collection, name, item):
512         raise NotImplementedError()
513
514     def stream_name(self):
515         raise NotImplementedError()
516
517     @must_be_writable
518     @synchronized
519     def find_or_create(self, path, create_type):
520         """Recursively search the specified file path.
521
522         May return either a `Collection` or `ArvadosFile`.  If not found, will
523         create a new item at the specified path based on `create_type`.  Will
524         create intermediate subcollections needed to contain the final item in
525         the path.
526
527         :create_type:
528           One of `arvados.collection.FILE` or
529           `arvados.collection.COLLECTION`.  If the path is not found, and value
530           of create_type is FILE then create and return a new ArvadosFile for
531           the last path component.  If COLLECTION, then create and return a new
532           Collection for the last path component.
533
534         """
535
536         pathcomponents = path.split("/", 1)
537         if pathcomponents[0]:
538             item = self._items.get(pathcomponents[0])
539             if len(pathcomponents) == 1:
540                 if item is None:
541                     # create new file
542                     if create_type == COLLECTION:
543                         item = Subcollection(self, pathcomponents[0])
544                     else:
545                         item = ArvadosFile(self, pathcomponents[0])
546                     self._items[pathcomponents[0]] = item
547                     self.set_committed(False)
548                     self.notify(ADD, self, pathcomponents[0], item)
549                 return item
550             else:
551                 if item is None:
552                     # create new collection
553                     item = Subcollection(self, pathcomponents[0])
554                     self._items[pathcomponents[0]] = item
555                     self.set_committed(False)
556                     self.notify(ADD, self, pathcomponents[0], item)
557                 if isinstance(item, RichCollectionBase):
558                     return item.find_or_create(pathcomponents[1], create_type)
559                 else:
560                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
561         else:
562             return self
563
564     @synchronized
565     def find(self, path):
566         """Recursively search the specified file path.
567
568         May return either a Collection or ArvadosFile. Return None if not
569         found.
570         If path is invalid (ex: starts with '/'), an IOError exception will be
571         raised.
572
573         """
574         if not path:
575             raise errors.ArgumentError("Parameter 'path' is empty.")
576
577         pathcomponents = path.split("/", 1)
578         if pathcomponents[0] == '':
579             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
580
581         item = self._items.get(pathcomponents[0])
582         if item is None:
583             return None
584         elif len(pathcomponents) == 1:
585             return item
586         else:
587             if isinstance(item, RichCollectionBase):
588                 if pathcomponents[1]:
589                     return item.find(pathcomponents[1])
590                 else:
591                     return item
592             else:
593                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
594
595     @synchronized
596     def mkdirs(self, path):
597         """Recursive subcollection create.
598
599         Like `os.makedirs()`.  Will create intermediate subcollections needed
600         to contain the leaf subcollection path.
601
602         """
603
604         if self.find(path) != None:
605             raise IOError(errno.EEXIST, "Directory or file exists", path)
606
607         return self.find_or_create(path, COLLECTION)
608
609     def open(self, path, mode="r"):
610         """Open a file-like object for access.
611
612         :path:
613           path to a file in the collection
614         :mode:
615           one of "r", "r+", "w", "w+", "a", "a+"
616           :"r":
617             opens for reading
618           :"r+":
619             opens for reading and writing.  Reads/writes share a file pointer.
620           :"w", "w+":
621             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
622           :"a", "a+":
623             opens for reading and writing.  All writes are appended to
624             the end of the file.  Writing does not affect the file pointer for
625             reading.
626         """
627         mode = mode.replace("b", "")
628         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
629             raise errors.ArgumentError("Bad mode '%s'" % mode)
630         create = (mode != "r")
631
632         if create and not self.writable():
633             raise IOError(errno.EROFS, "Collection is read only")
634
635         if create:
636             arvfile = self.find_or_create(path, FILE)
637         else:
638             arvfile = self.find(path)
639
640         if arvfile is None:
641             raise IOError(errno.ENOENT, "File not found", path)
642         if not isinstance(arvfile, ArvadosFile):
643             raise IOError(errno.EISDIR, "Is a directory", path)
644
645         if mode[0] == "w":
646             arvfile.truncate(0)
647
648         name = os.path.basename(path)
649
650         if mode == "r":
651             return ArvadosFileReader(arvfile, num_retries=self.num_retries)
652         else:
653             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
654
655     def modified(self):
656         """Determine if the collection has been modified since last commited."""
657         return not self.committed()
658
659     @synchronized
660     def committed(self):
661         """Determine if the collection has been committed to the API server."""
662         return self._committed
663
664     @synchronized
665     def set_committed(self, value=True):
666         """Recursively set committed flag.
667
668         If value is True, set committed to be True for this and all children.
669
670         If value is False, set committed to be False for this and all parents.
671         """
672         if value == self._committed:
673             return
674         if value:
675             for k,v in self._items.items():
676                 v.set_committed(True)
677             self._committed = True
678         else:
679             self._committed = False
680             if self.parent is not None:
681                 self.parent.set_committed(False)
682
683     @synchronized
684     def __iter__(self):
685         """Iterate over names of files and collections contained in this collection."""
686         return iter(self._items.keys())
687
688     @synchronized
689     def __getitem__(self, k):
690         """Get a file or collection that is directly contained by this collection.
691
692         If you want to search a path, use `find()` instead.
693
694         """
695         return self._items[k]
696
697     @synchronized
698     def __contains__(self, k):
699         """Test if there is a file or collection a directly contained by this collection."""
700         return k in self._items
701
702     @synchronized
703     def __len__(self):
704         """Get the number of items directly contained in this collection."""
705         return len(self._items)
706
707     @must_be_writable
708     @synchronized
709     def __delitem__(self, p):
710         """Delete an item by name which is directly contained by this collection."""
711         del self._items[p]
712         self.set_committed(False)
713         self.notify(DEL, self, p, None)
714
715     @synchronized
716     def keys(self):
717         """Get a list of names of files and collections directly contained in this collection."""
718         return self._items.keys()
719
720     @synchronized
721     def values(self):
722         """Get a list of files and collection objects directly contained in this collection."""
723         return self._items.values()
724
725     @synchronized
726     def items(self):
727         """Get a list of (name, object) tuples directly contained in this collection."""
728         return self._items.items()
729
730     def exists(self, path):
731         """Test if there is a file or collection at `path`."""
732         return self.find(path) is not None
733
734     @must_be_writable
735     @synchronized
736     def remove(self, path, recursive=False):
737         """Remove the file or subcollection (directory) at `path`.
738
739         :recursive:
740           Specify whether to remove non-empty subcollections (True), or raise an error (False).
741         """
742
743         if not path:
744             raise errors.ArgumentError("Parameter 'path' is empty.")
745
746         pathcomponents = path.split("/", 1)
747         item = self._items.get(pathcomponents[0])
748         if item is None:
749             raise IOError(errno.ENOENT, "File not found", path)
750         if len(pathcomponents) == 1:
751             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
752                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
753             deleteditem = self._items[pathcomponents[0]]
754             del self._items[pathcomponents[0]]
755             self.set_committed(False)
756             self.notify(DEL, self, pathcomponents[0], deleteditem)
757         else:
758             item.remove(pathcomponents[1])
759
760     def _clonefrom(self, source):
761         for k,v in source.items():
762             self._items[k] = v.clone(self, k)
763
764     def clone(self):
765         raise NotImplementedError()
766
767     @must_be_writable
768     @synchronized
769     def add(self, source_obj, target_name, overwrite=False, reparent=False):
770         """Copy or move a file or subcollection to this collection.
771
772         :source_obj:
773           An ArvadosFile, or Subcollection object
774
775         :target_name:
776           Destination item name.  If the target name already exists and is a
777           file, this will raise an error unless you specify `overwrite=True`.
778
779         :overwrite:
780           Whether to overwrite target file if it already exists.
781
782         :reparent:
783           If True, source_obj will be moved from its parent collection to this collection.
784           If False, source_obj will be copied and the parent collection will be
785           unmodified.
786
787         """
788
789         if target_name in self and not overwrite:
790             raise IOError(errno.EEXIST, "File already exists", target_name)
791
792         modified_from = None
793         if target_name in self:
794             modified_from = self[target_name]
795
796         # Actually make the move or copy.
797         if reparent:
798             source_obj._reparent(self, target_name)
799             item = source_obj
800         else:
801             item = source_obj.clone(self, target_name)
802
803         self._items[target_name] = item
804         self.set_committed(False)
805
806         if modified_from:
807             self.notify(MOD, self, target_name, (modified_from, item))
808         else:
809             self.notify(ADD, self, target_name, item)
810
811     def _get_src_target(self, source, target_path, source_collection, create_dest):
812         if source_collection is None:
813             source_collection = self
814
815         # Find the object
816         if isinstance(source, basestring):
817             source_obj = source_collection.find(source)
818             if source_obj is None:
819                 raise IOError(errno.ENOENT, "File not found", source)
820             sourcecomponents = source.split("/")
821         else:
822             source_obj = source
823             sourcecomponents = None
824
825         # Find parent collection the target path
826         targetcomponents = target_path.split("/")
827
828         # Determine the name to use.
829         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
830
831         if not target_name:
832             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
833
834         if create_dest:
835             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
836         else:
837             if len(targetcomponents) > 1:
838                 target_dir = self.find("/".join(targetcomponents[0:-1]))
839             else:
840                 target_dir = self
841
842         if target_dir is None:
843             raise IOError(errno.ENOENT, "Target directory not found", target_name)
844
845         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
846             target_dir = target_dir[target_name]
847             target_name = sourcecomponents[-1]
848
849         return (source_obj, target_dir, target_name)
850
851     @must_be_writable
852     @synchronized
853     def copy(self, source, target_path, source_collection=None, overwrite=False):
854         """Copy a file or subcollection to a new path in this collection.
855
856         :source:
857           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
858
859         :target_path:
860           Destination file or path.  If the target path already exists and is a
861           subcollection, the item will be placed inside the subcollection.  If
862           the target path already exists and is a file, this will raise an error
863           unless you specify `overwrite=True`.
864
865         :source_collection:
866           Collection to copy `source_path` from (default `self`)
867
868         :overwrite:
869           Whether to overwrite target file if it already exists.
870         """
871
872         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
873         target_dir.add(source_obj, target_name, overwrite, False)
874
875     @must_be_writable
876     @synchronized
877     def rename(self, source, target_path, source_collection=None, overwrite=False):
878         """Move a file or subcollection from `source_collection` to a new path in this collection.
879
880         :source:
881           A string with a path to source file or subcollection.
882
883         :target_path:
884           Destination file or path.  If the target path already exists and is a
885           subcollection, the item will be placed inside the subcollection.  If
886           the target path already exists and is a file, this will raise an error
887           unless you specify `overwrite=True`.
888
889         :source_collection:
890           Collection to copy `source_path` from (default `self`)
891
892         :overwrite:
893           Whether to overwrite target file if it already exists.
894         """
895
896         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
897         if not source_obj.writable():
898             raise IOError(errno.EROFS, "Source collection is read only", source)
899         target_dir.add(source_obj, target_name, overwrite, True)
900
901     def portable_manifest_text(self, stream_name="."):
902         """Get the manifest text for this collection, sub collections and files.
903
904         This method does not flush outstanding blocks to Keep.  It will return
905         a normalized manifest with access tokens stripped.
906
907         :stream_name:
908           Name to use for this stream (directory)
909
910         """
911         return self._get_manifest_text(stream_name, True, True)
912
913     @synchronized
914     def manifest_text(self, stream_name=".", strip=False, normalize=False,
915                       only_committed=False):
916         """Get the manifest text for this collection, sub collections and files.
917
918         This method will flush outstanding blocks to Keep.  By default, it will
919         not normalize an unmodified manifest or strip access tokens.
920
921         :stream_name:
922           Name to use for this stream (directory)
923
924         :strip:
925           If True, remove signing tokens from block locators if present.
926           If False (default), block locators are left unchanged.
927
928         :normalize:
929           If True, always export the manifest text in normalized form
930           even if the Collection is not modified.  If False (default) and the collection
931           is not modified, return the original manifest text even if it is not
932           in normalized form.
933
934         :only_committed:
935           If True, don't commit pending blocks.
936
937         """
938
939         if not only_committed:
940             self._my_block_manager().commit_all()
941         return self._get_manifest_text(stream_name, strip, normalize,
942                                        only_committed=only_committed)
943
944     @synchronized
945     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
946         """Get the manifest text for this collection, sub collections and files.
947
948         :stream_name:
949           Name to use for this stream (directory)
950
951         :strip:
952           If True, remove signing tokens from block locators if present.
953           If False (default), block locators are left unchanged.
954
955         :normalize:
956           If True, always export the manifest text in normalized form
957           even if the Collection is not modified.  If False (default) and the collection
958           is not modified, return the original manifest text even if it is not
959           in normalized form.
960
961         :only_committed:
962           If True, only include blocks that were already committed to Keep.
963
964         """
965
966         if not self.committed() or self._manifest_text is None or normalize:
967             stream = {}
968             buf = []
969             sorted_keys = sorted(self.keys())
970             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
971                 # Create a stream per file `k`
972                 arvfile = self[filename]
973                 filestream = []
974                 for segment in arvfile.segments():
975                     loc = segment.locator
976                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
977                         if only_committed:
978                             continue
979                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
980                     if strip:
981                         loc = KeepLocator(loc).stripped()
982                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
983                                          segment.segment_offset, segment.range_size))
984                 stream[filename] = filestream
985             if stream:
986                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
987             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
988                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
989             return "".join(buf)
990         else:
991             if strip:
992                 return self.stripped_manifest()
993             else:
994                 return self._manifest_text
995
996     @synchronized
997     def diff(self, end_collection, prefix=".", holding_collection=None):
998         """Generate list of add/modify/delete actions.
999
1000         When given to `apply`, will change `self` to match `end_collection`
1001
1002         """
1003         changes = []
1004         if holding_collection is None:
1005             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1006         for k in self:
1007             if k not in end_collection:
1008                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1009         for k in end_collection:
1010             if k in self:
1011                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1012                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1013                 elif end_collection[k] != self[k]:
1014                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1015                 else:
1016                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1017             else:
1018                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1019         return changes
1020
1021     @must_be_writable
1022     @synchronized
1023     def apply(self, changes):
1024         """Apply changes from `diff`.
1025
1026         If a change conflicts with a local change, it will be saved to an
1027         alternate path indicating the conflict.
1028
1029         """
1030         if changes:
1031             self.set_committed(False)
1032         for change in changes:
1033             event_type = change[0]
1034             path = change[1]
1035             initial = change[2]
1036             local = self.find(path)
1037             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1038                                                                     time.gmtime()))
1039             if event_type == ADD:
1040                 if local is None:
1041                     # No local file at path, safe to copy over new file
1042                     self.copy(initial, path)
1043                 elif local is not None and local != initial:
1044                     # There is already local file and it is different:
1045                     # save change to conflict file.
1046                     self.copy(initial, conflictpath)
1047             elif event_type == MOD or event_type == TOK:
1048                 final = change[3]
1049                 if local == initial:
1050                     # Local matches the "initial" item so it has not
1051                     # changed locally and is safe to update.
1052                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1053                         # Replace contents of local file with new contents
1054                         local.replace_contents(final)
1055                     else:
1056                         # Overwrite path with new item; this can happen if
1057                         # path was a file and is now a collection or vice versa
1058                         self.copy(final, path, overwrite=True)
1059                 else:
1060                     # Local is missing (presumably deleted) or local doesn't
1061                     # match the "start" value, so save change to conflict file
1062                     self.copy(final, conflictpath)
1063             elif event_type == DEL:
1064                 if local == initial:
1065                     # Local item matches "initial" value, so it is safe to remove.
1066                     self.remove(path, recursive=True)
1067                 # else, the file is modified or already removed, in either
1068                 # case we don't want to try to remove it.
1069
1070     def portable_data_hash(self):
1071         """Get the portable data hash for this collection's manifest."""
1072         if self._manifest_locator and self.committed():
1073             # If the collection is already saved on the API server, and it's committed
1074             # then return API server's PDH response.
1075             return self._portable_data_hash
1076         else:
1077             stripped = self.portable_manifest_text()
1078             return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1079
1080     @synchronized
1081     def subscribe(self, callback):
1082         if self._callback is None:
1083             self._callback = callback
1084         else:
1085             raise errors.ArgumentError("A callback is already set on this collection.")
1086
1087     @synchronized
1088     def unsubscribe(self):
1089         if self._callback is not None:
1090             self._callback = None
1091
1092     @synchronized
1093     def notify(self, event, collection, name, item):
1094         if self._callback:
1095             self._callback(event, collection, name, item)
1096         self.root_collection().notify(event, collection, name, item)
1097
1098     @synchronized
1099     def __eq__(self, other):
1100         if other is self:
1101             return True
1102         if not isinstance(other, RichCollectionBase):
1103             return False
1104         if len(self._items) != len(other):
1105             return False
1106         for k in self._items:
1107             if k not in other:
1108                 return False
1109             if self._items[k] != other[k]:
1110                 return False
1111         return True
1112
1113     def __ne__(self, other):
1114         return not self.__eq__(other)
1115
1116     @synchronized
1117     def flush(self):
1118         """Flush bufferblocks to Keep."""
1119         for e in self.values():
1120             e.flush()
1121
1122
1123 class Collection(RichCollectionBase):
1124     """Represents the root of an Arvados Collection.
1125
1126     This class is threadsafe.  The root collection object, all subcollections
1127     and files are protected by a single lock (i.e. each access locks the entire
1128     collection).
1129
1130     Brief summary of
1131     useful methods:
1132
1133     :To read an existing file:
1134       `c.open("myfile", "r")`
1135
1136     :To write a new file:
1137       `c.open("myfile", "w")`
1138
1139     :To determine if a file exists:
1140       `c.find("myfile") is not None`
1141
1142     :To copy a file:
1143       `c.copy("source", "dest")`
1144
1145     :To delete a file:
1146       `c.remove("myfile")`
1147
1148     :To save to an existing collection record:
1149       `c.save()`
1150
1151     :To save a new collection record:
1152     `c.save_new()`
1153
1154     :To merge remote changes into this object:
1155       `c.update()`
1156
1157     Must be associated with an API server Collection record (during
1158     initialization, or using `save_new`) to use `save` or `update`
1159
1160     """
1161
1162     def __init__(self, manifest_locator_or_text=None,
1163                  api_client=None,
1164                  keep_client=None,
1165                  num_retries=None,
1166                  parent=None,
1167                  apiconfig=None,
1168                  block_manager=None,
1169                  replication_desired=None,
1170                  put_threads=None):
1171         """Collection constructor.
1172
1173         :manifest_locator_or_text:
1174           One of Arvados collection UUID, block locator of
1175           a manifest, raw manifest text, or None (to create an empty collection).
1176         :parent:
1177           the parent Collection, may be None.
1178
1179         :apiconfig:
1180           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1181           Prefer this over supplying your own api_client and keep_client (except in testing).
1182           Will use default config settings if not specified.
1183
1184         :api_client:
1185           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1186
1187         :keep_client:
1188           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1189
1190         :num_retries:
1191           the number of retries for API and Keep requests.
1192
1193         :block_manager:
1194           the block manager to use.  If not specified, create one.
1195
1196         :replication_desired:
1197           How many copies should Arvados maintain. If None, API server default
1198           configuration applies. If not None, this value will also be used
1199           for determining the number of block copies being written.
1200
1201         """
1202         super(Collection, self).__init__(parent)
1203         self._api_client = api_client
1204         self._keep_client = keep_client
1205         self._block_manager = block_manager
1206         self.replication_desired = replication_desired
1207         self.put_threads = put_threads
1208
1209         if apiconfig:
1210             self._config = apiconfig
1211         else:
1212             self._config = config.settings()
1213
1214         self.num_retries = num_retries if num_retries is not None else 0
1215         self._manifest_locator = None
1216         self._manifest_text = None
1217         self._portable_data_hash = None
1218         self._api_response = None
1219         self._past_versions = set()
1220
1221         self.lock = threading.RLock()
1222         self.events = None
1223
1224         if manifest_locator_or_text:
1225             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1226                 self._manifest_locator = manifest_locator_or_text
1227             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1228                 self._manifest_locator = manifest_locator_or_text
1229             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1230                 self._manifest_text = manifest_locator_or_text
1231             else:
1232                 raise errors.ArgumentError(
1233                     "Argument to CollectionReader is not a manifest or a collection UUID")
1234
1235             try:
1236                 self._populate()
1237             except (IOError, errors.SyntaxError) as e:
1238                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1239
1240     def root_collection(self):
1241         return self
1242
1243     def stream_name(self):
1244         return "."
1245
1246     def writable(self):
1247         return True
1248
1249     @synchronized
1250     def known_past_version(self, modified_at_and_portable_data_hash):
1251         return modified_at_and_portable_data_hash in self._past_versions
1252
1253     @synchronized
1254     @retry_method
1255     def update(self, other=None, num_retries=None):
1256         """Merge the latest collection on the API server with the current collection."""
1257
1258         if other is None:
1259             if self._manifest_locator is None:
1260                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1261             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1262             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1263                 response.get("portable_data_hash") != self.portable_data_hash()):
1264                 # The record on the server is different from our current one, but we've seen it before,
1265                 # so ignore it because it's already been merged.
1266                 # However, if it's the same as our current record, proceed with the update, because we want to update
1267                 # our tokens.
1268                 return
1269             else:
1270                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1271             other = CollectionReader(response["manifest_text"])
1272         baseline = CollectionReader(self._manifest_text)
1273         self.apply(baseline.diff(other))
1274         self._manifest_text = self.manifest_text()
1275
1276     @synchronized
1277     def _my_api(self):
1278         if self._api_client is None:
1279             self._api_client = ThreadSafeApiCache(self._config)
1280             if self._keep_client is None:
1281                 self._keep_client = self._api_client.keep
1282         return self._api_client
1283
1284     @synchronized
1285     def _my_keep(self):
1286         if self._keep_client is None:
1287             if self._api_client is None:
1288                 self._my_api()
1289             else:
1290                 self._keep_client = KeepClient(api_client=self._api_client)
1291         return self._keep_client
1292
1293     @synchronized
1294     def _my_block_manager(self):
1295         if self._block_manager is None:
1296             copies = (self.replication_desired or
1297                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1298                                                    2))
1299             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1300         return self._block_manager
1301
1302     def _remember_api_response(self, response):
1303         self._api_response = response
1304         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1305
1306     def _populate_from_api_server(self):
1307         # As in KeepClient itself, we must wait until the last
1308         # possible moment to instantiate an API client, in order to
1309         # avoid tripping up clients that don't have access to an API
1310         # server.  If we do build one, make sure our Keep client uses
1311         # it.  If instantiation fails, we'll fall back to the except
1312         # clause, just like any other Collection lookup
1313         # failure. Return an exception, or None if successful.
1314         try:
1315             self._remember_api_response(self._my_api().collections().get(
1316                 uuid=self._manifest_locator).execute(
1317                     num_retries=self.num_retries))
1318             self._manifest_text = self._api_response['manifest_text']
1319             self._portable_data_hash = self._api_response['portable_data_hash']
1320             # If not overriden via kwargs, we should try to load the
1321             # replication_desired from the API server
1322             if self.replication_desired is None:
1323                 self.replication_desired = self._api_response.get('replication_desired', None)
1324             return None
1325         except Exception as e:
1326             return e
1327
1328     def _populate_from_keep(self):
1329         # Retrieve a manifest directly from Keep. This has a chance of
1330         # working if [a] the locator includes a permission signature
1331         # or [b] the Keep services are operating in world-readable
1332         # mode. Return an exception, or None if successful.
1333         try:
1334             self._manifest_text = self._my_keep().get(
1335                 self._manifest_locator, num_retries=self.num_retries)
1336         except Exception as e:
1337             return e
1338
1339     def _populate(self):
1340         if self._manifest_locator is None and self._manifest_text is None:
1341             return
1342         error_via_api = None
1343         error_via_keep = None
1344         should_try_keep = ((self._manifest_text is None) and
1345                            util.keep_locator_pattern.match(
1346                                self._manifest_locator))
1347         if ((self._manifest_text is None) and
1348             util.signed_locator_pattern.match(self._manifest_locator)):
1349             error_via_keep = self._populate_from_keep()
1350         if self._manifest_text is None:
1351             error_via_api = self._populate_from_api_server()
1352             if error_via_api is not None and not should_try_keep:
1353                 raise error_via_api
1354         if ((self._manifest_text is None) and
1355             not error_via_keep and
1356             should_try_keep):
1357             # Looks like a keep locator, and we didn't already try keep above
1358             error_via_keep = self._populate_from_keep()
1359         if self._manifest_text is None:
1360             # Nothing worked!
1361             raise errors.NotFoundError(
1362                 ("Failed to retrieve collection '{}' " +
1363                  "from either API server ({}) or Keep ({})."
1364                  ).format(
1365                     self._manifest_locator,
1366                     error_via_api,
1367                     error_via_keep))
1368         # populate
1369         self._baseline_manifest = self._manifest_text
1370         self._import_manifest(self._manifest_text)
1371
1372
1373     def _has_collection_uuid(self):
1374         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1375
1376     def __enter__(self):
1377         return self
1378
1379     def __exit__(self, exc_type, exc_value, traceback):
1380         """Support scoped auto-commit in a with: block."""
1381         if exc_type is None:
1382             if self.writable() and self._has_collection_uuid():
1383                 self.save()
1384         self.stop_threads()
1385
1386     def stop_threads(self):
1387         if self._block_manager is not None:
1388             self._block_manager.stop_threads()
1389
1390     @synchronized
1391     def manifest_locator(self):
1392         """Get the manifest locator, if any.
1393
1394         The manifest locator will be set when the collection is loaded from an
1395         API server record or the portable data hash of a manifest.
1396
1397         The manifest locator will be None if the collection is newly created or
1398         was created directly from manifest text.  The method `save_new()` will
1399         assign a manifest locator.
1400
1401         """
1402         return self._manifest_locator
1403
1404     @synchronized
1405     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1406         if new_config is None:
1407             new_config = self._config
1408         if readonly:
1409             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1410         else:
1411             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1412
1413         newcollection._clonefrom(self)
1414         return newcollection
1415
1416     @synchronized
1417     def api_response(self):
1418         """Returns information about this Collection fetched from the API server.
1419
1420         If the Collection exists in Keep but not the API server, currently
1421         returns None.  Future versions may provide a synthetic response.
1422
1423         """
1424         return self._api_response
1425
1426     def find_or_create(self, path, create_type):
1427         """See `RichCollectionBase.find_or_create`"""
1428         if path == ".":
1429             return self
1430         else:
1431             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1432
1433     def find(self, path):
1434         """See `RichCollectionBase.find`"""
1435         if path == ".":
1436             return self
1437         else:
1438             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1439
1440     def remove(self, path, recursive=False):
1441         """See `RichCollectionBase.remove`"""
1442         if path == ".":
1443             raise errors.ArgumentError("Cannot remove '.'")
1444         else:
1445             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1446
1447     @must_be_writable
1448     @synchronized
1449     @retry_method
1450     def save(self, merge=True, num_retries=None):
1451         """Save collection to an existing collection record.
1452
1453         Commit pending buffer blocks to Keep, merge with remote record (if
1454         merge=True, the default), and update the collection record.  Returns
1455         the current manifest text.
1456
1457         Will raise AssertionError if not associated with a collection record on
1458         the API server.  If you want to save a manifest to Keep only, see
1459         `save_new()`.
1460
1461         :merge:
1462           Update and merge remote changes before saving.  Otherwise, any
1463           remote changes will be ignored and overwritten.
1464
1465         :num_retries:
1466           Retry count on API calls (if None,  use the collection default)
1467
1468         """
1469         if not self.committed():
1470             if not self._has_collection_uuid():
1471                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1472
1473             self._my_block_manager().commit_all()
1474
1475             if merge:
1476                 self.update()
1477
1478             text = self.manifest_text(strip=False)
1479             self._remember_api_response(self._my_api().collections().update(
1480                 uuid=self._manifest_locator,
1481                 body={'manifest_text': text}
1482                 ).execute(
1483                     num_retries=num_retries))
1484             self._manifest_text = self._api_response["manifest_text"]
1485             self._portable_data_hash = self._api_response["portable_data_hash"]
1486             self.set_committed(True)
1487
1488         return self._manifest_text
1489
1490
1491     @must_be_writable
1492     @synchronized
1493     @retry_method
1494     def save_new(self, name=None,
1495                  create_collection_record=True,
1496                  owner_uuid=None,
1497                  ensure_unique_name=False,
1498                  num_retries=None):
1499         """Save collection to a new collection record.
1500
1501         Commit pending buffer blocks to Keep and, when create_collection_record
1502         is True (default), create a new collection record.  After creating a
1503         new collection record, this Collection object will be associated with
1504         the new record used by `save()`.  Returns the current manifest text.
1505
1506         :name:
1507           The collection name.
1508
1509         :create_collection_record:
1510            If True, create a collection record on the API server.
1511            If False, only commit blocks to Keep and return the manifest text.
1512
1513         :owner_uuid:
1514           the user, or project uuid that will own this collection.
1515           If None, defaults to the current user.
1516
1517         :ensure_unique_name:
1518           If True, ask the API server to rename the collection
1519           if it conflicts with a collection with the same name and owner.  If
1520           False, a name conflict will result in an error.
1521
1522         :num_retries:
1523           Retry count on API calls (if None,  use the collection default)
1524
1525         """
1526         self._my_block_manager().commit_all()
1527         text = self.manifest_text(strip=False)
1528
1529         if create_collection_record:
1530             if name is None:
1531                 name = "New collection"
1532                 ensure_unique_name = True
1533
1534             body = {"manifest_text": text,
1535                     "name": name,
1536                     "replication_desired": self.replication_desired}
1537             if owner_uuid:
1538                 body["owner_uuid"] = owner_uuid
1539
1540             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1541             text = self._api_response["manifest_text"]
1542
1543             self._manifest_locator = self._api_response["uuid"]
1544             self._portable_data_hash = self._api_response["portable_data_hash"]
1545
1546             self._manifest_text = text
1547             self.set_committed(True)
1548
1549         return text
1550
1551     @synchronized
1552     def _import_manifest(self, manifest_text):
1553         """Import a manifest into a `Collection`.
1554
1555         :manifest_text:
1556           The manifest text to import from.
1557
1558         """
1559         if len(self) > 0:
1560             raise ArgumentError("Can only import manifest into an empty collection")
1561
1562         STREAM_NAME = 0
1563         BLOCKS = 1
1564         SEGMENTS = 2
1565
1566         stream_name = None
1567         state = STREAM_NAME
1568
1569         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1570             tok = token_and_separator.group(1)
1571             sep = token_and_separator.group(2)
1572
1573             if state == STREAM_NAME:
1574                 # starting a new stream
1575                 stream_name = tok.replace('\\040', ' ')
1576                 blocks = []
1577                 segments = []
1578                 streamoffset = 0L
1579                 state = BLOCKS
1580                 self.find_or_create(stream_name, COLLECTION)
1581                 continue
1582
1583             if state == BLOCKS:
1584                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1585                 if block_locator:
1586                     blocksize = long(block_locator.group(1))
1587                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1588                     streamoffset += blocksize
1589                 else:
1590                     state = SEGMENTS
1591
1592             if state == SEGMENTS:
1593                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1594                 if file_segment:
1595                     pos = long(file_segment.group(1))
1596                     size = long(file_segment.group(2))
1597                     name = file_segment.group(3).replace('\\040', ' ')
1598                     filepath = os.path.join(stream_name, name)
1599                     afile = self.find_or_create(filepath, FILE)
1600                     if isinstance(afile, ArvadosFile):
1601                         afile.add_segment(blocks, pos, size)
1602                     else:
1603                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1604                 else:
1605                     # error!
1606                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1607
1608             if sep == "\n":
1609                 stream_name = None
1610                 state = STREAM_NAME
1611
1612         self.set_committed(True)
1613
1614     @synchronized
1615     def notify(self, event, collection, name, item):
1616         if self._callback:
1617             self._callback(event, collection, name, item)
1618
1619
1620 class Subcollection(RichCollectionBase):
1621     """This is a subdirectory within a collection that doesn't have its own API
1622     server record.
1623
1624     Subcollection locking falls under the umbrella lock of its root collection.
1625
1626     """
1627
1628     def __init__(self, parent, name):
1629         super(Subcollection, self).__init__(parent)
1630         self.lock = self.root_collection().lock
1631         self._manifest_text = None
1632         self.name = name
1633         self.num_retries = parent.num_retries
1634
1635     def root_collection(self):
1636         return self.parent.root_collection()
1637
1638     def writable(self):
1639         return self.root_collection().writable()
1640
1641     def _my_api(self):
1642         return self.root_collection()._my_api()
1643
1644     def _my_keep(self):
1645         return self.root_collection()._my_keep()
1646
1647     def _my_block_manager(self):
1648         return self.root_collection()._my_block_manager()
1649
1650     def stream_name(self):
1651         return os.path.join(self.parent.stream_name(), self.name)
1652
1653     @synchronized
1654     def clone(self, new_parent, new_name):
1655         c = Subcollection(new_parent, new_name)
1656         c._clonefrom(self)
1657         return c
1658
1659     @must_be_writable
1660     @synchronized
1661     def _reparent(self, newparent, newname):
1662         self.set_committed(False)
1663         self.flush()
1664         self.parent.remove(self.name, recursive=True)
1665         self.parent = newparent
1666         self.name = newname
1667         self.lock = self.parent.root_collection().lock
1668
1669
1670 class CollectionReader(Collection):
1671     """A read-only collection object.
1672
1673     Initialize from an api collection record locator, a portable data hash of a
1674     manifest, or raw manifest text.  See `Collection` constructor for detailed
1675     options.
1676
1677     """
1678     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1679         self._in_init = True
1680         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1681         self._in_init = False
1682
1683         # Forego any locking since it should never change once initialized.
1684         self.lock = NoopLock()
1685
1686         # Backwards compatability with old CollectionReader
1687         # all_streams() and all_files()
1688         self._streams = None
1689
1690     def writable(self):
1691         return self._in_init
1692
1693     def _populate_streams(orig_func):
1694         @functools.wraps(orig_func)
1695         def populate_streams_wrapper(self, *args, **kwargs):
1696             # Defer populating self._streams until needed since it creates a copy of the manifest.
1697             if self._streams is None:
1698                 if self._manifest_text:
1699                     self._streams = [sline.split()
1700                                      for sline in self._manifest_text.split("\n")
1701                                      if sline]
1702                 else:
1703                     self._streams = []
1704             return orig_func(self, *args, **kwargs)
1705         return populate_streams_wrapper
1706
1707     @_populate_streams
1708     def normalize(self):
1709         """Normalize the streams returned by `all_streams`.
1710
1711         This method is kept for backwards compatability and only affects the
1712         behavior of `all_streams()` and `all_files()`
1713
1714         """
1715
1716         # Rearrange streams
1717         streams = {}
1718         for s in self.all_streams():
1719             for f in s.all_files():
1720                 streamname, filename = split(s.name() + "/" + f.name())
1721                 if streamname not in streams:
1722                     streams[streamname] = {}
1723                 if filename not in streams[streamname]:
1724                     streams[streamname][filename] = []
1725                 for r in f.segments:
1726                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1727
1728         self._streams = [normalize_stream(s, streams[s])
1729                          for s in sorted(streams)]
1730     @_populate_streams
1731     def all_streams(self):
1732         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1733                 for s in self._streams]
1734
1735     @_populate_streams
1736     def all_files(self):
1737         for s in self.all_streams():
1738             for f in s.all_files():
1739                 yield f