Work in progress
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from keep import *
22 from stream import *
23 import config
24 import errors
25
26 def normalize(collection):
27     streams = {}
28     for s in collection.all_streams():
29         for f in s.all_files():
30             filestream = s.name() + "/" + f.name()
31             r = filestream.rindex("/")
32             streamname = filestream[:r]
33             filename = filestream[r+1:]
34             if streamname not in streams:
35                 streams[streamname] = {}
36             if filename not in streams[streamname]:
37                 streams[streamname][filename] = []
38             streams[streamname][filename].extend(s.locators_and_ranges(f.stream_offset(), f.size()))
39
40     normalized_streams = []
41     sortedstreams = list(streams.keys())
42     sortedstreams.sort()
43     for s in sortedstreams:
44         stream = streams[s]
45         stream_tokens = [s]
46
47         sortedfiles = list(stream.keys())
48         sortedfiles.sort()
49
50         blocks = {}
51         streamoffset = 0L
52         for f in sortedfiles:
53             for b in stream[f]:
54                 if b[StreamReader.LOCATOR] not in blocks:
55                     stream_tokens.append(b[StreamReader.LOCATOR])
56                     blocks[b[StreamReader.LOCATOR]] = streamoffset
57                     streamoffset += b[StreamReader.BLOCKSIZE]
58
59         for f in sortedfiles:
60             current_span = None
61             fout = f.replace(' ', '\\040')
62             for chunk in stream[f]:
63                 chunkoffset = blocks[chunk[StreamReader.LOCATOR]] + chunk[StreamReader.OFFSET]
64                 if current_span == None:
65                     current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]]
66                 else:
67                     if chunkoffset == current_span[1]:
68                         current_span[1] += chunk[StreamReader.CHUNKSIZE]
69                     else:
70                         stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
71                         current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]]
72
73             if current_span != None:
74                 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
75
76         normalized_streams.append(stream_tokens)
77     return normalized_streams
78
79
80 class CollectionReader(object):
81     def __init__(self, manifest_locator_or_text):
82         if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
83             self._manifest_text = manifest_locator_or_text
84             self._manifest_locator = None
85         else:
86             self._manifest_locator = manifest_locator_or_text
87             self._manifest_text = None
88         self._streams = None
89
90     def __enter__(self):
91         pass
92
93     def __exit__(self):
94         pass
95
96     def _populate(self):
97         if self._streams != None:
98             return
99         if not self._manifest_text:
100             try:
101                 c = arvados.api('v1').collections().get(
102                     uuid=self._manifest_locator).execute()
103                 self._manifest_text = c['manifest_text']
104             except Exception as e:
105                 logging.warning("API lookup failed for collection %s (%s: %s)" %
106                                 (self._manifest_locator, type(e), str(e)))
107                 self._manifest_text = Keep.get(self._manifest_locator)
108         self._streams = []
109         for stream_line in self._manifest_text.split("\n"):
110             if stream_line != '':
111                 stream_tokens = stream_line.split()
112                 self._streams += [stream_tokens]
113         self._streams = normalize(self)
114
115         # now regenerate the manifest text based on the normalized stream
116
117         #print "normalizing", self._manifest_text        
118         self._manifest_text = ''
119         for stream in self._streams:
120             self._manifest_text += stream[0].replace(' ', '\\040')
121             for t in stream[1:]:
122                 self._manifest_text += (" " + t.replace(' ', '\\040'))
123             self._manifest_text += "\n"
124         #print "result     ", self._manifest_text
125
126     def all_streams(self):
127         self._populate()
128         resp = []
129         for s in self._streams:
130             resp.append(StreamReader(s))
131         return resp
132
133     def all_files(self):
134         for s in self.all_streams():
135             for f in s.all_files():
136                 yield f
137
138     def manifest_text(self):
139         self._populate()
140         return self._manifest_text
141
142 class CollectionWriter(object):
143     KEEP_BLOCK_SIZE = 2**26
144
145     def __init__(self):
146         self._data_buffer = []
147         self._data_buffer_len = 0
148         self._current_stream_files = []
149         self._current_stream_length = 0
150         self._current_stream_locators = []
151         self._current_stream_name = '.'
152         self._current_file_name = None
153         self._current_file_pos = 0
154         self._finished_streams = []
155
156     def __enter__(self):
157         pass
158
159     def __exit__(self):
160         self.finish()
161
162     def write_directory_tree(self,
163                              path, stream_name='.', max_manifest_depth=-1):
164         self.start_new_stream(stream_name)
165         todo = []
166         if max_manifest_depth == 0:
167             dirents = sorted(util.listdir_recursive(path))
168         else:
169             dirents = sorted(os.listdir(path))
170         for dirent in dirents:
171             target = os.path.join(path, dirent)
172             if os.path.isdir(target):
173                 todo += [[target,
174                           os.path.join(stream_name, dirent),
175                           max_manifest_depth-1]]
176             else:
177                 self.start_new_file(dirent)
178                 with open(target, 'rb') as f:
179                     while True:
180                         buf = f.read(2**26)
181                         if len(buf) == 0:
182                             break
183                         self.write(buf)
184         self.finish_current_stream()
185         map(lambda x: self.write_directory_tree(*x), todo)
186
187     def write(self, newdata):
188         if hasattr(newdata, '__iter__'):
189             for s in newdata:
190                 self.write(s)
191             return
192         self._data_buffer += [newdata]
193         self._data_buffer_len += len(newdata)
194         self._current_stream_length += len(newdata)
195         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
196             self.flush_data()
197
198     def flush_data(self):
199         data_buffer = ''.join(self._data_buffer)
200         if data_buffer != '':
201             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
202             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
203             self._data_buffer_len = len(self._data_buffer[0])
204
205     def start_new_file(self, newfilename=None):
206         self.finish_current_file()
207         self.set_current_file_name(newfilename)
208
209     def set_current_file_name(self, newfilename):
210         if re.search(r'[\t\n]', newfilename):
211             raise errors.AssertionError(
212                 "Manifest filenames cannot contain whitespace: %s" %
213                 newfilename)
214         self._current_file_name = newfilename
215
216     def current_file_name(self):
217         return self._current_file_name
218
219     def finish_current_file(self):
220         if self._current_file_name == None:
221             if self._current_file_pos == self._current_stream_length:
222                 return
223             raise errors.AssertionError(
224                 "Cannot finish an unnamed file " +
225                 "(%d bytes at offset %d in '%s' stream)" %
226                 (self._current_stream_length - self._current_file_pos,
227                  self._current_file_pos,
228                  self._current_stream_name))
229         self._current_stream_files += [[self._current_file_pos,
230                                        self._current_stream_length - self._current_file_pos,
231                                        self._current_file_name]]
232         self._current_file_pos = self._current_stream_length
233
234     def start_new_stream(self, newstreamname='.'):
235         self.finish_current_stream()
236         self.set_current_stream_name(newstreamname)
237
238     def set_current_stream_name(self, newstreamname):
239         if re.search(r'[\t\n]', newstreamname):
240             raise errors.AssertionError(
241                 "Manifest stream names cannot contain whitespace")
242         self._current_stream_name = '.' if newstreamname=='' else newstreamname
243
244     def current_stream_name(self):
245         return self._current_stream_name
246
247     def finish_current_stream(self):
248         self.finish_current_file()
249         self.flush_data()
250         if len(self._current_stream_files) == 0:
251             pass
252         elif self._current_stream_name == None:
253             raise errors.AssertionError(
254                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
255                 (self._current_stream_length, len(self._current_stream_files)))
256         else:
257             if len(self._current_stream_locators) == 0:
258                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
259             self._finished_streams += [[self._current_stream_name,
260                                        self._current_stream_locators,
261                                        self._current_stream_files]]
262         self._current_stream_files = []
263         self._current_stream_length = 0
264         self._current_stream_locators = []
265         self._current_stream_name = None
266         self._current_file_pos = 0
267         self._current_file_name = None
268
269     def finish(self):
270         return Keep.put(self.manifest_text())
271
272
273     def manifest_text(self):
274         self.finish_current_stream()
275         manifest = ''
276         for stream in self._finished_streams:
277             if not re.search(r'^\.(/.*)?$', stream[0]):
278                 manifest += './'
279             manifest += stream[0].replace(' ', '\\040')
280             for locator in stream[1]:
281                 manifest += " %s" % locator
282             for sfile in stream[2]:
283                 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
284             manifest += "\n"
285         return CollectionReader(manifest).manifest_text()
286
287     def data_locators(self):
288         ret = []
289         for name, locators, files in self._finished_streams:
290             ret += locators
291         return ret