5011: Add replication argument to CollectionWriter.
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5
6 from collections import deque
7 from stat import *
8
9 from .arvfile import ArvadosFileBase
10 from keep import *
11 from .stream import StreamReader, split
12 import config
13 import errors
14 import util
15
16 _logger = logging.getLogger('arvados.collection')
17
18 def normalize_stream(s, stream):
19     stream_tokens = [s]
20     sortedfiles = list(stream.keys())
21     sortedfiles.sort()
22
23     blocks = {}
24     streamoffset = 0L
25     for f in sortedfiles:
26         for b in stream[f]:
27             if b[arvados.LOCATOR] not in blocks:
28                 stream_tokens.append(b[arvados.LOCATOR])
29                 blocks[b[arvados.LOCATOR]] = streamoffset
30                 streamoffset += b[arvados.BLOCKSIZE]
31
32     if len(stream_tokens) == 1:
33         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
34
35     for f in sortedfiles:
36         current_span = None
37         fout = f.replace(' ', '\\040')
38         for segment in stream[f]:
39             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
40             if current_span is None:
41                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
42             else:
43                 if segmentoffset == current_span[1]:
44                     current_span[1] += segment[arvados.SEGMENTSIZE]
45                 else:
46                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
47                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
48
49         if current_span is not None:
50             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
51
52         if not stream[f]:
53             stream_tokens.append("0:0:{0}".format(fout))
54
55     return stream_tokens
56
57
58 class CollectionBase(object):
59     def __enter__(self):
60         return self
61
62     def __exit__(self, exc_type, exc_value, traceback):
63         pass
64
65     def _my_keep(self):
66         if self._keep_client is None:
67             self._keep_client = KeepClient(api_client=self._api_client,
68                                            num_retries=self.num_retries)
69         return self._keep_client
70
71     def stripped_manifest(self):
72         """
73         Return the manifest for the current collection with all
74         non-portable hints (i.e., permission signatures and other
75         hints other than size hints) removed from the locators.
76         """
77         raw = self.manifest_text()
78         clean = []
79         for line in raw.split("\n"):
80             fields = line.split()
81             if fields:
82                 clean_fields = fields[:1] + [
83                     (re.sub(r'\+[^\d][^\+]*', '', x)
84                      if re.match(util.keep_locator_pattern, x)
85                      else x)
86                     for x in fields[1:]]
87                 clean += [' '.join(clean_fields), "\n"]
88         return ''.join(clean)
89
90
91 class CollectionReader(CollectionBase):
92     def __init__(self, manifest_locator_or_text, api_client=None,
93                  keep_client=None, num_retries=0):
94         """Instantiate a CollectionReader.
95
96         This class parses Collection manifests to provide a simple interface
97         to read its underlying files.
98
99         Arguments:
100         * manifest_locator_or_text: One of a Collection UUID, portable data
101           hash, or full manifest text.
102         * api_client: The API client to use to look up Collections.  If not
103           provided, CollectionReader will build one from available Arvados
104           configuration.
105         * keep_client: The KeepClient to use to download Collection data.
106           If not provided, CollectionReader will build one from available
107           Arvados configuration.
108         * num_retries: The default number of times to retry failed
109           service requests.  Default 0.  You may change this value
110           after instantiation, but note those changes may not
111           propagate to related objects like the Keep client.
112         """
113         self._api_client = api_client
114         self._keep_client = keep_client
115         self.num_retries = num_retries
116         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
117             self._manifest_locator = manifest_locator_or_text
118             self._manifest_text = None
119         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
120             self._manifest_locator = manifest_locator_or_text
121             self._manifest_text = None
122         elif re.match(util.manifest_pattern, manifest_locator_or_text):
123             self._manifest_text = manifest_locator_or_text
124             self._manifest_locator = None
125         else:
126             raise errors.ArgumentError(
127                 "Argument to CollectionReader must be a manifest or a collection UUID")
128         self._api_response = None
129         self._streams = None
130
131     def _populate_from_api_server(self):
132         # As in KeepClient itself, we must wait until the last
133         # possible moment to instantiate an API client, in order to
134         # avoid tripping up clients that don't have access to an API
135         # server.  If we do build one, make sure our Keep client uses
136         # it.  If instantiation fails, we'll fall back to the except
137         # clause, just like any other Collection lookup
138         # failure. Return an exception, or None if successful.
139         try:
140             if self._api_client is None:
141                 self._api_client = arvados.api('v1')
142                 self._keep_client = None  # Make a new one with the new api.
143             self._api_response = self._api_client.collections().get(
144                 uuid=self._manifest_locator).execute(
145                 num_retries=self.num_retries)
146             self._manifest_text = self._api_response['manifest_text']
147             return None
148         except Exception as e:
149             return e
150
151     def _populate_from_keep(self):
152         # Retrieve a manifest directly from Keep. This has a chance of
153         # working if [a] the locator includes a permission signature
154         # or [b] the Keep services are operating in world-readable
155         # mode. Return an exception, or None if successful.
156         try:
157             self._manifest_text = self._my_keep().get(
158                 self._manifest_locator, num_retries=self.num_retries)
159         except Exception as e:
160             return e
161
162     def _populate(self):
163         error_via_api = None
164         error_via_keep = None
165         should_try_keep = ((self._manifest_text is None) and
166                            util.keep_locator_pattern.match(
167                 self._manifest_locator))
168         if ((self._manifest_text is None) and
169             util.signed_locator_pattern.match(self._manifest_locator)):
170             error_via_keep = self._populate_from_keep()
171         if self._manifest_text is None:
172             error_via_api = self._populate_from_api_server()
173             if error_via_api is not None and not should_try_keep:
174                 raise error_via_api
175         if ((self._manifest_text is None) and
176             not error_via_keep and
177             should_try_keep):
178             # Looks like a keep locator, and we didn't already try keep above
179             error_via_keep = self._populate_from_keep()
180         if self._manifest_text is None:
181             # Nothing worked!
182             raise arvados.errors.NotFoundError(
183                 ("Failed to retrieve collection '{}' " +
184                  "from either API server ({}) or Keep ({})."
185                  ).format(
186                     self._manifest_locator,
187                     error_via_api,
188                     error_via_keep))
189         self._streams = [sline.split()
190                          for sline in self._manifest_text.split("\n")
191                          if sline]
192
193     def _populate_first(orig_func):
194         # Decorator for methods that read actual Collection data.
195         @functools.wraps(orig_func)
196         def wrapper(self, *args, **kwargs):
197             if self._streams is None:
198                 self._populate()
199             return orig_func(self, *args, **kwargs)
200         return wrapper
201
202     @_populate_first
203     def api_response(self):
204         """api_response() -> dict or None
205
206         Returns information about this Collection fetched from the API server.
207         If the Collection exists in Keep but not the API server, currently
208         returns None.  Future versions may provide a synthetic response.
209         """
210         return self._api_response
211
212     @_populate_first
213     def normalize(self):
214         # Rearrange streams
215         streams = {}
216         for s in self.all_streams():
217             for f in s.all_files():
218                 streamname, filename = split(s.name() + "/" + f.name())
219                 if streamname not in streams:
220                     streams[streamname] = {}
221                 if filename not in streams[streamname]:
222                     streams[streamname][filename] = []
223                 for r in f.segments:
224                     streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
225
226         self._streams = [normalize_stream(s, streams[s])
227                          for s in sorted(streams)]
228
229         # Regenerate the manifest text based on the normalized streams
230         self._manifest_text = ''.join(
231             [StreamReader(stream, keep=self._my_keep()).manifest_text()
232              for stream in self._streams])
233
234     @_populate_first
235     def open(self, streampath, filename=None):
236         """open(streampath[, filename]) -> file-like object
237
238         Pass in the path of a file to read from the Collection, either as a
239         single string or as two separate stream name and file name arguments.
240         This method returns a file-like object to read that file.
241         """
242         if filename is None:
243             streampath, filename = split(streampath)
244         keep_client = self._my_keep()
245         for stream_s in self._streams:
246             stream = StreamReader(stream_s, keep_client,
247                                   num_retries=self.num_retries)
248             if stream.name() == streampath:
249                 break
250         else:
251             raise ValueError("stream '{}' not found in Collection".
252                              format(streampath))
253         try:
254             return stream.files()[filename]
255         except KeyError:
256             raise ValueError("file '{}' not found in Collection stream '{}'".
257                              format(filename, streampath))
258
259     @_populate_first
260     def all_streams(self):
261         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
262                 for s in self._streams]
263
264     def all_files(self):
265         for s in self.all_streams():
266             for f in s.all_files():
267                 yield f
268
269     @_populate_first
270     def manifest_text(self, strip=False, normalize=False):
271         if normalize:
272             cr = CollectionReader(self.manifest_text())
273             cr.normalize()
274             return cr.manifest_text(strip=strip, normalize=False)
275         elif strip:
276             return self.stripped_manifest()
277         else:
278             return self._manifest_text
279
280
281 class _WriterFile(ArvadosFileBase):
282     def __init__(self, coll_writer, name):
283         super(_WriterFile, self).__init__(name, 'wb')
284         self.dest = coll_writer
285
286     def close(self):
287         super(_WriterFile, self).close()
288         self.dest.finish_current_file()
289
290     @ArvadosFileBase._before_close
291     def write(self, data):
292         self.dest.write(data)
293
294     @ArvadosFileBase._before_close
295     def writelines(self, seq):
296         for data in seq:
297             self.write(data)
298
299     @ArvadosFileBase._before_close
300     def flush(self):
301         self.dest.flush_data()
302
303
304 class CollectionWriter(CollectionBase):
305     KEEP_BLOCK_SIZE = 2**26
306
307     def __init__(self, api_client=None, num_retries=0, replication=0):
308         """Instantiate a CollectionWriter.
309
310         CollectionWriter lets you build a new Arvados Collection from scratch.
311         Write files to it.  The CollectionWriter will upload data to Keep as
312         appropriate, and provide you with the Collection manifest text when
313         you're finished.
314
315         Arguments:
316         * api_client: The API client to use to look up Collections.  If not
317           provided, CollectionReader will build one from available Arvados
318           configuration.
319         * num_retries: The default number of times to retry failed
320           service requests.  Default 0.  You may change this value
321           after instantiation, but note those changes may not
322           propagate to related objects like the Keep client.
323         * replication: The number of copies of each block to store.
324           If this argument is 0 or not supplied, replication is
325           the server-provided default if available, otherwise 2.
326         """
327         self._api_client = api_client
328         self.num_retries = num_retries
329         self.replication = (replication if replication>0 else 2)
330         self._keep_client = None
331         self._data_buffer = []
332         self._data_buffer_len = 0
333         self._current_stream_files = []
334         self._current_stream_length = 0
335         self._current_stream_locators = []
336         self._current_stream_name = '.'
337         self._current_file_name = None
338         self._current_file_pos = 0
339         self._finished_streams = []
340         self._close_file = None
341         self._queued_file = None
342         self._queued_dirents = deque()
343         self._queued_trees = deque()
344         self._last_open = None
345
346     def __exit__(self, exc_type, exc_value, traceback):
347         if exc_type is None:
348             self.finish()
349
350     def do_queued_work(self):
351         # The work queue consists of three pieces:
352         # * _queued_file: The file object we're currently writing to the
353         #   Collection.
354         # * _queued_dirents: Entries under the current directory
355         #   (_queued_trees[0]) that we want to write or recurse through.
356         #   This may contain files from subdirectories if
357         #   max_manifest_depth == 0 for this directory.
358         # * _queued_trees: Directories that should be written as separate
359         #   streams to the Collection.
360         # This function handles the smallest piece of work currently queued
361         # (current file, then current directory, then next directory) until
362         # no work remains.  The _work_THING methods each do a unit of work on
363         # THING.  _queue_THING methods add a THING to the work queue.
364         while True:
365             if self._queued_file:
366                 self._work_file()
367             elif self._queued_dirents:
368                 self._work_dirents()
369             elif self._queued_trees:
370                 self._work_trees()
371             else:
372                 break
373
374     def _work_file(self):
375         while True:
376             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
377             if not buf:
378                 break
379             self.write(buf)
380         self.finish_current_file()
381         if self._close_file:
382             self._queued_file.close()
383         self._close_file = None
384         self._queued_file = None
385
386     def _work_dirents(self):
387         path, stream_name, max_manifest_depth = self._queued_trees[0]
388         if stream_name != self.current_stream_name():
389             self.start_new_stream(stream_name)
390         while self._queued_dirents:
391             dirent = self._queued_dirents.popleft()
392             target = os.path.join(path, dirent)
393             if os.path.isdir(target):
394                 self._queue_tree(target,
395                                  os.path.join(stream_name, dirent),
396                                  max_manifest_depth - 1)
397             else:
398                 self._queue_file(target, dirent)
399                 break
400         if not self._queued_dirents:
401             self._queued_trees.popleft()
402
403     def _work_trees(self):
404         path, stream_name, max_manifest_depth = self._queued_trees[0]
405         d = util.listdir_recursive(
406             path, max_depth = (None if max_manifest_depth == 0 else 0))
407         if d:
408             self._queue_dirents(stream_name, d)
409         else:
410             self._queued_trees.popleft()
411
412     def _queue_file(self, source, filename=None):
413         assert (self._queued_file is None), "tried to queue more than one file"
414         if not hasattr(source, 'read'):
415             source = open(source, 'rb')
416             self._close_file = True
417         else:
418             self._close_file = False
419         if filename is None:
420             filename = os.path.basename(source.name)
421         self.start_new_file(filename)
422         self._queued_file = source
423
424     def _queue_dirents(self, stream_name, dirents):
425         assert (not self._queued_dirents), "tried to queue more than one tree"
426         self._queued_dirents = deque(sorted(dirents))
427
428     def _queue_tree(self, path, stream_name, max_manifest_depth):
429         self._queued_trees.append((path, stream_name, max_manifest_depth))
430
431     def write_file(self, source, filename=None):
432         self._queue_file(source, filename)
433         self.do_queued_work()
434
435     def write_directory_tree(self,
436                              path, stream_name='.', max_manifest_depth=-1):
437         self._queue_tree(path, stream_name, max_manifest_depth)
438         self.do_queued_work()
439
440     def write(self, newdata):
441         if hasattr(newdata, '__iter__'):
442             for s in newdata:
443                 self.write(s)
444             return
445         self._data_buffer.append(newdata)
446         self._data_buffer_len += len(newdata)
447         self._current_stream_length += len(newdata)
448         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
449             self.flush_data()
450
451     def open(self, streampath, filename=None):
452         """open(streampath[, filename]) -> file-like object
453
454         Pass in the path of a file to write to the Collection, either as a
455         single string or as two separate stream name and file name arguments.
456         This method returns a file-like object you can write to add it to the
457         Collection.
458
459         You may only have one file object from the Collection open at a time,
460         so be sure to close the object when you're done.  Using the object in
461         a with statement makes that easy::
462
463           with cwriter.open('./doc/page1.txt') as outfile:
464               outfile.write(page1_data)
465           with cwriter.open('./doc/page2.txt') as outfile:
466               outfile.write(page2_data)
467         """
468         if filename is None:
469             streampath, filename = split(streampath)
470         if self._last_open and not self._last_open.closed:
471             raise errors.AssertionError(
472                 "can't open '{}' when '{}' is still open".format(
473                     filename, self._last_open.name))
474         if streampath != self.current_stream_name():
475             self.start_new_stream(streampath)
476         self.set_current_file_name(filename)
477         self._last_open = _WriterFile(self, filename)
478         return self._last_open
479
480     def flush_data(self):
481         data_buffer = ''.join(self._data_buffer)
482         if data_buffer:
483             self._current_stream_locators.append(
484                 self._my_keep().put(
485                     data_buffer[0:self.KEEP_BLOCK_SIZE],
486                     copies=self.replication))
487             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
488             self._data_buffer_len = len(self._data_buffer[0])
489
490     def start_new_file(self, newfilename=None):
491         self.finish_current_file()
492         self.set_current_file_name(newfilename)
493
494     def set_current_file_name(self, newfilename):
495         if re.search(r'[\t\n]', newfilename):
496             raise errors.AssertionError(
497                 "Manifest filenames cannot contain whitespace: %s" %
498                 newfilename)
499         elif re.search(r'\x00', newfilename):
500             raise errors.AssertionError(
501                 "Manifest filenames cannot contain NUL characters: %s" %
502                 newfilename)
503         self._current_file_name = newfilename
504
505     def current_file_name(self):
506         return self._current_file_name
507
508     def finish_current_file(self):
509         if self._current_file_name is None:
510             if self._current_file_pos == self._current_stream_length:
511                 return
512             raise errors.AssertionError(
513                 "Cannot finish an unnamed file " +
514                 "(%d bytes at offset %d in '%s' stream)" %
515                 (self._current_stream_length - self._current_file_pos,
516                  self._current_file_pos,
517                  self._current_stream_name))
518         self._current_stream_files.append([
519                 self._current_file_pos,
520                 self._current_stream_length - self._current_file_pos,
521                 self._current_file_name])
522         self._current_file_pos = self._current_stream_length
523         self._current_file_name = None
524
525     def start_new_stream(self, newstreamname='.'):
526         self.finish_current_stream()
527         self.set_current_stream_name(newstreamname)
528
529     def set_current_stream_name(self, newstreamname):
530         if re.search(r'[\t\n]', newstreamname):
531             raise errors.AssertionError(
532                 "Manifest stream names cannot contain whitespace")
533         self._current_stream_name = '.' if newstreamname=='' else newstreamname
534
535     def current_stream_name(self):
536         return self._current_stream_name
537
538     def finish_current_stream(self):
539         self.finish_current_file()
540         self.flush_data()
541         if not self._current_stream_files:
542             pass
543         elif self._current_stream_name is None:
544             raise errors.AssertionError(
545                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
546                 (self._current_stream_length, len(self._current_stream_files)))
547         else:
548             if not self._current_stream_locators:
549                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
550             self._finished_streams.append([self._current_stream_name,
551                                            self._current_stream_locators,
552                                            self._current_stream_files])
553         self._current_stream_files = []
554         self._current_stream_length = 0
555         self._current_stream_locators = []
556         self._current_stream_name = None
557         self._current_file_pos = 0
558         self._current_file_name = None
559
560     def finish(self):
561         # Store the manifest in Keep and return its locator. Beware,
562         # this is only useful in special cases like storing manifest
563         # fragments temporarily in Keep during a Crunch job. In most
564         # cases you should make a collection instead, by sending
565         # manifest_text() to the API server's "create collection"
566         # endpoint.
567         return self._my_keep().put(self.manifest_text(), copies=self.replication)
568
569     def portable_data_hash(self):
570         stripped = self.stripped_manifest()
571         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
572
573     def manifest_text(self):
574         self.finish_current_stream()
575         manifest = ''
576
577         for stream in self._finished_streams:
578             if not re.search(r'^\.(/.*)?$', stream[0]):
579                 manifest += './'
580             manifest += stream[0].replace(' ', '\\040')
581             manifest += ' ' + ' '.join(stream[1])
582             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
583             manifest += "\n"
584
585         return manifest
586
587     def data_locators(self):
588         ret = []
589         for name, locators, files in self._finished_streams:
590             ret += locators
591         return ret
592
593
594 class ResumableCollectionWriter(CollectionWriter):
595     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
596                    '_current_stream_locators', '_current_stream_name',
597                    '_current_file_name', '_current_file_pos', '_close_file',
598                    '_data_buffer', '_dependencies', '_finished_streams',
599                    '_queued_dirents', '_queued_trees']
600
601     def __init__(self, api_client=None, **kwargs):
602         self._dependencies = {}
603         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
604
605     @classmethod
606     def from_state(cls, state, *init_args, **init_kwargs):
607         # Try to build a new writer from scratch with the given state.
608         # If the state is not suitable to resume (because files have changed,
609         # been deleted, aren't predictable, etc.), raise a
610         # StaleWriterStateError.  Otherwise, return the initialized writer.
611         # The caller is responsible for calling writer.do_queued_work()
612         # appropriately after it's returned.
613         writer = cls(*init_args, **init_kwargs)
614         for attr_name in cls.STATE_PROPS:
615             attr_value = state[attr_name]
616             attr_class = getattr(writer, attr_name).__class__
617             # Coerce the value into the same type as the initial value, if
618             # needed.
619             if attr_class not in (type(None), attr_value.__class__):
620                 attr_value = attr_class(attr_value)
621             setattr(writer, attr_name, attr_value)
622         # Check dependencies before we try to resume anything.
623         if any(KeepLocator(ls).permission_expired()
624                for ls in writer._current_stream_locators):
625             raise errors.StaleWriterStateError(
626                 "locators include expired permission hint")
627         writer.check_dependencies()
628         if state['_current_file'] is not None:
629             path, pos = state['_current_file']
630             try:
631                 writer._queued_file = open(path, 'rb')
632                 writer._queued_file.seek(pos)
633             except IOError as error:
634                 raise errors.StaleWriterStateError(
635                     "failed to reopen active file {}: {}".format(path, error))
636         return writer
637
638     def check_dependencies(self):
639         for path, orig_stat in self._dependencies.items():
640             if not S_ISREG(orig_stat[ST_MODE]):
641                 raise errors.StaleWriterStateError("{} not file".format(path))
642             try:
643                 now_stat = tuple(os.stat(path))
644             except OSError as error:
645                 raise errors.StaleWriterStateError(
646                     "failed to stat {}: {}".format(path, error))
647             if ((not S_ISREG(now_stat[ST_MODE])) or
648                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
649                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
650                 raise errors.StaleWriterStateError("{} changed".format(path))
651
652     def dump_state(self, copy_func=lambda x: x):
653         state = {attr: copy_func(getattr(self, attr))
654                  for attr in self.STATE_PROPS}
655         if self._queued_file is None:
656             state['_current_file'] = None
657         else:
658             state['_current_file'] = (os.path.realpath(self._queued_file.name),
659                                       self._queued_file.tell())
660         return state
661
662     def _queue_file(self, source, filename=None):
663         try:
664             src_path = os.path.realpath(source)
665         except Exception:
666             raise errors.AssertionError("{} not a file path".format(source))
667         try:
668             path_stat = os.stat(src_path)
669         except OSError as stat_error:
670             path_stat = None
671         super(ResumableCollectionWriter, self)._queue_file(source, filename)
672         fd_stat = os.fstat(self._queued_file.fileno())
673         if not S_ISREG(fd_stat.st_mode):
674             # We won't be able to resume from this cache anyway, so don't
675             # worry about further checks.
676             self._dependencies[source] = tuple(fd_stat)
677         elif path_stat is None:
678             raise errors.AssertionError(
679                 "could not stat {}: {}".format(source, stat_error))
680         elif path_stat.st_ino != fd_stat.st_ino:
681             raise errors.AssertionError(
682                 "{} changed between open and stat calls".format(source))
683         else:
684             self._dependencies[src_path] = tuple(fd_stat)
685
686     def write(self, data):
687         if self._queued_file is None:
688             raise errors.AssertionError(
689                 "resumable writer can't accept unsourced data")
690         return super(ResumableCollectionWriter, self).write(data)