Retrieve manifest_text from API server. If that fails, emit a warning
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from keep import *
22 from stream import *
23 import config
24 import errors
25
26 class CollectionReader(object):
27     def __init__(self, manifest_locator_or_text):
28         if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
29             self._manifest_text = manifest_locator_or_text
30             self._manifest_locator = None
31         else:
32             self._manifest_locator = manifest_locator_or_text
33             self._manifest_text = None
34         self._streams = None
35
36     def __enter__(self):
37         pass
38
39     def __exit__(self):
40         pass
41
42     def _populate(self):
43         if self._streams != None:
44             return
45         if not self._manifest_text:
46             try:
47                 c = arvados.api('v1').collections().get(
48                     uuid=self._manifest_locator).execute()
49                 self._manifest_text = c['manifest_text']
50             except Exception as e:
51                 logging.warning("API lookup failed for collection %s (%s: %s)" %
52                                 (self._manifest_locator, type(e), str(e)))
53                 self._manifest_text = Keep.get(self._manifest_locator)
54         self._streams = []
55         for stream_line in self._manifest_text.split("\n"):
56             if stream_line != '':
57                 stream_tokens = stream_line.split()
58                 self._streams += [stream_tokens]
59
60     def all_streams(self):
61         self._populate()
62         resp = []
63         for s in self._streams:
64             resp += [StreamReader(s)]
65         return resp
66
67     def all_files(self):
68         for s in self.all_streams():
69             for f in s.all_files():
70                 yield f
71
72     def manifest_text(self):
73         self._populate()
74         return self._manifest_text
75
76 class CollectionWriter(object):
77     KEEP_BLOCK_SIZE = 2**26
78
79     def __init__(self):
80         self._data_buffer = []
81         self._data_buffer_len = 0
82         self._current_stream_files = []
83         self._current_stream_length = 0
84         self._current_stream_locators = []
85         self._current_stream_name = '.'
86         self._current_file_name = None
87         self._current_file_pos = 0
88         self._finished_streams = []
89
90     def __enter__(self):
91         pass
92
93     def __exit__(self):
94         self.finish()
95
96     def write_directory_tree(self,
97                              path, stream_name='.', max_manifest_depth=-1):
98         self.start_new_stream(stream_name)
99         todo = []
100         if max_manifest_depth == 0:
101             dirents = sorted(util.listdir_recursive(path))
102         else:
103             dirents = sorted(os.listdir(path))
104         for dirent in dirents:
105             target = os.path.join(path, dirent)
106             if os.path.isdir(target):
107                 todo += [[target,
108                           os.path.join(stream_name, dirent),
109                           max_manifest_depth-1]]
110             else:
111                 self.start_new_file(dirent)
112                 with open(target, 'rb') as f:
113                     while True:
114                         buf = f.read(2**26)
115                         if len(buf) == 0:
116                             break
117                         self.write(buf)
118         self.finish_current_stream()
119         map(lambda x: self.write_directory_tree(*x), todo)
120
121     def write(self, newdata):
122         if hasattr(newdata, '__iter__'):
123             for s in newdata:
124                 self.write(s)
125             return
126         self._data_buffer += [newdata]
127         self._data_buffer_len += len(newdata)
128         self._current_stream_length += len(newdata)
129         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
130             self.flush_data()
131
132     def flush_data(self):
133         data_buffer = ''.join(self._data_buffer)
134         if data_buffer != '':
135             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
136             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
137             self._data_buffer_len = len(self._data_buffer[0])
138
139     def start_new_file(self, newfilename=None):
140         self.finish_current_file()
141         self.set_current_file_name(newfilename)
142
143     def set_current_file_name(self, newfilename):
144         if re.search(r'[\t\n]', newfilename):
145             raise errors.AssertionError(
146                 "Manifest filenames cannot contain whitespace: %s" %
147                 newfilename)
148         self._current_file_name = newfilename
149
150     def current_file_name(self):
151         return self._current_file_name
152
153     def finish_current_file(self):
154         if self._current_file_name == None:
155             if self._current_file_pos == self._current_stream_length:
156                 return
157             raise errors.AssertionError(
158                 "Cannot finish an unnamed file " +
159                 "(%d bytes at offset %d in '%s' stream)" %
160                 (self._current_stream_length - self._current_file_pos,
161                  self._current_file_pos,
162                  self._current_stream_name))
163         self._current_stream_files += [[self._current_file_pos,
164                                        self._current_stream_length - self._current_file_pos,
165                                        self._current_file_name]]
166         self._current_file_pos = self._current_stream_length
167
168     def start_new_stream(self, newstreamname='.'):
169         self.finish_current_stream()
170         self.set_current_stream_name(newstreamname)
171
172     def set_current_stream_name(self, newstreamname):
173         if re.search(r'[\t\n]', newstreamname):
174             raise errors.AssertionError(
175                 "Manifest stream names cannot contain whitespace")
176         self._current_stream_name = '.' if newstreamname=='' else newstreamname
177
178     def current_stream_name(self):
179         return self._current_stream_name
180
181     def finish_current_stream(self):
182         self.finish_current_file()
183         self.flush_data()
184         if len(self._current_stream_files) == 0:
185             pass
186         elif self._current_stream_name == None:
187             raise errors.AssertionError(
188                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
189                 (self._current_stream_length, len(self._current_stream_files)))
190         else:
191             if len(self._current_stream_locators) == 0:
192                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
193             self._finished_streams += [[self._current_stream_name,
194                                        self._current_stream_locators,
195                                        self._current_stream_files]]
196         self._current_stream_files = []
197         self._current_stream_length = 0
198         self._current_stream_locators = []
199         self._current_stream_name = None
200         self._current_file_pos = 0
201         self._current_file_name = None
202
203     def finish(self):
204         return Keep.put(self.manifest_text())
205
206     def manifest_text(self):
207         self.finish_current_stream()
208         manifest = ''
209         for stream in self._finished_streams:
210             if not re.search(r'^\.(/.*)?$', stream[0]):
211                 manifest += './'
212             manifest += stream[0].replace(' ', '\\040')
213             for locator in stream[1]:
214                 manifest += " %s" % locator
215             for sfile in stream[2]:
216                 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
217             manifest += "\n"
218         return manifest
219
220     def data_locators(self):
221         ret = []
222         for name, locators, files in self._finished_streams:
223             ret += locators
224         return ret