26 class CollectionReader(object):
27 def __init__(self, manifest_locator_or_text):
28 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
29 self._manifest_text = manifest_locator_or_text
30 self._manifest_locator = None
32 self._manifest_locator = manifest_locator_or_text
33 self._manifest_text = None
43 if self._streams != None:
45 if not self._manifest_text:
47 c = arvados.api('v1').collections().get(
48 uuid=self._manifest_locator).execute()
49 self._manifest_text = c['manifest_text']
50 except Exception as e:
51 logging.warning("API lookup failed for collection %s (%s: %s)" %
52 (self._manifest_locator, type(e), str(e)))
53 self._manifest_text = Keep.get(self._manifest_locator)
55 for stream_line in self._manifest_text.split("\n"):
57 stream_tokens = stream_line.split()
58 self._streams += [stream_tokens]
60 def all_streams(self):
63 for s in self._streams:
64 resp += [StreamReader(s)]
68 for s in self.all_streams():
69 for f in s.all_files():
72 def manifest_text(self):
74 return self._manifest_text
76 class CollectionWriter(object):
77 KEEP_BLOCK_SIZE = 2**26
80 self._data_buffer = []
81 self._data_buffer_len = 0
82 self._current_stream_files = []
83 self._current_stream_length = 0
84 self._current_stream_locators = []
85 self._current_stream_name = '.'
86 self._current_file_name = None
87 self._current_file_pos = 0
88 self._finished_streams = []
96 def write_directory_tree(self,
97 path, stream_name='.', max_manifest_depth=-1):
98 self.start_new_stream(stream_name)
100 if max_manifest_depth == 0:
101 dirents = sorted(util.listdir_recursive(path))
103 dirents = sorted(os.listdir(path))
104 for dirent in dirents:
105 target = os.path.join(path, dirent)
106 if os.path.isdir(target):
108 os.path.join(stream_name, dirent),
109 max_manifest_depth-1]]
111 self.start_new_file(dirent)
112 with open(target, 'rb') as f:
118 self.finish_current_stream()
119 map(lambda x: self.write_directory_tree(*x), todo)
121 def write(self, newdata):
122 if hasattr(newdata, '__iter__'):
126 self._data_buffer += [newdata]
127 self._data_buffer_len += len(newdata)
128 self._current_stream_length += len(newdata)
129 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
132 def flush_data(self):
133 data_buffer = ''.join(self._data_buffer)
134 if data_buffer != '':
135 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
136 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
137 self._data_buffer_len = len(self._data_buffer[0])
139 def start_new_file(self, newfilename=None):
140 self.finish_current_file()
141 self.set_current_file_name(newfilename)
143 def set_current_file_name(self, newfilename):
144 if re.search(r'[\t\n]', newfilename):
145 raise errors.AssertionError(
146 "Manifest filenames cannot contain whitespace: %s" %
148 self._current_file_name = newfilename
150 def current_file_name(self):
151 return self._current_file_name
153 def finish_current_file(self):
154 if self._current_file_name == None:
155 if self._current_file_pos == self._current_stream_length:
157 raise errors.AssertionError(
158 "Cannot finish an unnamed file " +
159 "(%d bytes at offset %d in '%s' stream)" %
160 (self._current_stream_length - self._current_file_pos,
161 self._current_file_pos,
162 self._current_stream_name))
163 self._current_stream_files += [[self._current_file_pos,
164 self._current_stream_length - self._current_file_pos,
165 self._current_file_name]]
166 self._current_file_pos = self._current_stream_length
168 def start_new_stream(self, newstreamname='.'):
169 self.finish_current_stream()
170 self.set_current_stream_name(newstreamname)
172 def set_current_stream_name(self, newstreamname):
173 if re.search(r'[\t\n]', newstreamname):
174 raise errors.AssertionError(
175 "Manifest stream names cannot contain whitespace")
176 self._current_stream_name = '.' if newstreamname=='' else newstreamname
178 def current_stream_name(self):
179 return self._current_stream_name
181 def finish_current_stream(self):
182 self.finish_current_file()
184 if len(self._current_stream_files) == 0:
186 elif self._current_stream_name == None:
187 raise errors.AssertionError(
188 "Cannot finish an unnamed stream (%d bytes in %d files)" %
189 (self._current_stream_length, len(self._current_stream_files)))
191 if len(self._current_stream_locators) == 0:
192 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
193 self._finished_streams += [[self._current_stream_name,
194 self._current_stream_locators,
195 self._current_stream_files]]
196 self._current_stream_files = []
197 self._current_stream_length = 0
198 self._current_stream_locators = []
199 self._current_stream_name = None
200 self._current_file_pos = 0
201 self._current_file_name = None
204 return Keep.put(self.manifest_text())
206 def manifest_text(self):
207 self.finish_current_stream()
209 for stream in self._finished_streams:
210 if not re.search(r'^\.(/.*)?$', stream[0]):
212 manifest += stream[0].replace(' ', '\\040')
213 for locator in stream[1]:
214 manifest += " %s" % locator
215 for sfile in stream[2]:
216 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
220 def data_locators(self):
222 for name, locators, files in self._finished_streams: