10383: Unused exception removed.
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 from apiclient import errors as apiclient_errors
27
28 import arvados.commands._util as arv_cmd
29
30 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
31 api_client = None
32
33 upload_opts = argparse.ArgumentParser(add_help=False)
34
35 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
36                          help="""
37 Local file or directory. Default: read from standard input.
38 """)
39
40 _group = upload_opts.add_mutually_exclusive_group()
41
42 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
43                     default=-1, help="""
44 Maximum depth of directory tree to represent in the manifest
45 structure. A directory structure deeper than this will be represented
46 as a single stream in the manifest. If N=0, the manifest will contain
47 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
48 stream per filesystem directory that contains files.
49 """)
50
51 _group.add_argument('--normalize', action='store_true',
52                     help="""
53 Normalize the manifest by re-ordering files and streams after writing
54 data.
55 """)
56
57 _group = upload_opts.add_mutually_exclusive_group()
58
59 _group.add_argument('--as-stream', action='store_true', dest='stream',
60                     help="""
61 Synonym for --stream.
62 """)
63
64 _group.add_argument('--stream', action='store_true',
65                     help="""
66 Store the file content and display the resulting manifest on
67 stdout. Do not write the manifest to Keep or save a Collection object
68 in Arvados.
69 """)
70
71 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
72                     help="""
73 Synonym for --manifest.
74 """)
75
76 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
77                     help="""
78 Synonym for --manifest.
79 """)
80
81 _group.add_argument('--manifest', action='store_true',
82                     help="""
83 Store the file data and resulting manifest in Keep, save a Collection
84 object in Arvados, and display the manifest locator (Collection uuid)
85 on stdout. This is the default behavior.
86 """)
87
88 _group.add_argument('--as-raw', action='store_true', dest='raw',
89                     help="""
90 Synonym for --raw.
91 """)
92
93 _group.add_argument('--raw', action='store_true',
94                     help="""
95 Store the file content and display the data block locators on stdout,
96 separated by commas, with a trailing newline. Do not store a
97 manifest.
98 """)
99
100 upload_opts.add_argument('--update-collection', type=str, default=None,
101                          dest='update_collection', metavar="UUID", help="""
102 Update an existing collection identified by the given Arvados collection
103 UUID. All new local files will be uploaded.
104 """)
105
106 upload_opts.add_argument('--use-filename', type=str, default=None,
107                          dest='filename', help="""
108 Synonym for --filename.
109 """)
110
111 upload_opts.add_argument('--filename', type=str, default=None,
112                          help="""
113 Use the given filename in the manifest, instead of the name of the
114 local file. This is useful when "-" or "/dev/stdin" is given as an
115 input file. It can be used only if there is exactly one path given and
116 it is not a directory. Implies --manifest.
117 """)
118
119 upload_opts.add_argument('--portable-data-hash', action='store_true',
120                          help="""
121 Print the portable data hash instead of the Arvados UUID for the collection
122 created by the upload.
123 """)
124
125 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
126                          help="""
127 Set the replication level for the new collection: how many different
128 physical storage devices (e.g., disks) should have a copy of each data
129 block. Default is to use the server-provided default (if any) or 2.
130 """)
131
132 run_opts = argparse.ArgumentParser(add_help=False)
133
134 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
135 Store the collection in the specified project, instead of your Home
136 project.
137 """)
138
139 run_opts.add_argument('--name', help="""
140 Save the collection with the specified name.
141 """)
142
143 _group = run_opts.add_mutually_exclusive_group()
144 _group.add_argument('--progress', action='store_true',
145                     help="""
146 Display human-readable progress on stderr (bytes and, if possible,
147 percentage of total data size). This is the default behavior when
148 stderr is a tty.
149 """)
150
151 _group.add_argument('--no-progress', action='store_true',
152                     help="""
153 Do not display human-readable progress on stderr, even if stderr is a
154 tty.
155 """)
156
157 _group.add_argument('--batch-progress', action='store_true',
158                     help="""
159 Display machine-readable progress on stderr (bytes and, if known,
160 total data size).
161 """)
162
163 _group = run_opts.add_mutually_exclusive_group()
164 _group.add_argument('--resume', action='store_true', default=True,
165                     help="""
166 Continue interrupted uploads from cached state (default).
167 """)
168 _group.add_argument('--no-resume', action='store_false', dest='resume',
169                     help="""
170 Do not continue interrupted uploads from cached state.
171 """)
172
173 arg_parser = argparse.ArgumentParser(
174     description='Copy data from the local filesystem to Keep.',
175     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
176
177 def parse_arguments(arguments):
178     args = arg_parser.parse_args(arguments)
179
180     if len(args.paths) == 0:
181         args.paths = ['-']
182
183     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
184
185     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
186         if args.filename:
187             arg_parser.error("""
188     --filename argument cannot be used when storing a directory or
189     multiple files.
190     """)
191
192     # Turn on --progress by default if stderr is a tty.
193     if (not (args.batch_progress or args.no_progress)
194         and os.isatty(sys.stderr.fileno())):
195         args.progress = True
196
197     if args.paths == ['-']:
198         if args.update_collection:
199             arg_parser.error("""
200     --update-collection cannot be used when reading from stdin.
201     """)
202         args.resume = False
203         if not args.filename:
204             args.filename = 'stdin'
205
206     return args
207
208
209 class CollectionUpdateError(Exception):
210     pass
211
212
213 class ResumeCacheConflict(Exception):
214     pass
215
216
217 class ResumeCache(object):
218     CACHE_DIR = '.cache/arvados/arv-put'
219
220     def __init__(self, file_spec):
221         self.cache_file = open(file_spec, 'a+')
222         self._lock_file(self.cache_file)
223         self.filename = self.cache_file.name
224
225     @classmethod
226     def make_path(cls, args):
227         md5 = hashlib.md5()
228         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
229         realpaths = sorted(os.path.realpath(path) for path in args.paths)
230         md5.update('\0'.join(realpaths))
231         if any(os.path.isdir(path) for path in realpaths):
232             md5.update(str(max(args.max_manifest_depth, -1)))
233         elif args.filename:
234             md5.update(args.filename)
235         return os.path.join(
236             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
237             md5.hexdigest())
238
239     def _lock_file(self, fileobj):
240         try:
241             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
242         except IOError:
243             raise ResumeCacheConflict("{} locked".format(fileobj.name))
244
245     def load(self):
246         self.cache_file.seek(0)
247         return json.load(self.cache_file)
248
249     def check_cache(self, api_client=None, num_retries=0):
250         try:
251             state = self.load()
252             locator = None
253             try:
254                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
255                     locator = state["_finished_streams"][0][1][0]
256                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
257                     locator = state["_current_stream_locators"][0]
258                 if locator is not None:
259                     kc = arvados.keep.KeepClient(api_client=api_client)
260                     kc.head(locator, num_retries=num_retries)
261             except Exception as e:
262                 self.restart()
263         except (ValueError):
264             pass
265
266     def save(self, data):
267         try:
268             new_cache_fd, new_cache_name = tempfile.mkstemp(
269                 dir=os.path.dirname(self.filename))
270             self._lock_file(new_cache_fd)
271             new_cache = os.fdopen(new_cache_fd, 'r+')
272             json.dump(data, new_cache)
273             os.rename(new_cache_name, self.filename)
274         except (IOError, OSError, ResumeCacheConflict) as error:
275             try:
276                 os.unlink(new_cache_name)
277             except NameError:  # mkstemp failed.
278                 pass
279         else:
280             self.cache_file.close()
281             self.cache_file = new_cache
282
283     def close(self):
284         self.cache_file.close()
285
286     def destroy(self):
287         try:
288             os.unlink(self.filename)
289         except OSError as error:
290             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
291                 raise
292         self.close()
293
294     def restart(self):
295         self.destroy()
296         self.__init__(self.filename)
297
298
299 class ArvPutUploadJob(object):
300     CACHE_DIR = '.cache/arvados/arv-put'
301     EMPTY_STATE = {
302         'manifest' : None, # Last saved manifest checkpoint
303         'files' : {} # Previous run file list: {path : {size, mtime}}
304     }
305
306     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
307                  name=None, owner_uuid=None, ensure_unique_name=False,
308                  num_retries=None, replication_desired=None,
309                  filename=None, update_time=1.0, update_collection=None):
310         self.paths = paths
311         self.resume = resume
312         self.update = False
313         self.reporter = reporter
314         self.bytes_expected = bytes_expected
315         self.bytes_written = 0
316         self.bytes_skipped = 0
317         self.name = name
318         self.owner_uuid = owner_uuid
319         self.ensure_unique_name = ensure_unique_name
320         self.num_retries = num_retries
321         self.replication_desired = replication_desired
322         self.filename = filename
323         self._state_lock = threading.Lock()
324         self._state = None # Previous run state (file list & manifest)
325         self._current_files = [] # Current run file list
326         self._cache_file = None
327         self._collection_lock = threading.Lock()
328         self._remote_collection = None # Collection being updated (if asked)
329         self._local_collection = None # Collection from previous run manifest
330         self._file_paths = [] # Files to be updated in remote collection
331         self._stop_checkpointer = threading.Event()
332         self._checkpointer = threading.Thread(target=self._update_task)
333         self._update_task_time = update_time  # How many seconds wait between update runs
334         self.logger = logging.getLogger('arvados.arv_put')
335
336         # Load cached data if any and if needed
337         self._setup_state(update_collection)
338
339     def start(self, save_collection):
340         """
341         Start supporting thread & file uploading
342         """
343         self._checkpointer.daemon = True
344         self._checkpointer.start()
345         try:
346             for path in self.paths:
347                 # Test for stdin first, in case some file named '-' exist
348                 if path == '-':
349                     self._write_stdin(self.filename or 'stdin')
350                 elif os.path.isdir(path):
351                     if path == '.' or path == './' or os.path.dirname(path) == '':
352                         dirname = ''
353                     else:
354                         dirname = os.path.dirname(path) + '/'
355                     for root, dirs, files in os.walk(path):
356                         # Make os.walk()'s dir traversing order deterministic
357                         dirs.sort()
358                         files.sort()
359                         for f in files:
360                             self._write_file(os.path.join(root, f),
361                                              os.path.join(root[len(dirname):], f))
362                 else:
363                     self._write_file(path, self.filename or os.path.basename(path))
364         finally:
365             # Stop the thread before doing anything else
366             self._stop_checkpointer.set()
367             self._checkpointer.join()
368             # Commit all & one last _update()
369             self.manifest_text()
370             if save_collection:
371                 self.save_collection()
372             self._update()
373             self._cache_file.close()
374             # Correct the final written bytes count
375             self.bytes_written -= self.bytes_skipped
376
377     def save_collection(self):
378         if self.update:
379             # Check if files should be updated on the remote collection.
380             for fp in self._file_paths:
381                 remote_file = self._remote_collection.find(fp)
382                 if not remote_file:
383                     # File don't exist on remote collection, copy it.
384                     self._remote_collection.copy(fp, fp, self._local_collection)
385                 elif remote_file != self._local_collection.find(fp):
386                     # A different file exist on remote collection, overwrite it.
387                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
388                 else:
389                     # The file already exist on remote collection, skip it.
390                     pass
391             self._remote_collection.save(num_retries=self.num_retries)
392         else:
393             self._local_collection.save_new(
394                 name=self.name, owner_uuid=self.owner_uuid,
395                 ensure_unique_name=self.ensure_unique_name,
396                 num_retries=self.num_retries)
397
398     def destroy_cache(self):
399         if self.resume:
400             try:
401                 os.unlink(self._cache_filename)
402             except OSError as error:
403                 # That's what we wanted anyway.
404                 if error.errno != errno.ENOENT:
405                     raise
406             self._cache_file.close()
407
408     def _collection_size(self, collection):
409         """
410         Recursively get the total size of the collection
411         """
412         size = 0
413         for item in collection.values():
414             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
415                 size += self._collection_size(item)
416             else:
417                 size += item.size()
418         return size
419
420     def _update_task(self):
421         """
422         Periodically called support task. File uploading is
423         asynchronous so we poll status from the collection.
424         """
425         while not self._stop_checkpointer.wait(self._update_task_time):
426             self._update()
427
428     def _update(self):
429         """
430         Update cached manifest text and report progress.
431         """
432         with self._collection_lock:
433             self.bytes_written = self._collection_size(self._local_collection)
434             # Update cache, if resume enabled
435             with self._state_lock:
436                 # Get the manifest text without comitting pending blocks
437                 self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
438             self._save_state()
439         # Call the reporter, if any
440         self.report_progress()
441
442     def report_progress(self):
443         if self.reporter is not None:
444             self.reporter(self.bytes_written, self.bytes_expected)
445
446     def _write_stdin(self, filename):
447         output = self._local_collection.open(filename, 'w')
448         self._write(sys.stdin, output)
449         output.close()
450
451     def _write_file(self, source, filename):
452         resume_offset = 0
453         should_upload = False
454         new_file_in_cache = False
455
456         # Record file path for updating the remote collection before exiting
457         self._file_paths.append(filename)
458
459         with self._state_lock:
460             # If no previous cached data on this file, store it for an eventual
461             # repeated run.
462             if source not in self._state['files']:
463                 self._state['files'][source] = {
464                     'mtime': os.path.getmtime(source),
465                     'size' : os.path.getsize(source)
466                 }
467                 new_file_in_cache = True
468             cached_file_data = self._state['files'][source]
469
470         # Check if file was already uploaded (at least partially)
471         file_in_local_collection = self._local_collection.find(filename)
472
473         # If not resuming, upload the full file.
474         if not self.resume:
475             should_upload = True
476         # New file detected from last run, upload it.
477         elif new_file_in_cache:
478             should_upload = True
479         # Local file didn't change from last run.
480         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
481             if not file_in_local_collection:
482                 # File not uploaded yet, upload it completely
483                 should_upload = True
484             elif cached_file_data['size'] == file_in_local_collection.size():
485                 # File already there, skip it.
486                 self.bytes_skipped += cached_file_data['size']
487             elif cached_file_data['size'] > file_in_local_collection.size():
488                 # File partially uploaded, resume!
489                 resume_offset = file_in_local_collection.size()
490                 should_upload = True
491             else:
492                 # Inconsistent cache, re-upload the file
493                 should_upload = True
494                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
495         # Local file differs from cached data, re-upload it.
496         else:
497             should_upload = True
498
499         if should_upload:
500             with open(source, 'r') as source_fd:
501                 with self._state_lock:
502                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
503                     self._state['files'][source]['size'] = os.path.getsize(source)
504                 if resume_offset > 0:
505                     # Start upload where we left off
506                     output = self._local_collection.open(filename, 'a')
507                     source_fd.seek(resume_offset)
508                     self.bytes_skipped += resume_offset
509                 else:
510                     # Start from scratch
511                     output = self._local_collection.open(filename, 'w')
512                 self._write(source_fd, output)
513                 output.close(flush=False)
514
515     def _write(self, source_fd, output):
516         first_read = True
517         while True:
518             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
519             # Allow an empty file to be written
520             if not data and not first_read:
521                 break
522             if first_read:
523                 first_read = False
524             output.write(data)
525
526     def _my_collection(self):
527         return self._remote_collection if self.update else self._local_collection
528
529     def _setup_state(self, update_collection):
530         """
531         Create a new cache file or load a previously existing one.
532         """
533         # Load an already existing collection for update
534         if update_collection and re.match(arvados.util.collection_uuid_pattern,
535                                           update_collection):
536             try:
537                 self._remote_collection = arvados.collection.Collection(update_collection)
538             except arvados.errors.ApiError as error:
539                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
540             else:
541                 self.update = True
542         elif update_collection:
543             # Collection locator provided, but unknown format
544             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
545
546         # Set up cache file name from input paths.
547         md5 = hashlib.md5()
548         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
549         realpaths = sorted(os.path.realpath(path) for path in self.paths)
550         md5.update('\0'.join(realpaths))
551         if self.filename:
552             md5.update(self.filename)
553         cache_filename = md5.hexdigest()
554         self._cache_file = open(os.path.join(
555             arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
556             cache_filename), 'a+')
557         self._cache_filename = self._cache_file.name
558         self._lock_file(self._cache_file)
559         self._cache_file.seek(0)
560         with self._state_lock:
561             try:
562                 self._state = json.load(self._cache_file)
563                 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
564                     # Cache at least partially incomplete, set up new cache
565                     self._state = copy.deepcopy(self.EMPTY_STATE)
566             except ValueError:
567                 # Cache file empty, set up new cache
568                 self._state = copy.deepcopy(self.EMPTY_STATE)
569
570             # Load the previous manifest so we can check if files were modified remotely.
571             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
572         # Load how many bytes were uploaded on previous run
573         with self._collection_lock:
574             self.bytes_written = self._collection_size(self._local_collection)
575
576     def _lock_file(self, fileobj):
577         try:
578             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
579         except IOError:
580             raise ResumeCacheConflict("{} locked".format(fileobj.name))
581
582     def _save_state(self):
583         """
584         Atomically save current state into cache.
585         """
586         try:
587             with self._state_lock:
588                 state = self._state
589             new_cache_fd, new_cache_name = tempfile.mkstemp(
590                 dir=os.path.dirname(self._cache_filename))
591             self._lock_file(new_cache_fd)
592             new_cache = os.fdopen(new_cache_fd, 'r+')
593             json.dump(state, new_cache)
594             new_cache.flush()
595             os.fsync(new_cache)
596             os.rename(new_cache_name, self._cache_filename)
597         except (IOError, OSError, ResumeCacheConflict) as error:
598             self.logger.error("There was a problem while saving the cache file: {}".format(error))
599             try:
600                 os.unlink(new_cache_name)
601             except NameError:  # mkstemp failed.
602                 pass
603         else:
604             self._cache_file.close()
605             self._cache_file = new_cache
606
607     def collection_name(self):
608         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
609
610     def manifest_locator(self):
611         return self._my_collection().manifest_locator()
612
613     def portable_data_hash(self):
614         return self._my_collection().portable_data_hash()
615
616     def manifest_text(self, stream_name=".", strip=False, normalize=False):
617         return self._my_collection().manifest_text(stream_name, strip, normalize)
618
619     def _datablocks_on_item(self, item):
620         """
621         Return a list of datablock locators, recursively navigating
622         through subcollections
623         """
624         if isinstance(item, arvados.arvfile.ArvadosFile):
625             if item.size() == 0:
626                 # Empty file locator
627                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
628             else:
629                 locators = []
630                 for segment in item.segments():
631                     loc = segment.locator
632                     locators.append(loc)
633                 return locators
634         elif isinstance(item, arvados.collection.Collection):
635             l = [self._datablocks_on_item(x) for x in item.values()]
636             # Fast list flattener method taken from:
637             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
638             return [loc for sublist in l for loc in sublist]
639         else:
640             return None
641
642     def data_locators(self):
643         with self._collection_lock:
644             # Make sure all datablocks are flushed before getting the locators
645             self._my_collection().manifest_text()
646             datablocks = self._datablocks_on_item(self._my_collection())
647         return datablocks
648
649
650 def expected_bytes_for(pathlist):
651     # Walk the given directory trees and stat files, adding up file sizes,
652     # so we can display progress as percent
653     bytesum = 0
654     for path in pathlist:
655         if os.path.isdir(path):
656             for filename in arvados.util.listdir_recursive(path):
657                 bytesum += os.path.getsize(os.path.join(path, filename))
658         elif not os.path.isfile(path):
659             return None
660         else:
661             bytesum += os.path.getsize(path)
662     return bytesum
663
664 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
665                                                             os.getpid())
666 def machine_progress(bytes_written, bytes_expected):
667     return _machine_format.format(
668         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
669
670 def human_progress(bytes_written, bytes_expected):
671     if bytes_expected:
672         return "\r{}M / {}M {:.1%} ".format(
673             bytes_written >> 20, bytes_expected >> 20,
674             float(bytes_written) / bytes_expected)
675     else:
676         return "\r{} ".format(bytes_written)
677
678 def progress_writer(progress_func, outfile=sys.stderr):
679     def write_progress(bytes_written, bytes_expected):
680         outfile.write(progress_func(bytes_written, bytes_expected))
681     return write_progress
682
683 def exit_signal_handler(sigcode, frame):
684     sys.exit(-sigcode)
685
686 def desired_project_uuid(api_client, project_uuid, num_retries):
687     if not project_uuid:
688         query = api_client.users().current()
689     elif arvados.util.user_uuid_pattern.match(project_uuid):
690         query = api_client.users().get(uuid=project_uuid)
691     elif arvados.util.group_uuid_pattern.match(project_uuid):
692         query = api_client.groups().get(uuid=project_uuid)
693     else:
694         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
695     return query.execute(num_retries=num_retries)['uuid']
696
697 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
698     global api_client
699
700     args = parse_arguments(arguments)
701     status = 0
702     if api_client is None:
703         api_client = arvados.api('v1')
704
705     # Determine the name to use
706     if args.name:
707         if args.stream or args.raw:
708             print >>stderr, "Cannot use --name with --stream or --raw"
709             sys.exit(1)
710         collection_name = args.name
711     else:
712         collection_name = "Saved at {} by {}@{}".format(
713             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
714             pwd.getpwuid(os.getuid()).pw_name,
715             socket.gethostname())
716
717     if args.project_uuid and (args.stream or args.raw):
718         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
719         sys.exit(1)
720
721     # Determine the parent project
722     try:
723         project_uuid = desired_project_uuid(api_client, args.project_uuid,
724                                             args.retries)
725     except (apiclient_errors.Error, ValueError) as error:
726         print >>stderr, error
727         sys.exit(1)
728
729     if args.progress:
730         reporter = progress_writer(human_progress)
731     elif args.batch_progress:
732         reporter = progress_writer(machine_progress)
733     else:
734         reporter = None
735
736     bytes_expected = expected_bytes_for(args.paths)
737
738     try:
739         writer = ArvPutUploadJob(paths = args.paths,
740                                  resume = args.resume,
741                                  filename = args.filename,
742                                  reporter = reporter,
743                                  bytes_expected = bytes_expected,
744                                  num_retries = args.retries,
745                                  replication_desired = args.replication,
746                                  name = collection_name,
747                                  owner_uuid = project_uuid,
748                                  ensure_unique_name = True,
749                                  update_collection = args.update_collection)
750     except ResumeCacheConflict:
751         print >>stderr, "\n".join([
752             "arv-put: Another process is already uploading this data.",
753             "         Use --no-resume if this is really what you want."])
754         sys.exit(1)
755     except CollectionUpdateError as error:
756         print >>stderr, "\n".join([
757             "arv-put: %s" % str(error)])
758         sys.exit(1)
759
760     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
761     # the originals.
762     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
763                             for sigcode in CAUGHT_SIGNALS}
764
765     if not args.update_collection and args.resume and writer.bytes_written > 0:
766         print >>stderr, "\n".join([
767                 "arv-put: Resuming previous upload from last checkpoint.",
768                 "         Use the --no-resume option to start over."])
769
770     writer.report_progress()
771     output = None
772     try:
773         writer.start(save_collection=not(args.stream or args.raw))
774     except arvados.errors.ApiError as error:
775         print >>stderr, "\n".join([
776             "arv-put: %s" % str(error)])
777         sys.exit(1)
778
779     if args.progress:  # Print newline to split stderr from stdout for humans.
780         print >>stderr
781
782     if args.stream:
783         if args.normalize:
784             output = writer.manifest_text(normalize=True)
785         else:
786             output = writer.manifest_text()
787     elif args.raw:
788         output = ','.join(writer.data_locators())
789     else:
790         try:
791             if args.update_collection:
792                 print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
793             else:
794                 print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
795             if args.portable_data_hash:
796                 output = writer.portable_data_hash()
797             else:
798                 output = writer.manifest_locator()
799         except apiclient_errors.Error as error:
800             print >>stderr, (
801                 "arv-put: Error creating Collection on project: {}.".format(
802                     error))
803             status = 1
804
805     # Print the locator (uuid) of the new collection.
806     if output is None:
807         status = status or 1
808     else:
809         stdout.write(output)
810         if not output.endswith('\n'):
811             stdout.write('\n')
812
813     for sigcode, orig_handler in orig_signal_handlers.items():
814         signal.signal(sigcode, orig_handler)
815
816     if status != 0:
817         sys.exit(status)
818
819     # Success!
820     return output
821
822
823 if __name__ == '__main__':
824     main()