Added basic unit test for fuse mount.
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from keep import *
22 from stream import *
23 import config
24 import errors
25
26 def normalize_stream(s, stream):
27     stream_tokens = [s]
28     sortedfiles = list(stream.keys())
29     sortedfiles.sort()
30
31     blocks = {}
32     streamoffset = 0L
33     for f in sortedfiles:
34         for b in stream[f]:
35             if b[arvados.LOCATOR] not in blocks:
36                 stream_tokens.append(b[arvados.LOCATOR])
37                 blocks[b[arvados.LOCATOR]] = streamoffset
38                 streamoffset += b[arvados.BLOCKSIZE]
39
40     for f in sortedfiles:
41         current_span = None
42         fout = f.replace(' ', '\\040')
43         for segment in stream[f]:
44             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
45             if current_span == None:
46                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
47             else:
48                 if segmentoffset == current_span[1]:
49                     current_span[1] += segment[arvados.SEGMENTSIZE]
50                 else:
51                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
52                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
53
54         if current_span != None:
55             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
56
57         if len(stream[f]) == 0:
58             stream_tokens.append("0:0:{0}".format(fout))            
59
60     return stream_tokens
61     
62
63 def normalize(collection):
64     streams = {}
65     for s in collection.all_streams():
66         for f in s.all_files():
67             filestream = s.name() + "/" + f.name()
68             r = filestream.rindex("/")
69             streamname = filestream[:r]
70             filename = filestream[r+1:]
71             if streamname not in streams:
72                 streams[streamname] = {}
73             if filename not in streams[streamname]:
74                 streams[streamname][filename] = []
75             for r in f.segments:
76                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
77
78     normalized_streams = []
79     sortedstreams = list(streams.keys())
80     sortedstreams.sort()
81     for s in sortedstreams:
82         normalized_streams.append(normalize_stream(s, streams[s]))
83     return normalized_streams
84
85
86 class CollectionReader(object):
87     def __init__(self, manifest_locator_or_text):
88         if re.search(r'^[a-f0-9]{32}\+\d+(\+\S)*$', manifest_locator_or_text):
89             self._manifest_locator = manifest_locator_or_text
90             self._manifest_text = None
91         else:
92             self._manifest_text = manifest_locator_or_text
93             self._manifest_locator = None
94         self._streams = None
95
96     def __enter__(self):
97         pass
98
99     def __exit__(self):
100         pass
101
102     def _populate(self):
103         if self._streams != None:
104             return
105         if not self._manifest_text:
106             try:
107                 c = arvados.api('v1').collections().get(
108                     uuid=self._manifest_locator).execute()
109                 self._manifest_text = c['manifest_text']
110             except Exception as e:
111                 logging.warning("API lookup failed for collection %s (%s: %s)" %
112                                 (self._manifest_locator, type(e), str(e)))
113                 self._manifest_text = Keep.get(self._manifest_locator)
114         self._streams = []
115         for stream_line in self._manifest_text.split("\n"):
116             if stream_line != '':
117                 stream_tokens = stream_line.split()
118                 self._streams += [stream_tokens]
119         self._streams = normalize(self)
120
121         # now regenerate the manifest text based on the normalized stream
122
123         #print "normalizing", self._manifest_text
124         self._manifest_text = ''
125         for stream in self._streams:
126             self._manifest_text += stream[0].replace(' ', '\\040')
127             for t in stream[1:]:
128                 self._manifest_text += (" " + t.replace(' ', '\\040'))
129             self._manifest_text += "\n"
130         #print "result     ", self._manifest_text
131
132     def all_streams(self):
133         self._populate()
134         resp = []
135         for s in self._streams:
136             resp.append(StreamReader(s))
137         return resp
138
139     def all_files(self):
140         for s in self.all_streams():
141             for f in s.all_files():
142                 yield f
143
144     def manifest_text(self):
145         self._populate()
146         return self._manifest_text
147
148 class CollectionWriter(object):
149     KEEP_BLOCK_SIZE = 2**26
150
151     def __init__(self):
152         self._data_buffer = []
153         self._data_buffer_len = 0
154         self._current_stream_files = []
155         self._current_stream_length = 0
156         self._current_stream_locators = []
157         self._current_stream_name = '.'
158         self._current_file_name = None
159         self._current_file_pos = 0
160         self._finished_streams = []
161
162     def __enter__(self):
163         pass
164
165     def __exit__(self):
166         self.finish()
167
168     def write_directory_tree(self,
169                              path, stream_name='.', max_manifest_depth=-1):
170         self.start_new_stream(stream_name)
171         todo = []
172         if max_manifest_depth == 0:
173             dirents = sorted(util.listdir_recursive(path))
174         else:
175             dirents = sorted(os.listdir(path))
176         for dirent in dirents:
177             target = os.path.join(path, dirent)
178             if os.path.isdir(target):
179                 todo += [[target,
180                           os.path.join(stream_name, dirent),
181                           max_manifest_depth-1]]
182             else:
183                 self.start_new_file(dirent)
184                 with open(target, 'rb') as f:
185                     while True:
186                         buf = f.read(2**26)
187                         if len(buf) == 0:
188                             break
189                         self.write(buf)
190         self.finish_current_stream()
191         map(lambda x: self.write_directory_tree(*x), todo)
192
193     def write(self, newdata):
194         if hasattr(newdata, '__iter__'):
195             for s in newdata:
196                 self.write(s)
197             return
198         self._data_buffer += [newdata]
199         self._data_buffer_len += len(newdata)
200         self._current_stream_length += len(newdata)
201         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
202             self.flush_data()
203
204     def flush_data(self):
205         data_buffer = ''.join(self._data_buffer)
206         if data_buffer != '':
207             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
208             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
209             self._data_buffer_len = len(self._data_buffer[0])
210
211     def start_new_file(self, newfilename=None):
212         self.finish_current_file()
213         self.set_current_file_name(newfilename)
214
215     def set_current_file_name(self, newfilename):
216         if re.search(r'[\t\n]', newfilename):
217             raise errors.AssertionError(
218                 "Manifest filenames cannot contain whitespace: %s" %
219                 newfilename)
220         self._current_file_name = newfilename
221
222     def current_file_name(self):
223         return self._current_file_name
224
225     def finish_current_file(self):
226         if self._current_file_name == None:
227             if self._current_file_pos == self._current_stream_length:
228                 return
229             raise errors.AssertionError(
230                 "Cannot finish an unnamed file " +
231                 "(%d bytes at offset %d in '%s' stream)" %
232                 (self._current_stream_length - self._current_file_pos,
233                  self._current_file_pos,
234                  self._current_stream_name))
235         self._current_stream_files += [[self._current_file_pos,
236                                        self._current_stream_length - self._current_file_pos,
237                                        self._current_file_name]]
238         self._current_file_pos = self._current_stream_length
239
240     def start_new_stream(self, newstreamname='.'):
241         self.finish_current_stream()
242         self.set_current_stream_name(newstreamname)
243
244     def set_current_stream_name(self, newstreamname):
245         if re.search(r'[\t\n]', newstreamname):
246             raise errors.AssertionError(
247                 "Manifest stream names cannot contain whitespace")
248         self._current_stream_name = '.' if newstreamname=='' else newstreamname
249
250     def current_stream_name(self):
251         return self._current_stream_name
252
253     def finish_current_stream(self):
254         self.finish_current_file()
255         self.flush_data()
256         if len(self._current_stream_files) == 0:
257             pass
258         elif self._current_stream_name == None:
259             raise errors.AssertionError(
260                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
261                 (self._current_stream_length, len(self._current_stream_files)))
262         else:
263             if len(self._current_stream_locators) == 0:
264                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
265             self._finished_streams += [[self._current_stream_name,
266                                        self._current_stream_locators,
267                                        self._current_stream_files]]
268         self._current_stream_files = []
269         self._current_stream_length = 0
270         self._current_stream_locators = []
271         self._current_stream_name = None
272         self._current_file_pos = 0
273         self._current_file_name = None
274
275     def finish(self):
276         return Keep.put(self.manifest_text())
277
278
279     def manifest_text(self):
280         self.finish_current_stream()
281         manifest = ''
282         for stream in self._finished_streams:
283             if not re.search(r'^\.(/.*)?$', stream[0]):
284                 manifest += './'
285             manifest += stream[0].replace(' ', '\\040')
286             for locator in stream[1]:
287                 manifest += " %s" % locator
288             for sfile in stream[2]:
289                 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
290             manifest += "\n"
291         return CollectionReader(manifest).manifest_text()
292
293     def data_locators(self):
294         ret = []
295         for name, locators, files in self._finished_streams:
296             ret += locators
297         return ret