4 # --md5sum - display md5 of each file as read from disk
17 def parse_arguments(arguments):
18 parser = argparse.ArgumentParser(
19 description='Copy data from the local filesystem to Keep.')
21 parser.add_argument('paths', metavar='path', type=str, nargs='*',
23 Local file or directory. Default: read from standard input.
26 parser.add_argument('--max-manifest-depth', type=int, metavar='N',
28 Maximum depth of directory tree to represent in the manifest
29 structure. A directory structure deeper than this will be represented
30 as a single stream in the manifest. If N=0, the manifest will contain
31 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
32 stream per filesystem directory that contains files.
35 group = parser.add_mutually_exclusive_group()
37 group.add_argument('--as-stream', action='store_true', dest='stream',
42 group.add_argument('--stream', action='store_true',
44 Store the file content and display the resulting manifest on
45 stdout. Do not write the manifest to Keep or save a Collection object
49 group.add_argument('--as-manifest', action='store_true', dest='manifest',
51 Synonym for --manifest.
54 group.add_argument('--in-manifest', action='store_true', dest='manifest',
56 Synonym for --manifest.
59 group.add_argument('--manifest', action='store_true',
61 Store the file data and resulting manifest in Keep, save a Collection
62 object in Arvados, and display the manifest locator (Collection uuid)
63 on stdout. This is the default behavior.
66 group.add_argument('--as-raw', action='store_true', dest='raw',
71 group.add_argument('--raw', action='store_true',
73 Store the file content and display the data block locators on stdout,
74 separated by commas, with a trailing newline. Do not store a
78 parser.add_argument('--use-filename', type=str, default=None,
79 dest='filename', help="""
80 Synonym for --filename.
83 parser.add_argument('--filename', type=str, default=None,
85 Use the given filename in the manifest, instead of the name of the
86 local file. This is useful when "-" or "/dev/stdin" is given as an
87 input file. It can be used only if there is exactly one path given and
88 it is not a directory. Implies --manifest.
91 group = parser.add_mutually_exclusive_group()
92 group.add_argument('--progress', action='store_true',
94 Display human-readable progress on stderr (bytes and, if possible,
95 percentage of total data size). This is the default behavior when
99 group.add_argument('--no-progress', action='store_true',
101 Do not display human-readable progress on stderr, even if stderr is a
105 group.add_argument('--batch-progress', action='store_true',
107 Display machine-readable progress on stderr (bytes and, if known,
111 group = parser.add_mutually_exclusive_group()
112 group.add_argument('--resume', action='store_true', default=True,
114 Continue interrupted uploads from cached state (default).
116 group.add_argument('--no-resume', action='store_false', dest='resume',
118 Do not continue interrupted uploads from cached state.
121 args = parser.parse_args(arguments)
123 if len(args.paths) == 0:
124 args.paths += ['/dev/stdin']
126 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
129 --filename argument cannot be used when storing a directory or
133 # Turn on --progress by default if stderr is a tty.
134 if (not (args.batch_progress or args.no_progress)
135 and os.isatty(sys.stderr.fileno())):
138 if args.paths == ['-']:
139 args.paths = ['/dev/stdin']
140 if not args.filename:
145 class ResumeCacheConflict(Exception):
149 class ResumeCache(object):
150 CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
153 def setup_user_cache(cls):
155 os.makedirs(cls.CACHE_DIR)
156 except OSError as error:
157 if error.errno != errno.EEXIST:
160 os.chmod(cls.CACHE_DIR, 0o700)
162 def __init__(self, file_spec):
164 self.cache_file = open(file_spec, 'a+')
166 file_spec = self.make_path(file_spec)
167 self.cache_file = open(file_spec, 'a+')
168 self._lock_file(self.cache_file)
169 self.filename = self.cache_file.name
172 def make_path(cls, args):
174 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
175 realpaths = sorted(os.path.realpath(path) for path in args.paths)
176 md5.update(''.join(realpaths))
177 if any(os.path.isdir(path) for path in realpaths):
178 md5.update(str(max(args.max_manifest_depth, -1)))
180 md5.update(args.filename)
181 return os.path.join(cls.CACHE_DIR, md5.hexdigest())
183 def _lock_file(self, fileobj):
185 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
187 raise ResumeCacheConflict("{} locked".format(fileobj.name))
190 self.cache_file.seek(0)
191 return json.load(self.cache_file)
193 def save(self, data):
195 new_cache_fd, new_cache_name = tempfile.mkstemp(
196 dir=os.path.dirname(self.filename))
197 self._lock_file(new_cache_fd)
198 new_cache = os.fdopen(new_cache_fd, 'r+')
199 json.dump(data, new_cache)
200 os.rename(new_cache_name, self.filename)
201 except (IOError, OSError, ResumeCacheConflict) as error:
203 os.unlink(new_cache_name)
204 except NameError: # mkstemp failed.
207 self.cache_file.close()
208 self.cache_file = new_cache
211 self.cache_file.close()
215 os.unlink(self.filename)
216 except OSError as error:
217 if error.errno != errno.ENOENT: # That's what we wanted anyway.
223 self.__init__(self.filename)
226 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
227 STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
230 def __init__(self, cache=None, reporter=None, bytes_expected=None):
231 self.bytes_written = 0
232 self.__init_locals__(cache, reporter, bytes_expected)
233 super(ArvPutCollectionWriter, self).__init__()
235 def __init_locals__(self, cache, reporter, bytes_expected):
237 self.report_func = reporter
238 self.bytes_expected = bytes_expected
241 def from_cache(cls, cache, reporter=None, bytes_expected=None):
244 state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
245 writer = cls.from_state(state)
246 except (TypeError, ValueError,
247 arvados.errors.StaleWriterStateError) as error:
248 return cls(cache, reporter, bytes_expected)
250 writer.__init_locals__(cache, reporter, bytes_expected)
253 def checkpoint_state(self):
254 if self.cache is None:
256 state = self.dump_state()
257 # Transform attributes for serialization.
258 for attr, value in state.items():
259 if attr == '_data_buffer':
260 state[attr] = base64.encodestring(''.join(value))
261 elif hasattr(value, 'popleft'):
262 state[attr] = list(value)
263 self.cache.save(state)
265 def flush_data(self):
266 bytes_buffered = self._data_buffer_len
267 super(ArvPutCollectionWriter, self).flush_data()
268 self.bytes_written += (bytes_buffered - self._data_buffer_len)
269 if self.report_func is not None:
270 self.report_func(self.bytes_written, self.bytes_expected)
273 def expected_bytes_for(pathlist):
274 # Walk the given directory trees and stat files, adding up file sizes,
275 # so we can display progress as percent
277 for path in pathlist:
278 if os.path.isdir(path):
279 for filename in arvados.util.listdir_recursive(path):
280 bytesum += os.path.getsize(os.path.join(path, filename))
281 elif not os.path.isfile(path):
284 bytesum += os.path.getsize(path)
287 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
289 def machine_progress(bytes_written, bytes_expected):
290 return _machine_format.format(
291 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
293 def human_progress(bytes_written, bytes_expected):
295 return "\r{}M / {}M {:.1f}% ".format(
296 bytes_written >> 20, bytes_expected >> 20,
297 bytes_written / bytes_expected)
299 return "\r{} ".format(bytes_written)
301 def progress_writer(progress_func, outfile=sys.stderr):
302 def write_progress(bytes_written, bytes_expected):
303 outfile.write(progress_func(bytes_written, bytes_expected))
304 return write_progress
306 def main(arguments=None):
307 ResumeCache.setup_user_cache()
308 args = parse_arguments(arguments)
311 reporter = progress_writer(human_progress)
312 elif args.batch_progress:
313 reporter = progress_writer(machine_progress)
318 resume_cache = ResumeCache(args)
320 resume_cache.restart()
321 except ResumeCacheConflict:
322 print "arv-put: Another process is already uploading this data."
325 writer = ArvPutCollectionWriter.from_cache(
326 resume_cache, reporter, expected_bytes_for(args.paths))
328 # Copy file data to Keep.
329 for path in args.paths:
330 if os.path.isdir(path):
331 writer.write_directory_tree(
332 path, max_manifest_depth=args.max_manifest_depth)
334 writer.start_new_stream()
335 writer.write_file(path, args.filename or os.path.basename(path))
338 print writer.manifest_text(),
340 writer.finish_current_stream()
341 print ','.join(writer.data_locators())
343 # Register the resulting collection in Arvados.
344 arvados.api().collections().create(
346 'uuid': writer.finish(),
347 'manifest_text': writer.manifest_text(),
351 # Print the locator (uuid) of the new collection.
352 print writer.finish()
353 resume_cache.destroy()
355 if __name__ == '__main__':