26 class CollectionReader(object):
27 def __init__(self, manifest_locator_or_text):
28 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
29 self._manifest_text = manifest_locator_or_text
30 self._manifest_locator = None
32 self._manifest_locator = manifest_locator_or_text
33 self._manifest_text = None
43 if self._streams != None:
45 if not self._manifest_text:
46 self._manifest_text = Keep.get(self._manifest_locator)
48 for stream_line in self._manifest_text.split("\n"):
50 stream_tokens = stream_line.split()
51 self._streams += [stream_tokens]
53 def all_streams(self):
56 for s in self._streams:
57 resp += [StreamReader(s)]
61 for s in self.all_streams():
62 for f in s.all_files():
65 def manifest_text(self):
67 return self._manifest_text
69 class CollectionWriter(object):
70 KEEP_BLOCK_SIZE = 2**26
73 self._data_buffer = []
74 self._data_buffer_len = 0
75 self._current_stream_files = []
76 self._current_stream_length = 0
77 self._current_stream_locators = []
78 self._current_stream_name = '.'
79 self._current_file_name = None
80 self._current_file_pos = 0
81 self._finished_streams = []
89 def write_directory_tree(self,
90 path, stream_name='.', max_manifest_depth=-1):
91 self.start_new_stream(stream_name)
93 if max_manifest_depth == 0:
94 dirents = sorted(util.listdir_recursive(path))
96 dirents = sorted(os.listdir(path))
97 for dirent in dirents:
98 target = os.path.join(path, dirent)
99 if os.path.isdir(target):
101 os.path.join(stream_name, dirent),
102 max_manifest_depth-1]]
104 self.start_new_file(dirent)
105 with open(target, 'rb') as f:
111 self.finish_current_stream()
112 map(lambda x: self.write_directory_tree(*x), todo)
114 def write(self, newdata):
115 if hasattr(newdata, '__iter__'):
119 self._data_buffer += [newdata]
120 self._data_buffer_len += len(newdata)
121 self._current_stream_length += len(newdata)
122 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
125 def flush_data(self):
126 data_buffer = ''.join(self._data_buffer)
127 if data_buffer != '':
128 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
129 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
130 self._data_buffer_len = len(self._data_buffer[0])
132 def start_new_file(self, newfilename=None):
133 self.finish_current_file()
134 self.set_current_file_name(newfilename)
136 def set_current_file_name(self, newfilename):
137 if re.search(r'[\t\n]', newfilename):
138 raise errors.AssertionError(
139 "Manifest filenames cannot contain whitespace: %s" %
141 self._current_file_name = newfilename
143 def current_file_name(self):
144 return self._current_file_name
146 def finish_current_file(self):
147 if self._current_file_name == None:
148 if self._current_file_pos == self._current_stream_length:
150 raise errors.AssertionError(
151 "Cannot finish an unnamed file " +
152 "(%d bytes at offset %d in '%s' stream)" %
153 (self._current_stream_length - self._current_file_pos,
154 self._current_file_pos,
155 self._current_stream_name))
156 self._current_stream_files += [[self._current_file_pos,
157 self._current_stream_length - self._current_file_pos,
158 self._current_file_name]]
159 self._current_file_pos = self._current_stream_length
161 def start_new_stream(self, newstreamname='.'):
162 self.finish_current_stream()
163 self.set_current_stream_name(newstreamname)
165 def set_current_stream_name(self, newstreamname):
166 if re.search(r'[\t\n]', newstreamname):
167 raise errors.AssertionError(
168 "Manifest stream names cannot contain whitespace")
169 self._current_stream_name = '.' if newstreamname=='' else newstreamname
171 def current_stream_name(self):
172 return self._current_stream_name
174 def finish_current_stream(self):
175 self.finish_current_file()
177 if len(self._current_stream_files) == 0:
179 elif self._current_stream_name == None:
180 raise errors.AssertionError(
181 "Cannot finish an unnamed stream (%d bytes in %d files)" %
182 (self._current_stream_length, len(self._current_stream_files)))
184 if len(self._current_stream_locators) == 0:
185 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
186 self._finished_streams += [[self._current_stream_name,
187 self._current_stream_locators,
188 self._current_stream_files]]
189 self._current_stream_files = []
190 self._current_stream_length = 0
191 self._current_stream_locators = []
192 self._current_stream_name = None
193 self._current_file_pos = 0
194 self._current_file_name = None
197 return Keep.put(self.manifest_text())
199 def manifest_text(self):
200 self.finish_current_stream()
202 for stream in self._finished_streams:
203 if not re.search(r'^\.(/.*)?$', stream[0]):
205 manifest += stream[0].replace(' ', '\\040')
206 for locator in stream[1]:
207 manifest += " %s" % locator
208 for sfile in stream[2]:
209 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
213 def data_locators(self):
215 for name, locators, files in self._finished_streams: