Merge branch 'wtsi/13093-crunch-dispatch-slurm-add-mem' refs #13093
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import functools
11 import logging
12 import os
13 import re
14 import errno
15 import hashlib
16 import time
17 import threading
18
19 from collections import deque
20 from stat import *
21
22 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
23 from .keep import KeepLocator, KeepClient
24 from .stream import StreamReader
25 from ._normalize_stream import normalize_stream
26 from ._ranges import Range, LocatorAndRange
27 from .safeapi import ThreadSafeApiCache
28 import arvados.config as config
29 import arvados.errors as errors
30 import arvados.util
31 import arvados.events as events
32 from arvados.retry import retry_method
33
34 _logger = logging.getLogger('arvados.collection')
35
36 class CollectionBase(object):
37     def __enter__(self):
38         return self
39
40     def __exit__(self, exc_type, exc_value, traceback):
41         pass
42
43     def _my_keep(self):
44         if self._keep_client is None:
45             self._keep_client = KeepClient(api_client=self._api_client,
46                                            num_retries=self.num_retries)
47         return self._keep_client
48
49     def stripped_manifest(self):
50         """Get the manifest with locator hints stripped.
51
52         Return the manifest for the current collection with all
53         non-portable hints (i.e., permission signatures and other
54         hints other than size hints) removed from the locators.
55         """
56         raw = self.manifest_text()
57         clean = []
58         for line in raw.split("\n"):
59             fields = line.split()
60             if fields:
61                 clean_fields = fields[:1] + [
62                     (re.sub(r'\+[^\d][^\+]*', '', x)
63                      if re.match(arvados.util.keep_locator_pattern, x)
64                      else x)
65                     for x in fields[1:]]
66                 clean += [' '.join(clean_fields), "\n"]
67         return ''.join(clean)
68
69
70 class _WriterFile(_FileLikeObjectBase):
71     def __init__(self, coll_writer, name):
72         super(_WriterFile, self).__init__(name, 'wb')
73         self.dest = coll_writer
74
75     def close(self):
76         super(_WriterFile, self).close()
77         self.dest.finish_current_file()
78
79     @_FileLikeObjectBase._before_close
80     def write(self, data):
81         self.dest.write(data)
82
83     @_FileLikeObjectBase._before_close
84     def writelines(self, seq):
85         for data in seq:
86             self.write(data)
87
88     @_FileLikeObjectBase._before_close
89     def flush(self):
90         self.dest.flush_data()
91
92
93 class CollectionWriter(CollectionBase):
94     def __init__(self, api_client=None, num_retries=0, replication=None):
95         """Instantiate a CollectionWriter.
96
97         CollectionWriter lets you build a new Arvados Collection from scratch.
98         Write files to it.  The CollectionWriter will upload data to Keep as
99         appropriate, and provide you with the Collection manifest text when
100         you're finished.
101
102         Arguments:
103         * api_client: The API client to use to look up Collections.  If not
104           provided, CollectionReader will build one from available Arvados
105           configuration.
106         * num_retries: The default number of times to retry failed
107           service requests.  Default 0.  You may change this value
108           after instantiation, but note those changes may not
109           propagate to related objects like the Keep client.
110         * replication: The number of copies of each block to store.
111           If this argument is None or not supplied, replication is
112           the server-provided default if available, otherwise 2.
113         """
114         self._api_client = api_client
115         self.num_retries = num_retries
116         self.replication = (2 if replication is None else replication)
117         self._keep_client = None
118         self._data_buffer = []
119         self._data_buffer_len = 0
120         self._current_stream_files = []
121         self._current_stream_length = 0
122         self._current_stream_locators = []
123         self._current_stream_name = '.'
124         self._current_file_name = None
125         self._current_file_pos = 0
126         self._finished_streams = []
127         self._close_file = None
128         self._queued_file = None
129         self._queued_dirents = deque()
130         self._queued_trees = deque()
131         self._last_open = None
132
133     def __exit__(self, exc_type, exc_value, traceback):
134         if exc_type is None:
135             self.finish()
136
137     def do_queued_work(self):
138         # The work queue consists of three pieces:
139         # * _queued_file: The file object we're currently writing to the
140         #   Collection.
141         # * _queued_dirents: Entries under the current directory
142         #   (_queued_trees[0]) that we want to write or recurse through.
143         #   This may contain files from subdirectories if
144         #   max_manifest_depth == 0 for this directory.
145         # * _queued_trees: Directories that should be written as separate
146         #   streams to the Collection.
147         # This function handles the smallest piece of work currently queued
148         # (current file, then current directory, then next directory) until
149         # no work remains.  The _work_THING methods each do a unit of work on
150         # THING.  _queue_THING methods add a THING to the work queue.
151         while True:
152             if self._queued_file:
153                 self._work_file()
154             elif self._queued_dirents:
155                 self._work_dirents()
156             elif self._queued_trees:
157                 self._work_trees()
158             else:
159                 break
160
161     def _work_file(self):
162         while True:
163             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
164             if not buf:
165                 break
166             self.write(buf)
167         self.finish_current_file()
168         if self._close_file:
169             self._queued_file.close()
170         self._close_file = None
171         self._queued_file = None
172
173     def _work_dirents(self):
174         path, stream_name, max_manifest_depth = self._queued_trees[0]
175         if stream_name != self.current_stream_name():
176             self.start_new_stream(stream_name)
177         while self._queued_dirents:
178             dirent = self._queued_dirents.popleft()
179             target = os.path.join(path, dirent)
180             if os.path.isdir(target):
181                 self._queue_tree(target,
182                                  os.path.join(stream_name, dirent),
183                                  max_manifest_depth - 1)
184             else:
185                 self._queue_file(target, dirent)
186                 break
187         if not self._queued_dirents:
188             self._queued_trees.popleft()
189
190     def _work_trees(self):
191         path, stream_name, max_manifest_depth = self._queued_trees[0]
192         d = arvados.util.listdir_recursive(
193             path, max_depth = (None if max_manifest_depth == 0 else 0))
194         if d:
195             self._queue_dirents(stream_name, d)
196         else:
197             self._queued_trees.popleft()
198
199     def _queue_file(self, source, filename=None):
200         assert (self._queued_file is None), "tried to queue more than one file"
201         if not hasattr(source, 'read'):
202             source = open(source, 'rb')
203             self._close_file = True
204         else:
205             self._close_file = False
206         if filename is None:
207             filename = os.path.basename(source.name)
208         self.start_new_file(filename)
209         self._queued_file = source
210
211     def _queue_dirents(self, stream_name, dirents):
212         assert (not self._queued_dirents), "tried to queue more than one tree"
213         self._queued_dirents = deque(sorted(dirents))
214
215     def _queue_tree(self, path, stream_name, max_manifest_depth):
216         self._queued_trees.append((path, stream_name, max_manifest_depth))
217
218     def write_file(self, source, filename=None):
219         self._queue_file(source, filename)
220         self.do_queued_work()
221
222     def write_directory_tree(self,
223                              path, stream_name='.', max_manifest_depth=-1):
224         self._queue_tree(path, stream_name, max_manifest_depth)
225         self.do_queued_work()
226
227     def write(self, newdata):
228         if isinstance(newdata, bytes):
229             pass
230         elif isinstance(newdata, str):
231             newdata = newdata.encode()
232         elif hasattr(newdata, '__iter__'):
233             for s in newdata:
234                 self.write(s)
235             return
236         self._data_buffer.append(newdata)
237         self._data_buffer_len += len(newdata)
238         self._current_stream_length += len(newdata)
239         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
240             self.flush_data()
241
242     def open(self, streampath, filename=None):
243         """open(streampath[, filename]) -> file-like object
244
245         Pass in the path of a file to write to the Collection, either as a
246         single string or as two separate stream name and file name arguments.
247         This method returns a file-like object you can write to add it to the
248         Collection.
249
250         You may only have one file object from the Collection open at a time,
251         so be sure to close the object when you're done.  Using the object in
252         a with statement makes that easy::
253
254           with cwriter.open('./doc/page1.txt') as outfile:
255               outfile.write(page1_data)
256           with cwriter.open('./doc/page2.txt') as outfile:
257               outfile.write(page2_data)
258         """
259         if filename is None:
260             streampath, filename = split(streampath)
261         if self._last_open and not self._last_open.closed:
262             raise errors.AssertionError(
263                 "can't open '{}' when '{}' is still open".format(
264                     filename, self._last_open.name))
265         if streampath != self.current_stream_name():
266             self.start_new_stream(streampath)
267         self.set_current_file_name(filename)
268         self._last_open = _WriterFile(self, filename)
269         return self._last_open
270
271     def flush_data(self):
272         data_buffer = b''.join(self._data_buffer)
273         if data_buffer:
274             self._current_stream_locators.append(
275                 self._my_keep().put(
276                     data_buffer[0:config.KEEP_BLOCK_SIZE],
277                     copies=self.replication))
278             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
279             self._data_buffer_len = len(self._data_buffer[0])
280
281     def start_new_file(self, newfilename=None):
282         self.finish_current_file()
283         self.set_current_file_name(newfilename)
284
285     def set_current_file_name(self, newfilename):
286         if re.search(r'[\t\n]', newfilename):
287             raise errors.AssertionError(
288                 "Manifest filenames cannot contain whitespace: %s" %
289                 newfilename)
290         elif re.search(r'\x00', newfilename):
291             raise errors.AssertionError(
292                 "Manifest filenames cannot contain NUL characters: %s" %
293                 newfilename)
294         self._current_file_name = newfilename
295
296     def current_file_name(self):
297         return self._current_file_name
298
299     def finish_current_file(self):
300         if self._current_file_name is None:
301             if self._current_file_pos == self._current_stream_length:
302                 return
303             raise errors.AssertionError(
304                 "Cannot finish an unnamed file " +
305                 "(%d bytes at offset %d in '%s' stream)" %
306                 (self._current_stream_length - self._current_file_pos,
307                  self._current_file_pos,
308                  self._current_stream_name))
309         self._current_stream_files.append([
310                 self._current_file_pos,
311                 self._current_stream_length - self._current_file_pos,
312                 self._current_file_name])
313         self._current_file_pos = self._current_stream_length
314         self._current_file_name = None
315
316     def start_new_stream(self, newstreamname='.'):
317         self.finish_current_stream()
318         self.set_current_stream_name(newstreamname)
319
320     def set_current_stream_name(self, newstreamname):
321         if re.search(r'[\t\n]', newstreamname):
322             raise errors.AssertionError(
323                 "Manifest stream names cannot contain whitespace: '%s'" %
324                 (newstreamname))
325         self._current_stream_name = '.' if newstreamname=='' else newstreamname
326
327     def current_stream_name(self):
328         return self._current_stream_name
329
330     def finish_current_stream(self):
331         self.finish_current_file()
332         self.flush_data()
333         if not self._current_stream_files:
334             pass
335         elif self._current_stream_name is None:
336             raise errors.AssertionError(
337                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
338                 (self._current_stream_length, len(self._current_stream_files)))
339         else:
340             if not self._current_stream_locators:
341                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
342             self._finished_streams.append([self._current_stream_name,
343                                            self._current_stream_locators,
344                                            self._current_stream_files])
345         self._current_stream_files = []
346         self._current_stream_length = 0
347         self._current_stream_locators = []
348         self._current_stream_name = None
349         self._current_file_pos = 0
350         self._current_file_name = None
351
352     def finish(self):
353         """Store the manifest in Keep and return its locator.
354
355         This is useful for storing manifest fragments (task outputs)
356         temporarily in Keep during a Crunch job.
357
358         In other cases you should make a collection instead, by
359         sending manifest_text() to the API server's "create
360         collection" endpoint.
361         """
362         return self._my_keep().put(self.manifest_text().encode(),
363                                    copies=self.replication)
364
365     def portable_data_hash(self):
366         stripped = self.stripped_manifest().encode()
367         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
368
369     def manifest_text(self):
370         self.finish_current_stream()
371         manifest = ''
372
373         for stream in self._finished_streams:
374             if not re.search(r'^\.(/.*)?$', stream[0]):
375                 manifest += './'
376             manifest += stream[0].replace(' ', '\\040')
377             manifest += ' ' + ' '.join(stream[1])
378             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
379             manifest += "\n"
380
381         return manifest
382
383     def data_locators(self):
384         ret = []
385         for name, locators, files in self._finished_streams:
386             ret += locators
387         return ret
388
389     def save_new(self, name=None):
390         return self._api_client.collections().create(
391             ensure_unique_name=True,
392             body={
393                 'name': name,
394                 'manifest_text': self.manifest_text(),
395             }).execute(num_retries=self.num_retries)
396
397
398 class ResumableCollectionWriter(CollectionWriter):
399     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
400                    '_current_stream_locators', '_current_stream_name',
401                    '_current_file_name', '_current_file_pos', '_close_file',
402                    '_data_buffer', '_dependencies', '_finished_streams',
403                    '_queued_dirents', '_queued_trees']
404
405     def __init__(self, api_client=None, **kwargs):
406         self._dependencies = {}
407         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
408
409     @classmethod
410     def from_state(cls, state, *init_args, **init_kwargs):
411         # Try to build a new writer from scratch with the given state.
412         # If the state is not suitable to resume (because files have changed,
413         # been deleted, aren't predictable, etc.), raise a
414         # StaleWriterStateError.  Otherwise, return the initialized writer.
415         # The caller is responsible for calling writer.do_queued_work()
416         # appropriately after it's returned.
417         writer = cls(*init_args, **init_kwargs)
418         for attr_name in cls.STATE_PROPS:
419             attr_value = state[attr_name]
420             attr_class = getattr(writer, attr_name).__class__
421             # Coerce the value into the same type as the initial value, if
422             # needed.
423             if attr_class not in (type(None), attr_value.__class__):
424                 attr_value = attr_class(attr_value)
425             setattr(writer, attr_name, attr_value)
426         # Check dependencies before we try to resume anything.
427         if any(KeepLocator(ls).permission_expired()
428                for ls in writer._current_stream_locators):
429             raise errors.StaleWriterStateError(
430                 "locators include expired permission hint")
431         writer.check_dependencies()
432         if state['_current_file'] is not None:
433             path, pos = state['_current_file']
434             try:
435                 writer._queued_file = open(path, 'rb')
436                 writer._queued_file.seek(pos)
437             except IOError as error:
438                 raise errors.StaleWriterStateError(
439                     "failed to reopen active file {}: {}".format(path, error))
440         return writer
441
442     def check_dependencies(self):
443         for path, orig_stat in listitems(self._dependencies):
444             if not S_ISREG(orig_stat[ST_MODE]):
445                 raise errors.StaleWriterStateError("{} not file".format(path))
446             try:
447                 now_stat = tuple(os.stat(path))
448             except OSError as error:
449                 raise errors.StaleWriterStateError(
450                     "failed to stat {}: {}".format(path, error))
451             if ((not S_ISREG(now_stat[ST_MODE])) or
452                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
453                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
454                 raise errors.StaleWriterStateError("{} changed".format(path))
455
456     def dump_state(self, copy_func=lambda x: x):
457         state = {attr: copy_func(getattr(self, attr))
458                  for attr in self.STATE_PROPS}
459         if self._queued_file is None:
460             state['_current_file'] = None
461         else:
462             state['_current_file'] = (os.path.realpath(self._queued_file.name),
463                                       self._queued_file.tell())
464         return state
465
466     def _queue_file(self, source, filename=None):
467         try:
468             src_path = os.path.realpath(source)
469         except Exception:
470             raise errors.AssertionError("{} not a file path".format(source))
471         try:
472             path_stat = os.stat(src_path)
473         except OSError as stat_error:
474             path_stat = None
475         super(ResumableCollectionWriter, self)._queue_file(source, filename)
476         fd_stat = os.fstat(self._queued_file.fileno())
477         if not S_ISREG(fd_stat.st_mode):
478             # We won't be able to resume from this cache anyway, so don't
479             # worry about further checks.
480             self._dependencies[source] = tuple(fd_stat)
481         elif path_stat is None:
482             raise errors.AssertionError(
483                 "could not stat {}: {}".format(source, stat_error))
484         elif path_stat.st_ino != fd_stat.st_ino:
485             raise errors.AssertionError(
486                 "{} changed between open and stat calls".format(source))
487         else:
488             self._dependencies[src_path] = tuple(fd_stat)
489
490     def write(self, data):
491         if self._queued_file is None:
492             raise errors.AssertionError(
493                 "resumable writer can't accept unsourced data")
494         return super(ResumableCollectionWriter, self).write(data)
495
496
497 ADD = "add"
498 DEL = "del"
499 MOD = "mod"
500 TOK = "tok"
501 FILE = "file"
502 COLLECTION = "collection"
503
504 class RichCollectionBase(CollectionBase):
505     """Base class for Collections and Subcollections.
506
507     Implements the majority of functionality relating to accessing items in the
508     Collection.
509
510     """
511
512     def __init__(self, parent=None):
513         self.parent = parent
514         self._committed = False
515         self._callback = None
516         self._items = {}
517
518     def _my_api(self):
519         raise NotImplementedError()
520
521     def _my_keep(self):
522         raise NotImplementedError()
523
524     def _my_block_manager(self):
525         raise NotImplementedError()
526
527     def writable(self):
528         raise NotImplementedError()
529
530     def root_collection(self):
531         raise NotImplementedError()
532
533     def notify(self, event, collection, name, item):
534         raise NotImplementedError()
535
536     def stream_name(self):
537         raise NotImplementedError()
538
539     @must_be_writable
540     @synchronized
541     def find_or_create(self, path, create_type):
542         """Recursively search the specified file path.
543
544         May return either a `Collection` or `ArvadosFile`.  If not found, will
545         create a new item at the specified path based on `create_type`.  Will
546         create intermediate subcollections needed to contain the final item in
547         the path.
548
549         :create_type:
550           One of `arvados.collection.FILE` or
551           `arvados.collection.COLLECTION`.  If the path is not found, and value
552           of create_type is FILE then create and return a new ArvadosFile for
553           the last path component.  If COLLECTION, then create and return a new
554           Collection for the last path component.
555
556         """
557
558         pathcomponents = path.split("/", 1)
559         if pathcomponents[0]:
560             item = self._items.get(pathcomponents[0])
561             if len(pathcomponents) == 1:
562                 if item is None:
563                     # create new file
564                     if create_type == COLLECTION:
565                         item = Subcollection(self, pathcomponents[0])
566                     else:
567                         item = ArvadosFile(self, pathcomponents[0])
568                     self._items[pathcomponents[0]] = item
569                     self.set_committed(False)
570                     self.notify(ADD, self, pathcomponents[0], item)
571                 return item
572             else:
573                 if item is None:
574                     # create new collection
575                     item = Subcollection(self, pathcomponents[0])
576                     self._items[pathcomponents[0]] = item
577                     self.set_committed(False)
578                     self.notify(ADD, self, pathcomponents[0], item)
579                 if isinstance(item, RichCollectionBase):
580                     return item.find_or_create(pathcomponents[1], create_type)
581                 else:
582                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
583         else:
584             return self
585
586     @synchronized
587     def find(self, path):
588         """Recursively search the specified file path.
589
590         May return either a Collection or ArvadosFile. Return None if not
591         found.
592         If path is invalid (ex: starts with '/'), an IOError exception will be
593         raised.
594
595         """
596         if not path:
597             raise errors.ArgumentError("Parameter 'path' is empty.")
598
599         pathcomponents = path.split("/", 1)
600         if pathcomponents[0] == '':
601             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
602
603         item = self._items.get(pathcomponents[0])
604         if item is None:
605             return None
606         elif len(pathcomponents) == 1:
607             return item
608         else:
609             if isinstance(item, RichCollectionBase):
610                 if pathcomponents[1]:
611                     return item.find(pathcomponents[1])
612                 else:
613                     return item
614             else:
615                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
616
617     @synchronized
618     def mkdirs(self, path):
619         """Recursive subcollection create.
620
621         Like `os.makedirs()`.  Will create intermediate subcollections needed
622         to contain the leaf subcollection path.
623
624         """
625
626         if self.find(path) != None:
627             raise IOError(errno.EEXIST, "Directory or file exists", path)
628
629         return self.find_or_create(path, COLLECTION)
630
631     def open(self, path, mode="r"):
632         """Open a file-like object for access.
633
634         :path:
635           path to a file in the collection
636         :mode:
637           a string consisting of "r", "w", or "a", optionally followed
638           by "b" or "t", optionally followed by "+".
639           :"b":
640             binary mode: write() accepts bytes, read() returns bytes.
641           :"t":
642             text mode (default): write() accepts strings, read() returns strings.
643           :"r":
644             opens for reading
645           :"r+":
646             opens for reading and writing.  Reads/writes share a file pointer.
647           :"w", "w+":
648             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
649           :"a", "a+":
650             opens for reading and writing.  All writes are appended to
651             the end of the file.  Writing does not affect the file pointer for
652             reading.
653         """
654
655         if not re.search(r'^[rwa][bt]?\+?$', mode):
656             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
657
658         if mode[0] == 'r' and '+' not in mode:
659             fclass = ArvadosFileReader
660             arvfile = self.find(path)
661         elif not self.writable():
662             raise IOError(errno.EROFS, "Collection is read only")
663         else:
664             fclass = ArvadosFileWriter
665             arvfile = self.find_or_create(path, FILE)
666
667         if arvfile is None:
668             raise IOError(errno.ENOENT, "File not found", path)
669         if not isinstance(arvfile, ArvadosFile):
670             raise IOError(errno.EISDIR, "Is a directory", path)
671
672         if mode[0] == 'w':
673             arvfile.truncate(0)
674
675         return fclass(arvfile, mode=mode, num_retries=self.num_retries)
676
677     def modified(self):
678         """Determine if the collection has been modified since last commited."""
679         return not self.committed()
680
681     @synchronized
682     def committed(self):
683         """Determine if the collection has been committed to the API server."""
684         return self._committed
685
686     @synchronized
687     def set_committed(self, value=True):
688         """Recursively set committed flag.
689
690         If value is True, set committed to be True for this and all children.
691
692         If value is False, set committed to be False for this and all parents.
693         """
694         if value == self._committed:
695             return
696         if value:
697             for k,v in listitems(self._items):
698                 v.set_committed(True)
699             self._committed = True
700         else:
701             self._committed = False
702             if self.parent is not None:
703                 self.parent.set_committed(False)
704
705     @synchronized
706     def __iter__(self):
707         """Iterate over names of files and collections contained in this collection."""
708         return iter(viewkeys(self._items))
709
710     @synchronized
711     def __getitem__(self, k):
712         """Get a file or collection that is directly contained by this collection.
713
714         If you want to search a path, use `find()` instead.
715
716         """
717         return self._items[k]
718
719     @synchronized
720     def __contains__(self, k):
721         """Test if there is a file or collection a directly contained by this collection."""
722         return k in self._items
723
724     @synchronized
725     def __len__(self):
726         """Get the number of items directly contained in this collection."""
727         return len(self._items)
728
729     @must_be_writable
730     @synchronized
731     def __delitem__(self, p):
732         """Delete an item by name which is directly contained by this collection."""
733         del self._items[p]
734         self.set_committed(False)
735         self.notify(DEL, self, p, None)
736
737     @synchronized
738     def keys(self):
739         """Get a list of names of files and collections directly contained in this collection."""
740         return self._items.keys()
741
742     @synchronized
743     def values(self):
744         """Get a list of files and collection objects directly contained in this collection."""
745         return listvalues(self._items)
746
747     @synchronized
748     def items(self):
749         """Get a list of (name, object) tuples directly contained in this collection."""
750         return listitems(self._items)
751
752     def exists(self, path):
753         """Test if there is a file or collection at `path`."""
754         return self.find(path) is not None
755
756     @must_be_writable
757     @synchronized
758     def remove(self, path, recursive=False):
759         """Remove the file or subcollection (directory) at `path`.
760
761         :recursive:
762           Specify whether to remove non-empty subcollections (True), or raise an error (False).
763         """
764
765         if not path:
766             raise errors.ArgumentError("Parameter 'path' is empty.")
767
768         pathcomponents = path.split("/", 1)
769         item = self._items.get(pathcomponents[0])
770         if item is None:
771             raise IOError(errno.ENOENT, "File not found", path)
772         if len(pathcomponents) == 1:
773             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
774                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
775             deleteditem = self._items[pathcomponents[0]]
776             del self._items[pathcomponents[0]]
777             self.set_committed(False)
778             self.notify(DEL, self, pathcomponents[0], deleteditem)
779         else:
780             item.remove(pathcomponents[1])
781
782     def _clonefrom(self, source):
783         for k,v in listitems(source):
784             self._items[k] = v.clone(self, k)
785
786     def clone(self):
787         raise NotImplementedError()
788
789     @must_be_writable
790     @synchronized
791     def add(self, source_obj, target_name, overwrite=False, reparent=False):
792         """Copy or move a file or subcollection to this collection.
793
794         :source_obj:
795           An ArvadosFile, or Subcollection object
796
797         :target_name:
798           Destination item name.  If the target name already exists and is a
799           file, this will raise an error unless you specify `overwrite=True`.
800
801         :overwrite:
802           Whether to overwrite target file if it already exists.
803
804         :reparent:
805           If True, source_obj will be moved from its parent collection to this collection.
806           If False, source_obj will be copied and the parent collection will be
807           unmodified.
808
809         """
810
811         if target_name in self and not overwrite:
812             raise IOError(errno.EEXIST, "File already exists", target_name)
813
814         modified_from = None
815         if target_name in self:
816             modified_from = self[target_name]
817
818         # Actually make the move or copy.
819         if reparent:
820             source_obj._reparent(self, target_name)
821             item = source_obj
822         else:
823             item = source_obj.clone(self, target_name)
824
825         self._items[target_name] = item
826         self.set_committed(False)
827
828         if modified_from:
829             self.notify(MOD, self, target_name, (modified_from, item))
830         else:
831             self.notify(ADD, self, target_name, item)
832
833     def _get_src_target(self, source, target_path, source_collection, create_dest):
834         if source_collection is None:
835             source_collection = self
836
837         # Find the object
838         if isinstance(source, basestring):
839             source_obj = source_collection.find(source)
840             if source_obj is None:
841                 raise IOError(errno.ENOENT, "File not found", source)
842             sourcecomponents = source.split("/")
843         else:
844             source_obj = source
845             sourcecomponents = None
846
847         # Find parent collection the target path
848         targetcomponents = target_path.split("/")
849
850         # Determine the name to use.
851         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
852
853         if not target_name:
854             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
855
856         if create_dest:
857             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
858         else:
859             if len(targetcomponents) > 1:
860                 target_dir = self.find("/".join(targetcomponents[0:-1]))
861             else:
862                 target_dir = self
863
864         if target_dir is None:
865             raise IOError(errno.ENOENT, "Target directory not found", target_name)
866
867         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
868             target_dir = target_dir[target_name]
869             target_name = sourcecomponents[-1]
870
871         return (source_obj, target_dir, target_name)
872
873     @must_be_writable
874     @synchronized
875     def copy(self, source, target_path, source_collection=None, overwrite=False):
876         """Copy a file or subcollection to a new path in this collection.
877
878         :source:
879           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
880
881         :target_path:
882           Destination file or path.  If the target path already exists and is a
883           subcollection, the item will be placed inside the subcollection.  If
884           the target path already exists and is a file, this will raise an error
885           unless you specify `overwrite=True`.
886
887         :source_collection:
888           Collection to copy `source_path` from (default `self`)
889
890         :overwrite:
891           Whether to overwrite target file if it already exists.
892         """
893
894         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
895         target_dir.add(source_obj, target_name, overwrite, False)
896
897     @must_be_writable
898     @synchronized
899     def rename(self, source, target_path, source_collection=None, overwrite=False):
900         """Move a file or subcollection from `source_collection` to a new path in this collection.
901
902         :source:
903           A string with a path to source file or subcollection.
904
905         :target_path:
906           Destination file or path.  If the target path already exists and is a
907           subcollection, the item will be placed inside the subcollection.  If
908           the target path already exists and is a file, this will raise an error
909           unless you specify `overwrite=True`.
910
911         :source_collection:
912           Collection to copy `source_path` from (default `self`)
913
914         :overwrite:
915           Whether to overwrite target file if it already exists.
916         """
917
918         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
919         if not source_obj.writable():
920             raise IOError(errno.EROFS, "Source collection is read only", source)
921         target_dir.add(source_obj, target_name, overwrite, True)
922
923     def portable_manifest_text(self, stream_name="."):
924         """Get the manifest text for this collection, sub collections and files.
925
926         This method does not flush outstanding blocks to Keep.  It will return
927         a normalized manifest with access tokens stripped.
928
929         :stream_name:
930           Name to use for this stream (directory)
931
932         """
933         return self._get_manifest_text(stream_name, True, True)
934
935     @synchronized
936     def manifest_text(self, stream_name=".", strip=False, normalize=False,
937                       only_committed=False):
938         """Get the manifest text for this collection, sub collections and files.
939
940         This method will flush outstanding blocks to Keep.  By default, it will
941         not normalize an unmodified manifest or strip access tokens.
942
943         :stream_name:
944           Name to use for this stream (directory)
945
946         :strip:
947           If True, remove signing tokens from block locators if present.
948           If False (default), block locators are left unchanged.
949
950         :normalize:
951           If True, always export the manifest text in normalized form
952           even if the Collection is not modified.  If False (default) and the collection
953           is not modified, return the original manifest text even if it is not
954           in normalized form.
955
956         :only_committed:
957           If True, don't commit pending blocks.
958
959         """
960
961         if not only_committed:
962             self._my_block_manager().commit_all()
963         return self._get_manifest_text(stream_name, strip, normalize,
964                                        only_committed=only_committed)
965
966     @synchronized
967     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
968         """Get the manifest text for this collection, sub collections and files.
969
970         :stream_name:
971           Name to use for this stream (directory)
972
973         :strip:
974           If True, remove signing tokens from block locators if present.
975           If False (default), block locators are left unchanged.
976
977         :normalize:
978           If True, always export the manifest text in normalized form
979           even if the Collection is not modified.  If False (default) and the collection
980           is not modified, return the original manifest text even if it is not
981           in normalized form.
982
983         :only_committed:
984           If True, only include blocks that were already committed to Keep.
985
986         """
987
988         if not self.committed() or self._manifest_text is None or normalize:
989             stream = {}
990             buf = []
991             sorted_keys = sorted(self.keys())
992             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
993                 # Create a stream per file `k`
994                 arvfile = self[filename]
995                 filestream = []
996                 for segment in arvfile.segments():
997                     loc = segment.locator
998                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
999                         if only_committed:
1000                             continue
1001                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1002                     if strip:
1003                         loc = KeepLocator(loc).stripped()
1004                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1005                                          segment.segment_offset, segment.range_size))
1006                 stream[filename] = filestream
1007             if stream:
1008                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1009             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1010                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1011             return "".join(buf)
1012         else:
1013             if strip:
1014                 return self.stripped_manifest()
1015             else:
1016                 return self._manifest_text
1017
1018     @synchronized
1019     def diff(self, end_collection, prefix=".", holding_collection=None):
1020         """Generate list of add/modify/delete actions.
1021
1022         When given to `apply`, will change `self` to match `end_collection`
1023
1024         """
1025         changes = []
1026         if holding_collection is None:
1027             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1028         for k in self:
1029             if k not in end_collection:
1030                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1031         for k in end_collection:
1032             if k in self:
1033                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1034                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1035                 elif end_collection[k] != self[k]:
1036                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1037                 else:
1038                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1039             else:
1040                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1041         return changes
1042
1043     @must_be_writable
1044     @synchronized
1045     def apply(self, changes):
1046         """Apply changes from `diff`.
1047
1048         If a change conflicts with a local change, it will be saved to an
1049         alternate path indicating the conflict.
1050
1051         """
1052         if changes:
1053             self.set_committed(False)
1054         for change in changes:
1055             event_type = change[0]
1056             path = change[1]
1057             initial = change[2]
1058             local = self.find(path)
1059             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1060                                                                     time.gmtime()))
1061             if event_type == ADD:
1062                 if local is None:
1063                     # No local file at path, safe to copy over new file
1064                     self.copy(initial, path)
1065                 elif local is not None and local != initial:
1066                     # There is already local file and it is different:
1067                     # save change to conflict file.
1068                     self.copy(initial, conflictpath)
1069             elif event_type == MOD or event_type == TOK:
1070                 final = change[3]
1071                 if local == initial:
1072                     # Local matches the "initial" item so it has not
1073                     # changed locally and is safe to update.
1074                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1075                         # Replace contents of local file with new contents
1076                         local.replace_contents(final)
1077                     else:
1078                         # Overwrite path with new item; this can happen if
1079                         # path was a file and is now a collection or vice versa
1080                         self.copy(final, path, overwrite=True)
1081                 else:
1082                     # Local is missing (presumably deleted) or local doesn't
1083                     # match the "start" value, so save change to conflict file
1084                     self.copy(final, conflictpath)
1085             elif event_type == DEL:
1086                 if local == initial:
1087                     # Local item matches "initial" value, so it is safe to remove.
1088                     self.remove(path, recursive=True)
1089                 # else, the file is modified or already removed, in either
1090                 # case we don't want to try to remove it.
1091
1092     def portable_data_hash(self):
1093         """Get the portable data hash for this collection's manifest."""
1094         if self._manifest_locator and self.committed():
1095             # If the collection is already saved on the API server, and it's committed
1096             # then return API server's PDH response.
1097             return self._portable_data_hash
1098         else:
1099             stripped = self.portable_manifest_text().encode()
1100             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1101
1102     @synchronized
1103     def subscribe(self, callback):
1104         if self._callback is None:
1105             self._callback = callback
1106         else:
1107             raise errors.ArgumentError("A callback is already set on this collection.")
1108
1109     @synchronized
1110     def unsubscribe(self):
1111         if self._callback is not None:
1112             self._callback = None
1113
1114     @synchronized
1115     def notify(self, event, collection, name, item):
1116         if self._callback:
1117             self._callback(event, collection, name, item)
1118         self.root_collection().notify(event, collection, name, item)
1119
1120     @synchronized
1121     def __eq__(self, other):
1122         if other is self:
1123             return True
1124         if not isinstance(other, RichCollectionBase):
1125             return False
1126         if len(self._items) != len(other):
1127             return False
1128         for k in self._items:
1129             if k not in other:
1130                 return False
1131             if self._items[k] != other[k]:
1132                 return False
1133         return True
1134
1135     def __ne__(self, other):
1136         return not self.__eq__(other)
1137
1138     @synchronized
1139     def flush(self):
1140         """Flush bufferblocks to Keep."""
1141         for e in listvalues(self):
1142             e.flush()
1143
1144
1145 class Collection(RichCollectionBase):
1146     """Represents the root of an Arvados Collection.
1147
1148     This class is threadsafe.  The root collection object, all subcollections
1149     and files are protected by a single lock (i.e. each access locks the entire
1150     collection).
1151
1152     Brief summary of
1153     useful methods:
1154
1155     :To read an existing file:
1156       `c.open("myfile", "r")`
1157
1158     :To write a new file:
1159       `c.open("myfile", "w")`
1160
1161     :To determine if a file exists:
1162       `c.find("myfile") is not None`
1163
1164     :To copy a file:
1165       `c.copy("source", "dest")`
1166
1167     :To delete a file:
1168       `c.remove("myfile")`
1169
1170     :To save to an existing collection record:
1171       `c.save()`
1172
1173     :To save a new collection record:
1174     `c.save_new()`
1175
1176     :To merge remote changes into this object:
1177       `c.update()`
1178
1179     Must be associated with an API server Collection record (during
1180     initialization, or using `save_new`) to use `save` or `update`
1181
1182     """
1183
1184     def __init__(self, manifest_locator_or_text=None,
1185                  api_client=None,
1186                  keep_client=None,
1187                  num_retries=None,
1188                  parent=None,
1189                  apiconfig=None,
1190                  block_manager=None,
1191                  replication_desired=None,
1192                  put_threads=None):
1193         """Collection constructor.
1194
1195         :manifest_locator_or_text:
1196           An Arvados collection UUID, portable data hash, raw manifest
1197           text, or (if creating an empty collection) None.
1198
1199         :parent:
1200           the parent Collection, may be None.
1201
1202         :apiconfig:
1203           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1204           Prefer this over supplying your own api_client and keep_client (except in testing).
1205           Will use default config settings if not specified.
1206
1207         :api_client:
1208           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1209
1210         :keep_client:
1211           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1212
1213         :num_retries:
1214           the number of retries for API and Keep requests.
1215
1216         :block_manager:
1217           the block manager to use.  If not specified, create one.
1218
1219         :replication_desired:
1220           How many copies should Arvados maintain. If None, API server default
1221           configuration applies. If not None, this value will also be used
1222           for determining the number of block copies being written.
1223
1224         """
1225         super(Collection, self).__init__(parent)
1226         self._api_client = api_client
1227         self._keep_client = keep_client
1228         self._block_manager = block_manager
1229         self.replication_desired = replication_desired
1230         self.put_threads = put_threads
1231
1232         if apiconfig:
1233             self._config = apiconfig
1234         else:
1235             self._config = config.settings()
1236
1237         self.num_retries = num_retries if num_retries is not None else 0
1238         self._manifest_locator = None
1239         self._manifest_text = None
1240         self._portable_data_hash = None
1241         self._api_response = None
1242         self._past_versions = set()
1243
1244         self.lock = threading.RLock()
1245         self.events = None
1246
1247         if manifest_locator_or_text:
1248             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1249                 self._manifest_locator = manifest_locator_or_text
1250             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1251                 self._manifest_locator = manifest_locator_or_text
1252             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1253                 self._manifest_text = manifest_locator_or_text
1254             else:
1255                 raise errors.ArgumentError(
1256                     "Argument to CollectionReader is not a manifest or a collection UUID")
1257
1258             try:
1259                 self._populate()
1260             except (IOError, errors.SyntaxError) as e:
1261                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1262
1263     def root_collection(self):
1264         return self
1265
1266     def stream_name(self):
1267         return "."
1268
1269     def writable(self):
1270         return True
1271
1272     @synchronized
1273     def known_past_version(self, modified_at_and_portable_data_hash):
1274         return modified_at_and_portable_data_hash in self._past_versions
1275
1276     @synchronized
1277     @retry_method
1278     def update(self, other=None, num_retries=None):
1279         """Merge the latest collection on the API server with the current collection."""
1280
1281         if other is None:
1282             if self._manifest_locator is None:
1283                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1284             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1285             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1286                 response.get("portable_data_hash") != self.portable_data_hash()):
1287                 # The record on the server is different from our current one, but we've seen it before,
1288                 # so ignore it because it's already been merged.
1289                 # However, if it's the same as our current record, proceed with the update, because we want to update
1290                 # our tokens.
1291                 return
1292             else:
1293                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1294             other = CollectionReader(response["manifest_text"])
1295         baseline = CollectionReader(self._manifest_text)
1296         self.apply(baseline.diff(other))
1297         self._manifest_text = self.manifest_text()
1298
1299     @synchronized
1300     def _my_api(self):
1301         if self._api_client is None:
1302             self._api_client = ThreadSafeApiCache(self._config)
1303             if self._keep_client is None:
1304                 self._keep_client = self._api_client.keep
1305         return self._api_client
1306
1307     @synchronized
1308     def _my_keep(self):
1309         if self._keep_client is None:
1310             if self._api_client is None:
1311                 self._my_api()
1312             else:
1313                 self._keep_client = KeepClient(api_client=self._api_client)
1314         return self._keep_client
1315
1316     @synchronized
1317     def _my_block_manager(self):
1318         if self._block_manager is None:
1319             copies = (self.replication_desired or
1320                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1321                                                    2))
1322             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1323         return self._block_manager
1324
1325     def _remember_api_response(self, response):
1326         self._api_response = response
1327         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1328
1329     def _populate_from_api_server(self):
1330         # As in KeepClient itself, we must wait until the last
1331         # possible moment to instantiate an API client, in order to
1332         # avoid tripping up clients that don't have access to an API
1333         # server.  If we do build one, make sure our Keep client uses
1334         # it.  If instantiation fails, we'll fall back to the except
1335         # clause, just like any other Collection lookup
1336         # failure. Return an exception, or None if successful.
1337         self._remember_api_response(self._my_api().collections().get(
1338             uuid=self._manifest_locator).execute(
1339                 num_retries=self.num_retries))
1340         self._manifest_text = self._api_response['manifest_text']
1341         self._portable_data_hash = self._api_response['portable_data_hash']
1342         # If not overriden via kwargs, we should try to load the
1343         # replication_desired from the API server
1344         if self.replication_desired is None:
1345             self.replication_desired = self._api_response.get('replication_desired', None)
1346
1347     def _populate(self):
1348         if self._manifest_text is None:
1349             if self._manifest_locator is None:
1350                 return
1351             else:
1352                 self._populate_from_api_server()
1353         self._baseline_manifest = self._manifest_text
1354         self._import_manifest(self._manifest_text)
1355
1356     def _has_collection_uuid(self):
1357         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1358
1359     def __enter__(self):
1360         return self
1361
1362     def __exit__(self, exc_type, exc_value, traceback):
1363         """Support scoped auto-commit in a with: block."""
1364         if exc_type is None:
1365             if self.writable() and self._has_collection_uuid():
1366                 self.save()
1367         self.stop_threads()
1368
1369     def stop_threads(self):
1370         if self._block_manager is not None:
1371             self._block_manager.stop_threads()
1372
1373     @synchronized
1374     def manifest_locator(self):
1375         """Get the manifest locator, if any.
1376
1377         The manifest locator will be set when the collection is loaded from an
1378         API server record or the portable data hash of a manifest.
1379
1380         The manifest locator will be None if the collection is newly created or
1381         was created directly from manifest text.  The method `save_new()` will
1382         assign a manifest locator.
1383
1384         """
1385         return self._manifest_locator
1386
1387     @synchronized
1388     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1389         if new_config is None:
1390             new_config = self._config
1391         if readonly:
1392             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1393         else:
1394             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1395
1396         newcollection._clonefrom(self)
1397         return newcollection
1398
1399     @synchronized
1400     def api_response(self):
1401         """Returns information about this Collection fetched from the API server.
1402
1403         If the Collection exists in Keep but not the API server, currently
1404         returns None.  Future versions may provide a synthetic response.
1405
1406         """
1407         return self._api_response
1408
1409     def find_or_create(self, path, create_type):
1410         """See `RichCollectionBase.find_or_create`"""
1411         if path == ".":
1412             return self
1413         else:
1414             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1415
1416     def find(self, path):
1417         """See `RichCollectionBase.find`"""
1418         if path == ".":
1419             return self
1420         else:
1421             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1422
1423     def remove(self, path, recursive=False):
1424         """See `RichCollectionBase.remove`"""
1425         if path == ".":
1426             raise errors.ArgumentError("Cannot remove '.'")
1427         else:
1428             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1429
1430     @must_be_writable
1431     @synchronized
1432     @retry_method
1433     def save(self, merge=True, num_retries=None):
1434         """Save collection to an existing collection record.
1435
1436         Commit pending buffer blocks to Keep, merge with remote record (if
1437         merge=True, the default), and update the collection record.  Returns
1438         the current manifest text.
1439
1440         Will raise AssertionError if not associated with a collection record on
1441         the API server.  If you want to save a manifest to Keep only, see
1442         `save_new()`.
1443
1444         :merge:
1445           Update and merge remote changes before saving.  Otherwise, any
1446           remote changes will be ignored and overwritten.
1447
1448         :num_retries:
1449           Retry count on API calls (if None,  use the collection default)
1450
1451         """
1452         if not self.committed():
1453             if not self._has_collection_uuid():
1454                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1455
1456             self._my_block_manager().commit_all()
1457
1458             if merge:
1459                 self.update()
1460
1461             text = self.manifest_text(strip=False)
1462             self._remember_api_response(self._my_api().collections().update(
1463                 uuid=self._manifest_locator,
1464                 body={'manifest_text': text}
1465                 ).execute(
1466                     num_retries=num_retries))
1467             self._manifest_text = self._api_response["manifest_text"]
1468             self._portable_data_hash = self._api_response["portable_data_hash"]
1469             self.set_committed(True)
1470
1471         return self._manifest_text
1472
1473
1474     @must_be_writable
1475     @synchronized
1476     @retry_method
1477     def save_new(self, name=None,
1478                  create_collection_record=True,
1479                  owner_uuid=None,
1480                  ensure_unique_name=False,
1481                  num_retries=None):
1482         """Save collection to a new collection record.
1483
1484         Commit pending buffer blocks to Keep and, when create_collection_record
1485         is True (default), create a new collection record.  After creating a
1486         new collection record, this Collection object will be associated with
1487         the new record used by `save()`.  Returns the current manifest text.
1488
1489         :name:
1490           The collection name.
1491
1492         :create_collection_record:
1493            If True, create a collection record on the API server.
1494            If False, only commit blocks to Keep and return the manifest text.
1495
1496         :owner_uuid:
1497           the user, or project uuid that will own this collection.
1498           If None, defaults to the current user.
1499
1500         :ensure_unique_name:
1501           If True, ask the API server to rename the collection
1502           if it conflicts with a collection with the same name and owner.  If
1503           False, a name conflict will result in an error.
1504
1505         :num_retries:
1506           Retry count on API calls (if None,  use the collection default)
1507
1508         """
1509         self._my_block_manager().commit_all()
1510         text = self.manifest_text(strip=False)
1511
1512         if create_collection_record:
1513             if name is None:
1514                 name = "New collection"
1515                 ensure_unique_name = True
1516
1517             body = {"manifest_text": text,
1518                     "name": name,
1519                     "replication_desired": self.replication_desired}
1520             if owner_uuid:
1521                 body["owner_uuid"] = owner_uuid
1522
1523             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1524             text = self._api_response["manifest_text"]
1525
1526             self._manifest_locator = self._api_response["uuid"]
1527             self._portable_data_hash = self._api_response["portable_data_hash"]
1528
1529             self._manifest_text = text
1530             self.set_committed(True)
1531
1532         return text
1533
1534     _token_re = re.compile(r'(\S+)(\s+|$)')
1535     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1536     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1537
1538     @synchronized
1539     def _import_manifest(self, manifest_text):
1540         """Import a manifest into a `Collection`.
1541
1542         :manifest_text:
1543           The manifest text to import from.
1544
1545         """
1546         if len(self) > 0:
1547             raise ArgumentError("Can only import manifest into an empty collection")
1548
1549         STREAM_NAME = 0
1550         BLOCKS = 1
1551         SEGMENTS = 2
1552
1553         stream_name = None
1554         state = STREAM_NAME
1555
1556         for token_and_separator in self._token_re.finditer(manifest_text):
1557             tok = token_and_separator.group(1)
1558             sep = token_and_separator.group(2)
1559
1560             if state == STREAM_NAME:
1561                 # starting a new stream
1562                 stream_name = tok.replace('\\040', ' ')
1563                 blocks = []
1564                 segments = []
1565                 streamoffset = 0
1566                 state = BLOCKS
1567                 self.find_or_create(stream_name, COLLECTION)
1568                 continue
1569
1570             if state == BLOCKS:
1571                 block_locator = self._block_re.match(tok)
1572                 if block_locator:
1573                     blocksize = int(block_locator.group(1))
1574                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1575                     streamoffset += blocksize
1576                 else:
1577                     state = SEGMENTS
1578
1579             if state == SEGMENTS:
1580                 file_segment = self._segment_re.match(tok)
1581                 if file_segment:
1582                     pos = int(file_segment.group(1))
1583                     size = int(file_segment.group(2))
1584                     name = file_segment.group(3).replace('\\040', ' ')
1585                     filepath = os.path.join(stream_name, name)
1586                     afile = self.find_or_create(filepath, FILE)
1587                     if isinstance(afile, ArvadosFile):
1588                         afile.add_segment(blocks, pos, size)
1589                     else:
1590                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1591                 else:
1592                     # error!
1593                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1594
1595             if sep == "\n":
1596                 stream_name = None
1597                 state = STREAM_NAME
1598
1599         self.set_committed(True)
1600
1601     @synchronized
1602     def notify(self, event, collection, name, item):
1603         if self._callback:
1604             self._callback(event, collection, name, item)
1605
1606
1607 class Subcollection(RichCollectionBase):
1608     """This is a subdirectory within a collection that doesn't have its own API
1609     server record.
1610
1611     Subcollection locking falls under the umbrella lock of its root collection.
1612
1613     """
1614
1615     def __init__(self, parent, name):
1616         super(Subcollection, self).__init__(parent)
1617         self.lock = self.root_collection().lock
1618         self._manifest_text = None
1619         self.name = name
1620         self.num_retries = parent.num_retries
1621
1622     def root_collection(self):
1623         return self.parent.root_collection()
1624
1625     def writable(self):
1626         return self.root_collection().writable()
1627
1628     def _my_api(self):
1629         return self.root_collection()._my_api()
1630
1631     def _my_keep(self):
1632         return self.root_collection()._my_keep()
1633
1634     def _my_block_manager(self):
1635         return self.root_collection()._my_block_manager()
1636
1637     def stream_name(self):
1638         return os.path.join(self.parent.stream_name(), self.name)
1639
1640     @synchronized
1641     def clone(self, new_parent, new_name):
1642         c = Subcollection(new_parent, new_name)
1643         c._clonefrom(self)
1644         return c
1645
1646     @must_be_writable
1647     @synchronized
1648     def _reparent(self, newparent, newname):
1649         self.set_committed(False)
1650         self.flush()
1651         self.parent.remove(self.name, recursive=True)
1652         self.parent = newparent
1653         self.name = newname
1654         self.lock = self.parent.root_collection().lock
1655
1656
1657 class CollectionReader(Collection):
1658     """A read-only collection object.
1659
1660     Initialize from a collection UUID or portable data hash, or raw
1661     manifest text.  See `Collection` constructor for detailed options.
1662
1663     """
1664     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1665         self._in_init = True
1666         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1667         self._in_init = False
1668
1669         # Forego any locking since it should never change once initialized.
1670         self.lock = NoopLock()
1671
1672         # Backwards compatability with old CollectionReader
1673         # all_streams() and all_files()
1674         self._streams = None
1675
1676     def writable(self):
1677         return self._in_init
1678
1679     def _populate_streams(orig_func):
1680         @functools.wraps(orig_func)
1681         def populate_streams_wrapper(self, *args, **kwargs):
1682             # Defer populating self._streams until needed since it creates a copy of the manifest.
1683             if self._streams is None:
1684                 if self._manifest_text:
1685                     self._streams = [sline.split()
1686                                      for sline in self._manifest_text.split("\n")
1687                                      if sline]
1688                 else:
1689                     self._streams = []
1690             return orig_func(self, *args, **kwargs)
1691         return populate_streams_wrapper
1692
1693     @_populate_streams
1694     def normalize(self):
1695         """Normalize the streams returned by `all_streams`.
1696
1697         This method is kept for backwards compatability and only affects the
1698         behavior of `all_streams()` and `all_files()`
1699
1700         """
1701
1702         # Rearrange streams
1703         streams = {}
1704         for s in self.all_streams():
1705             for f in s.all_files():
1706                 streamname, filename = split(s.name() + "/" + f.name())
1707                 if streamname not in streams:
1708                     streams[streamname] = {}
1709                 if filename not in streams[streamname]:
1710                     streams[streamname][filename] = []
1711                 for r in f.segments:
1712                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1713
1714         self._streams = [normalize_stream(s, streams[s])
1715                          for s in sorted(streams)]
1716     @_populate_streams
1717     def all_streams(self):
1718         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1719                 for s in self._streams]
1720
1721     @_populate_streams
1722     def all_files(self):
1723         for s in self.all_streams():
1724             for f in s.all_files():
1725                 yield f