20839: Replace arvados.util.list_all calls throughout
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import ciso8601
11 import datetime
12 import errno
13 import functools
14 import hashlib
15 import io
16 import logging
17 import os
18 import re
19 import sys
20 import threading
21 import time
22
23 from collections import deque
24 from stat import *
25
26 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
27 from .keep import KeepLocator, KeepClient
28 from .stream import StreamReader
29 from ._normalize_stream import normalize_stream, escape
30 from ._ranges import Range, LocatorAndRange
31 from .safeapi import ThreadSafeApiCache
32 import arvados.config as config
33 import arvados.errors as errors
34 import arvados.util
35 import arvados.events as events
36 from arvados.retry import retry_method
37
38 _logger = logging.getLogger('arvados.collection')
39
40 class CollectionBase(object):
41     """Abstract base class for Collection classes."""
42
43     def __enter__(self):
44         return self
45
46     def __exit__(self, exc_type, exc_value, traceback):
47         pass
48
49     def _my_keep(self):
50         if self._keep_client is None:
51             self._keep_client = KeepClient(api_client=self._api_client,
52                                            num_retries=self.num_retries)
53         return self._keep_client
54
55     def stripped_manifest(self):
56         """Get the manifest with locator hints stripped.
57
58         Return the manifest for the current collection with all
59         non-portable hints (i.e., permission signatures and other
60         hints other than size hints) removed from the locators.
61         """
62         raw = self.manifest_text()
63         clean = []
64         for line in raw.split("\n"):
65             fields = line.split()
66             if fields:
67                 clean_fields = fields[:1] + [
68                     (re.sub(r'\+[^\d][^\+]*', '', x)
69                      if re.match(arvados.util.keep_locator_pattern, x)
70                      else x)
71                     for x in fields[1:]]
72                 clean += [' '.join(clean_fields), "\n"]
73         return ''.join(clean)
74
75
76 class _WriterFile(_FileLikeObjectBase):
77     def __init__(self, coll_writer, name):
78         super(_WriterFile, self).__init__(name, 'wb')
79         self.dest = coll_writer
80
81     def close(self):
82         super(_WriterFile, self).close()
83         self.dest.finish_current_file()
84
85     @_FileLikeObjectBase._before_close
86     def write(self, data):
87         self.dest.write(data)
88
89     @_FileLikeObjectBase._before_close
90     def writelines(self, seq):
91         for data in seq:
92             self.write(data)
93
94     @_FileLikeObjectBase._before_close
95     def flush(self):
96         self.dest.flush_data()
97
98
99 class CollectionWriter(CollectionBase):
100     """Deprecated, use Collection instead."""
101
102     def __init__(self, api_client=None, num_retries=0, replication=None):
103         """Instantiate a CollectionWriter.
104
105         CollectionWriter lets you build a new Arvados Collection from scratch.
106         Write files to it.  The CollectionWriter will upload data to Keep as
107         appropriate, and provide you with the Collection manifest text when
108         you're finished.
109
110         Arguments:
111         * api_client: The API client to use to look up Collections.  If not
112           provided, CollectionReader will build one from available Arvados
113           configuration.
114         * num_retries: The default number of times to retry failed
115           service requests.  Default 0.  You may change this value
116           after instantiation, but note those changes may not
117           propagate to related objects like the Keep client.
118         * replication: The number of copies of each block to store.
119           If this argument is None or not supplied, replication is
120           the server-provided default if available, otherwise 2.
121         """
122         self._api_client = api_client
123         self.num_retries = num_retries
124         self.replication = (2 if replication is None else replication)
125         self._keep_client = None
126         self._data_buffer = []
127         self._data_buffer_len = 0
128         self._current_stream_files = []
129         self._current_stream_length = 0
130         self._current_stream_locators = []
131         self._current_stream_name = '.'
132         self._current_file_name = None
133         self._current_file_pos = 0
134         self._finished_streams = []
135         self._close_file = None
136         self._queued_file = None
137         self._queued_dirents = deque()
138         self._queued_trees = deque()
139         self._last_open = None
140
141     def __exit__(self, exc_type, exc_value, traceback):
142         if exc_type is None:
143             self.finish()
144
145     def do_queued_work(self):
146         # The work queue consists of three pieces:
147         # * _queued_file: The file object we're currently writing to the
148         #   Collection.
149         # * _queued_dirents: Entries under the current directory
150         #   (_queued_trees[0]) that we want to write or recurse through.
151         #   This may contain files from subdirectories if
152         #   max_manifest_depth == 0 for this directory.
153         # * _queued_trees: Directories that should be written as separate
154         #   streams to the Collection.
155         # This function handles the smallest piece of work currently queued
156         # (current file, then current directory, then next directory) until
157         # no work remains.  The _work_THING methods each do a unit of work on
158         # THING.  _queue_THING methods add a THING to the work queue.
159         while True:
160             if self._queued_file:
161                 self._work_file()
162             elif self._queued_dirents:
163                 self._work_dirents()
164             elif self._queued_trees:
165                 self._work_trees()
166             else:
167                 break
168
169     def _work_file(self):
170         while True:
171             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
172             if not buf:
173                 break
174             self.write(buf)
175         self.finish_current_file()
176         if self._close_file:
177             self._queued_file.close()
178         self._close_file = None
179         self._queued_file = None
180
181     def _work_dirents(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         if stream_name != self.current_stream_name():
184             self.start_new_stream(stream_name)
185         while self._queued_dirents:
186             dirent = self._queued_dirents.popleft()
187             target = os.path.join(path, dirent)
188             if os.path.isdir(target):
189                 self._queue_tree(target,
190                                  os.path.join(stream_name, dirent),
191                                  max_manifest_depth - 1)
192             else:
193                 self._queue_file(target, dirent)
194                 break
195         if not self._queued_dirents:
196             self._queued_trees.popleft()
197
198     def _work_trees(self):
199         path, stream_name, max_manifest_depth = self._queued_trees[0]
200         d = arvados.util.listdir_recursive(
201             path, max_depth = (None if max_manifest_depth == 0 else 0))
202         if d:
203             self._queue_dirents(stream_name, d)
204         else:
205             self._queued_trees.popleft()
206
207     def _queue_file(self, source, filename=None):
208         assert (self._queued_file is None), "tried to queue more than one file"
209         if not hasattr(source, 'read'):
210             source = open(source, 'rb')
211             self._close_file = True
212         else:
213             self._close_file = False
214         if filename is None:
215             filename = os.path.basename(source.name)
216         self.start_new_file(filename)
217         self._queued_file = source
218
219     def _queue_dirents(self, stream_name, dirents):
220         assert (not self._queued_dirents), "tried to queue more than one tree"
221         self._queued_dirents = deque(sorted(dirents))
222
223     def _queue_tree(self, path, stream_name, max_manifest_depth):
224         self._queued_trees.append((path, stream_name, max_manifest_depth))
225
226     def write_file(self, source, filename=None):
227         self._queue_file(source, filename)
228         self.do_queued_work()
229
230     def write_directory_tree(self,
231                              path, stream_name='.', max_manifest_depth=-1):
232         self._queue_tree(path, stream_name, max_manifest_depth)
233         self.do_queued_work()
234
235     def write(self, newdata):
236         if isinstance(newdata, bytes):
237             pass
238         elif isinstance(newdata, str):
239             newdata = newdata.encode()
240         elif hasattr(newdata, '__iter__'):
241             for s in newdata:
242                 self.write(s)
243             return
244         self._data_buffer.append(newdata)
245         self._data_buffer_len += len(newdata)
246         self._current_stream_length += len(newdata)
247         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
248             self.flush_data()
249
250     def open(self, streampath, filename=None):
251         """open(streampath[, filename]) -> file-like object
252
253         Pass in the path of a file to write to the Collection, either as a
254         single string or as two separate stream name and file name arguments.
255         This method returns a file-like object you can write to add it to the
256         Collection.
257
258         You may only have one file object from the Collection open at a time,
259         so be sure to close the object when you're done.  Using the object in
260         a with statement makes that easy::
261
262           with cwriter.open('./doc/page1.txt') as outfile:
263               outfile.write(page1_data)
264           with cwriter.open('./doc/page2.txt') as outfile:
265               outfile.write(page2_data)
266         """
267         if filename is None:
268             streampath, filename = split(streampath)
269         if self._last_open and not self._last_open.closed:
270             raise errors.AssertionError(
271                 u"can't open '{}' when '{}' is still open".format(
272                     filename, self._last_open.name))
273         if streampath != self.current_stream_name():
274             self.start_new_stream(streampath)
275         self.set_current_file_name(filename)
276         self._last_open = _WriterFile(self, filename)
277         return self._last_open
278
279     def flush_data(self):
280         data_buffer = b''.join(self._data_buffer)
281         if data_buffer:
282             self._current_stream_locators.append(
283                 self._my_keep().put(
284                     data_buffer[0:config.KEEP_BLOCK_SIZE],
285                     copies=self.replication))
286             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
287             self._data_buffer_len = len(self._data_buffer[0])
288
289     def start_new_file(self, newfilename=None):
290         self.finish_current_file()
291         self.set_current_file_name(newfilename)
292
293     def set_current_file_name(self, newfilename):
294         if re.search(r'[\t\n]', newfilename):
295             raise errors.AssertionError(
296                 "Manifest filenames cannot contain whitespace: %s" %
297                 newfilename)
298         elif re.search(r'\x00', newfilename):
299             raise errors.AssertionError(
300                 "Manifest filenames cannot contain NUL characters: %s" %
301                 newfilename)
302         self._current_file_name = newfilename
303
304     def current_file_name(self):
305         return self._current_file_name
306
307     def finish_current_file(self):
308         if self._current_file_name is None:
309             if self._current_file_pos == self._current_stream_length:
310                 return
311             raise errors.AssertionError(
312                 "Cannot finish an unnamed file " +
313                 "(%d bytes at offset %d in '%s' stream)" %
314                 (self._current_stream_length - self._current_file_pos,
315                  self._current_file_pos,
316                  self._current_stream_name))
317         self._current_stream_files.append([
318                 self._current_file_pos,
319                 self._current_stream_length - self._current_file_pos,
320                 self._current_file_name])
321         self._current_file_pos = self._current_stream_length
322         self._current_file_name = None
323
324     def start_new_stream(self, newstreamname='.'):
325         self.finish_current_stream()
326         self.set_current_stream_name(newstreamname)
327
328     def set_current_stream_name(self, newstreamname):
329         if re.search(r'[\t\n]', newstreamname):
330             raise errors.AssertionError(
331                 "Manifest stream names cannot contain whitespace: '%s'" %
332                 (newstreamname))
333         self._current_stream_name = '.' if newstreamname=='' else newstreamname
334
335     def current_stream_name(self):
336         return self._current_stream_name
337
338     def finish_current_stream(self):
339         self.finish_current_file()
340         self.flush_data()
341         if not self._current_stream_files:
342             pass
343         elif self._current_stream_name is None:
344             raise errors.AssertionError(
345                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
346                 (self._current_stream_length, len(self._current_stream_files)))
347         else:
348             if not self._current_stream_locators:
349                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
350             self._finished_streams.append([self._current_stream_name,
351                                            self._current_stream_locators,
352                                            self._current_stream_files])
353         self._current_stream_files = []
354         self._current_stream_length = 0
355         self._current_stream_locators = []
356         self._current_stream_name = None
357         self._current_file_pos = 0
358         self._current_file_name = None
359
360     def finish(self):
361         """Store the manifest in Keep and return its locator.
362
363         This is useful for storing manifest fragments (task outputs)
364         temporarily in Keep during a Crunch job.
365
366         In other cases you should make a collection instead, by
367         sending manifest_text() to the API server's "create
368         collection" endpoint.
369         """
370         return self._my_keep().put(self.manifest_text().encode(),
371                                    copies=self.replication)
372
373     def portable_data_hash(self):
374         stripped = self.stripped_manifest().encode()
375         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
376
377     def manifest_text(self):
378         self.finish_current_stream()
379         manifest = ''
380
381         for stream in self._finished_streams:
382             if not re.search(r'^\.(/.*)?$', stream[0]):
383                 manifest += './'
384             manifest += stream[0].replace(' ', '\\040')
385             manifest += ' ' + ' '.join(stream[1])
386             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
387             manifest += "\n"
388
389         return manifest
390
391     def data_locators(self):
392         ret = []
393         for name, locators, files in self._finished_streams:
394             ret += locators
395         return ret
396
397     def save_new(self, name=None):
398         return self._api_client.collections().create(
399             ensure_unique_name=True,
400             body={
401                 'name': name,
402                 'manifest_text': self.manifest_text(),
403             }).execute(num_retries=self.num_retries)
404
405
406 class ResumableCollectionWriter(CollectionWriter):
407     """Deprecated, use Collection instead."""
408
409     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
410                    '_current_stream_locators', '_current_stream_name',
411                    '_current_file_name', '_current_file_pos', '_close_file',
412                    '_data_buffer', '_dependencies', '_finished_streams',
413                    '_queued_dirents', '_queued_trees']
414
415     def __init__(self, api_client=None, **kwargs):
416         self._dependencies = {}
417         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
418
419     @classmethod
420     def from_state(cls, state, *init_args, **init_kwargs):
421         # Try to build a new writer from scratch with the given state.
422         # If the state is not suitable to resume (because files have changed,
423         # been deleted, aren't predictable, etc.), raise a
424         # StaleWriterStateError.  Otherwise, return the initialized writer.
425         # The caller is responsible for calling writer.do_queued_work()
426         # appropriately after it's returned.
427         writer = cls(*init_args, **init_kwargs)
428         for attr_name in cls.STATE_PROPS:
429             attr_value = state[attr_name]
430             attr_class = getattr(writer, attr_name).__class__
431             # Coerce the value into the same type as the initial value, if
432             # needed.
433             if attr_class not in (type(None), attr_value.__class__):
434                 attr_value = attr_class(attr_value)
435             setattr(writer, attr_name, attr_value)
436         # Check dependencies before we try to resume anything.
437         if any(KeepLocator(ls).permission_expired()
438                for ls in writer._current_stream_locators):
439             raise errors.StaleWriterStateError(
440                 "locators include expired permission hint")
441         writer.check_dependencies()
442         if state['_current_file'] is not None:
443             path, pos = state['_current_file']
444             try:
445                 writer._queued_file = open(path, 'rb')
446                 writer._queued_file.seek(pos)
447             except IOError as error:
448                 raise errors.StaleWriterStateError(
449                     u"failed to reopen active file {}: {}".format(path, error))
450         return writer
451
452     def check_dependencies(self):
453         for path, orig_stat in listitems(self._dependencies):
454             if not S_ISREG(orig_stat[ST_MODE]):
455                 raise errors.StaleWriterStateError(u"{} not file".format(path))
456             try:
457                 now_stat = tuple(os.stat(path))
458             except OSError as error:
459                 raise errors.StaleWriterStateError(
460                     u"failed to stat {}: {}".format(path, error))
461             if ((not S_ISREG(now_stat[ST_MODE])) or
462                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
463                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
464                 raise errors.StaleWriterStateError(u"{} changed".format(path))
465
466     def dump_state(self, copy_func=lambda x: x):
467         state = {attr: copy_func(getattr(self, attr))
468                  for attr in self.STATE_PROPS}
469         if self._queued_file is None:
470             state['_current_file'] = None
471         else:
472             state['_current_file'] = (os.path.realpath(self._queued_file.name),
473                                       self._queued_file.tell())
474         return state
475
476     def _queue_file(self, source, filename=None):
477         try:
478             src_path = os.path.realpath(source)
479         except Exception:
480             raise errors.AssertionError(u"{} not a file path".format(source))
481         try:
482             path_stat = os.stat(src_path)
483         except OSError as stat_error:
484             path_stat = None
485         super(ResumableCollectionWriter, self)._queue_file(source, filename)
486         fd_stat = os.fstat(self._queued_file.fileno())
487         if not S_ISREG(fd_stat.st_mode):
488             # We won't be able to resume from this cache anyway, so don't
489             # worry about further checks.
490             self._dependencies[source] = tuple(fd_stat)
491         elif path_stat is None:
492             raise errors.AssertionError(
493                 u"could not stat {}: {}".format(source, stat_error))
494         elif path_stat.st_ino != fd_stat.st_ino:
495             raise errors.AssertionError(
496                 u"{} changed between open and stat calls".format(source))
497         else:
498             self._dependencies[src_path] = tuple(fd_stat)
499
500     def write(self, data):
501         if self._queued_file is None:
502             raise errors.AssertionError(
503                 "resumable writer can't accept unsourced data")
504         return super(ResumableCollectionWriter, self).write(data)
505
506
507 ADD = "add"
508 DEL = "del"
509 MOD = "mod"
510 TOK = "tok"
511 FILE = "file"
512 COLLECTION = "collection"
513
514 class RichCollectionBase(CollectionBase):
515     """Base class for Collections and Subcollections.
516
517     Implements the majority of functionality relating to accessing items in the
518     Collection.
519
520     """
521
522     def __init__(self, parent=None):
523         self.parent = parent
524         self._committed = False
525         self._has_remote_blocks = False
526         self._callback = None
527         self._items = {}
528
529     def _my_api(self):
530         raise NotImplementedError()
531
532     def _my_keep(self):
533         raise NotImplementedError()
534
535     def _my_block_manager(self):
536         raise NotImplementedError()
537
538     def writable(self):
539         raise NotImplementedError()
540
541     def root_collection(self):
542         raise NotImplementedError()
543
544     def notify(self, event, collection, name, item):
545         raise NotImplementedError()
546
547     def stream_name(self):
548         raise NotImplementedError()
549
550
551     @synchronized
552     def has_remote_blocks(self):
553         """Recursively check for a +R segment locator signature."""
554
555         if self._has_remote_blocks:
556             return True
557         for item in self:
558             if self[item].has_remote_blocks():
559                 return True
560         return False
561
562     @synchronized
563     def set_has_remote_blocks(self, val):
564         self._has_remote_blocks = val
565         if self.parent:
566             self.parent.set_has_remote_blocks(val)
567
568     @must_be_writable
569     @synchronized
570     def find_or_create(self, path, create_type):
571         """Recursively search the specified file path.
572
573         May return either a `Collection` or `ArvadosFile`.  If not found, will
574         create a new item at the specified path based on `create_type`.  Will
575         create intermediate subcollections needed to contain the final item in
576         the path.
577
578         :create_type:
579           One of `arvados.collection.FILE` or
580           `arvados.collection.COLLECTION`.  If the path is not found, and value
581           of create_type is FILE then create and return a new ArvadosFile for
582           the last path component.  If COLLECTION, then create and return a new
583           Collection for the last path component.
584
585         """
586
587         pathcomponents = path.split("/", 1)
588         if pathcomponents[0]:
589             item = self._items.get(pathcomponents[0])
590             if len(pathcomponents) == 1:
591                 if item is None:
592                     # create new file
593                     if create_type == COLLECTION:
594                         item = Subcollection(self, pathcomponents[0])
595                     else:
596                         item = ArvadosFile(self, pathcomponents[0])
597                     self._items[pathcomponents[0]] = item
598                     self.set_committed(False)
599                     self.notify(ADD, self, pathcomponents[0], item)
600                 return item
601             else:
602                 if item is None:
603                     # create new collection
604                     item = Subcollection(self, pathcomponents[0])
605                     self._items[pathcomponents[0]] = item
606                     self.set_committed(False)
607                     self.notify(ADD, self, pathcomponents[0], item)
608                 if isinstance(item, RichCollectionBase):
609                     return item.find_or_create(pathcomponents[1], create_type)
610                 else:
611                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
612         else:
613             return self
614
615     @synchronized
616     def find(self, path):
617         """Recursively search the specified file path.
618
619         May return either a Collection or ArvadosFile. Return None if not
620         found.
621         If path is invalid (ex: starts with '/'), an IOError exception will be
622         raised.
623
624         """
625         if not path:
626             raise errors.ArgumentError("Parameter 'path' is empty.")
627
628         pathcomponents = path.split("/", 1)
629         if pathcomponents[0] == '':
630             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
631
632         item = self._items.get(pathcomponents[0])
633         if item is None:
634             return None
635         elif len(pathcomponents) == 1:
636             return item
637         else:
638             if isinstance(item, RichCollectionBase):
639                 if pathcomponents[1]:
640                     return item.find(pathcomponents[1])
641                 else:
642                     return item
643             else:
644                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
645
646     @synchronized
647     def mkdirs(self, path):
648         """Recursive subcollection create.
649
650         Like `os.makedirs()`.  Will create intermediate subcollections needed
651         to contain the leaf subcollection path.
652
653         """
654
655         if self.find(path) != None:
656             raise IOError(errno.EEXIST, "Directory or file exists", path)
657
658         return self.find_or_create(path, COLLECTION)
659
660     def open(self, path, mode="r", encoding=None):
661         """Open a file-like object for access.
662
663         :path:
664           path to a file in the collection
665         :mode:
666           a string consisting of "r", "w", or "a", optionally followed
667           by "b" or "t", optionally followed by "+".
668           :"b":
669             binary mode: write() accepts bytes, read() returns bytes.
670           :"t":
671             text mode (default): write() accepts strings, read() returns strings.
672           :"r":
673             opens for reading
674           :"r+":
675             opens for reading and writing.  Reads/writes share a file pointer.
676           :"w", "w+":
677             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
678           :"a", "a+":
679             opens for reading and writing.  All writes are appended to
680             the end of the file.  Writing does not affect the file pointer for
681             reading.
682
683         """
684
685         if not re.search(r'^[rwa][bt]?\+?$', mode):
686             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
687
688         if mode[0] == 'r' and '+' not in mode:
689             fclass = ArvadosFileReader
690             arvfile = self.find(path)
691         elif not self.writable():
692             raise IOError(errno.EROFS, "Collection is read only")
693         else:
694             fclass = ArvadosFileWriter
695             arvfile = self.find_or_create(path, FILE)
696
697         if arvfile is None:
698             raise IOError(errno.ENOENT, "File not found", path)
699         if not isinstance(arvfile, ArvadosFile):
700             raise IOError(errno.EISDIR, "Is a directory", path)
701
702         if mode[0] == 'w':
703             arvfile.truncate(0)
704
705         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
706         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
707         if 'b' not in mode:
708             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
709             f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
710         return f
711
712     def modified(self):
713         """Determine if the collection has been modified since last commited."""
714         return not self.committed()
715
716     @synchronized
717     def committed(self):
718         """Determine if the collection has been committed to the API server."""
719         return self._committed
720
721     @synchronized
722     def set_committed(self, value=True):
723         """Recursively set committed flag.
724
725         If value is True, set committed to be True for this and all children.
726
727         If value is False, set committed to be False for this and all parents.
728         """
729         if value == self._committed:
730             return
731         if value:
732             for k,v in listitems(self._items):
733                 v.set_committed(True)
734             self._committed = True
735         else:
736             self._committed = False
737             if self.parent is not None:
738                 self.parent.set_committed(False)
739
740     @synchronized
741     def __iter__(self):
742         """Iterate over names of files and collections contained in this collection."""
743         return iter(viewkeys(self._items))
744
745     @synchronized
746     def __getitem__(self, k):
747         """Get a file or collection that is directly contained by this collection.
748
749         If you want to search a path, use `find()` instead.
750
751         """
752         return self._items[k]
753
754     @synchronized
755     def __contains__(self, k):
756         """Test if there is a file or collection a directly contained by this collection."""
757         return k in self._items
758
759     @synchronized
760     def __len__(self):
761         """Get the number of items directly contained in this collection."""
762         return len(self._items)
763
764     @must_be_writable
765     @synchronized
766     def __delitem__(self, p):
767         """Delete an item by name which is directly contained by this collection."""
768         del self._items[p]
769         self.set_committed(False)
770         self.notify(DEL, self, p, None)
771
772     @synchronized
773     def keys(self):
774         """Get a list of names of files and collections directly contained in this collection."""
775         return self._items.keys()
776
777     @synchronized
778     def values(self):
779         """Get a list of files and collection objects directly contained in this collection."""
780         return listvalues(self._items)
781
782     @synchronized
783     def items(self):
784         """Get a list of (name, object) tuples directly contained in this collection."""
785         return listitems(self._items)
786
787     def exists(self, path):
788         """Test if there is a file or collection at `path`."""
789         return self.find(path) is not None
790
791     @must_be_writable
792     @synchronized
793     def remove(self, path, recursive=False):
794         """Remove the file or subcollection (directory) at `path`.
795
796         :recursive:
797           Specify whether to remove non-empty subcollections (True), or raise an error (False).
798         """
799
800         if not path:
801             raise errors.ArgumentError("Parameter 'path' is empty.")
802
803         pathcomponents = path.split("/", 1)
804         item = self._items.get(pathcomponents[0])
805         if item is None:
806             raise IOError(errno.ENOENT, "File not found", path)
807         if len(pathcomponents) == 1:
808             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
809                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
810             deleteditem = self._items[pathcomponents[0]]
811             del self._items[pathcomponents[0]]
812             self.set_committed(False)
813             self.notify(DEL, self, pathcomponents[0], deleteditem)
814         else:
815             item.remove(pathcomponents[1], recursive=recursive)
816
817     def _clonefrom(self, source):
818         for k,v in listitems(source):
819             self._items[k] = v.clone(self, k)
820
821     def clone(self):
822         raise NotImplementedError()
823
824     @must_be_writable
825     @synchronized
826     def add(self, source_obj, target_name, overwrite=False, reparent=False):
827         """Copy or move a file or subcollection to this collection.
828
829         :source_obj:
830           An ArvadosFile, or Subcollection object
831
832         :target_name:
833           Destination item name.  If the target name already exists and is a
834           file, this will raise an error unless you specify `overwrite=True`.
835
836         :overwrite:
837           Whether to overwrite target file if it already exists.
838
839         :reparent:
840           If True, source_obj will be moved from its parent collection to this collection.
841           If False, source_obj will be copied and the parent collection will be
842           unmodified.
843
844         """
845
846         if target_name in self and not overwrite:
847             raise IOError(errno.EEXIST, "File already exists", target_name)
848
849         modified_from = None
850         if target_name in self:
851             modified_from = self[target_name]
852
853         # Actually make the move or copy.
854         if reparent:
855             source_obj._reparent(self, target_name)
856             item = source_obj
857         else:
858             item = source_obj.clone(self, target_name)
859
860         self._items[target_name] = item
861         self.set_committed(False)
862         if not self._has_remote_blocks and source_obj.has_remote_blocks():
863             self.set_has_remote_blocks(True)
864
865         if modified_from:
866             self.notify(MOD, self, target_name, (modified_from, item))
867         else:
868             self.notify(ADD, self, target_name, item)
869
870     def _get_src_target(self, source, target_path, source_collection, create_dest):
871         if source_collection is None:
872             source_collection = self
873
874         # Find the object
875         if isinstance(source, basestring):
876             source_obj = source_collection.find(source)
877             if source_obj is None:
878                 raise IOError(errno.ENOENT, "File not found", source)
879             sourcecomponents = source.split("/")
880         else:
881             source_obj = source
882             sourcecomponents = None
883
884         # Find parent collection the target path
885         targetcomponents = target_path.split("/")
886
887         # Determine the name to use.
888         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
889
890         if not target_name:
891             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
892
893         if create_dest:
894             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
895         else:
896             if len(targetcomponents) > 1:
897                 target_dir = self.find("/".join(targetcomponents[0:-1]))
898             else:
899                 target_dir = self
900
901         if target_dir is None:
902             raise IOError(errno.ENOENT, "Target directory not found", target_name)
903
904         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
905             target_dir = target_dir[target_name]
906             target_name = sourcecomponents[-1]
907
908         return (source_obj, target_dir, target_name)
909
910     @must_be_writable
911     @synchronized
912     def copy(self, source, target_path, source_collection=None, overwrite=False):
913         """Copy a file or subcollection to a new path in this collection.
914
915         :source:
916           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
917
918         :target_path:
919           Destination file or path.  If the target path already exists and is a
920           subcollection, the item will be placed inside the subcollection.  If
921           the target path already exists and is a file, this will raise an error
922           unless you specify `overwrite=True`.
923
924         :source_collection:
925           Collection to copy `source_path` from (default `self`)
926
927         :overwrite:
928           Whether to overwrite target file if it already exists.
929         """
930
931         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
932         target_dir.add(source_obj, target_name, overwrite, False)
933
934     @must_be_writable
935     @synchronized
936     def rename(self, source, target_path, source_collection=None, overwrite=False):
937         """Move a file or subcollection from `source_collection` to a new path in this collection.
938
939         :source:
940           A string with a path to source file or subcollection.
941
942         :target_path:
943           Destination file or path.  If the target path already exists and is a
944           subcollection, the item will be placed inside the subcollection.  If
945           the target path already exists and is a file, this will raise an error
946           unless you specify `overwrite=True`.
947
948         :source_collection:
949           Collection to copy `source_path` from (default `self`)
950
951         :overwrite:
952           Whether to overwrite target file if it already exists.
953         """
954
955         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
956         if not source_obj.writable():
957             raise IOError(errno.EROFS, "Source collection is read only", source)
958         target_dir.add(source_obj, target_name, overwrite, True)
959
960     def portable_manifest_text(self, stream_name="."):
961         """Get the manifest text for this collection, sub collections and files.
962
963         This method does not flush outstanding blocks to Keep.  It will return
964         a normalized manifest with access tokens stripped.
965
966         :stream_name:
967           Name to use for this stream (directory)
968
969         """
970         return self._get_manifest_text(stream_name, True, True)
971
972     @synchronized
973     def manifest_text(self, stream_name=".", strip=False, normalize=False,
974                       only_committed=False):
975         """Get the manifest text for this collection, sub collections and files.
976
977         This method will flush outstanding blocks to Keep.  By default, it will
978         not normalize an unmodified manifest or strip access tokens.
979
980         :stream_name:
981           Name to use for this stream (directory)
982
983         :strip:
984           If True, remove signing tokens from block locators if present.
985           If False (default), block locators are left unchanged.
986
987         :normalize:
988           If True, always export the manifest text in normalized form
989           even if the Collection is not modified.  If False (default) and the collection
990           is not modified, return the original manifest text even if it is not
991           in normalized form.
992
993         :only_committed:
994           If True, don't commit pending blocks.
995
996         """
997
998         if not only_committed:
999             self._my_block_manager().commit_all()
1000         return self._get_manifest_text(stream_name, strip, normalize,
1001                                        only_committed=only_committed)
1002
1003     @synchronized
1004     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1005         """Get the manifest text for this collection, sub collections and files.
1006
1007         :stream_name:
1008           Name to use for this stream (directory)
1009
1010         :strip:
1011           If True, remove signing tokens from block locators if present.
1012           If False (default), block locators are left unchanged.
1013
1014         :normalize:
1015           If True, always export the manifest text in normalized form
1016           even if the Collection is not modified.  If False (default) and the collection
1017           is not modified, return the original manifest text even if it is not
1018           in normalized form.
1019
1020         :only_committed:
1021           If True, only include blocks that were already committed to Keep.
1022
1023         """
1024
1025         if not self.committed() or self._manifest_text is None or normalize:
1026             stream = {}
1027             buf = []
1028             sorted_keys = sorted(self.keys())
1029             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1030                 # Create a stream per file `k`
1031                 arvfile = self[filename]
1032                 filestream = []
1033                 for segment in arvfile.segments():
1034                     loc = segment.locator
1035                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
1036                         if only_committed:
1037                             continue
1038                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1039                     if strip:
1040                         loc = KeepLocator(loc).stripped()
1041                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1042                                          segment.segment_offset, segment.range_size))
1043                 stream[filename] = filestream
1044             if stream:
1045                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1046             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1047                 buf.append(self[dirname].manifest_text(
1048                     stream_name=os.path.join(stream_name, dirname),
1049                     strip=strip, normalize=True, only_committed=only_committed))
1050             return "".join(buf)
1051         else:
1052             if strip:
1053                 return self.stripped_manifest()
1054             else:
1055                 return self._manifest_text
1056
1057     @synchronized
1058     def _copy_remote_blocks(self, remote_blocks={}):
1059         """Scan through the entire collection and ask Keep to copy remote blocks.
1060
1061         When accessing a remote collection, blocks will have a remote signature
1062         (+R instead of +A). Collect these signatures and request Keep to copy the
1063         blocks to the local cluster, returning local (+A) signatures.
1064
1065         :remote_blocks:
1066           Shared cache of remote to local block mappings. This is used to avoid
1067           doing extra work when blocks are shared by more than one file in
1068           different subdirectories.
1069
1070         """
1071         for item in self:
1072             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1073         return remote_blocks
1074
1075     @synchronized
1076     def diff(self, end_collection, prefix=".", holding_collection=None):
1077         """Generate list of add/modify/delete actions.
1078
1079         When given to `apply`, will change `self` to match `end_collection`
1080
1081         """
1082         changes = []
1083         if holding_collection is None:
1084             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1085         for k in self:
1086             if k not in end_collection:
1087                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1088         for k in end_collection:
1089             if k in self:
1090                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1091                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1092                 elif end_collection[k] != self[k]:
1093                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1094                 else:
1095                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1096             else:
1097                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1098         return changes
1099
1100     @must_be_writable
1101     @synchronized
1102     def apply(self, changes):
1103         """Apply changes from `diff`.
1104
1105         If a change conflicts with a local change, it will be saved to an
1106         alternate path indicating the conflict.
1107
1108         """
1109         if changes:
1110             self.set_committed(False)
1111         for change in changes:
1112             event_type = change[0]
1113             path = change[1]
1114             initial = change[2]
1115             local = self.find(path)
1116             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1117                                                                     time.gmtime()))
1118             if event_type == ADD:
1119                 if local is None:
1120                     # No local file at path, safe to copy over new file
1121                     self.copy(initial, path)
1122                 elif local is not None and local != initial:
1123                     # There is already local file and it is different:
1124                     # save change to conflict file.
1125                     self.copy(initial, conflictpath)
1126             elif event_type == MOD or event_type == TOK:
1127                 final = change[3]
1128                 if local == initial:
1129                     # Local matches the "initial" item so it has not
1130                     # changed locally and is safe to update.
1131                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1132                         # Replace contents of local file with new contents
1133                         local.replace_contents(final)
1134                     else:
1135                         # Overwrite path with new item; this can happen if
1136                         # path was a file and is now a collection or vice versa
1137                         self.copy(final, path, overwrite=True)
1138                 else:
1139                     # Local is missing (presumably deleted) or local doesn't
1140                     # match the "start" value, so save change to conflict file
1141                     self.copy(final, conflictpath)
1142             elif event_type == DEL:
1143                 if local == initial:
1144                     # Local item matches "initial" value, so it is safe to remove.
1145                     self.remove(path, recursive=True)
1146                 # else, the file is modified or already removed, in either
1147                 # case we don't want to try to remove it.
1148
1149     def portable_data_hash(self):
1150         """Get the portable data hash for this collection's manifest."""
1151         if self._manifest_locator and self.committed():
1152             # If the collection is already saved on the API server, and it's committed
1153             # then return API server's PDH response.
1154             return self._portable_data_hash
1155         else:
1156             stripped = self.portable_manifest_text().encode()
1157             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1158
1159     @synchronized
1160     def subscribe(self, callback):
1161         if self._callback is None:
1162             self._callback = callback
1163         else:
1164             raise errors.ArgumentError("A callback is already set on this collection.")
1165
1166     @synchronized
1167     def unsubscribe(self):
1168         if self._callback is not None:
1169             self._callback = None
1170
1171     @synchronized
1172     def notify(self, event, collection, name, item):
1173         if self._callback:
1174             self._callback(event, collection, name, item)
1175         self.root_collection().notify(event, collection, name, item)
1176
1177     @synchronized
1178     def __eq__(self, other):
1179         if other is self:
1180             return True
1181         if not isinstance(other, RichCollectionBase):
1182             return False
1183         if len(self._items) != len(other):
1184             return False
1185         for k in self._items:
1186             if k not in other:
1187                 return False
1188             if self._items[k] != other[k]:
1189                 return False
1190         return True
1191
1192     def __ne__(self, other):
1193         return not self.__eq__(other)
1194
1195     @synchronized
1196     def flush(self):
1197         """Flush bufferblocks to Keep."""
1198         for e in listvalues(self):
1199             e.flush()
1200
1201
1202 class Collection(RichCollectionBase):
1203     """Represents the root of an Arvados Collection.
1204
1205     This class is threadsafe.  The root collection object, all subcollections
1206     and files are protected by a single lock (i.e. each access locks the entire
1207     collection).
1208
1209     Brief summary of
1210     useful methods:
1211
1212     :To read an existing file:
1213       `c.open("myfile", "r")`
1214
1215     :To write a new file:
1216       `c.open("myfile", "w")`
1217
1218     :To determine if a file exists:
1219       `c.find("myfile") is not None`
1220
1221     :To copy a file:
1222       `c.copy("source", "dest")`
1223
1224     :To delete a file:
1225       `c.remove("myfile")`
1226
1227     :To save to an existing collection record:
1228       `c.save()`
1229
1230     :To save a new collection record:
1231     `c.save_new()`
1232
1233     :To merge remote changes into this object:
1234       `c.update()`
1235
1236     Must be associated with an API server Collection record (during
1237     initialization, or using `save_new`) to use `save` or `update`
1238
1239     """
1240
1241     def __init__(self, manifest_locator_or_text=None,
1242                  api_client=None,
1243                  keep_client=None,
1244                  num_retries=10,
1245                  parent=None,
1246                  apiconfig=None,
1247                  block_manager=None,
1248                  replication_desired=None,
1249                  storage_classes_desired=None,
1250                  put_threads=None):
1251         """Collection constructor.
1252
1253         :manifest_locator_or_text:
1254           An Arvados collection UUID, portable data hash, raw manifest
1255           text, or (if creating an empty collection) None.
1256
1257         :parent:
1258           the parent Collection, may be None.
1259
1260         :apiconfig:
1261           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1262           Prefer this over supplying your own api_client and keep_client (except in testing).
1263           Will use default config settings if not specified.
1264
1265         :api_client:
1266           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1267
1268         :keep_client:
1269           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1270
1271         :num_retries:
1272           the number of retries for API and Keep requests.
1273
1274         :block_manager:
1275           the block manager to use.  If not specified, create one.
1276
1277         :replication_desired:
1278           How many copies should Arvados maintain. If None, API server default
1279           configuration applies. If not None, this value will also be used
1280           for determining the number of block copies being written.
1281
1282         :storage_classes_desired:
1283           A list of storage class names where to upload the data. If None,
1284           the keep client is expected to store the data into the cluster's
1285           default storage class(es).
1286
1287         """
1288
1289         if storage_classes_desired and type(storage_classes_desired) is not list:
1290             raise errors.ArgumentError("storage_classes_desired must be list type.")
1291
1292         super(Collection, self).__init__(parent)
1293         self._api_client = api_client
1294         self._keep_client = keep_client
1295
1296         # Use the keep client from ThreadSafeApiCache
1297         if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1298             self._keep_client = self._api_client.keep
1299
1300         self._block_manager = block_manager
1301         self.replication_desired = replication_desired
1302         self._storage_classes_desired = storage_classes_desired
1303         self.put_threads = put_threads
1304
1305         if apiconfig:
1306             self._config = apiconfig
1307         else:
1308             self._config = config.settings()
1309
1310         self.num_retries = num_retries
1311         self._manifest_locator = None
1312         self._manifest_text = None
1313         self._portable_data_hash = None
1314         self._api_response = None
1315         self._past_versions = set()
1316
1317         self.lock = threading.RLock()
1318         self.events = None
1319
1320         if manifest_locator_or_text:
1321             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1322                 self._manifest_locator = manifest_locator_or_text
1323             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1324                 self._manifest_locator = manifest_locator_or_text
1325                 if not self._has_local_collection_uuid():
1326                     self._has_remote_blocks = True
1327             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1328                 self._manifest_text = manifest_locator_or_text
1329                 if '+R' in self._manifest_text:
1330                     self._has_remote_blocks = True
1331             else:
1332                 raise errors.ArgumentError(
1333                     "Argument to CollectionReader is not a manifest or a collection UUID")
1334
1335             try:
1336                 self._populate()
1337             except errors.SyntaxError as e:
1338                 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1339
1340     def storage_classes_desired(self):
1341         return self._storage_classes_desired or []
1342
1343     def root_collection(self):
1344         return self
1345
1346     def get_properties(self):
1347         if self._api_response and self._api_response["properties"]:
1348             return self._api_response["properties"]
1349         else:
1350             return {}
1351
1352     def get_trash_at(self):
1353         if self._api_response and self._api_response["trash_at"]:
1354             try:
1355                 return ciso8601.parse_datetime(self._api_response["trash_at"])
1356             except ValueError:
1357                 return None
1358         else:
1359             return None
1360
1361     def stream_name(self):
1362         return "."
1363
1364     def writable(self):
1365         return True
1366
1367     @synchronized
1368     def known_past_version(self, modified_at_and_portable_data_hash):
1369         return modified_at_and_portable_data_hash in self._past_versions
1370
1371     @synchronized
1372     @retry_method
1373     def update(self, other=None, num_retries=None):
1374         """Merge the latest collection on the API server with the current collection."""
1375
1376         if other is None:
1377             if self._manifest_locator is None:
1378                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1379             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1380             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1381                 response.get("portable_data_hash") != self.portable_data_hash()):
1382                 # The record on the server is different from our current one, but we've seen it before,
1383                 # so ignore it because it's already been merged.
1384                 # However, if it's the same as our current record, proceed with the update, because we want to update
1385                 # our tokens.
1386                 return
1387             else:
1388                 self._remember_api_response(response)
1389             other = CollectionReader(response["manifest_text"])
1390         baseline = CollectionReader(self._manifest_text)
1391         self.apply(baseline.diff(other))
1392         self._manifest_text = self.manifest_text()
1393
1394     @synchronized
1395     def _my_api(self):
1396         if self._api_client is None:
1397             self._api_client = ThreadSafeApiCache(self._config, version='v1')
1398             if self._keep_client is None:
1399                 self._keep_client = self._api_client.keep
1400         return self._api_client
1401
1402     @synchronized
1403     def _my_keep(self):
1404         if self._keep_client is None:
1405             if self._api_client is None:
1406                 self._my_api()
1407             else:
1408                 self._keep_client = KeepClient(api_client=self._api_client)
1409         return self._keep_client
1410
1411     @synchronized
1412     def _my_block_manager(self):
1413         if self._block_manager is None:
1414             copies = (self.replication_desired or
1415                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1416                                                    2))
1417             self._block_manager = _BlockManager(self._my_keep(),
1418                                                 copies=copies,
1419                                                 put_threads=self.put_threads,
1420                                                 num_retries=self.num_retries,
1421                                                 storage_classes_func=self.storage_classes_desired)
1422         return self._block_manager
1423
1424     def _remember_api_response(self, response):
1425         self._api_response = response
1426         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1427
1428     def _populate_from_api_server(self):
1429         # As in KeepClient itself, we must wait until the last
1430         # possible moment to instantiate an API client, in order to
1431         # avoid tripping up clients that don't have access to an API
1432         # server.  If we do build one, make sure our Keep client uses
1433         # it.  If instantiation fails, we'll fall back to the except
1434         # clause, just like any other Collection lookup
1435         # failure. Return an exception, or None if successful.
1436         self._remember_api_response(self._my_api().collections().get(
1437             uuid=self._manifest_locator).execute(
1438                 num_retries=self.num_retries))
1439         self._manifest_text = self._api_response['manifest_text']
1440         self._portable_data_hash = self._api_response['portable_data_hash']
1441         # If not overriden via kwargs, we should try to load the
1442         # replication_desired and storage_classes_desired from the API server
1443         if self.replication_desired is None:
1444             self.replication_desired = self._api_response.get('replication_desired', None)
1445         if self._storage_classes_desired is None:
1446             self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1447
1448     def _populate(self):
1449         if self._manifest_text is None:
1450             if self._manifest_locator is None:
1451                 return
1452             else:
1453                 self._populate_from_api_server()
1454         self._baseline_manifest = self._manifest_text
1455         self._import_manifest(self._manifest_text)
1456
1457     def _has_collection_uuid(self):
1458         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1459
1460     def _has_local_collection_uuid(self):
1461         return self._has_collection_uuid and \
1462             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1463
1464     def __enter__(self):
1465         return self
1466
1467     def __exit__(self, exc_type, exc_value, traceback):
1468         """Support scoped auto-commit in a with: block."""
1469         if exc_type is None:
1470             if self.writable() and self._has_collection_uuid():
1471                 self.save()
1472         self.stop_threads()
1473
1474     def stop_threads(self):
1475         if self._block_manager is not None:
1476             self._block_manager.stop_threads()
1477
1478     @synchronized
1479     def manifest_locator(self):
1480         """Get the manifest locator, if any.
1481
1482         The manifest locator will be set when the collection is loaded from an
1483         API server record or the portable data hash of a manifest.
1484
1485         The manifest locator will be None if the collection is newly created or
1486         was created directly from manifest text.  The method `save_new()` will
1487         assign a manifest locator.
1488
1489         """
1490         return self._manifest_locator
1491
1492     @synchronized
1493     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1494         if new_config is None:
1495             new_config = self._config
1496         if readonly:
1497             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1498         else:
1499             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1500
1501         newcollection._clonefrom(self)
1502         return newcollection
1503
1504     @synchronized
1505     def api_response(self):
1506         """Returns information about this Collection fetched from the API server.
1507
1508         If the Collection exists in Keep but not the API server, currently
1509         returns None.  Future versions may provide a synthetic response.
1510
1511         """
1512         return self._api_response
1513
1514     def find_or_create(self, path, create_type):
1515         """See `RichCollectionBase.find_or_create`"""
1516         if path == ".":
1517             return self
1518         else:
1519             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1520
1521     def find(self, path):
1522         """See `RichCollectionBase.find`"""
1523         if path == ".":
1524             return self
1525         else:
1526             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1527
1528     def remove(self, path, recursive=False):
1529         """See `RichCollectionBase.remove`"""
1530         if path == ".":
1531             raise errors.ArgumentError("Cannot remove '.'")
1532         else:
1533             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1534
1535     @must_be_writable
1536     @synchronized
1537     @retry_method
1538     def save(self,
1539              properties=None,
1540              storage_classes=None,
1541              trash_at=None,
1542              merge=True,
1543              num_retries=None,
1544              preserve_version=False):
1545         """Save collection to an existing collection record.
1546
1547         Commit pending buffer blocks to Keep, merge with remote record (if
1548         merge=True, the default), and update the collection record. Returns
1549         the current manifest text.
1550
1551         Will raise AssertionError if not associated with a collection record on
1552         the API server.  If you want to save a manifest to Keep only, see
1553         `save_new()`.
1554
1555         :properties:
1556           Additional properties of collection. This value will replace any existing
1557           properties of collection.
1558
1559         :storage_classes:
1560           Specify desirable storage classes to be used when writing data to Keep.
1561
1562         :trash_at:
1563           A collection is *expiring* when it has a *trash_at* time in the future.
1564           An expiring collection can be accessed as normal,
1565           but is scheduled to be trashed automatically at the *trash_at* time.
1566
1567         :merge:
1568           Update and merge remote changes before saving.  Otherwise, any
1569           remote changes will be ignored and overwritten.
1570
1571         :num_retries:
1572           Retry count on API calls (if None,  use the collection default)
1573
1574         :preserve_version:
1575           If True, indicate that the collection content being saved right now
1576           should be preserved in a version snapshot if the collection record is
1577           updated in the future. Requires that the API server has
1578           Collections.CollectionVersioning enabled, if not, setting this will
1579           raise an exception.
1580
1581         """
1582         if properties and type(properties) is not dict:
1583             raise errors.ArgumentError("properties must be dictionary type.")
1584
1585         if storage_classes and type(storage_classes) is not list:
1586             raise errors.ArgumentError("storage_classes must be list type.")
1587         if storage_classes:
1588             self._storage_classes_desired = storage_classes
1589
1590         if trash_at and type(trash_at) is not datetime.datetime:
1591             raise errors.ArgumentError("trash_at must be datetime type.")
1592
1593         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1594             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1595
1596         body={}
1597         if properties:
1598             body["properties"] = properties
1599         if self.storage_classes_desired():
1600             body["storage_classes_desired"] = self.storage_classes_desired()
1601         if trash_at:
1602             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1603             body["trash_at"] = t
1604         if preserve_version:
1605             body["preserve_version"] = preserve_version
1606
1607         if not self.committed():
1608             if self._has_remote_blocks:
1609                 # Copy any remote blocks to the local cluster.
1610                 self._copy_remote_blocks(remote_blocks={})
1611                 self._has_remote_blocks = False
1612             if not self._has_collection_uuid():
1613                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1614             elif not self._has_local_collection_uuid():
1615                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1616
1617             self._my_block_manager().commit_all()
1618
1619             if merge:
1620                 self.update()
1621
1622             text = self.manifest_text(strip=False)
1623             body['manifest_text'] = text
1624
1625             self._remember_api_response(self._my_api().collections().update(
1626                 uuid=self._manifest_locator,
1627                 body=body
1628                 ).execute(num_retries=num_retries))
1629             self._manifest_text = self._api_response["manifest_text"]
1630             self._portable_data_hash = self._api_response["portable_data_hash"]
1631             self.set_committed(True)
1632         elif body:
1633             self._remember_api_response(self._my_api().collections().update(
1634                 uuid=self._manifest_locator,
1635                 body=body
1636                 ).execute(num_retries=num_retries))
1637
1638         return self._manifest_text
1639
1640
1641     @must_be_writable
1642     @synchronized
1643     @retry_method
1644     def save_new(self, name=None,
1645                  create_collection_record=True,
1646                  owner_uuid=None,
1647                  properties=None,
1648                  storage_classes=None,
1649                  trash_at=None,
1650                  ensure_unique_name=False,
1651                  num_retries=None,
1652                  preserve_version=False):
1653         """Save collection to a new collection record.
1654
1655         Commit pending buffer blocks to Keep and, when create_collection_record
1656         is True (default), create a new collection record.  After creating a
1657         new collection record, this Collection object will be associated with
1658         the new record used by `save()`. Returns the current manifest text.
1659
1660         :name:
1661           The collection name.
1662
1663         :create_collection_record:
1664            If True, create a collection record on the API server.
1665            If False, only commit blocks to Keep and return the manifest text.
1666
1667         :owner_uuid:
1668           the user, or project uuid that will own this collection.
1669           If None, defaults to the current user.
1670
1671         :properties:
1672           Additional properties of collection. This value will replace any existing
1673           properties of collection.
1674
1675         :storage_classes:
1676           Specify desirable storage classes to be used when writing data to Keep.
1677
1678         :trash_at:
1679           A collection is *expiring* when it has a *trash_at* time in the future.
1680           An expiring collection can be accessed as normal,
1681           but is scheduled to be trashed automatically at the *trash_at* time.
1682
1683         :ensure_unique_name:
1684           If True, ask the API server to rename the collection
1685           if it conflicts with a collection with the same name and owner.  If
1686           False, a name conflict will result in an error.
1687
1688         :num_retries:
1689           Retry count on API calls (if None,  use the collection default)
1690
1691         :preserve_version:
1692           If True, indicate that the collection content being saved right now
1693           should be preserved in a version snapshot if the collection record is
1694           updated in the future. Requires that the API server has
1695           Collections.CollectionVersioning enabled, if not, setting this will
1696           raise an exception.
1697
1698         """
1699         if properties and type(properties) is not dict:
1700             raise errors.ArgumentError("properties must be dictionary type.")
1701
1702         if storage_classes and type(storage_classes) is not list:
1703             raise errors.ArgumentError("storage_classes must be list type.")
1704
1705         if trash_at and type(trash_at) is not datetime.datetime:
1706             raise errors.ArgumentError("trash_at must be datetime type.")
1707
1708         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1709             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1710
1711         if self._has_remote_blocks:
1712             # Copy any remote blocks to the local cluster.
1713             self._copy_remote_blocks(remote_blocks={})
1714             self._has_remote_blocks = False
1715
1716         if storage_classes:
1717             self._storage_classes_desired = storage_classes
1718
1719         self._my_block_manager().commit_all()
1720         text = self.manifest_text(strip=False)
1721
1722         if create_collection_record:
1723             if name is None:
1724                 name = "New collection"
1725                 ensure_unique_name = True
1726
1727             body = {"manifest_text": text,
1728                     "name": name,
1729                     "replication_desired": self.replication_desired}
1730             if owner_uuid:
1731                 body["owner_uuid"] = owner_uuid
1732             if properties:
1733                 body["properties"] = properties
1734             if self.storage_classes_desired():
1735                 body["storage_classes_desired"] = self.storage_classes_desired()
1736             if trash_at:
1737                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1738                 body["trash_at"] = t
1739             if preserve_version:
1740                 body["preserve_version"] = preserve_version
1741
1742             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1743             text = self._api_response["manifest_text"]
1744
1745             self._manifest_locator = self._api_response["uuid"]
1746             self._portable_data_hash = self._api_response["portable_data_hash"]
1747
1748             self._manifest_text = text
1749             self.set_committed(True)
1750
1751         return text
1752
1753     _token_re = re.compile(r'(\S+)(\s+|$)')
1754     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1755     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1756
1757     def _unescape_manifest_path(self, path):
1758         return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1759
1760     @synchronized
1761     def _import_manifest(self, manifest_text):
1762         """Import a manifest into a `Collection`.
1763
1764         :manifest_text:
1765           The manifest text to import from.
1766
1767         """
1768         if len(self) > 0:
1769             raise ArgumentError("Can only import manifest into an empty collection")
1770
1771         STREAM_NAME = 0
1772         BLOCKS = 1
1773         SEGMENTS = 2
1774
1775         stream_name = None
1776         state = STREAM_NAME
1777
1778         for token_and_separator in self._token_re.finditer(manifest_text):
1779             tok = token_and_separator.group(1)
1780             sep = token_and_separator.group(2)
1781
1782             if state == STREAM_NAME:
1783                 # starting a new stream
1784                 stream_name = self._unescape_manifest_path(tok)
1785                 blocks = []
1786                 segments = []
1787                 streamoffset = 0
1788                 state = BLOCKS
1789                 self.find_or_create(stream_name, COLLECTION)
1790                 continue
1791
1792             if state == BLOCKS:
1793                 block_locator = self._block_re.match(tok)
1794                 if block_locator:
1795                     blocksize = int(block_locator.group(1))
1796                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1797                     streamoffset += blocksize
1798                 else:
1799                     state = SEGMENTS
1800
1801             if state == SEGMENTS:
1802                 file_segment = self._segment_re.match(tok)
1803                 if file_segment:
1804                     pos = int(file_segment.group(1))
1805                     size = int(file_segment.group(2))
1806                     name = self._unescape_manifest_path(file_segment.group(3))
1807                     if name.split('/')[-1] == '.':
1808                         # placeholder for persisting an empty directory, not a real file
1809                         if len(name) > 2:
1810                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1811                     else:
1812                         filepath = os.path.join(stream_name, name)
1813                         try:
1814                             afile = self.find_or_create(filepath, FILE)
1815                         except IOError as e:
1816                             if e.errno == errno.ENOTDIR:
1817                                 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1818                             else:
1819                                 raise e from None
1820                         if isinstance(afile, ArvadosFile):
1821                             afile.add_segment(blocks, pos, size)
1822                         else:
1823                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1824                 else:
1825                     # error!
1826                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1827
1828             if sep == "\n":
1829                 stream_name = None
1830                 state = STREAM_NAME
1831
1832         self.set_committed(True)
1833
1834     @synchronized
1835     def notify(self, event, collection, name, item):
1836         if self._callback:
1837             self._callback(event, collection, name, item)
1838
1839
1840 class Subcollection(RichCollectionBase):
1841     """This is a subdirectory within a collection that doesn't have its own API
1842     server record.
1843
1844     Subcollection locking falls under the umbrella lock of its root collection.
1845
1846     """
1847
1848     def __init__(self, parent, name):
1849         super(Subcollection, self).__init__(parent)
1850         self.lock = self.root_collection().lock
1851         self._manifest_text = None
1852         self.name = name
1853         self.num_retries = parent.num_retries
1854
1855     def root_collection(self):
1856         return self.parent.root_collection()
1857
1858     def writable(self):
1859         return self.root_collection().writable()
1860
1861     def _my_api(self):
1862         return self.root_collection()._my_api()
1863
1864     def _my_keep(self):
1865         return self.root_collection()._my_keep()
1866
1867     def _my_block_manager(self):
1868         return self.root_collection()._my_block_manager()
1869
1870     def stream_name(self):
1871         return os.path.join(self.parent.stream_name(), self.name)
1872
1873     @synchronized
1874     def clone(self, new_parent, new_name):
1875         c = Subcollection(new_parent, new_name)
1876         c._clonefrom(self)
1877         return c
1878
1879     @must_be_writable
1880     @synchronized
1881     def _reparent(self, newparent, newname):
1882         self.set_committed(False)
1883         self.flush()
1884         self.parent.remove(self.name, recursive=True)
1885         self.parent = newparent
1886         self.name = newname
1887         self.lock = self.parent.root_collection().lock
1888
1889     @synchronized
1890     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1891         """Encode empty directories by using an \056-named (".") empty file"""
1892         if len(self._items) == 0:
1893             return "%s %s 0:0:\\056\n" % (
1894                 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1895         return super(Subcollection, self)._get_manifest_text(stream_name,
1896                                                              strip, normalize,
1897                                                              only_committed)
1898
1899
1900 class CollectionReader(Collection):
1901     """A read-only collection object.
1902
1903     Initialize from a collection UUID or portable data hash, or raw
1904     manifest text.  See `Collection` constructor for detailed options.
1905
1906     """
1907     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1908         self._in_init = True
1909         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1910         self._in_init = False
1911
1912         # Forego any locking since it should never change once initialized.
1913         self.lock = NoopLock()
1914
1915         # Backwards compatability with old CollectionReader
1916         # all_streams() and all_files()
1917         self._streams = None
1918
1919     def writable(self):
1920         return self._in_init
1921
1922     def _populate_streams(orig_func):
1923         @functools.wraps(orig_func)
1924         def populate_streams_wrapper(self, *args, **kwargs):
1925             # Defer populating self._streams until needed since it creates a copy of the manifest.
1926             if self._streams is None:
1927                 if self._manifest_text:
1928                     self._streams = [sline.split()
1929                                      for sline in self._manifest_text.split("\n")
1930                                      if sline]
1931                 else:
1932                     self._streams = []
1933             return orig_func(self, *args, **kwargs)
1934         return populate_streams_wrapper
1935
1936     @_populate_streams
1937     def normalize(self):
1938         """Normalize the streams returned by `all_streams`.
1939
1940         This method is kept for backwards compatability and only affects the
1941         behavior of `all_streams()` and `all_files()`
1942
1943         """
1944
1945         # Rearrange streams
1946         streams = {}
1947         for s in self.all_streams():
1948             for f in s.all_files():
1949                 streamname, filename = split(s.name() + "/" + f.name())
1950                 if streamname not in streams:
1951                     streams[streamname] = {}
1952                 if filename not in streams[streamname]:
1953                     streams[streamname][filename] = []
1954                 for r in f.segments:
1955                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1956
1957         self._streams = [normalize_stream(s, streams[s])
1958                          for s in sorted(streams)]
1959     @_populate_streams
1960     def all_streams(self):
1961         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1962                 for s in self._streams]
1963
1964     @_populate_streams
1965     def all_files(self):
1966         for s in self.all_streams():
1967             for f in s.all_files():
1968                 yield f