Merge branch '1911-python-sdk-pydoc'
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from stream import *
22 from keep import *
23
24 class CollectionReader(object):
25     def __init__(self, manifest_locator_or_text):
26         if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
27             self._manifest_text = manifest_locator_or_text
28             self._manifest_locator = None
29         else:
30             self._manifest_locator = manifest_locator_or_text
31             self._manifest_text = None
32         self._streams = None
33
34     def __enter__(self):
35         pass
36
37     def __exit__(self):
38         pass
39
40     def _populate(self):
41         if self._streams != None:
42             return
43         if not self._manifest_text:
44             self._manifest_text = Keep.get(self._manifest_locator)
45         self._streams = []
46         for stream_line in self._manifest_text.split("\n"):
47             if stream_line != '':
48                 stream_tokens = stream_line.split()
49                 self._streams += [stream_tokens]
50
51     def all_streams(self):
52         self._populate()
53         resp = []
54         for s in self._streams:
55             resp += [StreamReader(s)]
56         return resp
57
58     def all_files(self):
59         for s in self.all_streams():
60             for f in s.all_files():
61                 yield f
62
63     def manifest_text(self):
64         self._populate()
65         return self._manifest_text
66
67 class CollectionWriter(object):
68     KEEP_BLOCK_SIZE = 2**26
69
70     def __init__(self):
71         self._data_buffer = []
72         self._data_buffer_len = 0
73         self._current_stream_files = []
74         self._current_stream_length = 0
75         self._current_stream_locators = []
76         self._current_stream_name = '.'
77         self._current_file_name = None
78         self._current_file_pos = 0
79         self._finished_streams = []
80
81     def __enter__(self):
82         pass
83
84     def __exit__(self):
85         self.finish()
86
87     def write_directory_tree(self,
88                              path, stream_name='.', max_manifest_depth=-1):
89         self.start_new_stream(stream_name)
90         todo = []
91         if max_manifest_depth == 0:
92             dirents = sorted(util.listdir_recursive(path))
93         else:
94             dirents = sorted(os.listdir(path))
95         for dirent in dirents:
96             target = os.path.join(path, dirent)
97             if os.path.isdir(target):
98                 todo += [[target,
99                           os.path.join(stream_name, dirent),
100                           max_manifest_depth-1]]
101             else:
102                 self.start_new_file(dirent)
103                 with open(target, 'rb') as f:
104                     while True:
105                         buf = f.read(2**26)
106                         if len(buf) == 0:
107                             break
108                         self.write(buf)
109         self.finish_current_stream()
110         map(lambda x: self.write_directory_tree(*x), todo)
111
112     def write(self, newdata):
113         if hasattr(newdata, '__iter__'):
114             for s in newdata:
115                 self.write(s)
116             return
117         self._data_buffer += [newdata]
118         self._data_buffer_len += len(newdata)
119         self._current_stream_length += len(newdata)
120         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
121             self.flush_data()
122
123     def flush_data(self):
124         data_buffer = ''.join(self._data_buffer)
125         if data_buffer != '':
126             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
127             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
128             self._data_buffer_len = len(self._data_buffer[0])
129
130     def start_new_file(self, newfilename=None):
131         self.finish_current_file()
132         self.set_current_file_name(newfilename)
133
134     def set_current_file_name(self, newfilename):
135         if re.search(r'[\t\n]', newfilename):
136             raise errors.AssertionError(
137                 "Manifest filenames cannot contain whitespace: %s" %
138                 newfilename)
139         self._current_file_name = newfilename
140
141     def current_file_name(self):
142         return self._current_file_name
143
144     def finish_current_file(self):
145         if self._current_file_name == None:
146             if self._current_file_pos == self._current_stream_length:
147                 return
148             raise errors.AssertionError(
149                 "Cannot finish an unnamed file " +
150                 "(%d bytes at offset %d in '%s' stream)" %
151                 (self._current_stream_length - self._current_file_pos,
152                  self._current_file_pos,
153                  self._current_stream_name))
154         self._current_stream_files += [[self._current_file_pos,
155                                        self._current_stream_length - self._current_file_pos,
156                                        self._current_file_name]]
157         self._current_file_pos = self._current_stream_length
158
159     def start_new_stream(self, newstreamname='.'):
160         self.finish_current_stream()
161         self.set_current_stream_name(newstreamname)
162
163     def set_current_stream_name(self, newstreamname):
164         if re.search(r'[\t\n]', newstreamname):
165             raise errors.AssertionError(
166                 "Manifest stream names cannot contain whitespace")
167         self._current_stream_name = '.' if newstreamname=='' else newstreamname
168
169     def current_stream_name(self):
170         return self._current_stream_name
171
172     def finish_current_stream(self):
173         self.finish_current_file()
174         self.flush_data()
175         if len(self._current_stream_files) == 0:
176             pass
177         elif self._current_stream_name == None:
178             raise errors.AssertionError(
179                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
180                 (self._current_stream_length, len(self._current_stream_files)))
181         else:
182             if len(self._current_stream_locators) == 0:
183                 self._current_stream_locators += [EMPTY_BLOCK_LOCATOR]
184             self._finished_streams += [[self._current_stream_name,
185                                        self._current_stream_locators,
186                                        self._current_stream_files]]
187         self._current_stream_files = []
188         self._current_stream_length = 0
189         self._current_stream_locators = []
190         self._current_stream_name = None
191         self._current_file_pos = 0
192         self._current_file_name = None
193
194     def finish(self):
195         return Keep.put(self.manifest_text())
196
197     def manifest_text(self):
198         self.finish_current_stream()
199         manifest = ''
200         for stream in self._finished_streams:
201             if not re.search(r'^\.(/.*)?$', stream[0]):
202                 manifest += './'
203             manifest += stream[0].replace(' ', '\\040')
204             for locator in stream[1]:
205                 manifest += " %s" % locator
206             for sfile in stream[2]:
207                 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
208             manifest += "\n"
209         return manifest
210
211     def data_locators(self):
212         ret = []
213         for name, locators, files in self._finished_streams:
214             ret += locators
215         return ret