3198: Added Collection.rename (needs test). Fixing tests broken on account of
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6
7 from collections import deque
8 from stat import *
9
10 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
11 from keep import *
12 from .stream import StreamReader, normalize_stream, locator_block_size
13 from .ranges import Range, LocatorAndRange
14 import config
15 import errors
16 import util
17
18 _logger = logging.getLogger('arvados.collection')
19
20 class CollectionBase(object):
21     def __enter__(self):
22         return self
23
24     def __exit__(self, exc_type, exc_value, traceback):
25         pass
26
27     def _my_keep(self):
28         if self._keep_client is None:
29             self._keep_client = KeepClient(api_client=self._api_client,
30                                            num_retries=self.num_retries)
31         return self._keep_client
32
33     def stripped_manifest(self):
34         """
35         Return the manifest for the current collection with all
36         non-portable hints (i.e., permission signatures and other
37         hints other than size hints) removed from the locators.
38         """
39         raw = self.manifest_text()
40         clean = []
41         for line in raw.split("\n"):
42             fields = line.split()
43             if fields:
44                 clean_fields = fields[:1] + [
45                     (re.sub(r'\+[^\d][^\+]*', '', x)
46                      if re.match(util.keep_locator_pattern, x)
47                      else x)
48                     for x in fields[1:]]
49                 clean += [' '.join(clean_fields), "\n"]
50         return ''.join(clean)
51
52
53 class CollectionReader(CollectionBase):
54     def __init__(self, manifest_locator_or_text, api_client=None,
55                  keep_client=None, num_retries=0):
56         """Instantiate a CollectionReader.
57
58         This class parses Collection manifests to provide a simple interface
59         to read its underlying files.
60
61         Arguments:
62         * manifest_locator_or_text: One of a Collection UUID, portable data
63           hash, or full manifest text.
64         * api_client: The API client to use to look up Collections.  If not
65           provided, CollectionReader will build one from available Arvados
66           configuration.
67         * keep_client: The KeepClient to use to download Collection data.
68           If not provided, CollectionReader will build one from available
69           Arvados configuration.
70         * num_retries: The default number of times to retry failed
71           service requests.  Default 0.  You may change this value
72           after instantiation, but note those changes may not
73           propagate to related objects like the Keep client.
74         """
75         self._api_client = api_client
76         self._keep_client = keep_client
77         self.num_retries = num_retries
78         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
79             self._manifest_locator = manifest_locator_or_text
80             self._manifest_text = None
81         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
82             self._manifest_locator = manifest_locator_or_text
83             self._manifest_text = None
84         elif re.match(util.manifest_pattern, manifest_locator_or_text):
85             self._manifest_text = manifest_locator_or_text
86             self._manifest_locator = None
87         else:
88             raise errors.ArgumentError(
89                 "Argument to CollectionReader must be a manifest or a collection UUID")
90         self._api_response = None
91         self._streams = None
92
93     def _populate_from_api_server(self):
94         # As in KeepClient itself, we must wait until the last
95         # possible moment to instantiate an API client, in order to
96         # avoid tripping up clients that don't have access to an API
97         # server.  If we do build one, make sure our Keep client uses
98         # it.  If instantiation fails, we'll fall back to the except
99         # clause, just like any other Collection lookup
100         # failure. Return an exception, or None if successful.
101         try:
102             if self._api_client is None:
103                 self._api_client = arvados.api('v1')
104                 self._keep_client = None  # Make a new one with the new api.
105             self._api_response = self._api_client.collections().get(
106                 uuid=self._manifest_locator).execute(
107                 num_retries=self.num_retries)
108             self._manifest_text = self._api_response['manifest_text']
109             return None
110         except Exception as e:
111             return e
112
113     def _populate_from_keep(self):
114         # Retrieve a manifest directly from Keep. This has a chance of
115         # working if [a] the locator includes a permission signature
116         # or [b] the Keep services are operating in world-readable
117         # mode. Return an exception, or None if successful.
118         try:
119             self._manifest_text = self._my_keep().get(
120                 self._manifest_locator, num_retries=self.num_retries)
121         except Exception as e:
122             return e
123
124     def _populate(self):
125         error_via_api = None
126         error_via_keep = None
127         should_try_keep = ((self._manifest_text is None) and
128                            util.keep_locator_pattern.match(
129                 self._manifest_locator))
130         if ((self._manifest_text is None) and
131             util.signed_locator_pattern.match(self._manifest_locator)):
132             error_via_keep = self._populate_from_keep()
133         if self._manifest_text is None:
134             error_via_api = self._populate_from_api_server()
135             if error_via_api is not None and not should_try_keep:
136                 raise error_via_api
137         if ((self._manifest_text is None) and
138             not error_via_keep and
139             should_try_keep):
140             # Looks like a keep locator, and we didn't already try keep above
141             error_via_keep = self._populate_from_keep()
142         if self._manifest_text is None:
143             # Nothing worked!
144             raise arvados.errors.NotFoundError(
145                 ("Failed to retrieve collection '{}' " +
146                  "from either API server ({}) or Keep ({})."
147                  ).format(
148                     self._manifest_locator,
149                     error_via_api,
150                     error_via_keep))
151         self._streams = [sline.split()
152                          for sline in self._manifest_text.split("\n")
153                          if sline]
154
155     def _populate_first(orig_func):
156         # Decorator for methods that read actual Collection data.
157         @functools.wraps(orig_func)
158         def wrapper(self, *args, **kwargs):
159             if self._streams is None:
160                 self._populate()
161             return orig_func(self, *args, **kwargs)
162         return wrapper
163
164     @_populate_first
165     def api_response(self):
166         """api_response() -> dict or None
167
168         Returns information about this Collection fetched from the API server.
169         If the Collection exists in Keep but not the API server, currently
170         returns None.  Future versions may provide a synthetic response.
171         """
172         return self._api_response
173
174     @_populate_first
175     def normalize(self):
176         # Rearrange streams
177         streams = {}
178         for s in self.all_streams():
179             for f in s.all_files():
180                 streamname, filename = split(s.name() + "/" + f.name())
181                 if streamname not in streams:
182                     streams[streamname] = {}
183                 if filename not in streams[streamname]:
184                     streams[streamname][filename] = []
185                 for r in f.segments:
186                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
187
188         self._streams = [normalize_stream(s, streams[s])
189                          for s in sorted(streams)]
190
191         # Regenerate the manifest text based on the normalized streams
192         self._manifest_text = ''.join(
193             [StreamReader(stream, keep=self._my_keep()).manifest_text()
194              for stream in self._streams])
195
196     @_populate_first
197     def open(self, streampath, filename=None):
198         """open(streampath[, filename]) -> file-like object
199
200         Pass in the path of a file to read from the Collection, either as a
201         single string or as two separate stream name and file name arguments.
202         This method returns a file-like object to read that file.
203         """
204         if filename is None:
205             streampath, filename = split(streampath)
206         keep_client = self._my_keep()
207         for stream_s in self._streams:
208             stream = StreamReader(stream_s, keep_client,
209                                   num_retries=self.num_retries)
210             if stream.name() == streampath:
211                 break
212         else:
213             raise ValueError("stream '{}' not found in Collection".
214                              format(streampath))
215         try:
216             return stream.files()[filename]
217         except KeyError:
218             raise ValueError("file '{}' not found in Collection stream '{}'".
219                              format(filename, streampath))
220
221     @_populate_first
222     def all_streams(self):
223         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
224                 for s in self._streams]
225
226     def all_files(self):
227         for s in self.all_streams():
228             for f in s.all_files():
229                 yield f
230
231     @_populate_first
232     def manifest_text(self, strip=False, normalize=False):
233         if normalize:
234             cr = CollectionReader(self.manifest_text())
235             cr.normalize()
236             return cr.manifest_text(strip=strip, normalize=False)
237         elif strip:
238             return self.stripped_manifest()
239         else:
240             return self._manifest_text
241
242
243 class _WriterFile(ArvadosFileBase):
244     def __init__(self, coll_writer, name):
245         super(_WriterFile, self).__init__(name, 'wb')
246         self.dest = coll_writer
247
248     def close(self):
249         super(_WriterFile, self).close()
250         self.dest.finish_current_file()
251
252     @ArvadosFileBase._before_close
253     def write(self, data):
254         self.dest.write(data)
255
256     @ArvadosFileBase._before_close
257     def writelines(self, seq):
258         for data in seq:
259             self.write(data)
260
261     @ArvadosFileBase._before_close
262     def flush(self):
263         self.dest.flush_data()
264
265
266 class CollectionWriter(CollectionBase):
267     def __init__(self, api_client=None, num_retries=0):
268         """Instantiate a CollectionWriter.
269
270         CollectionWriter lets you build a new Arvados Collection from scratch.
271         Write files to it.  The CollectionWriter will upload data to Keep as
272         appropriate, and provide you with the Collection manifest text when
273         you're finished.
274
275         Arguments:
276         * api_client: The API client to use to look up Collections.  If not
277           provided, CollectionReader will build one from available Arvados
278           configuration.
279         * num_retries: The default number of times to retry failed
280           service requests.  Default 0.  You may change this value
281           after instantiation, but note those changes may not
282           propagate to related objects like the Keep client.
283         """
284         self._api_client = api_client
285         self.num_retries = num_retries
286         self._keep_client = None
287         self._data_buffer = []
288         self._data_buffer_len = 0
289         self._current_stream_files = []
290         self._current_stream_length = 0
291         self._current_stream_locators = []
292         self._current_stream_name = '.'
293         self._current_file_name = None
294         self._current_file_pos = 0
295         self._finished_streams = []
296         self._close_file = None
297         self._queued_file = None
298         self._queued_dirents = deque()
299         self._queued_trees = deque()
300         self._last_open = None
301
302     def __exit__(self, exc_type, exc_value, traceback):
303         if exc_type is None:
304             self.finish()
305
306     def do_queued_work(self):
307         # The work queue consists of three pieces:
308         # * _queued_file: The file object we're currently writing to the
309         #   Collection.
310         # * _queued_dirents: Entries under the current directory
311         #   (_queued_trees[0]) that we want to write or recurse through.
312         #   This may contain files from subdirectories if
313         #   max_manifest_depth == 0 for this directory.
314         # * _queued_trees: Directories that should be written as separate
315         #   streams to the Collection.
316         # This function handles the smallest piece of work currently queued
317         # (current file, then current directory, then next directory) until
318         # no work remains.  The _work_THING methods each do a unit of work on
319         # THING.  _queue_THING methods add a THING to the work queue.
320         while True:
321             if self._queued_file:
322                 self._work_file()
323             elif self._queued_dirents:
324                 self._work_dirents()
325             elif self._queued_trees:
326                 self._work_trees()
327             else:
328                 break
329
330     def _work_file(self):
331         while True:
332             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
333             if not buf:
334                 break
335             self.write(buf)
336         self.finish_current_file()
337         if self._close_file:
338             self._queued_file.close()
339         self._close_file = None
340         self._queued_file = None
341
342     def _work_dirents(self):
343         path, stream_name, max_manifest_depth = self._queued_trees[0]
344         if stream_name != self.current_stream_name():
345             self.start_new_stream(stream_name)
346         while self._queued_dirents:
347             dirent = self._queued_dirents.popleft()
348             target = os.path.join(path, dirent)
349             if os.path.isdir(target):
350                 self._queue_tree(target,
351                                  os.path.join(stream_name, dirent),
352                                  max_manifest_depth - 1)
353             else:
354                 self._queue_file(target, dirent)
355                 break
356         if not self._queued_dirents:
357             self._queued_trees.popleft()
358
359     def _work_trees(self):
360         path, stream_name, max_manifest_depth = self._queued_trees[0]
361         d = util.listdir_recursive(
362             path, max_depth = (None if max_manifest_depth == 0 else 0))
363         if d:
364             self._queue_dirents(stream_name, d)
365         else:
366             self._queued_trees.popleft()
367
368     def _queue_file(self, source, filename=None):
369         assert (self._queued_file is None), "tried to queue more than one file"
370         if not hasattr(source, 'read'):
371             source = open(source, 'rb')
372             self._close_file = True
373         else:
374             self._close_file = False
375         if filename is None:
376             filename = os.path.basename(source.name)
377         self.start_new_file(filename)
378         self._queued_file = source
379
380     def _queue_dirents(self, stream_name, dirents):
381         assert (not self._queued_dirents), "tried to queue more than one tree"
382         self._queued_dirents = deque(sorted(dirents))
383
384     def _queue_tree(self, path, stream_name, max_manifest_depth):
385         self._queued_trees.append((path, stream_name, max_manifest_depth))
386
387     def write_file(self, source, filename=None):
388         self._queue_file(source, filename)
389         self.do_queued_work()
390
391     def write_directory_tree(self,
392                              path, stream_name='.', max_manifest_depth=-1):
393         self._queue_tree(path, stream_name, max_manifest_depth)
394         self.do_queued_work()
395
396     def write(self, newdata):
397         if hasattr(newdata, '__iter__'):
398             for s in newdata:
399                 self.write(s)
400             return
401         self._data_buffer.append(newdata)
402         self._data_buffer_len += len(newdata)
403         self._current_stream_length += len(newdata)
404         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
405             self.flush_data()
406
407     def open(self, streampath, filename=None):
408         """open(streampath[, filename]) -> file-like object
409
410         Pass in the path of a file to write to the Collection, either as a
411         single string or as two separate stream name and file name arguments.
412         This method returns a file-like object you can write to add it to the
413         Collection.
414
415         You may only have one file object from the Collection open at a time,
416         so be sure to close the object when you're done.  Using the object in
417         a with statement makes that easy::
418
419           with cwriter.open('./doc/page1.txt') as outfile:
420               outfile.write(page1_data)
421           with cwriter.open('./doc/page2.txt') as outfile:
422               outfile.write(page2_data)
423         """
424         if filename is None:
425             streampath, filename = split(streampath)
426         if self._last_open and not self._last_open.closed:
427             raise errors.AssertionError(
428                 "can't open '{}' when '{}' is still open".format(
429                     filename, self._last_open.name))
430         if streampath != self.current_stream_name():
431             self.start_new_stream(streampath)
432         self.set_current_file_name(filename)
433         self._last_open = _WriterFile(self, filename)
434         return self._last_open
435
436     def flush_data(self):
437         data_buffer = ''.join(self._data_buffer)
438         if data_buffer:
439             self._current_stream_locators.append(
440                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
441             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
442             self._data_buffer_len = len(self._data_buffer[0])
443
444     def start_new_file(self, newfilename=None):
445         self.finish_current_file()
446         self.set_current_file_name(newfilename)
447
448     def set_current_file_name(self, newfilename):
449         if re.search(r'[\t\n]', newfilename):
450             raise errors.AssertionError(
451                 "Manifest filenames cannot contain whitespace: %s" %
452                 newfilename)
453         elif re.search(r'\x00', newfilename):
454             raise errors.AssertionError(
455                 "Manifest filenames cannot contain NUL characters: %s" %
456                 newfilename)
457         self._current_file_name = newfilename
458
459     def current_file_name(self):
460         return self._current_file_name
461
462     def finish_current_file(self):
463         if self._current_file_name is None:
464             if self._current_file_pos == self._current_stream_length:
465                 return
466             raise errors.AssertionError(
467                 "Cannot finish an unnamed file " +
468                 "(%d bytes at offset %d in '%s' stream)" %
469                 (self._current_stream_length - self._current_file_pos,
470                  self._current_file_pos,
471                  self._current_stream_name))
472         self._current_stream_files.append([
473                 self._current_file_pos,
474                 self._current_stream_length - self._current_file_pos,
475                 self._current_file_name])
476         self._current_file_pos = self._current_stream_length
477         self._current_file_name = None
478
479     def start_new_stream(self, newstreamname='.'):
480         self.finish_current_stream()
481         self.set_current_stream_name(newstreamname)
482
483     def set_current_stream_name(self, newstreamname):
484         if re.search(r'[\t\n]', newstreamname):
485             raise errors.AssertionError(
486                 "Manifest stream names cannot contain whitespace")
487         self._current_stream_name = '.' if newstreamname=='' else newstreamname
488
489     def current_stream_name(self):
490         return self._current_stream_name
491
492     def finish_current_stream(self):
493         self.finish_current_file()
494         self.flush_data()
495         if not self._current_stream_files:
496             pass
497         elif self._current_stream_name is None:
498             raise errors.AssertionError(
499                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
500                 (self._current_stream_length, len(self._current_stream_files)))
501         else:
502             if not self._current_stream_locators:
503                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
504             self._finished_streams.append([self._current_stream_name,
505                                            self._current_stream_locators,
506                                            self._current_stream_files])
507         self._current_stream_files = []
508         self._current_stream_length = 0
509         self._current_stream_locators = []
510         self._current_stream_name = None
511         self._current_file_pos = 0
512         self._current_file_name = None
513
514     def finish(self):
515         # Store the manifest in Keep and return its locator.
516         return self._my_keep().put(self.manifest_text())
517
518     def portable_data_hash(self):
519         stripped = self.stripped_manifest()
520         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
521
522     def manifest_text(self):
523         self.finish_current_stream()
524         manifest = ''
525
526         for stream in self._finished_streams:
527             if not re.search(r'^\.(/.*)?$', stream[0]):
528                 manifest += './'
529             manifest += stream[0].replace(' ', '\\040')
530             manifest += ' ' + ' '.join(stream[1])
531             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
532             manifest += "\n"
533
534         return manifest
535
536     def data_locators(self):
537         ret = []
538         for name, locators, files in self._finished_streams:
539             ret += locators
540         return ret
541
542
543 class ResumableCollectionWriter(CollectionWriter):
544     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
545                    '_current_stream_locators', '_current_stream_name',
546                    '_current_file_name', '_current_file_pos', '_close_file',
547                    '_data_buffer', '_dependencies', '_finished_streams',
548                    '_queued_dirents', '_queued_trees']
549
550     def __init__(self, api_client=None, num_retries=0):
551         self._dependencies = {}
552         super(ResumableCollectionWriter, self).__init__(
553             api_client, num_retries=num_retries)
554
555     @classmethod
556     def from_state(cls, state, *init_args, **init_kwargs):
557         # Try to build a new writer from scratch with the given state.
558         # If the state is not suitable to resume (because files have changed,
559         # been deleted, aren't predictable, etc.), raise a
560         # StaleWriterStateError.  Otherwise, return the initialized writer.
561         # The caller is responsible for calling writer.do_queued_work()
562         # appropriately after it's returned.
563         writer = cls(*init_args, **init_kwargs)
564         for attr_name in cls.STATE_PROPS:
565             attr_value = state[attr_name]
566             attr_class = getattr(writer, attr_name).__class__
567             # Coerce the value into the same type as the initial value, if
568             # needed.
569             if attr_class not in (type(None), attr_value.__class__):
570                 attr_value = attr_class(attr_value)
571             setattr(writer, attr_name, attr_value)
572         # Check dependencies before we try to resume anything.
573         if any(KeepLocator(ls).permission_expired()
574                for ls in writer._current_stream_locators):
575             raise errors.StaleWriterStateError(
576                 "locators include expired permission hint")
577         writer.check_dependencies()
578         if state['_current_file'] is not None:
579             path, pos = state['_current_file']
580             try:
581                 writer._queued_file = open(path, 'rb')
582                 writer._queued_file.seek(pos)
583             except IOError as error:
584                 raise errors.StaleWriterStateError(
585                     "failed to reopen active file {}: {}".format(path, error))
586         return writer
587
588     def check_dependencies(self):
589         for path, orig_stat in self._dependencies.items():
590             if not S_ISREG(orig_stat[ST_MODE]):
591                 raise errors.StaleWriterStateError("{} not file".format(path))
592             try:
593                 now_stat = tuple(os.stat(path))
594             except OSError as error:
595                 raise errors.StaleWriterStateError(
596                     "failed to stat {}: {}".format(path, error))
597             if ((not S_ISREG(now_stat[ST_MODE])) or
598                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
599                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
600                 raise errors.StaleWriterStateError("{} changed".format(path))
601
602     def dump_state(self, copy_func=lambda x: x):
603         state = {attr: copy_func(getattr(self, attr))
604                  for attr in self.STATE_PROPS}
605         if self._queued_file is None:
606             state['_current_file'] = None
607         else:
608             state['_current_file'] = (os.path.realpath(self._queued_file.name),
609                                       self._queued_file.tell())
610         return state
611
612     def _queue_file(self, source, filename=None):
613         try:
614             src_path = os.path.realpath(source)
615         except Exception:
616             raise errors.AssertionError("{} not a file path".format(source))
617         try:
618             path_stat = os.stat(src_path)
619         except OSError as stat_error:
620             path_stat = None
621         super(ResumableCollectionWriter, self)._queue_file(source, filename)
622         fd_stat = os.fstat(self._queued_file.fileno())
623         if not S_ISREG(fd_stat.st_mode):
624             # We won't be able to resume from this cache anyway, so don't
625             # worry about further checks.
626             self._dependencies[source] = tuple(fd_stat)
627         elif path_stat is None:
628             raise errors.AssertionError(
629                 "could not stat {}: {}".format(source, stat_error))
630         elif path_stat.st_ino != fd_stat.st_ino:
631             raise errors.AssertionError(
632                 "{} changed between open and stat calls".format(source))
633         else:
634             self._dependencies[src_path] = tuple(fd_stat)
635
636     def write(self, data):
637         if self._queued_file is None:
638             raise errors.AssertionError(
639                 "resumable writer can't accept unsourced data")
640         return super(ResumableCollectionWriter, self).write(data)
641
642
643 class Collection(CollectionBase):
644     def __init__(self, manifest_locator_or_text=None, parent=None, api_client=None,
645                  keep_client=None, num_retries=0, block_manager=None):
646
647         self.parent = parent
648         self._items = None
649         self._api_client = api_client
650         self._keep_client = keep_client
651         self._block_manager = block_manager
652
653         self.num_retries = num_retries
654         self._manifest_locator = None
655         self._manifest_text = None
656         self._api_response = None
657
658         if manifest_locator_or_text:
659             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
660                 self._manifest_locator = manifest_locator_or_text
661             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
662                 self._manifest_locator = manifest_locator_or_text
663             elif re.match(util.manifest_pattern, manifest_locator_or_text):
664                 self._manifest_text = manifest_locator_or_text
665             else:
666                 raise errors.ArgumentError(
667                     "Argument to CollectionReader must be a manifest or a collection UUID")
668
669     def _my_api(self):
670         if self._api_client is None:
671             if self.parent is not None:
672                 return self.parent._my_api()
673             self._api_client = arvados.api('v1')
674             self._keep_client = None  # Make a new one with the new api.
675         return self._api_client
676
677     def _my_keep(self):
678         if self._keep_client is None:
679             if self.parent is not None:
680                 return self.parent._my_keep()
681             self._keep_client = KeepClient(api_client=self._my_api(),
682                                            num_retries=self.num_retries)
683         return self._keep_client
684
685     def _my_block_manager(self):
686         if self._block_manager is None:
687             if self.parent is not None:
688                 return self.parent._my_block_manager()
689             self._block_manager = BlockManager(self._my_keep())
690         return self._block_manager
691
692     def _populate_from_api_server(self):
693         # As in KeepClient itself, we must wait until the last
694         # possible moment to instantiate an API client, in order to
695         # avoid tripping up clients that don't have access to an API
696         # server.  If we do build one, make sure our Keep client uses
697         # it.  If instantiation fails, we'll fall back to the except
698         # clause, just like any other Collection lookup
699         # failure. Return an exception, or None if successful.
700         try:
701             self._api_response = self._my_api().collections().get(
702                 uuid=self._manifest_locator).execute(
703                     num_retries=self.num_retries)
704             self._manifest_text = self._api_response['manifest_text']
705             return None
706         except Exception as e:
707             return e
708
709     def _populate_from_keep(self):
710         # Retrieve a manifest directly from Keep. This has a chance of
711         # working if [a] the locator includes a permission signature
712         # or [b] the Keep services are operating in world-readable
713         # mode. Return an exception, or None if successful.
714         try:
715             self._manifest_text = self._my_keep().get(
716                 self._manifest_locator, num_retries=self.num_retries)
717         except Exception as e:
718             return e
719
720     def _populate(self):
721         self._items = {}
722         if self._manifest_locator is None and self._manifest_text is None:
723             return
724         error_via_api = None
725         error_via_keep = None
726         should_try_keep = ((self._manifest_text is None) and
727                            util.keep_locator_pattern.match(
728                 self._manifest_locator))
729         if ((self._manifest_text is None) and
730             util.signed_locator_pattern.match(self._manifest_locator)):
731             error_via_keep = self._populate_from_keep()
732         if self._manifest_text is None:
733             error_via_api = self._populate_from_api_server()
734             if error_via_api is not None and not should_try_keep:
735                 raise error_via_api
736         if ((self._manifest_text is None) and
737             not error_via_keep and
738             should_try_keep):
739             # Looks like a keep locator, and we didn't already try keep above
740             error_via_keep = self._populate_from_keep()
741         if self._manifest_text is None:
742             # Nothing worked!
743             raise arvados.errors.NotFoundError(
744                 ("Failed to retrieve collection '{}' " +
745                  "from either API server ({}) or Keep ({})."
746                  ).format(
747                     self._manifest_locator,
748                     error_via_api,
749                     error_via_keep))
750         # populate
751         import_manifest(self._manifest_text, self)
752
753     def _populate_first(orig_func):
754         # Decorator for methods that read actual Collection data.
755         @functools.wraps(orig_func)
756         def wrapper(self, *args, **kwargs):
757             if self._items is None:
758                 self._populate()
759             return orig_func(self, *args, **kwargs)
760         return wrapper
761
762     def __enter__(self):
763         return self
764
765     def __exit__(self, exc_type, exc_value, traceback):
766         self.save(no_locator=True)
767         if self._block_manager is not None:
768             self._block_manager.stop_threads()
769
770     @_populate_first
771     def find(self, path, create=False, create_collection=False):
772         p = path.split("/")
773         if p[0] == '.':
774             del p[0]
775
776         if len(p) > 0:
777             item = self._items.get(p[0])
778             if len(p) == 1:
779                 # item must be a file
780                 if item is None and create:
781                     # create new file
782                     if create_collection:
783                         item = Collection(parent=self, num_retries=self.num_retries)
784                     else:
785                         item = ArvadosFile(self)
786                     self._items[p[0]] = item
787                 return item
788             else:
789                 if item is None and create:
790                     # create new collection
791                     item = Collection(parent=self, num_retries=self.num_retries)
792                     self._items[p[0]] = item
793                 del p[0]
794                 return item.find("/".join(p), create=create)
795         else:
796             return self
797
798     @_populate_first
799     def api_response(self):
800         """api_response() -> dict or None
801
802         Returns information about this Collection fetched from the API server.
803         If the Collection exists in Keep but not the API server, currently
804         returns None.  Future versions may provide a synthetic response.
805         """
806         return self._api_response
807
808     def open(self, path, mode):
809         mode = mode.replace("b", "")
810         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
811             raise ArgumentError("Bad mode '%s'" % mode)
812         create = (mode != "r")
813
814         f = self.find(path, create=create)
815         if f is None:
816             raise IOError((errno.ENOENT, "File not found"))
817         if not isinstance(f, ArvadosFile):
818             raise IOError((errno.EISDIR, "Path must refer to a file."))
819
820         if mode[0] == "w":
821             f.truncate(0)
822
823         if mode == "r":
824             return ArvadosFileReader(f, path, mode)
825         else:
826             return ArvadosFileWriter(f, path, mode)
827
828     @_populate_first
829     def modified(self):
830         for k,v in self._items.items():
831             if v.modified():
832                 return True
833         return False
834
835     @_populate_first
836     def set_unmodified(self):
837         for k,v in self._items.items():
838             v.set_unmodified()
839
840     @_populate_first
841     def __iter__(self):
842         return self._items.iterkeys()
843
844     @_populate_first
845     def iterkeys(self):
846         return self._items.iterkeys()
847
848     @_populate_first
849     def __getitem__(self, k):
850         return self._items[k]
851
852     @_populate_first
853     def __contains__(self, k):
854         return k in self._items
855
856     @_populate_first
857     def __len__(self):
858        return len(self._items)
859
860     @_populate_first
861     def __delitem__(self, p):
862         del self._items[p]
863
864     @_populate_first
865     def keys(self):
866         return self._items.keys()
867
868     @_populate_first
869     def values(self):
870         return self._items.values()
871
872     @_populate_first
873     def items(self):
874         return self._items.items()
875
876     @_populate_first
877     def exists(self, path):
878         return self.find(path) != None
879
880     @_populate_first
881     def remove(self, path):
882         p = path.split("/")
883         if p[0] == '.':
884             del p[0]
885
886         if len(p) > 0:
887             item = self._items.get(p[0])
888             if item is None:
889                 raise IOError((errno.ENOENT, "File not found"))
890             if len(p) == 1:
891                 del self._items[p[0]]
892             else:
893                 del p[0]
894                 item.remove("/".join(p))
895         else:
896             raise IOError((errno.ENOENT, "File not found"))
897
898     @_populate_first
899     def manifest_text(self, strip=False, normalize=False):
900         if self.modified() or self._manifest_text is None or normalize:
901             return export_manifest(self, stream_name=".", portable_locators=strip)
902         else:
903             if strip:
904                 return self.stripped_manifest()
905             else:
906                 return self._manifest_text
907
908     def portable_data_hash(self):
909         stripped = self.manifest_text(strip=True)
910         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
911
912     @_populate_first
913     def save(self, no_locator=False):
914         if self.modified():
915             self._my_block_manager().commit_all()
916             self._my_keep().put(self.manifest_text(strip=True))
917             if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
918                 self._api_response = self._my_api().collections().update(
919                     uuid=self._manifest_locator,
920                     body={'manifest_text': self.manifest_text(strip=False)}
921                     ).execute(
922                         num_retries=self.num_retries)
923             elif not no_locator:
924                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
925             self.set_unmodified()
926
927     @_populate_first
928     def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
929         self._my_block_manager().commit_all()
930         self._my_keep().put(self.manifest_text(strip=True))
931         body = {"manifest_text": self.manifest_text(strip=False),
932                 "name": name}
933         if owner_uuid:
934             body["owner_uuid"] = owner_uuid
935         self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
936         self._manifest_locator = self._api_response["uuid"]
937         self.set_unmodified()
938
939     @_populate_first
940     def rename(self, old, new):
941         old_path, old_fn = os.path.split(old)
942         old_col = self.find(path)
943         if old_col is None:
944             raise IOError((errno.ENOENT, "File not found"))
945         if not isinstance(old_p, Collection):
946             raise IOError((errno.ENOTDIR, "Parent in path is a file, not a directory"))
947         if old_fn in old_col:
948             new_path, new_fn = os.path.split(new)
949             new_col = self.find(new_path, create=True, create_collection=True)
950             if not isinstance(new_col, Collection):
951                 raise IOError((errno.ENOTDIR, "Destination is a file, not a directory"))
952             ent = old_col[old_fn]
953             del old_col[old_fn]
954             ent.parent = new_col
955             new_col[new_fn] = ent
956         else:
957             raise IOError((errno.ENOENT, "File not found"))
958
959 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
960     if into_collection is not None:
961         if len(into_collection) > 0:
962             raise ArgumentError("Can only import manifest into an empty collection")
963         c = into_collection
964     else:
965         c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
966
967     STREAM_NAME = 0
968     BLOCKS = 1
969     SEGMENTS = 2
970
971     stream_name = None
972     state = STREAM_NAME
973
974     for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
975         tok = n.group(1)
976         sep = n.group(2)
977
978         if state == STREAM_NAME:
979             # starting a new stream
980             stream_name = tok.replace('\\040', ' ')
981             blocks = []
982             segments = []
983             streamoffset = 0L
984             state = BLOCKS
985             continue
986
987         if state == BLOCKS:
988             s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
989             if s:
990                 blocksize = long(s.group(1))
991                 blocks.append(Range(tok, streamoffset, blocksize))
992                 streamoffset += blocksize
993             else:
994                 state = SEGMENTS
995
996         if state == SEGMENTS:
997             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
998             if s:
999                 pos = long(s.group(1))
1000                 size = long(s.group(2))
1001                 name = s.group(3).replace('\\040', ' ')
1002                 f = c.find("%s/%s" % (stream_name, name), create=True)
1003                 f.add_segment(blocks, pos, size)
1004             else:
1005                 # error!
1006                 raise errors.SyntaxError("Invalid manifest format")
1007
1008         if sep == "\n":
1009             stream_name = None
1010             state = STREAM_NAME
1011
1012     c.set_unmodified()
1013     return c
1014
1015 def export_manifest(item, stream_name=".", portable_locators=False):
1016     buf = ""
1017     if isinstance(item, Collection):
1018         stream = {}
1019         sorted_keys = sorted(item.keys())
1020         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1021             v = item[k]
1022             st = []
1023             for s in v.segments:
1024                 loc = s.locator
1025                 if loc.startswith("bufferblock"):
1026                     loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1027                 if portable_locators:
1028                     loc = KeepLocator(loc).stripped()
1029                 st.append(LocatorAndRange(loc, locator_block_size(loc),
1030                                      s.segment_offset, s.range_size))
1031             stream[k] = st
1032         if stream:
1033             buf += ' '.join(normalize_stream(stream_name, stream))
1034             buf += "\n"
1035         for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
1036             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1037     elif isinstance(item, ArvadosFile):
1038         st = []
1039         for s in item.segments:
1040             loc = s.locator
1041             if loc.startswith("bufferblock"):
1042                 loc = item._bufferblocks[loc].calculate_locator()
1043             if portable_locators:
1044                 loc = KeepLocator(loc).stripped()
1045             st.append(LocatorAndRange(loc, locator_block_size(loc),
1046                                  s.segment_offset, s.range_size))
1047         stream[stream_name] = st
1048         buf += ' '.join(normalize_stream(stream_name, stream))
1049         buf += "\n"
1050     return buf