10932: Changed _file_paths from being a list to a set so we're not going to copy...
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 from apiclient import errors as apiclient_errors
27 from arvados._version import __version__
28
29 import arvados.commands._util as arv_cmd
30
31 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
32 api_client = None
33
34 upload_opts = argparse.ArgumentParser(add_help=False)
35
36 upload_opts.add_argument('--version', action='version',
37                          version="%s %s" % (sys.argv[0], __version__),
38                          help='Print version and exit.')
39 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
40                          help="""
41 Local file or directory. Default: read from standard input.
42 """)
43
44 _group = upload_opts.add_mutually_exclusive_group()
45
46 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
47                     default=-1, help=argparse.SUPPRESS)
48
49 _group.add_argument('--normalize', action='store_true',
50                     help="""
51 Normalize the manifest by re-ordering files and streams after writing
52 data.
53 """)
54
55 _group.add_argument('--dry-run', action='store_true', default=False,
56                     help="""
57 Don't actually upload files, but only check if any file should be
58 uploaded. Exit with code=2 when files are pending for upload.
59 """)
60
61 _group = upload_opts.add_mutually_exclusive_group()
62
63 _group.add_argument('--as-stream', action='store_true', dest='stream',
64                     help="""
65 Synonym for --stream.
66 """)
67
68 _group.add_argument('--stream', action='store_true',
69                     help="""
70 Store the file content and display the resulting manifest on
71 stdout. Do not write the manifest to Keep or save a Collection object
72 in Arvados.
73 """)
74
75 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
76                     help="""
77 Synonym for --manifest.
78 """)
79
80 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
81                     help="""
82 Synonym for --manifest.
83 """)
84
85 _group.add_argument('--manifest', action='store_true',
86                     help="""
87 Store the file data and resulting manifest in Keep, save a Collection
88 object in Arvados, and display the manifest locator (Collection uuid)
89 on stdout. This is the default behavior.
90 """)
91
92 _group.add_argument('--as-raw', action='store_true', dest='raw',
93                     help="""
94 Synonym for --raw.
95 """)
96
97 _group.add_argument('--raw', action='store_true',
98                     help="""
99 Store the file content and display the data block locators on stdout,
100 separated by commas, with a trailing newline. Do not store a
101 manifest.
102 """)
103
104 upload_opts.add_argument('--update-collection', type=str, default=None,
105                          dest='update_collection', metavar="UUID", help="""
106 Update an existing collection identified by the given Arvados collection
107 UUID. All new local files will be uploaded.
108 """)
109
110 upload_opts.add_argument('--use-filename', type=str, default=None,
111                          dest='filename', help="""
112 Synonym for --filename.
113 """)
114
115 upload_opts.add_argument('--filename', type=str, default=None,
116                          help="""
117 Use the given filename in the manifest, instead of the name of the
118 local file. This is useful when "-" or "/dev/stdin" is given as an
119 input file. It can be used only if there is exactly one path given and
120 it is not a directory. Implies --manifest.
121 """)
122
123 upload_opts.add_argument('--portable-data-hash', action='store_true',
124                          help="""
125 Print the portable data hash instead of the Arvados UUID for the collection
126 created by the upload.
127 """)
128
129 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
130                          help="""
131 Set the replication level for the new collection: how many different
132 physical storage devices (e.g., disks) should have a copy of each data
133 block. Default is to use the server-provided default (if any) or 2.
134 """)
135
136 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
137                          help="""
138 Set the number of upload threads to be used. Take into account that
139 using lots of threads will increase the RAM requirements. Default is
140 to use 2 threads.
141 On high latency installations, using a greater number will improve
142 overall throughput.
143 """)
144
145 run_opts = argparse.ArgumentParser(add_help=False)
146
147 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
148 Store the collection in the specified project, instead of your Home
149 project.
150 """)
151
152 run_opts.add_argument('--name', help="""
153 Save the collection with the specified name.
154 """)
155
156 _group = run_opts.add_mutually_exclusive_group()
157 _group.add_argument('--progress', action='store_true',
158                     help="""
159 Display human-readable progress on stderr (bytes and, if possible,
160 percentage of total data size). This is the default behavior when
161 stderr is a tty.
162 """)
163
164 _group.add_argument('--no-progress', action='store_true',
165                     help="""
166 Do not display human-readable progress on stderr, even if stderr is a
167 tty.
168 """)
169
170 _group.add_argument('--batch-progress', action='store_true',
171                     help="""
172 Display machine-readable progress on stderr (bytes and, if known,
173 total data size).
174 """)
175
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--resume', action='store_true', default=True,
178                     help="""
179 Continue interrupted uploads from cached state (default).
180 """)
181 _group.add_argument('--no-resume', action='store_false', dest='resume',
182                     help="""
183 Do not continue interrupted uploads from cached state.
184 """)
185
186 _group = run_opts.add_mutually_exclusive_group()
187 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
188                     help="""
189 Save upload state in a cache file for resuming (default).
190 """)
191 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
192                     help="""
193 Do not save upload state in a cache file for resuming.
194 """)
195
196 arg_parser = argparse.ArgumentParser(
197     description='Copy data from the local filesystem to Keep.',
198     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
199
200 def parse_arguments(arguments):
201     args = arg_parser.parse_args(arguments)
202
203     if len(args.paths) == 0:
204         args.paths = ['-']
205
206     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
207
208     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
209         if args.filename:
210             arg_parser.error("""
211     --filename argument cannot be used when storing a directory or
212     multiple files.
213     """)
214
215     # Turn on --progress by default if stderr is a tty.
216     if (not (args.batch_progress or args.no_progress)
217         and os.isatty(sys.stderr.fileno())):
218         args.progress = True
219
220     # Turn off --resume (default) if --no-cache is used.
221     if not args.use_cache:
222         args.resume = False
223
224     if args.paths == ['-']:
225         if args.update_collection:
226             arg_parser.error("""
227     --update-collection cannot be used when reading from stdin.
228     """)
229         args.resume = False
230         args.use_cache = False
231         if not args.filename:
232             args.filename = 'stdin'
233
234     return args
235
236
237 class CollectionUpdateError(Exception):
238     pass
239
240
241 class ResumeCacheConflict(Exception):
242     pass
243
244
245 class ArvPutArgumentConflict(Exception):
246     pass
247
248
249 class ArvPutUploadIsPending(Exception):
250     pass
251
252
253 class ArvPutUploadNotPending(Exception):
254     pass
255
256
257 class FileUploadList(list):
258     def __init__(self, dry_run=False):
259         list.__init__(self)
260         self.dry_run = dry_run
261
262     def append(self, other):
263         if self.dry_run:
264             raise ArvPutUploadIsPending()
265         super(FileUploadList, self).append(other)
266
267
268 class ResumeCache(object):
269     CACHE_DIR = '.cache/arvados/arv-put'
270
271     def __init__(self, file_spec):
272         self.cache_file = open(file_spec, 'a+')
273         self._lock_file(self.cache_file)
274         self.filename = self.cache_file.name
275
276     @classmethod
277     def make_path(cls, args):
278         md5 = hashlib.md5()
279         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
280         realpaths = sorted(os.path.realpath(path) for path in args.paths)
281         md5.update('\0'.join(realpaths))
282         if any(os.path.isdir(path) for path in realpaths):
283             md5.update("-1")
284         elif args.filename:
285             md5.update(args.filename)
286         return os.path.join(
287             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
288             md5.hexdigest())
289
290     def _lock_file(self, fileobj):
291         try:
292             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
293         except IOError:
294             raise ResumeCacheConflict("{} locked".format(fileobj.name))
295
296     def load(self):
297         self.cache_file.seek(0)
298         return json.load(self.cache_file)
299
300     def check_cache(self, api_client=None, num_retries=0):
301         try:
302             state = self.load()
303             locator = None
304             try:
305                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
306                     locator = state["_finished_streams"][0][1][0]
307                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
308                     locator = state["_current_stream_locators"][0]
309                 if locator is not None:
310                     kc = arvados.keep.KeepClient(api_client=api_client)
311                     kc.head(locator, num_retries=num_retries)
312             except Exception as e:
313                 self.restart()
314         except (ValueError):
315             pass
316
317     def save(self, data):
318         try:
319             new_cache_fd, new_cache_name = tempfile.mkstemp(
320                 dir=os.path.dirname(self.filename))
321             self._lock_file(new_cache_fd)
322             new_cache = os.fdopen(new_cache_fd, 'r+')
323             json.dump(data, new_cache)
324             os.rename(new_cache_name, self.filename)
325         except (IOError, OSError, ResumeCacheConflict) as error:
326             try:
327                 os.unlink(new_cache_name)
328             except NameError:  # mkstemp failed.
329                 pass
330         else:
331             self.cache_file.close()
332             self.cache_file = new_cache
333
334     def close(self):
335         self.cache_file.close()
336
337     def destroy(self):
338         try:
339             os.unlink(self.filename)
340         except OSError as error:
341             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
342                 raise
343         self.close()
344
345     def restart(self):
346         self.destroy()
347         self.__init__(self.filename)
348
349
350 class ArvPutUploadJob(object):
351     CACHE_DIR = '.cache/arvados/arv-put'
352     EMPTY_STATE = {
353         'manifest' : None, # Last saved manifest checkpoint
354         'files' : {} # Previous run file list: {path : {size, mtime}}
355     }
356
357     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
358                  bytes_expected=None, name=None, owner_uuid=None,
359                  ensure_unique_name=False, num_retries=None,
360                  put_threads=None, replication_desired=None,
361                  filename=None, update_time=60.0, update_collection=None,
362                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
363         self.paths = paths
364         self.resume = resume
365         self.use_cache = use_cache
366         self.update = False
367         self.reporter = reporter
368         self.bytes_expected = bytes_expected
369         self.bytes_written = 0
370         self.bytes_skipped = 0
371         self.name = name
372         self.owner_uuid = owner_uuid
373         self.ensure_unique_name = ensure_unique_name
374         self.num_retries = num_retries
375         self.replication_desired = replication_desired
376         self.put_threads = put_threads
377         self.filename = filename
378         self._state_lock = threading.Lock()
379         self._state = None # Previous run state (file list & manifest)
380         self._current_files = [] # Current run file list
381         self._cache_file = None
382         self._collection_lock = threading.Lock()
383         self._remote_collection = None # Collection being updated (if asked)
384         self._local_collection = None # Collection from previous run manifest
385         self._file_paths = set() # Files to be updated in remote collection
386         self._stop_checkpointer = threading.Event()
387         self._checkpointer = threading.Thread(target=self._update_task)
388         self._checkpointer.daemon = True
389         self._update_task_time = update_time  # How many seconds wait between update runs
390         self._files_to_upload = FileUploadList(dry_run=dry_run)
391         self.logger = logger
392         self.dry_run = dry_run
393
394         if not self.use_cache and self.resume:
395             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
396
397         # Check for obvious dry-run responses
398         if self.dry_run and (not self.use_cache or not self.resume):
399             raise ArvPutUploadIsPending()
400
401         # Load cached data if any and if needed
402         self._setup_state(update_collection)
403
404     def start(self, save_collection):
405         """
406         Start supporting thread & file uploading
407         """
408         if not self.dry_run:
409             self._checkpointer.start()
410         try:
411             for path in self.paths:
412                 # Test for stdin first, in case some file named '-' exist
413                 if path == '-':
414                     if self.dry_run:
415                         raise ArvPutUploadIsPending()
416                     self._write_stdin(self.filename or 'stdin')
417                 elif os.path.isdir(path):
418                     # Use absolute paths on cache index so CWD doesn't interfere
419                     # with the caching logic.
420                     prefixdir = path = os.path.abspath(path)
421                     if prefixdir != '/':
422                         prefixdir += '/'
423                     for root, dirs, files in os.walk(path):
424                         # Make os.walk()'s dir traversing order deterministic
425                         dirs.sort()
426                         files.sort()
427                         for f in files:
428                             self._check_file(os.path.join(root, f),
429                                              os.path.join(root[len(prefixdir):], f))
430                 else:
431                     self._check_file(os.path.abspath(path),
432                                      self.filename or os.path.basename(path))
433             # If dry-mode is on, and got up to this point, then we should notify that
434             # there aren't any file to upload.
435             if self.dry_run:
436                 raise ArvPutUploadNotPending()
437             # Remove local_collection's files that don't exist locally anymore, so the
438             # bytes_written count is correct.
439             for f in self.collection_file_paths(self._local_collection,
440                                                 path_prefix=""):
441                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
442                     self._local_collection.remove(f)
443             # Update bytes_written from current local collection and
444             # report initial progress.
445             self._update()
446             # Actual file upload
447             self._upload_files()
448         finally:
449             if not self.dry_run:
450                 # Stop the thread before doing anything else
451                 self._stop_checkpointer.set()
452                 self._checkpointer.join()
453                 # Commit all pending blocks & one last _update()
454                 self._local_collection.manifest_text()
455                 self._update(final=True)
456                 if save_collection:
457                     self.save_collection()
458             if self.use_cache:
459                 self._cache_file.close()
460
461     def save_collection(self):
462         if self.update:
463             # Check if files should be updated on the remote collection.
464             for fp in self._file_paths:
465                 remote_file = self._remote_collection.find(fp)
466                 if not remote_file:
467                     # File don't exist on remote collection, copy it.
468                     self._remote_collection.copy(fp, fp, self._local_collection)
469                 elif remote_file != self._local_collection.find(fp):
470                     # A different file exist on remote collection, overwrite it.
471                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
472                 else:
473                     # The file already exist on remote collection, skip it.
474                     pass
475             self._remote_collection.save(num_retries=self.num_retries)
476         else:
477             self._local_collection.save_new(
478                 name=self.name, owner_uuid=self.owner_uuid,
479                 ensure_unique_name=self.ensure_unique_name,
480                 num_retries=self.num_retries)
481
482     def destroy_cache(self):
483         if self.use_cache:
484             try:
485                 os.unlink(self._cache_filename)
486             except OSError as error:
487                 # That's what we wanted anyway.
488                 if error.errno != errno.ENOENT:
489                     raise
490             self._cache_file.close()
491
492     def _collection_size(self, collection):
493         """
494         Recursively get the total size of the collection
495         """
496         size = 0
497         for item in collection.values():
498             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
499                 size += self._collection_size(item)
500             else:
501                 size += item.size()
502         return size
503
504     def _update_task(self):
505         """
506         Periodically called support task. File uploading is
507         asynchronous so we poll status from the collection.
508         """
509         while not self._stop_checkpointer.wait(self._update_task_time):
510             self._update()
511
512     def _update(self, final=False):
513         """
514         Update cached manifest text and report progress.
515         """
516         with self._collection_lock:
517             self.bytes_written = self._collection_size(self._local_collection)
518             if self.use_cache:
519                 if final:
520                     manifest = self._local_collection.manifest_text()
521                 else:
522                     # Get the manifest text without comitting pending blocks
523                     manifest = self._local_collection.manifest_text(strip=False,
524                                                                     normalize=False,
525                                                                     only_committed=True)
526                 # Update cache
527                 with self._state_lock:
528                     self._state['manifest'] = manifest
529         if self.use_cache:
530             self._save_state()
531         # Call the reporter, if any
532         self.report_progress()
533
534     def report_progress(self):
535         if self.reporter is not None:
536             self.reporter(self.bytes_written, self.bytes_expected)
537
538     def _write_stdin(self, filename):
539         output = self._local_collection.open(filename, 'w')
540         self._write(sys.stdin, output)
541         output.close()
542
543     def _check_file(self, source, filename):
544         """Check if this file needs to be uploaded"""
545         resume_offset = 0
546         should_upload = False
547         new_file_in_cache = False
548         # Record file path for updating the remote collection before exiting
549         self._file_paths.add(filename)
550
551         with self._state_lock:
552             # If no previous cached data on this file, store it for an eventual
553             # repeated run.
554             if source not in self._state['files']:
555                 self._state['files'][source] = {
556                     'mtime': os.path.getmtime(source),
557                     'size' : os.path.getsize(source)
558                 }
559                 new_file_in_cache = True
560             cached_file_data = self._state['files'][source]
561
562         # Check if file was already uploaded (at least partially)
563         file_in_local_collection = self._local_collection.find(filename)
564
565         # If not resuming, upload the full file.
566         if not self.resume:
567             should_upload = True
568         # New file detected from last run, upload it.
569         elif new_file_in_cache:
570             should_upload = True
571         # Local file didn't change from last run.
572         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
573             if not file_in_local_collection:
574                 # File not uploaded yet, upload it completely
575                 should_upload = True
576             elif file_in_local_collection.permission_expired():
577                 # Permission token expired, re-upload file. This will change whenever
578                 # we have a API for refreshing tokens.
579                 should_upload = True
580                 self._local_collection.remove(filename)
581             elif cached_file_data['size'] == file_in_local_collection.size():
582                 # File already there, skip it.
583                 self.bytes_skipped += cached_file_data['size']
584             elif cached_file_data['size'] > file_in_local_collection.size():
585                 # File partially uploaded, resume!
586                 resume_offset = file_in_local_collection.size()
587                 self.bytes_skipped += resume_offset
588                 should_upload = True
589             else:
590                 # Inconsistent cache, re-upload the file
591                 should_upload = True
592                 self._local_collection.remove(filename)
593                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
594         # Local file differs from cached data, re-upload it.
595         else:
596             if file_in_local_collection:
597                 self._local_collection.remove(filename)
598             should_upload = True
599
600         if should_upload:
601             self._files_to_upload.append((source, resume_offset, filename))
602
603     def _upload_files(self):
604         for source, resume_offset, filename in self._files_to_upload:
605             with open(source, 'r') as source_fd:
606                 with self._state_lock:
607                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
608                     self._state['files'][source]['size'] = os.path.getsize(source)
609                 if resume_offset > 0:
610                     # Start upload where we left off
611                     output = self._local_collection.open(filename, 'a')
612                     source_fd.seek(resume_offset)
613                 else:
614                     # Start from scratch
615                     output = self._local_collection.open(filename, 'w')
616                 self._write(source_fd, output)
617                 output.close(flush=False)
618
619     def _write(self, source_fd, output):
620         while True:
621             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
622             if not data:
623                 break
624             output.write(data)
625
626     def _my_collection(self):
627         return self._remote_collection if self.update else self._local_collection
628
629     def _setup_state(self, update_collection):
630         """
631         Create a new cache file or load a previously existing one.
632         """
633         # Load an already existing collection for update
634         if update_collection and re.match(arvados.util.collection_uuid_pattern,
635                                           update_collection):
636             try:
637                 self._remote_collection = arvados.collection.Collection(update_collection)
638             except arvados.errors.ApiError as error:
639                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
640             else:
641                 self.update = True
642         elif update_collection:
643             # Collection locator provided, but unknown format
644             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
645
646         if self.use_cache:
647             # Set up cache file name from input paths.
648             md5 = hashlib.md5()
649             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
650             realpaths = sorted(os.path.realpath(path) for path in self.paths)
651             md5.update('\0'.join(realpaths))
652             if self.filename:
653                 md5.update(self.filename)
654             cache_filename = md5.hexdigest()
655             cache_filepath = os.path.join(
656                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
657                 cache_filename)
658             if self.resume:
659                 self._cache_file = open(cache_filepath, 'a+')
660             else:
661                 # --no-resume means start with a empty cache file.
662                 self._cache_file = open(cache_filepath, 'w+')
663             self._cache_filename = self._cache_file.name
664             self._lock_file(self._cache_file)
665             self._cache_file.seek(0)
666
667         with self._state_lock:
668             if self.use_cache:
669                 try:
670                     self._state = json.load(self._cache_file)
671                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
672                         # Cache at least partially incomplete, set up new cache
673                         self._state = copy.deepcopy(self.EMPTY_STATE)
674                 except ValueError:
675                     # Cache file empty, set up new cache
676                     self._state = copy.deepcopy(self.EMPTY_STATE)
677             else:
678                 # No cache file, set empty state
679                 self._state = copy.deepcopy(self.EMPTY_STATE)
680             # Load the previous manifest so we can check if files were modified remotely.
681             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
682
683     def collection_file_paths(self, col, path_prefix='.'):
684         """Return a list of file paths by recursively go through the entire collection `col`"""
685         file_paths = []
686         for name, item in col.items():
687             if isinstance(item, arvados.arvfile.ArvadosFile):
688                 file_paths.append(os.path.join(path_prefix, name))
689             elif isinstance(item, arvados.collection.Subcollection):
690                 new_prefix = os.path.join(path_prefix, name)
691                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
692         return file_paths
693
694     def _lock_file(self, fileobj):
695         try:
696             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
697         except IOError:
698             raise ResumeCacheConflict("{} locked".format(fileobj.name))
699
700     def _save_state(self):
701         """
702         Atomically save current state into cache.
703         """
704         try:
705             with self._state_lock:
706                 # We're not using copy.deepcopy() here because it's a lot slower
707                 # than json.dumps(), and we're already needing JSON format to be
708                 # saved on disk.
709                 state = json.dumps(self._state)
710             new_cache_fd, new_cache_name = tempfile.mkstemp(
711                 dir=os.path.dirname(self._cache_filename))
712             self._lock_file(new_cache_fd)
713             new_cache = os.fdopen(new_cache_fd, 'r+')
714             new_cache.write(state)
715             new_cache.flush()
716             os.fsync(new_cache)
717             os.rename(new_cache_name, self._cache_filename)
718         except (IOError, OSError, ResumeCacheConflict) as error:
719             self.logger.error("There was a problem while saving the cache file: {}".format(error))
720             try:
721                 os.unlink(new_cache_name)
722             except NameError:  # mkstemp failed.
723                 pass
724         else:
725             self._cache_file.close()
726             self._cache_file = new_cache
727
728     def collection_name(self):
729         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
730
731     def manifest_locator(self):
732         return self._my_collection().manifest_locator()
733
734     def portable_data_hash(self):
735         return self._my_collection().portable_data_hash()
736
737     def manifest_text(self, stream_name=".", strip=False, normalize=False):
738         return self._my_collection().manifest_text(stream_name, strip, normalize)
739
740     def _datablocks_on_item(self, item):
741         """
742         Return a list of datablock locators, recursively navigating
743         through subcollections
744         """
745         if isinstance(item, arvados.arvfile.ArvadosFile):
746             if item.size() == 0:
747                 # Empty file locator
748                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
749             else:
750                 locators = []
751                 for segment in item.segments():
752                     loc = segment.locator
753                     locators.append(loc)
754                 return locators
755         elif isinstance(item, arvados.collection.Collection):
756             l = [self._datablocks_on_item(x) for x in item.values()]
757             # Fast list flattener method taken from:
758             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
759             return [loc for sublist in l for loc in sublist]
760         else:
761             return None
762
763     def data_locators(self):
764         with self._collection_lock:
765             # Make sure all datablocks are flushed before getting the locators
766             self._my_collection().manifest_text()
767             datablocks = self._datablocks_on_item(self._my_collection())
768         return datablocks
769
770
771 def expected_bytes_for(pathlist):
772     # Walk the given directory trees and stat files, adding up file sizes,
773     # so we can display progress as percent
774     bytesum = 0
775     for path in pathlist:
776         if os.path.isdir(path):
777             for filename in arvados.util.listdir_recursive(path):
778                 bytesum += os.path.getsize(os.path.join(path, filename))
779         elif not os.path.isfile(path):
780             return None
781         else:
782             bytesum += os.path.getsize(path)
783     return bytesum
784
785 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
786                                                             os.getpid())
787 def machine_progress(bytes_written, bytes_expected):
788     return _machine_format.format(
789         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
790
791 def human_progress(bytes_written, bytes_expected):
792     if bytes_expected:
793         return "\r{}M / {}M {:.1%} ".format(
794             bytes_written >> 20, bytes_expected >> 20,
795             float(bytes_written) / bytes_expected)
796     else:
797         return "\r{} ".format(bytes_written)
798
799 def progress_writer(progress_func, outfile=sys.stderr):
800     def write_progress(bytes_written, bytes_expected):
801         outfile.write(progress_func(bytes_written, bytes_expected))
802     return write_progress
803
804 def exit_signal_handler(sigcode, frame):
805     sys.exit(-sigcode)
806
807 def desired_project_uuid(api_client, project_uuid, num_retries):
808     if not project_uuid:
809         query = api_client.users().current()
810     elif arvados.util.user_uuid_pattern.match(project_uuid):
811         query = api_client.users().get(uuid=project_uuid)
812     elif arvados.util.group_uuid_pattern.match(project_uuid):
813         query = api_client.groups().get(uuid=project_uuid)
814     else:
815         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
816     return query.execute(num_retries=num_retries)['uuid']
817
818 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
819     global api_client
820
821     logger = logging.getLogger('arvados.arv_put')
822     args = parse_arguments(arguments)
823     status = 0
824     if api_client is None:
825         api_client = arvados.api('v1')
826
827     # Determine the name to use
828     if args.name:
829         if args.stream or args.raw:
830             logger.error("Cannot use --name with --stream or --raw")
831             sys.exit(1)
832         elif args.update_collection:
833             logger.error("Cannot use --name with --update-collection")
834             sys.exit(1)
835         collection_name = args.name
836     else:
837         collection_name = "Saved at {} by {}@{}".format(
838             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
839             pwd.getpwuid(os.getuid()).pw_name,
840             socket.gethostname())
841
842     if args.project_uuid and (args.stream or args.raw):
843         logger.error("Cannot use --project-uuid with --stream or --raw")
844         sys.exit(1)
845
846     # Determine the parent project
847     try:
848         project_uuid = desired_project_uuid(api_client, args.project_uuid,
849                                             args.retries)
850     except (apiclient_errors.Error, ValueError) as error:
851         logger.error(error)
852         sys.exit(1)
853
854     if args.progress:
855         reporter = progress_writer(human_progress)
856     elif args.batch_progress:
857         reporter = progress_writer(machine_progress)
858     else:
859         reporter = None
860
861     bytes_expected = expected_bytes_for(args.paths)
862
863     try:
864         writer = ArvPutUploadJob(paths = args.paths,
865                                  resume = args.resume,
866                                  use_cache = args.use_cache,
867                                  filename = args.filename,
868                                  reporter = reporter,
869                                  bytes_expected = bytes_expected,
870                                  num_retries = args.retries,
871                                  replication_desired = args.replication,
872                                  put_threads = args.threads,
873                                  name = collection_name,
874                                  owner_uuid = project_uuid,
875                                  ensure_unique_name = True,
876                                  update_collection = args.update_collection,
877                                  logger=logger,
878                                  dry_run=args.dry_run)
879     except ResumeCacheConflict:
880         logger.error("\n".join([
881             "arv-put: Another process is already uploading this data.",
882             "         Use --no-cache if this is really what you want."]))
883         sys.exit(1)
884     except CollectionUpdateError as error:
885         logger.error("\n".join([
886             "arv-put: %s" % str(error)]))
887         sys.exit(1)
888     except ArvPutUploadIsPending:
889         # Dry run check successful, return proper exit code.
890         sys.exit(2)
891     except ArvPutUploadNotPending:
892         # No files pending for upload
893         sys.exit(0)
894
895     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
896     # the originals.
897     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
898                             for sigcode in CAUGHT_SIGNALS}
899
900     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
901         logger.warning("\n".join([
902             "arv-put: Resuming previous upload from last checkpoint.",
903             "         Use the --no-resume option to start over."]))
904
905     if not args.dry_run:
906         writer.report_progress()
907     output = None
908     try:
909         writer.start(save_collection=not(args.stream or args.raw))
910     except arvados.errors.ApiError as error:
911         logger.error("\n".join([
912             "arv-put: %s" % str(error)]))
913         sys.exit(1)
914     except ArvPutUploadIsPending:
915         # Dry run check successful, return proper exit code.
916         sys.exit(2)
917     except ArvPutUploadNotPending:
918         # No files pending for upload
919         sys.exit(0)
920
921     if args.progress:  # Print newline to split stderr from stdout for humans.
922         logger.info("\n")
923
924     if args.stream:
925         if args.normalize:
926             output = writer.manifest_text(normalize=True)
927         else:
928             output = writer.manifest_text()
929     elif args.raw:
930         output = ','.join(writer.data_locators())
931     else:
932         try:
933             if args.update_collection:
934                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
935             else:
936                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
937             if args.portable_data_hash:
938                 output = writer.portable_data_hash()
939             else:
940                 output = writer.manifest_locator()
941         except apiclient_errors.Error as error:
942             logger.error(
943                 "arv-put: Error creating Collection on project: {}.".format(
944                     error))
945             status = 1
946
947     # Print the locator (uuid) of the new collection.
948     if output is None:
949         status = status or 1
950     else:
951         stdout.write(output)
952         if not output.endswith('\n'):
953             stdout.write('\n')
954
955     for sigcode, orig_handler in orig_signal_handlers.items():
956         signal.signal(sigcode, orig_handler)
957
958     if status != 0:
959         sys.exit(status)
960
961     # Success!
962     return output
963
964
965 if __name__ == '__main__':
966     main()