4 # --md5sum - display md5 of each file as read from disk
18 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
20 def parse_arguments(arguments):
21 parser = argparse.ArgumentParser(
22 description='Copy data from the local filesystem to Keep.')
24 parser.add_argument('paths', metavar='path', type=str, nargs='*',
26 Local file or directory. Default: read from standard input.
29 parser.add_argument('--max-manifest-depth', type=int, metavar='N',
31 Maximum depth of directory tree to represent in the manifest
32 structure. A directory structure deeper than this will be represented
33 as a single stream in the manifest. If N=0, the manifest will contain
34 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
35 stream per filesystem directory that contains files.
38 group = parser.add_mutually_exclusive_group()
40 group.add_argument('--as-stream', action='store_true', dest='stream',
45 group.add_argument('--stream', action='store_true',
47 Store the file content and display the resulting manifest on
48 stdout. Do not write the manifest to Keep or save a Collection object
52 group.add_argument('--as-manifest', action='store_true', dest='manifest',
54 Synonym for --manifest.
57 group.add_argument('--in-manifest', action='store_true', dest='manifest',
59 Synonym for --manifest.
62 group.add_argument('--manifest', action='store_true',
64 Store the file data and resulting manifest in Keep, save a Collection
65 object in Arvados, and display the manifest locator (Collection uuid)
66 on stdout. This is the default behavior.
69 group.add_argument('--as-raw', action='store_true', dest='raw',
74 group.add_argument('--raw', action='store_true',
76 Store the file content and display the data block locators on stdout,
77 separated by commas, with a trailing newline. Do not store a
81 parser.add_argument('--use-filename', type=str, default=None,
82 dest='filename', help="""
83 Synonym for --filename.
86 parser.add_argument('--filename', type=str, default=None,
88 Use the given filename in the manifest, instead of the name of the
89 local file. This is useful when "-" or "/dev/stdin" is given as an
90 input file. It can be used only if there is exactly one path given and
91 it is not a directory. Implies --manifest.
94 group = parser.add_mutually_exclusive_group()
95 group.add_argument('--progress', action='store_true',
97 Display human-readable progress on stderr (bytes and, if possible,
98 percentage of total data size). This is the default behavior when
102 group.add_argument('--no-progress', action='store_true',
104 Do not display human-readable progress on stderr, even if stderr is a
108 group.add_argument('--batch-progress', action='store_true',
110 Display machine-readable progress on stderr (bytes and, if known,
114 group = parser.add_mutually_exclusive_group()
115 group.add_argument('--resume', action='store_true', default=True,
117 Continue interrupted uploads from cached state (default).
119 group.add_argument('--no-resume', action='store_false', dest='resume',
121 Do not continue interrupted uploads from cached state.
124 args = parser.parse_args(arguments)
126 if len(args.paths) == 0:
127 args.paths += ['/dev/stdin']
129 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
132 --filename argument cannot be used when storing a directory or
136 # Turn on --progress by default if stderr is a tty.
137 if (not (args.batch_progress or args.no_progress)
138 and os.isatty(sys.stderr.fileno())):
141 if args.paths == ['-']:
142 args.paths = ['/dev/stdin']
143 if not args.filename:
148 class ResumeCacheConflict(Exception):
152 class ResumeCache(object):
153 CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
156 def setup_user_cache(cls):
158 os.makedirs(cls.CACHE_DIR)
159 except OSError as error:
160 if error.errno != errno.EEXIST:
163 os.chmod(cls.CACHE_DIR, 0o700)
165 def __init__(self, file_spec):
166 self.cache_file = open(file_spec, 'a+')
167 self._lock_file(self.cache_file)
168 self.filename = self.cache_file.name
171 def make_path(cls, args):
173 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
174 realpaths = sorted(os.path.realpath(path) for path in args.paths)
175 md5.update('\0'.join(realpaths))
176 if any(os.path.isdir(path) for path in realpaths):
177 md5.update(str(max(args.max_manifest_depth, -1)))
179 md5.update(args.filename)
180 return os.path.join(cls.CACHE_DIR, md5.hexdigest())
182 def _lock_file(self, fileobj):
184 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
186 raise ResumeCacheConflict("{} locked".format(fileobj.name))
189 self.cache_file.seek(0)
190 return json.load(self.cache_file)
192 def save(self, data):
194 new_cache_fd, new_cache_name = tempfile.mkstemp(
195 dir=os.path.dirname(self.filename))
196 self._lock_file(new_cache_fd)
197 new_cache = os.fdopen(new_cache_fd, 'r+')
198 json.dump(data, new_cache)
199 os.rename(new_cache_name, self.filename)
200 except (IOError, OSError, ResumeCacheConflict) as error:
202 os.unlink(new_cache_name)
203 except NameError: # mkstemp failed.
206 self.cache_file.close()
207 self.cache_file = new_cache
210 self.cache_file.close()
214 os.unlink(self.filename)
215 except OSError as error:
216 if error.errno != errno.ENOENT: # That's what we wanted anyway.
222 self.__init__(self.filename)
225 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
226 STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
227 ['bytes_written', '_seen_inputs'])
229 def __init__(self, cache=None, reporter=None, bytes_expected=None):
230 self.bytes_written = 0
231 self._seen_inputs = []
233 self.reporter = reporter
234 self.bytes_expected = bytes_expected
235 super(ArvPutCollectionWriter, self).__init__()
238 def from_cache(cls, cache, reporter=None, bytes_expected=None):
241 state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
242 writer = cls.from_state(state, cache, reporter, bytes_expected)
243 except (TypeError, ValueError,
244 arvados.errors.StaleWriterStateError) as error:
245 return cls(cache, reporter, bytes_expected)
249 def cache_state(self):
250 if self.cache is None:
252 state = self.dump_state()
253 # Transform attributes for serialization.
254 for attr, value in state.items():
255 if attr == '_data_buffer':
256 state[attr] = base64.encodestring(''.join(value))
257 elif hasattr(value, 'popleft'):
258 state[attr] = list(value)
259 self.cache.save(state)
261 def report_progress(self):
262 if self.reporter is not None:
263 self.reporter(self.bytes_written, self.bytes_expected)
265 def flush_data(self):
266 start_buffer_len = self._data_buffer_len
267 start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
268 super(ArvPutCollectionWriter, self).flush_data()
269 if self._data_buffer_len < start_buffer_len: # We actually PUT data.
270 self.bytes_written += (start_buffer_len - self._data_buffer_len)
271 self.report_progress()
272 if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
275 def _record_new_input(self, input_type, source_name, dest_name):
276 # The key needs to be a list because that's what we'll get back
277 # from JSON deserialization.
278 key = [input_type, source_name, dest_name]
279 if key in self._seen_inputs:
281 self._seen_inputs.append(key)
284 def write_file(self, source, filename=None):
285 if self._record_new_input('file', source, filename):
286 super(ArvPutCollectionWriter, self).write_file(source, filename)
288 def write_directory_tree(self,
289 path, stream_name='.', max_manifest_depth=-1):
290 if self._record_new_input('directory', path, stream_name):
291 super(ArvPutCollectionWriter, self).write_directory_tree(
292 path, stream_name, max_manifest_depth)
295 def expected_bytes_for(pathlist):
296 # Walk the given directory trees and stat files, adding up file sizes,
297 # so we can display progress as percent
299 for path in pathlist:
300 if os.path.isdir(path):
301 for filename in arvados.util.listdir_recursive(path):
302 bytesum += os.path.getsize(os.path.join(path, filename))
303 elif not os.path.isfile(path):
306 bytesum += os.path.getsize(path)
309 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
311 def machine_progress(bytes_written, bytes_expected):
312 return _machine_format.format(
313 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
315 def human_progress(bytes_written, bytes_expected):
317 return "\r{}M / {}M {:.1%} ".format(
318 bytes_written >> 20, bytes_expected >> 20,
319 float(bytes_written) / bytes_expected)
321 return "\r{} ".format(bytes_written)
323 def progress_writer(progress_func, outfile=sys.stderr):
324 def write_progress(bytes_written, bytes_expected):
325 outfile.write(progress_func(bytes_written, bytes_expected))
326 return write_progress
328 def exit_signal_handler(sigcode, frame):
331 def main(arguments=None):
332 ResumeCache.setup_user_cache()
333 args = parse_arguments(arguments)
336 reporter = progress_writer(human_progress)
337 elif args.batch_progress:
338 reporter = progress_writer(machine_progress)
343 resume_cache = ResumeCache(ResumeCache.make_path(args))
345 resume_cache.restart()
346 except ResumeCacheConflict:
347 print "arv-put: Another process is already uploading this data."
350 writer = ArvPutCollectionWriter.from_cache(
351 resume_cache, reporter, expected_bytes_for(args.paths))
353 # Install our signal handler for each code in CAUGHT_SIGNALS, and save
355 orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
356 for sigcode in CAUGHT_SIGNALS}
358 if writer.bytes_written > 0: # We're resuming a previous upload.
359 print >>sys.stderr, "\n".join([
360 "arv-put: Resuming previous upload from last checkpoint.",
361 " Use the --no-resume option to start over."])
362 writer.report_progress()
364 writer.do_queued_work() # Do work resumed from cache.
365 for path in args.paths: # Copy file data to Keep.
366 if os.path.isdir(path):
367 writer.write_directory_tree(
368 path, max_manifest_depth=args.max_manifest_depth)
370 writer.start_new_stream()
371 writer.write_file(path, args.filename or os.path.basename(path))
372 writer.finish_current_stream()
374 if args.progress: # Print newline to split stderr from stdout for humans.
378 print writer.manifest_text(),
380 print ','.join(writer.data_locators())
382 # Register the resulting collection in Arvados.
383 collection = arvados.api().collections().create(
385 'uuid': writer.finish(),
386 'manifest_text': writer.manifest_text(),
390 # Print the locator (uuid) of the new collection.
391 print collection['uuid']
393 for sigcode, orig_handler in orig_signal_handlers.items():
394 signal.signal(sigcode, orig_handler)
396 resume_cache.destroy()
398 if __name__ == '__main__':