Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6
7 from collections import deque
8 from stat import *
9
10 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
11 from keep import *
12 from .stream import StreamReader, normalize_stream, locator_block_size
13 from .ranges import Range, LocatorAndRange
14 import config
15 import errors
16 import util
17
18 _logger = logging.getLogger('arvados.collection')
19
20 class CollectionBase(object):
21     def __enter__(self):
22         return self
23
24     def __exit__(self, exc_type, exc_value, traceback):
25         pass
26
27     def _my_keep(self):
28         if self._keep_client is None:
29             self._keep_client = KeepClient(api_client=self._api_client,
30                                            num_retries=self.num_retries)
31         return self._keep_client
32
33     def stripped_manifest(self):
34         """
35         Return the manifest for the current collection with all
36         non-portable hints (i.e., permission signatures and other
37         hints other than size hints) removed from the locators.
38         """
39         raw = self.manifest_text()
40         clean = []
41         for line in raw.split("\n"):
42             fields = line.split()
43             if fields:
44                 clean_fields = fields[:1] + [
45                     (re.sub(r'\+[^\d][^\+]*', '', x)
46                      if re.match(util.keep_locator_pattern, x)
47                      else x)
48                     for x in fields[1:]]
49                 clean += [' '.join(clean_fields), "\n"]
50         return ''.join(clean)
51
52
53 class CollectionReader(CollectionBase):
54     def __init__(self, manifest_locator_or_text, api_client=None,
55                  keep_client=None, num_retries=0):
56         """Instantiate a CollectionReader.
57
58         This class parses Collection manifests to provide a simple interface
59         to read its underlying files.
60
61         Arguments:
62         * manifest_locator_or_text: One of a Collection UUID, portable data
63           hash, or full manifest text.
64         * api_client: The API client to use to look up Collections.  If not
65           provided, CollectionReader will build one from available Arvados
66           configuration.
67         * keep_client: The KeepClient to use to download Collection data.
68           If not provided, CollectionReader will build one from available
69           Arvados configuration.
70         * num_retries: The default number of times to retry failed
71           service requests.  Default 0.  You may change this value
72           after instantiation, but note those changes may not
73           propagate to related objects like the Keep client.
74         """
75         self._api_client = api_client
76         self._keep_client = keep_client
77         self.num_retries = num_retries
78         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
79             self._manifest_locator = manifest_locator_or_text
80             self._manifest_text = None
81         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
82             self._manifest_locator = manifest_locator_or_text
83             self._manifest_text = None
84         elif re.match(util.manifest_pattern, manifest_locator_or_text):
85             self._manifest_text = manifest_locator_or_text
86             self._manifest_locator = None
87         else:
88             raise errors.ArgumentError(
89                 "Argument to CollectionReader must be a manifest or a collection UUID")
90         self._api_response = None
91         self._streams = None
92
93     def _populate_from_api_server(self):
94         # As in KeepClient itself, we must wait until the last
95         # possible moment to instantiate an API client, in order to
96         # avoid tripping up clients that don't have access to an API
97         # server.  If we do build one, make sure our Keep client uses
98         # it.  If instantiation fails, we'll fall back to the except
99         # clause, just like any other Collection lookup
100         # failure. Return an exception, or None if successful.
101         try:
102             if self._api_client is None:
103                 self._api_client = arvados.api('v1')
104                 self._keep_client = None  # Make a new one with the new api.
105             self._api_response = self._api_client.collections().get(
106                 uuid=self._manifest_locator).execute(
107                 num_retries=self.num_retries)
108             self._manifest_text = self._api_response['manifest_text']
109             return None
110         except Exception as e:
111             return e
112
113     def _populate_from_keep(self):
114         # Retrieve a manifest directly from Keep. This has a chance of
115         # working if [a] the locator includes a permission signature
116         # or [b] the Keep services are operating in world-readable
117         # mode. Return an exception, or None if successful.
118         try:
119             self._manifest_text = self._my_keep().get(
120                 self._manifest_locator, num_retries=self.num_retries)
121         except Exception as e:
122             return e
123
124     def _populate(self):
125         error_via_api = None
126         error_via_keep = None
127         should_try_keep = ((self._manifest_text is None) and
128                            util.keep_locator_pattern.match(
129                 self._manifest_locator))
130         if ((self._manifest_text is None) and
131             util.signed_locator_pattern.match(self._manifest_locator)):
132             error_via_keep = self._populate_from_keep()
133         if self._manifest_text is None:
134             error_via_api = self._populate_from_api_server()
135             if error_via_api is not None and not should_try_keep:
136                 raise error_via_api
137         if ((self._manifest_text is None) and
138             not error_via_keep and
139             should_try_keep):
140             # Looks like a keep locator, and we didn't already try keep above
141             error_via_keep = self._populate_from_keep()
142         if self._manifest_text is None:
143             # Nothing worked!
144             raise arvados.errors.NotFoundError(
145                 ("Failed to retrieve collection '{}' " +
146                  "from either API server ({}) or Keep ({})."
147                  ).format(
148                     self._manifest_locator,
149                     error_via_api,
150                     error_via_keep))
151         self._streams = [sline.split()
152                          for sline in self._manifest_text.split("\n")
153                          if sline]
154
155     def _populate_first(orig_func):
156         # Decorator for methods that read actual Collection data.
157         @functools.wraps(orig_func)
158         def wrapper(self, *args, **kwargs):
159             if self._streams is None:
160                 self._populate()
161             return orig_func(self, *args, **kwargs)
162         return wrapper
163
164     @_populate_first
165     def api_response(self):
166         """api_response() -> dict or None
167
168         Returns information about this Collection fetched from the API server.
169         If the Collection exists in Keep but not the API server, currently
170         returns None.  Future versions may provide a synthetic response.
171         """
172         return self._api_response
173
174     @_populate_first
175     def normalize(self):
176         # Rearrange streams
177         streams = {}
178         for s in self.all_streams():
179             for f in s.all_files():
180                 streamname, filename = split(s.name() + "/" + f.name())
181                 if streamname not in streams:
182                     streams[streamname] = {}
183                 if filename not in streams[streamname]:
184                     streams[streamname][filename] = []
185                 for r in f.segments:
186                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
187
188         self._streams = [normalize_stream(s, streams[s])
189                          for s in sorted(streams)]
190
191         # Regenerate the manifest text based on the normalized streams
192         self._manifest_text = ''.join(
193             [StreamReader(stream, keep=self._my_keep()).manifest_text()
194              for stream in self._streams])
195
196     @_populate_first
197     def open(self, streampath, filename=None):
198         """open(streampath[, filename]) -> file-like object
199
200         Pass in the path of a file to read from the Collection, either as a
201         single string or as two separate stream name and file name arguments.
202         This method returns a file-like object to read that file.
203         """
204         if filename is None:
205             streampath, filename = split(streampath)
206         keep_client = self._my_keep()
207         for stream_s in self._streams:
208             stream = StreamReader(stream_s, keep_client,
209                                   num_retries=self.num_retries)
210             if stream.name() == streampath:
211                 break
212         else:
213             raise ValueError("stream '{}' not found in Collection".
214                              format(streampath))
215         try:
216             return stream.files()[filename]
217         except KeyError:
218             raise ValueError("file '{}' not found in Collection stream '{}'".
219                              format(filename, streampath))
220
221     @_populate_first
222     def all_streams(self):
223         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
224                 for s in self._streams]
225
226     def all_files(self):
227         for s in self.all_streams():
228             for f in s.all_files():
229                 yield f
230
231     @_populate_first
232     def manifest_text(self, strip=False, normalize=False):
233         if normalize:
234             cr = CollectionReader(self.manifest_text())
235             cr.normalize()
236             return cr.manifest_text(strip=strip, normalize=False)
237         elif strip:
238             return self.stripped_manifest()
239         else:
240             return self._manifest_text
241
242
243 class _WriterFile(ArvadosFileBase):
244     def __init__(self, coll_writer, name):
245         super(_WriterFile, self).__init__(name, 'wb')
246         self.dest = coll_writer
247
248     def close(self):
249         super(_WriterFile, self).close()
250         self.dest.finish_current_file()
251
252     @ArvadosFileBase._before_close
253     def write(self, data):
254         self.dest.write(data)
255
256     @ArvadosFileBase._before_close
257     def writelines(self, seq):
258         for data in seq:
259             self.write(data)
260
261     @ArvadosFileBase._before_close
262     def flush(self):
263         self.dest.flush_data()
264
265
266 class CollectionWriter(CollectionBase):
267     def __init__(self, api_client=None, num_retries=0):
268         """Instantiate a CollectionWriter.
269
270         CollectionWriter lets you build a new Arvados Collection from scratch.
271         Write files to it.  The CollectionWriter will upload data to Keep as
272         appropriate, and provide you with the Collection manifest text when
273         you're finished.
274
275         Arguments:
276         * api_client: The API client to use to look up Collections.  If not
277           provided, CollectionReader will build one from available Arvados
278           configuration.
279         * num_retries: The default number of times to retry failed
280           service requests.  Default 0.  You may change this value
281           after instantiation, but note those changes may not
282           propagate to related objects like the Keep client.
283         """
284         self._api_client = api_client
285         self.num_retries = num_retries
286         self._keep_client = None
287         self._data_buffer = []
288         self._data_buffer_len = 0
289         self._current_stream_files = []
290         self._current_stream_length = 0
291         self._current_stream_locators = []
292         self._current_stream_name = '.'
293         self._current_file_name = None
294         self._current_file_pos = 0
295         self._finished_streams = []
296         self._close_file = None
297         self._queued_file = None
298         self._queued_dirents = deque()
299         self._queued_trees = deque()
300         self._last_open = None
301
302     def __exit__(self, exc_type, exc_value, traceback):
303         if exc_type is None:
304             self.finish()
305
306     def do_queued_work(self):
307         # The work queue consists of three pieces:
308         # * _queued_file: The file object we're currently writing to the
309         #   Collection.
310         # * _queued_dirents: Entries under the current directory
311         #   (_queued_trees[0]) that we want to write or recurse through.
312         #   This may contain files from subdirectories if
313         #   max_manifest_depth == 0 for this directory.
314         # * _queued_trees: Directories that should be written as separate
315         #   streams to the Collection.
316         # This function handles the smallest piece of work currently queued
317         # (current file, then current directory, then next directory) until
318         # no work remains.  The _work_THING methods each do a unit of work on
319         # THING.  _queue_THING methods add a THING to the work queue.
320         while True:
321             if self._queued_file:
322                 self._work_file()
323             elif self._queued_dirents:
324                 self._work_dirents()
325             elif self._queued_trees:
326                 self._work_trees()
327             else:
328                 break
329
330     def _work_file(self):
331         while True:
332             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
333             if not buf:
334                 break
335             self.write(buf)
336         self.finish_current_file()
337         if self._close_file:
338             self._queued_file.close()
339         self._close_file = None
340         self._queued_file = None
341
342     def _work_dirents(self):
343         path, stream_name, max_manifest_depth = self._queued_trees[0]
344         if stream_name != self.current_stream_name():
345             self.start_new_stream(stream_name)
346         while self._queued_dirents:
347             dirent = self._queued_dirents.popleft()
348             target = os.path.join(path, dirent)
349             if os.path.isdir(target):
350                 self._queue_tree(target,
351                                  os.path.join(stream_name, dirent),
352                                  max_manifest_depth - 1)
353             else:
354                 self._queue_file(target, dirent)
355                 break
356         if not self._queued_dirents:
357             self._queued_trees.popleft()
358
359     def _work_trees(self):
360         path, stream_name, max_manifest_depth = self._queued_trees[0]
361         d = util.listdir_recursive(
362             path, max_depth = (None if max_manifest_depth == 0 else 0))
363         if d:
364             self._queue_dirents(stream_name, d)
365         else:
366             self._queued_trees.popleft()
367
368     def _queue_file(self, source, filename=None):
369         assert (self._queued_file is None), "tried to queue more than one file"
370         if not hasattr(source, 'read'):
371             source = open(source, 'rb')
372             self._close_file = True
373         else:
374             self._close_file = False
375         if filename is None:
376             filename = os.path.basename(source.name)
377         self.start_new_file(filename)
378         self._queued_file = source
379
380     def _queue_dirents(self, stream_name, dirents):
381         assert (not self._queued_dirents), "tried to queue more than one tree"
382         self._queued_dirents = deque(sorted(dirents))
383
384     def _queue_tree(self, path, stream_name, max_manifest_depth):
385         self._queued_trees.append((path, stream_name, max_manifest_depth))
386
387     def write_file(self, source, filename=None):
388         self._queue_file(source, filename)
389         self.do_queued_work()
390
391     def write_directory_tree(self,
392                              path, stream_name='.', max_manifest_depth=-1):
393         self._queue_tree(path, stream_name, max_manifest_depth)
394         self.do_queued_work()
395
396     def write(self, newdata):
397         if hasattr(newdata, '__iter__'):
398             for s in newdata:
399                 self.write(s)
400             return
401         self._data_buffer.append(newdata)
402         self._data_buffer_len += len(newdata)
403         self._current_stream_length += len(newdata)
404         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
405             self.flush_data()
406
407     def open(self, streampath, filename=None):
408         """open(streampath[, filename]) -> file-like object
409
410         Pass in the path of a file to write to the Collection, either as a
411         single string or as two separate stream name and file name arguments.
412         This method returns a file-like object you can write to add it to the
413         Collection.
414
415         You may only have one file object from the Collection open at a time,
416         so be sure to close the object when you're done.  Using the object in
417         a with statement makes that easy::
418
419           with cwriter.open('./doc/page1.txt') as outfile:
420               outfile.write(page1_data)
421           with cwriter.open('./doc/page2.txt') as outfile:
422               outfile.write(page2_data)
423         """
424         if filename is None:
425             streampath, filename = split(streampath)
426         if self._last_open and not self._last_open.closed:
427             raise errors.AssertionError(
428                 "can't open '{}' when '{}' is still open".format(
429                     filename, self._last_open.name))
430         if streampath != self.current_stream_name():
431             self.start_new_stream(streampath)
432         self.set_current_file_name(filename)
433         self._last_open = _WriterFile(self, filename)
434         return self._last_open
435
436     def flush_data(self):
437         data_buffer = ''.join(self._data_buffer)
438         if data_buffer:
439             self._current_stream_locators.append(
440                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
441             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
442             self._data_buffer_len = len(self._data_buffer[0])
443
444     def start_new_file(self, newfilename=None):
445         self.finish_current_file()
446         self.set_current_file_name(newfilename)
447
448     def set_current_file_name(self, newfilename):
449         if re.search(r'[\t\n]', newfilename):
450             raise errors.AssertionError(
451                 "Manifest filenames cannot contain whitespace: %s" %
452                 newfilename)
453         elif re.search(r'\x00', newfilename):
454             raise errors.AssertionError(
455                 "Manifest filenames cannot contain NUL characters: %s" %
456                 newfilename)
457         self._current_file_name = newfilename
458
459     def current_file_name(self):
460         return self._current_file_name
461
462     def finish_current_file(self):
463         if self._current_file_name is None:
464             if self._current_file_pos == self._current_stream_length:
465                 return
466             raise errors.AssertionError(
467                 "Cannot finish an unnamed file " +
468                 "(%d bytes at offset %d in '%s' stream)" %
469                 (self._current_stream_length - self._current_file_pos,
470                  self._current_file_pos,
471                  self._current_stream_name))
472         self._current_stream_files.append([
473                 self._current_file_pos,
474                 self._current_stream_length - self._current_file_pos,
475                 self._current_file_name])
476         self._current_file_pos = self._current_stream_length
477         self._current_file_name = None
478
479     def start_new_stream(self, newstreamname='.'):
480         self.finish_current_stream()
481         self.set_current_stream_name(newstreamname)
482
483     def set_current_stream_name(self, newstreamname):
484         if re.search(r'[\t\n]', newstreamname):
485             raise errors.AssertionError(
486                 "Manifest stream names cannot contain whitespace")
487         self._current_stream_name = '.' if newstreamname=='' else newstreamname
488
489     def current_stream_name(self):
490         return self._current_stream_name
491
492     def finish_current_stream(self):
493         self.finish_current_file()
494         self.flush_data()
495         if not self._current_stream_files:
496             pass
497         elif self._current_stream_name is None:
498             raise errors.AssertionError(
499                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
500                 (self._current_stream_length, len(self._current_stream_files)))
501         else:
502             if not self._current_stream_locators:
503                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
504             self._finished_streams.append([self._current_stream_name,
505                                            self._current_stream_locators,
506                                            self._current_stream_files])
507         self._current_stream_files = []
508         self._current_stream_length = 0
509         self._current_stream_locators = []
510         self._current_stream_name = None
511         self._current_file_pos = 0
512         self._current_file_name = None
513
514     def finish(self):
515         # Store the manifest in Keep and return its locator.
516         return self._my_keep().put(self.manifest_text())
517
518     def portable_data_hash(self):
519         stripped = self.stripped_manifest()
520         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
521
522     def manifest_text(self):
523         self.finish_current_stream()
524         manifest = ''
525
526         for stream in self._finished_streams:
527             if not re.search(r'^\.(/.*)?$', stream[0]):
528                 manifest += './'
529             manifest += stream[0].replace(' ', '\\040')
530             manifest += ' ' + ' '.join(stream[1])
531             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
532             manifest += "\n"
533
534         return manifest
535
536     def data_locators(self):
537         ret = []
538         for name, locators, files in self._finished_streams:
539             ret += locators
540         return ret
541
542
543 class ResumableCollectionWriter(CollectionWriter):
544     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
545                    '_current_stream_locators', '_current_stream_name',
546                    '_current_file_name', '_current_file_pos', '_close_file',
547                    '_data_buffer', '_dependencies', '_finished_streams',
548                    '_queued_dirents', '_queued_trees']
549
550     def __init__(self, api_client=None, num_retries=0):
551         self._dependencies = {}
552         super(ResumableCollectionWriter, self).__init__(
553             api_client, num_retries=num_retries)
554
555     @classmethod
556     def from_state(cls, state, *init_args, **init_kwargs):
557         # Try to build a new writer from scratch with the given state.
558         # If the state is not suitable to resume (because files have changed,
559         # been deleted, aren't predictable, etc.), raise a
560         # StaleWriterStateError.  Otherwise, return the initialized writer.
561         # The caller is responsible for calling writer.do_queued_work()
562         # appropriately after it's returned.
563         writer = cls(*init_args, **init_kwargs)
564         for attr_name in cls.STATE_PROPS:
565             attr_value = state[attr_name]
566             attr_class = getattr(writer, attr_name).__class__
567             # Coerce the value into the same type as the initial value, if
568             # needed.
569             if attr_class not in (type(None), attr_value.__class__):
570                 attr_value = attr_class(attr_value)
571             setattr(writer, attr_name, attr_value)
572         # Check dependencies before we try to resume anything.
573         if any(KeepLocator(ls).permission_expired()
574                for ls in writer._current_stream_locators):
575             raise errors.StaleWriterStateError(
576                 "locators include expired permission hint")
577         writer.check_dependencies()
578         if state['_current_file'] is not None:
579             path, pos = state['_current_file']
580             try:
581                 writer._queued_file = open(path, 'rb')
582                 writer._queued_file.seek(pos)
583             except IOError as error:
584                 raise errors.StaleWriterStateError(
585                     "failed to reopen active file {}: {}".format(path, error))
586         return writer
587
588     def check_dependencies(self):
589         for path, orig_stat in self._dependencies.items():
590             if not S_ISREG(orig_stat[ST_MODE]):
591                 raise errors.StaleWriterStateError("{} not file".format(path))
592             try:
593                 now_stat = tuple(os.stat(path))
594             except OSError as error:
595                 raise errors.StaleWriterStateError(
596                     "failed to stat {}: {}".format(path, error))
597             if ((not S_ISREG(now_stat[ST_MODE])) or
598                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
599                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
600                 raise errors.StaleWriterStateError("{} changed".format(path))
601
602     def dump_state(self, copy_func=lambda x: x):
603         state = {attr: copy_func(getattr(self, attr))
604                  for attr in self.STATE_PROPS}
605         if self._queued_file is None:
606             state['_current_file'] = None
607         else:
608             state['_current_file'] = (os.path.realpath(self._queued_file.name),
609                                       self._queued_file.tell())
610         return state
611
612     def _queue_file(self, source, filename=None):
613         try:
614             src_path = os.path.realpath(source)
615         except Exception:
616             raise errors.AssertionError("{} not a file path".format(source))
617         try:
618             path_stat = os.stat(src_path)
619         except OSError as stat_error:
620             path_stat = None
621         super(ResumableCollectionWriter, self)._queue_file(source, filename)
622         fd_stat = os.fstat(self._queued_file.fileno())
623         if not S_ISREG(fd_stat.st_mode):
624             # We won't be able to resume from this cache anyway, so don't
625             # worry about further checks.
626             self._dependencies[source] = tuple(fd_stat)
627         elif path_stat is None:
628             raise errors.AssertionError(
629                 "could not stat {}: {}".format(source, stat_error))
630         elif path_stat.st_ino != fd_stat.st_ino:
631             raise errors.AssertionError(
632                 "{} changed between open and stat calls".format(source))
633         else:
634             self._dependencies[src_path] = tuple(fd_stat)
635
636     def write(self, data):
637         if self._queued_file is None:
638             raise errors.AssertionError(
639                 "resumable writer can't accept unsourced data")
640         return super(ResumableCollectionWriter, self).write(data)
641
642
643 class Collection(CollectionBase):
644     '''An abstract Arvados collection, consisting of a set of files and
645     sub-collections.
646     '''
647
648     def __init__(self, manifest_locator_or_text=None, parent=None, api_client=None,
649                  keep_client=None, num_retries=0, block_manager=None):
650         '''manifest_locator_or_text: One of Arvados collection UUID, block locator of
651         a manifest, raw manifest text, or None (to create an empty collection).
652
653         parent: the parent Collection, may be None.
654
655         api_client: The API client object to use for requests.  If None, use default.
656
657         keep_client: the Keep client to use for requests.  If None, use default.
658
659         num_retries: the number of retries for API and Keep requests.
660
661         block_manager: the block manager to use.  If None, create one.
662         '''
663         self.parent = parent
664         self._items = None
665         self._api_client = api_client
666         self._keep_client = keep_client
667         self._block_manager = block_manager
668
669         self.num_retries = num_retries
670         self._manifest_locator = None
671         self._manifest_text = None
672         self._api_response = None
673
674         if manifest_locator_or_text:
675             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
676                 self._manifest_locator = manifest_locator_or_text
677             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
678                 self._manifest_locator = manifest_locator_or_text
679             elif re.match(util.manifest_pattern, manifest_locator_or_text):
680                 self._manifest_text = manifest_locator_or_text
681             else:
682                 raise errors.ArgumentError(
683                     "Argument to CollectionReader must be a manifest or a collection UUID")
684
685     def _my_api(self):
686         if self._api_client is None:
687             if self.parent is not None:
688                 return self.parent._my_api()
689             self._api_client = arvados.api('v1')
690             self._keep_client = None  # Make a new one with the new api.
691         return self._api_client
692
693     def _my_keep(self):
694         if self._keep_client is None:
695             if self.parent is not None:
696                 return self.parent._my_keep()
697             self._keep_client = KeepClient(api_client=self._my_api(),
698                                            num_retries=self.num_retries)
699         return self._keep_client
700
701     def _my_block_manager(self):
702         if self._block_manager is None:
703             if self.parent is not None:
704                 return self.parent._my_block_manager()
705             self._block_manager = BlockManager(self._my_keep())
706         return self._block_manager
707
708     def _populate_from_api_server(self):
709         # As in KeepClient itself, we must wait until the last
710         # possible moment to instantiate an API client, in order to
711         # avoid tripping up clients that don't have access to an API
712         # server.  If we do build one, make sure our Keep client uses
713         # it.  If instantiation fails, we'll fall back to the except
714         # clause, just like any other Collection lookup
715         # failure. Return an exception, or None if successful.
716         try:
717             self._api_response = self._my_api().collections().get(
718                 uuid=self._manifest_locator).execute(
719                     num_retries=self.num_retries)
720             self._manifest_text = self._api_response['manifest_text']
721             return None
722         except Exception as e:
723             return e
724
725     def _populate_from_keep(self):
726         # Retrieve a manifest directly from Keep. This has a chance of
727         # working if [a] the locator includes a permission signature
728         # or [b] the Keep services are operating in world-readable
729         # mode. Return an exception, or None if successful.
730         try:
731             self._manifest_text = self._my_keep().get(
732                 self._manifest_locator, num_retries=self.num_retries)
733         except Exception as e:
734             return e
735
736     def _populate(self):
737         self._items = {}
738         if self._manifest_locator is None and self._manifest_text is None:
739             return
740         error_via_api = None
741         error_via_keep = None
742         should_try_keep = ((self._manifest_text is None) and
743                            util.keep_locator_pattern.match(
744                 self._manifest_locator))
745         if ((self._manifest_text is None) and
746             util.signed_locator_pattern.match(self._manifest_locator)):
747             error_via_keep = self._populate_from_keep()
748         if self._manifest_text is None:
749             error_via_api = self._populate_from_api_server()
750             if error_via_api is not None and not should_try_keep:
751                 raise error_via_api
752         if ((self._manifest_text is None) and
753             not error_via_keep and
754             should_try_keep):
755             # Looks like a keep locator, and we didn't already try keep above
756             error_via_keep = self._populate_from_keep()
757         if self._manifest_text is None:
758             # Nothing worked!
759             raise arvados.errors.NotFoundError(
760                 ("Failed to retrieve collection '{}' " +
761                  "from either API server ({}) or Keep ({})."
762                  ).format(
763                     self._manifest_locator,
764                     error_via_api,
765                     error_via_keep))
766         # populate
767         import_manifest(self._manifest_text, self)
768
769     def _populate_first(orig_func):
770         # Decorator for methods that read actual Collection data.
771         @functools.wraps(orig_func)
772         def wrapper(self, *args, **kwargs):
773             if self._items is None:
774                 self._populate()
775             return orig_func(self, *args, **kwargs)
776         return wrapper
777
778     def __enter__(self):
779         return self
780
781     def __exit__(self, exc_type, exc_value, traceback):
782         '''Support scoped auto-commit in a with: block'''
783         self.save(no_locator=True)
784         if self._block_manager is not None:
785             self._block_manager.stop_threads()
786
787     @_populate_first
788     def find(self, path, create=False, create_collection=False):
789         '''Recursively search the specified file path.  May return either a Collection
790         or ArvadosFile.
791
792         create: If true, create path components (i.e. Collections) that are
793         missing.  If "create" is False, return None if a path component is not
794         found.
795
796         create_collection: If the path is not found, "create" is True, and
797         "create_collection" is False, then create and return a new ArvadosFile
798         for the last path component.  If "create_collection" is True, then
799         create and return a new Collection for the last path component.
800         '''
801         p = path.split("/")
802         if p[0] == '.':
803             del p[0]
804
805         if len(p) > 0:
806             item = self._items.get(p[0])
807             if len(p) == 1:
808                 # item must be a file
809                 if item is None and create:
810                     # create new file
811                     if create_collection:
812                         item = Collection(parent=self, num_retries=self.num_retries)
813                     else:
814                         item = ArvadosFile(self)
815                     self._items[p[0]] = item
816                 return item
817             else:
818                 if item is None and create:
819                     # create new collection
820                     item = Collection(parent=self, num_retries=self.num_retries)
821                     self._items[p[0]] = item
822                 del p[0]
823                 return item.find("/".join(p), create=create)
824         else:
825             return self
826
827     @_populate_first
828     def api_response(self):
829         """api_response() -> dict or None
830
831         Returns information about this Collection fetched from the API server.
832         If the Collection exists in Keep but not the API server, currently
833         returns None.  Future versions may provide a synthetic response.
834         """
835         return self._api_response
836
837     def open(self, path, mode):
838         '''Open a file-like object for access.
839
840         path: path to a file in the collection
841
842         mode: one of "r", "r+", "w", "w+", "a", "a+"
843         "r" opens for reading
844
845         "r+" opens for reading and writing.  Reads/writes share a file pointer.
846
847         "w", "w+" truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
848
849         "a", "a+" opens for reading and writing.  All writes are appended to the end of the file.  Writing does not affect the file pointer for reading.
850         '''
851         mode = mode.replace("b", "")
852         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
853             raise ArgumentError("Bad mode '%s'" % mode)
854         create = (mode != "r")
855
856         f = self.find(path, create=create)
857         if f is None:
858             raise IOError((errno.ENOENT, "File not found"))
859         if not isinstance(f, ArvadosFile):
860             raise IOError((errno.EISDIR, "Path must refer to a file."))
861
862         if mode[0] == "w":
863             f.truncate(0)
864
865         if mode == "r":
866             return ArvadosFileReader(f, path, mode)
867         else:
868             return ArvadosFileWriter(f, path, mode)
869
870     @_populate_first
871     def modified(self):
872         '''Test if the collection (or any subcollection or file) has been modified
873         since it was created.'''
874         for k,v in self._items.items():
875             if v.modified():
876                 return True
877         return False
878
879     @_populate_first
880     def set_unmodified(self):
881         '''Recursively clear modified flag'''
882         for k,v in self._items.items():
883             v.set_unmodified()
884
885     @_populate_first
886     def __iter__(self):
887         '''Iterate over names of files and collections contained in this collection.'''
888         return self._items.iterkeys()
889
890     @_populate_first
891     def iterkeys(self):
892         '''Iterate over names of files and collections directly contained in this collection.'''
893         return self._items.iterkeys()
894
895     @_populate_first
896     def __getitem__(self, k):
897         '''Get a file or collection that is directly contained by this collection.  Use
898         find() for path serach.'''
899         return self._items[k]
900
901     @_populate_first
902     def __contains__(self, k):
903         '''If there is a file or collection a directly contained by this collection
904         with name "k".'''
905         return k in self._items
906
907     @_populate_first
908     def __len__(self):
909         '''Get the number of items directly contained in this collection'''
910         return len(self._items)
911
912     @_populate_first
913     def __delitem__(self, p):
914         '''Delete an item by name which is directly contained by this collection.'''
915         del self._items[p]
916
917     @_populate_first
918     def keys(self):
919         '''Get a list of names of files and collections directly contained in this collection.'''
920         return self._items.keys()
921
922     @_populate_first
923     def values(self):
924         '''Get a list of files and collection objects directly contained in this collection.'''
925         return self._items.values()
926
927     @_populate_first
928     def items(self):
929         '''Get a list of (name, object) tuples directly contained in this collection.'''
930         return self._items.items()
931
932     @_populate_first
933     def exists(self, path):
934         '''Test if there is a file or collection at "path"'''
935         return self.find(path) != None
936
937     @_populate_first
938     def remove(self, path):
939         '''Test if there is a file or collection at "path"'''
940         p = path.split("/")
941         if p[0] == '.':
942             del p[0]
943
944         if len(p) > 0:
945             item = self._items.get(p[0])
946             if item is None:
947                 raise IOError((errno.ENOENT, "File not found"))
948             if len(p) == 1:
949                 del self._items[p[0]]
950             else:
951                 del p[0]
952                 item.remove("/".join(p))
953         else:
954             raise IOError((errno.ENOENT, "File not found"))
955
956     @_populate_first
957     def manifest_text(self, strip=False, normalize=False):
958         '''Get the manifest text for this collection, sub collections and files.
959
960         strip: If True, remove signing tokens from block locators if present.
961         If False, block locators are left unchanged.
962
963         normalize: If True, always export the manifest text in normalized form
964         even if the Collection is not modified.  If False and the collection is
965         not modified, return the original manifest text even if it is not in
966         normalized form.
967         '''
968         if self.modified() or self._manifest_text is None or normalize:
969             return export_manifest(self, stream_name=".", portable_locators=strip)
970         else:
971             if strip:
972                 return self.stripped_manifest()
973             else:
974                 return self._manifest_text
975
976     def portable_data_hash(self):
977         '''Get the portable data hash for this collection's manifest.'''
978         stripped = self.manifest_text(strip=True)
979         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
980
981     @_populate_first
982     def save(self, no_locator=False):
983         '''Commit pending buffer blocks to Keep, write the manifest to Keep, and
984         update the collection record to Keep.
985
986         no_locator: If False and there is no collection uuid associated with
987         this Collection, raise an error.  If True, do not raise an error.
988         '''
989         if self.modified():
990             self._my_block_manager().commit_all()
991             self._my_keep().put(self.manifest_text(strip=True))
992             if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
993                 self._api_response = self._my_api().collections().update(
994                     uuid=self._manifest_locator,
995                     body={'manifest_text': self.manifest_text(strip=False)}
996                     ).execute(
997                         num_retries=self.num_retries)
998             elif not no_locator:
999                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
1000             self.set_unmodified()
1001
1002     @_populate_first
1003     def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
1004         '''Save a new collection record.
1005
1006         name: The collection name.
1007
1008         owner_uuid: the user, or project uuid that will own this collection.
1009         If None, defaults to the current user.
1010
1011         ensure_unique_name: If True, ask the API server to rename the
1012         collection if it conflicts with a collection with the same name and
1013         owner.  If False, a name conflict will result in an error.
1014         '''
1015         self._my_block_manager().commit_all()
1016         self._my_keep().put(self.manifest_text(strip=True))
1017         body = {"manifest_text": self.manifest_text(strip=False),
1018                 "name": name}
1019         if owner_uuid:
1020             body["owner_uuid"] = owner_uuid
1021         self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
1022         self._manifest_locator = self._api_response["uuid"]
1023         self.set_unmodified()
1024
1025
1026 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
1027     '''Import a manifest into a Collection.
1028
1029     manifest_text: The manifest text to import from.
1030
1031     into_collection: The Collection that will be initialized (must be empty).
1032     If None, create a new Collection object.
1033
1034     api_client: The API client object that will be used when creating a new Collection object.
1035
1036     keep: The keep client object that will be used when creating a new Collection object.
1037
1038     num_retries: the default number of api client and keep retries on error.
1039     '''
1040     if into_collection is not None:
1041         if len(into_collection) > 0:
1042             raise ArgumentError("Can only import manifest into an empty collection")
1043         c = into_collection
1044     else:
1045         c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
1046
1047     STREAM_NAME = 0
1048     BLOCKS = 1
1049     SEGMENTS = 2
1050
1051     stream_name = None
1052     state = STREAM_NAME
1053
1054     for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1055         tok = n.group(1)
1056         sep = n.group(2)
1057
1058         if state == STREAM_NAME:
1059             # starting a new stream
1060             stream_name = tok.replace('\\040', ' ')
1061             blocks = []
1062             segments = []
1063             streamoffset = 0L
1064             state = BLOCKS
1065             continue
1066
1067         if state == BLOCKS:
1068             s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1069             if s:
1070                 blocksize = long(s.group(1))
1071                 blocks.append(Range(tok, streamoffset, blocksize))
1072                 streamoffset += blocksize
1073             else:
1074                 state = SEGMENTS
1075
1076         if state == SEGMENTS:
1077             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1078             if s:
1079                 pos = long(s.group(1))
1080                 size = long(s.group(2))
1081                 name = s.group(3).replace('\\040', ' ')
1082                 f = c.find("%s/%s" % (stream_name, name), create=True)
1083                 f.add_segment(blocks, pos, size)
1084             else:
1085                 # error!
1086                 raise errors.SyntaxError("Invalid manifest format")
1087
1088         if sep == "\n":
1089             stream_name = None
1090             state = STREAM_NAME
1091
1092     c.set_unmodified()
1093     return c
1094
1095 def export_manifest(item, stream_name=".", portable_locators=False):
1096     '''Create a manifest for "item" (must be a Collection or ArvadosFile).  If
1097     "item" is a is a Collection, this will also export subcollections.
1098
1099     stream_name: the name of the stream when exporting "item".
1100
1101     portable_locators: If True, strip any permission hints on block locators.
1102     If False, use block locators as-is.
1103     '''
1104     buf = ""
1105     if isinstance(item, Collection):
1106         stream = {}
1107         sorted_keys = sorted(item.keys())
1108         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1109             v = item[k]
1110             st = []
1111             for s in v.segments:
1112                 loc = s.locator
1113                 if loc.startswith("bufferblock"):
1114                     loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1115                 if portable_locators:
1116                     loc = KeepLocator(loc).stripped()
1117                 st.append(LocatorAndRange(loc, locator_block_size(loc),
1118                                      s.segment_offset, s.range_size))
1119             stream[k] = st
1120         if stream:
1121             buf += ' '.join(normalize_stream(stream_name, stream))
1122             buf += "\n"
1123         for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
1124             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1125     elif isinstance(item, ArvadosFile):
1126         st = []
1127         for s in item.segments:
1128             loc = s.locator
1129             if loc.startswith("bufferblock"):
1130                 loc = item._bufferblocks[loc].calculate_locator()
1131             if portable_locators:
1132                 loc = KeepLocator(loc).stripped()
1133             st.append(LocatorAndRange(loc, locator_block_size(loc),
1134                                  s.segment_offset, s.range_size))
1135         stream[stream_name] = st
1136         buf += ' '.join(normalize_stream(stream_name, stream))
1137         buf += "\n"
1138     return buf