Merge branch '1977-provenance-report' of git.clinicalfuture.com:arvados into 1977...
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from keep import *
22 from stream import *
23 import config
24 import errors
25
26 class CollectionReader(object):
27     def __init__(self, manifest_locator_or_text):
28         if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
29             self._manifest_text = manifest_locator_or_text
30             self._manifest_locator = None
31         else:
32             self._manifest_locator = manifest_locator_or_text
33             self._manifest_text = None
34         self._streams = None
35
36     def __enter__(self):
37         pass
38
39     def __exit__(self):
40         pass
41
42     def _populate(self):
43         if self._streams != None:
44             return
45         if not self._manifest_text:
46             self._manifest_text = Keep.get(self._manifest_locator)
47         self._streams = []
48         for stream_line in self._manifest_text.split("\n"):
49             if stream_line != '':
50                 stream_tokens = stream_line.split()
51                 self._streams += [stream_tokens]
52
53     def all_streams(self):
54         self._populate()
55         resp = []
56         for s in self._streams:
57             resp += [StreamReader(s)]
58         return resp
59
60     def all_files(self):
61         for s in self.all_streams():
62             for f in s.all_files():
63                 yield f
64
65     def manifest_text(self):
66         self._populate()
67         return self._manifest_text
68
69 class CollectionWriter(object):
70     KEEP_BLOCK_SIZE = 2**26
71
72     def __init__(self):
73         self._data_buffer = []
74         self._data_buffer_len = 0
75         self._current_stream_files = []
76         self._current_stream_length = 0
77         self._current_stream_locators = []
78         self._current_stream_name = '.'
79         self._current_file_name = None
80         self._current_file_pos = 0
81         self._finished_streams = []
82
83     def __enter__(self):
84         pass
85
86     def __exit__(self):
87         self.finish()
88
89     def write_directory_tree(self,
90                              path, stream_name='.', max_manifest_depth=-1):
91         self.start_new_stream(stream_name)
92         todo = []
93         if max_manifest_depth == 0:
94             dirents = sorted(util.listdir_recursive(path))
95         else:
96             dirents = sorted(os.listdir(path))
97         for dirent in dirents:
98             target = os.path.join(path, dirent)
99             if os.path.isdir(target):
100                 todo += [[target,
101                           os.path.join(stream_name, dirent),
102                           max_manifest_depth-1]]
103             else:
104                 self.start_new_file(dirent)
105                 with open(target, 'rb') as f:
106                     while True:
107                         buf = f.read(2**26)
108                         if len(buf) == 0:
109                             break
110                         self.write(buf)
111         self.finish_current_stream()
112         map(lambda x: self.write_directory_tree(*x), todo)
113
114     def write(self, newdata):
115         if hasattr(newdata, '__iter__'):
116             for s in newdata:
117                 self.write(s)
118             return
119         self._data_buffer += [newdata]
120         self._data_buffer_len += len(newdata)
121         self._current_stream_length += len(newdata)
122         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
123             self.flush_data()
124
125     def flush_data(self):
126         data_buffer = ''.join(self._data_buffer)
127         if data_buffer != '':
128             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
129             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
130             self._data_buffer_len = len(self._data_buffer[0])
131
132     def start_new_file(self, newfilename=None):
133         self.finish_current_file()
134         self.set_current_file_name(newfilename)
135
136     def set_current_file_name(self, newfilename):
137         if re.search(r'[\t\n]', newfilename):
138             raise errors.AssertionError(
139                 "Manifest filenames cannot contain whitespace: %s" %
140                 newfilename)
141         self._current_file_name = newfilename
142
143     def current_file_name(self):
144         return self._current_file_name
145
146     def finish_current_file(self):
147         if self._current_file_name == None:
148             if self._current_file_pos == self._current_stream_length:
149                 return
150             raise errors.AssertionError(
151                 "Cannot finish an unnamed file " +
152                 "(%d bytes at offset %d in '%s' stream)" %
153                 (self._current_stream_length - self._current_file_pos,
154                  self._current_file_pos,
155                  self._current_stream_name))
156         self._current_stream_files += [[self._current_file_pos,
157                                        self._current_stream_length - self._current_file_pos,
158                                        self._current_file_name]]
159         self._current_file_pos = self._current_stream_length
160
161     def start_new_stream(self, newstreamname='.'):
162         self.finish_current_stream()
163         self.set_current_stream_name(newstreamname)
164
165     def set_current_stream_name(self, newstreamname):
166         if re.search(r'[\t\n]', newstreamname):
167             raise errors.AssertionError(
168                 "Manifest stream names cannot contain whitespace")
169         self._current_stream_name = '.' if newstreamname=='' else newstreamname
170
171     def current_stream_name(self):
172         return self._current_stream_name
173
174     def finish_current_stream(self):
175         self.finish_current_file()
176         self.flush_data()
177         if len(self._current_stream_files) == 0:
178             pass
179         elif self._current_stream_name == None:
180             raise errors.AssertionError(
181                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
182                 (self._current_stream_length, len(self._current_stream_files)))
183         else:
184             if len(self._current_stream_locators) == 0:
185                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
186             self._finished_streams += [[self._current_stream_name,
187                                        self._current_stream_locators,
188                                        self._current_stream_files]]
189         self._current_stream_files = []
190         self._current_stream_length = 0
191         self._current_stream_locators = []
192         self._current_stream_name = None
193         self._current_file_pos = 0
194         self._current_file_name = None
195
196     def finish(self):
197         return Keep.put(self.manifest_text())
198
199     def manifest_text(self):
200         self.finish_current_stream()
201         manifest = ''
202         for stream in self._finished_streams:
203             if not re.search(r'^\.(/.*)?$', stream[0]):
204                 manifest += './'
205             manifest += stream[0].replace(' ', '\\040')
206             for locator in stream[1]:
207                 manifest += " %s" % locator
208             for sfile in stream[2]:
209                 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
210             manifest += "\n"
211         return manifest
212
213     def data_locators(self):
214         ret = []
215         for name, locators, files in self._finished_streams:
216             ret += locators
217         return ret