10585: Merge branch 'master' into 10587-python-cli-version
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import time
18 import signal
19 import socket
20 import sys
21 import tempfile
22 import threading
23 import copy
24 import logging
25 from apiclient import errors as apiclient_errors
26 from arvados._version import __version__
27
28 import arvados.commands._util as arv_cmd
29
30 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
31 api_client = None
32
33 upload_opts = argparse.ArgumentParser(add_help=False)
34
35 upload_opts.add_argument('--version', action='version',
36                          version="%s %s" % (sys.argv[0], __version__),
37                          help='Print version and exit.')
38 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
39                          help="""
40 Local file or directory. Default: read from standard input.
41 """)
42
43 _group = upload_opts.add_mutually_exclusive_group()
44
45 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
46                     default=-1, help="""
47 Maximum depth of directory tree to represent in the manifest
48 structure. A directory structure deeper than this will be represented
49 as a single stream in the manifest. If N=0, the manifest will contain
50 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
51 stream per filesystem directory that contains files.
52 """)
53
54 _group.add_argument('--normalize', action='store_true',
55                     help="""
56 Normalize the manifest by re-ordering files and streams after writing
57 data.
58 """)
59
60 _group = upload_opts.add_mutually_exclusive_group()
61
62 _group.add_argument('--as-stream', action='store_true', dest='stream',
63                     help="""
64 Synonym for --stream.
65 """)
66
67 _group.add_argument('--stream', action='store_true',
68                     help="""
69 Store the file content and display the resulting manifest on
70 stdout. Do not write the manifest to Keep or save a Collection object
71 in Arvados.
72 """)
73
74 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
75                     help="""
76 Synonym for --manifest.
77 """)
78
79 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
80                     help="""
81 Synonym for --manifest.
82 """)
83
84 _group.add_argument('--manifest', action='store_true',
85                     help="""
86 Store the file data and resulting manifest in Keep, save a Collection
87 object in Arvados, and display the manifest locator (Collection uuid)
88 on stdout. This is the default behavior.
89 """)
90
91 _group.add_argument('--as-raw', action='store_true', dest='raw',
92                     help="""
93 Synonym for --raw.
94 """)
95
96 _group.add_argument('--raw', action='store_true',
97                     help="""
98 Store the file content and display the data block locators on stdout,
99 separated by commas, with a trailing newline. Do not store a
100 manifest.
101 """)
102
103 upload_opts.add_argument('--use-filename', type=str, default=None,
104                          dest='filename', help="""
105 Synonym for --filename.
106 """)
107
108 upload_opts.add_argument('--filename', type=str, default=None,
109                          help="""
110 Use the given filename in the manifest, instead of the name of the
111 local file. This is useful when "-" or "/dev/stdin" is given as an
112 input file. It can be used only if there is exactly one path given and
113 it is not a directory. Implies --manifest.
114 """)
115
116 upload_opts.add_argument('--portable-data-hash', action='store_true',
117                          help="""
118 Print the portable data hash instead of the Arvados UUID for the collection
119 created by the upload.
120 """)
121
122 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
123                          help="""
124 Set the replication level for the new collection: how many different
125 physical storage devices (e.g., disks) should have a copy of each data
126 block. Default is to use the server-provided default (if any) or 2.
127 """)
128
129 run_opts = argparse.ArgumentParser(add_help=False)
130
131 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
132 Store the collection in the specified project, instead of your Home
133 project.
134 """)
135
136 run_opts.add_argument('--name', help="""
137 Save the collection with the specified name.
138 """)
139
140 _group = run_opts.add_mutually_exclusive_group()
141 _group.add_argument('--progress', action='store_true',
142                     help="""
143 Display human-readable progress on stderr (bytes and, if possible,
144 percentage of total data size). This is the default behavior when
145 stderr is a tty.
146 """)
147
148 _group.add_argument('--no-progress', action='store_true',
149                     help="""
150 Do not display human-readable progress on stderr, even if stderr is a
151 tty.
152 """)
153
154 _group.add_argument('--batch-progress', action='store_true',
155                     help="""
156 Display machine-readable progress on stderr (bytes and, if known,
157 total data size).
158 """)
159
160 _group = run_opts.add_mutually_exclusive_group()
161 _group.add_argument('--resume', action='store_true', default=True,
162                     help="""
163 Continue interrupted uploads from cached state (default).
164 """)
165 _group.add_argument('--no-resume', action='store_false', dest='resume',
166                     help="""
167 Do not continue interrupted uploads from cached state.
168 """)
169
170 arg_parser = argparse.ArgumentParser(
171     description='Copy data from the local filesystem to Keep.',
172     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
173
174 def parse_arguments(arguments):
175     args = arg_parser.parse_args(arguments)
176
177     if len(args.paths) == 0:
178         args.paths = ['-']
179
180     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
181
182     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
183         if args.filename:
184             arg_parser.error("""
185     --filename argument cannot be used when storing a directory or
186     multiple files.
187     """)
188
189     # Turn on --progress by default if stderr is a tty.
190     if (not (args.batch_progress or args.no_progress)
191         and os.isatty(sys.stderr.fileno())):
192         args.progress = True
193
194     if args.paths == ['-']:
195         args.resume = False
196         if not args.filename:
197             args.filename = 'stdin'
198
199     return args
200
201 class ResumeCacheConflict(Exception):
202     pass
203
204
205 class ResumeCache(object):
206     CACHE_DIR = '.cache/arvados/arv-put'
207
208     def __init__(self, file_spec):
209         self.cache_file = open(file_spec, 'a+')
210         self._lock_file(self.cache_file)
211         self.filename = self.cache_file.name
212
213     @classmethod
214     def make_path(cls, args):
215         md5 = hashlib.md5()
216         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
217         realpaths = sorted(os.path.realpath(path) for path in args.paths)
218         md5.update('\0'.join(realpaths))
219         if any(os.path.isdir(path) for path in realpaths):
220             md5.update(str(max(args.max_manifest_depth, -1)))
221         elif args.filename:
222             md5.update(args.filename)
223         return os.path.join(
224             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
225             md5.hexdigest())
226
227     def _lock_file(self, fileobj):
228         try:
229             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
230         except IOError:
231             raise ResumeCacheConflict("{} locked".format(fileobj.name))
232
233     def load(self):
234         self.cache_file.seek(0)
235         return json.load(self.cache_file)
236
237     def check_cache(self, api_client=None, num_retries=0):
238         try:
239             state = self.load()
240             locator = None
241             try:
242                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
243                     locator = state["_finished_streams"][0][1][0]
244                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
245                     locator = state["_current_stream_locators"][0]
246                 if locator is not None:
247                     kc = arvados.keep.KeepClient(api_client=api_client)
248                     kc.head(locator, num_retries=num_retries)
249             except Exception as e:
250                 self.restart()
251         except (ValueError):
252             pass
253
254     def save(self, data):
255         try:
256             new_cache_fd, new_cache_name = tempfile.mkstemp(
257                 dir=os.path.dirname(self.filename))
258             self._lock_file(new_cache_fd)
259             new_cache = os.fdopen(new_cache_fd, 'r+')
260             json.dump(data, new_cache)
261             os.rename(new_cache_name, self.filename)
262         except (IOError, OSError, ResumeCacheConflict) as error:
263             try:
264                 os.unlink(new_cache_name)
265             except NameError:  # mkstemp failed.
266                 pass
267         else:
268             self.cache_file.close()
269             self.cache_file = new_cache
270
271     def close(self):
272         self.cache_file.close()
273
274     def destroy(self):
275         try:
276             os.unlink(self.filename)
277         except OSError as error:
278             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
279                 raise
280         self.close()
281
282     def restart(self):
283         self.destroy()
284         self.__init__(self.filename)
285
286
287 class ArvPutUploadJob(object):
288     CACHE_DIR = '.cache/arvados/arv-put'
289     EMPTY_STATE = {
290         'manifest' : None, # Last saved manifest checkpoint
291         'files' : {} # Previous run file list: {path : {size, mtime}}
292     }
293
294     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
295                  name=None, owner_uuid=None, ensure_unique_name=False,
296                  num_retries=None, replication_desired=None,
297                  filename=None, update_time=1.0):
298         self.paths = paths
299         self.resume = resume
300         self.reporter = reporter
301         self.bytes_expected = bytes_expected
302         self.bytes_written = 0
303         self.bytes_skipped = 0
304         self.name = name
305         self.owner_uuid = owner_uuid
306         self.ensure_unique_name = ensure_unique_name
307         self.num_retries = num_retries
308         self.replication_desired = replication_desired
309         self.filename = filename
310         self._state_lock = threading.Lock()
311         self._state = None # Previous run state (file list & manifest)
312         self._current_files = [] # Current run file list
313         self._cache_file = None
314         self._collection = None
315         self._collection_lock = threading.Lock()
316         self._stop_checkpointer = threading.Event()
317         self._checkpointer = threading.Thread(target=self._update_task)
318         self._update_task_time = update_time  # How many seconds wait between update runs
319         self.logger = logging.getLogger('arvados.arv_put')
320         # Load cached data if any and if needed
321         self._setup_state()
322
323     def start(self):
324         """
325         Start supporting thread & file uploading
326         """
327         self._checkpointer.daemon = True
328         self._checkpointer.start()
329         try:
330             for path in self.paths:
331                 # Test for stdin first, in case some file named '-' exist
332                 if path == '-':
333                     self._write_stdin(self.filename or 'stdin')
334                 elif os.path.isdir(path):
335                     self._write_directory_tree(path)
336                 else:
337                     self._write_file(path, self.filename or os.path.basename(path))
338         finally:
339             # Stop the thread before doing anything else
340             self._stop_checkpointer.set()
341             self._checkpointer.join()
342             # Commit all & one last _update()
343             self.manifest_text()
344             self._update()
345             if self.resume:
346                 self._cache_file.close()
347                 # Correct the final written bytes count
348                 self.bytes_written -= self.bytes_skipped
349
350     def save_collection(self):
351         with self._collection_lock:
352             self._my_collection().save_new(
353                 name=self.name, owner_uuid=self.owner_uuid,
354                 ensure_unique_name=self.ensure_unique_name,
355                 num_retries=self.num_retries)
356
357     def destroy_cache(self):
358         if self.resume:
359             try:
360                 os.unlink(self._cache_filename)
361             except OSError as error:
362                 # That's what we wanted anyway.
363                 if error.errno != errno.ENOENT:
364                     raise
365             self._cache_file.close()
366
367     def _collection_size(self, collection):
368         """
369         Recursively get the total size of the collection
370         """
371         size = 0
372         for item in collection.values():
373             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
374                 size += self._collection_size(item)
375             else:
376                 size += item.size()
377         return size
378
379     def _update_task(self):
380         """
381         Periodically called support task. File uploading is
382         asynchronous so we poll status from the collection.
383         """
384         while not self._stop_checkpointer.wait(self._update_task_time):
385             self._update()
386
387     def _update(self):
388         """
389         Update cached manifest text and report progress.
390         """
391         with self._collection_lock:
392             self.bytes_written = self._collection_size(self._my_collection())
393             # Update cache, if resume enabled
394             if self.resume:
395                 with self._state_lock:
396                     # Get the manifest text without comitting pending blocks
397                     self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
398                 self._save_state()
399         # Call the reporter, if any
400         self.report_progress()
401
402     def report_progress(self):
403         if self.reporter is not None:
404             self.reporter(self.bytes_written, self.bytes_expected)
405
406     def _write_directory_tree(self, path, stream_name="."):
407         # TODO: Check what happens when multiple directories are passed as
408         # arguments.
409         # If the code below is uncommented, integration test
410         # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
411         # fails, I suppose it is because the manifest_uuid changes because
412         # of the dir addition to stream_name.
413
414         # if stream_name == '.':
415         #     stream_name = os.path.join('.', os.path.basename(path))
416         for item in os.listdir(path):
417             if os.path.isdir(os.path.join(path, item)):
418                 self._write_directory_tree(os.path.join(path, item),
419                                 os.path.join(stream_name, item))
420             else:
421                 self._write_file(os.path.join(path, item),
422                                 os.path.join(stream_name, item))
423
424     def _write_stdin(self, filename):
425         with self._collection_lock:
426             output = self._my_collection().open(filename, 'w')
427         self._write(sys.stdin, output)
428         output.close()
429
430     def _write_file(self, source, filename):
431         resume_offset = 0
432         if self.resume:
433             # Check if file was already uploaded (at least partially)
434             with self._collection_lock:
435                 try:
436                     file_in_collection = self._my_collection().find(filename)
437                 except IOError:
438                     # Not found
439                     file_in_collection = None
440             # If no previous cached data on this file, store it for an eventual
441             # repeated run.
442             if source not in self._state['files']:
443                 with self._state_lock:
444                     self._state['files'][source] = {
445                         'mtime': os.path.getmtime(source),
446                         'size' : os.path.getsize(source)
447                     }
448             with self._state_lock:
449                 cached_file_data = self._state['files'][source]
450             # See if this file was already uploaded at least partially
451             if file_in_collection:
452                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
453                     if cached_file_data['size'] == file_in_collection.size():
454                         # File already there, skip it.
455                         self.bytes_skipped += cached_file_data['size']
456                         return
457                     elif cached_file_data['size'] > file_in_collection.size():
458                         # File partially uploaded, resume!
459                         resume_offset = file_in_collection.size()
460                     else:
461                         # Inconsistent cache, re-upload the file
462                         self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
463                 else:
464                     # Local file differs from cached data, re-upload it
465                     pass
466         with open(source, 'r') as source_fd:
467             if resume_offset > 0:
468                 # Start upload where we left off
469                 with self._collection_lock:
470                     output = self._my_collection().open(filename, 'a')
471                 source_fd.seek(resume_offset)
472                 self.bytes_skipped += resume_offset
473             else:
474                 # Start from scratch
475                 with self._collection_lock:
476                     output = self._my_collection().open(filename, 'w')
477             self._write(source_fd, output)
478             output.close(flush=False)
479
480     def _write(self, source_fd, output):
481         first_read = True
482         while True:
483             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
484             # Allow an empty file to be written
485             if not data and not first_read:
486                 break
487             if first_read:
488                 first_read = False
489             output.write(data)
490
491     def _my_collection(self):
492         """
493         Create a new collection if none cached. Load it from cache otherwise.
494         """
495         if self._collection is None:
496             with self._state_lock:
497                 manifest = self._state['manifest']
498             if self.resume and manifest is not None:
499                 # Create collection from saved state
500                 self._collection = arvados.collection.Collection(
501                     manifest,
502                     replication_desired=self.replication_desired)
503             else:
504                 # Create new collection
505                 self._collection = arvados.collection.Collection(
506                     replication_desired=self.replication_desired)
507         return self._collection
508
509     def _setup_state(self):
510         """
511         Create a new cache file or load a previously existing one.
512         """
513         if self.resume:
514             md5 = hashlib.md5()
515             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
516             realpaths = sorted(os.path.realpath(path) for path in self.paths)
517             md5.update('\0'.join(realpaths))
518             if self.filename:
519                 md5.update(self.filename)
520             cache_filename = md5.hexdigest()
521             self._cache_file = open(os.path.join(
522                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
523                 cache_filename), 'a+')
524             self._cache_filename = self._cache_file.name
525             self._lock_file(self._cache_file)
526             self._cache_file.seek(0)
527             with self._state_lock:
528                 try:
529                     self._state = json.load(self._cache_file)
530                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
531                         # Cache at least partially incomplete, set up new cache
532                         self._state = copy.deepcopy(self.EMPTY_STATE)
533                 except ValueError:
534                     # Cache file empty, set up new cache
535                     self._state = copy.deepcopy(self.EMPTY_STATE)
536             # Load how many bytes were uploaded on previous run
537             with self._collection_lock:
538                 self.bytes_written = self._collection_size(self._my_collection())
539         # No resume required
540         else:
541             with self._state_lock:
542                 self._state = copy.deepcopy(self.EMPTY_STATE)
543
544     def _lock_file(self, fileobj):
545         try:
546             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
547         except IOError:
548             raise ResumeCacheConflict("{} locked".format(fileobj.name))
549
550     def _save_state(self):
551         """
552         Atomically save current state into cache.
553         """
554         try:
555             with self._state_lock:
556                 state = self._state
557             new_cache_fd, new_cache_name = tempfile.mkstemp(
558                 dir=os.path.dirname(self._cache_filename))
559             self._lock_file(new_cache_fd)
560             new_cache = os.fdopen(new_cache_fd, 'r+')
561             json.dump(state, new_cache)
562             new_cache.flush()
563             os.fsync(new_cache)
564             os.rename(new_cache_name, self._cache_filename)
565         except (IOError, OSError, ResumeCacheConflict) as error:
566             self.logger.error("There was a problem while saving the cache file: {}".format(error))
567             try:
568                 os.unlink(new_cache_name)
569             except NameError:  # mkstemp failed.
570                 pass
571         else:
572             self._cache_file.close()
573             self._cache_file = new_cache
574
575     def collection_name(self):
576         with self._collection_lock:
577             name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
578         return name
579
580     def manifest_locator(self):
581         with self._collection_lock:
582             locator = self._my_collection().manifest_locator()
583         return locator
584
585     def portable_data_hash(self):
586         with self._collection_lock:
587             datahash = self._my_collection().portable_data_hash()
588         return datahash
589
590     def manifest_text(self, stream_name=".", strip=False, normalize=False):
591         with self._collection_lock:
592             manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
593         return manifest
594
595     def _datablocks_on_item(self, item):
596         """
597         Return a list of datablock locators, recursively navigating
598         through subcollections
599         """
600         if isinstance(item, arvados.arvfile.ArvadosFile):
601             if item.size() == 0:
602                 # Empty file locator
603                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
604             else:
605                 locators = []
606                 for segment in item.segments():
607                     loc = segment.locator
608                     locators.append(loc)
609                 return locators
610         elif isinstance(item, arvados.collection.Collection):
611             l = [self._datablocks_on_item(x) for x in item.values()]
612             # Fast list flattener method taken from:
613             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
614             return [loc for sublist in l for loc in sublist]
615         else:
616             return None
617
618     def data_locators(self):
619         with self._collection_lock:
620             # Make sure all datablocks are flushed before getting the locators
621             self._my_collection().manifest_text()
622             datablocks = self._datablocks_on_item(self._my_collection())
623         return datablocks
624
625
626 def expected_bytes_for(pathlist):
627     # Walk the given directory trees and stat files, adding up file sizes,
628     # so we can display progress as percent
629     bytesum = 0
630     for path in pathlist:
631         if os.path.isdir(path):
632             for filename in arvados.util.listdir_recursive(path):
633                 bytesum += os.path.getsize(os.path.join(path, filename))
634         elif not os.path.isfile(path):
635             return None
636         else:
637             bytesum += os.path.getsize(path)
638     return bytesum
639
640 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
641                                                             os.getpid())
642 def machine_progress(bytes_written, bytes_expected):
643     return _machine_format.format(
644         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
645
646 def human_progress(bytes_written, bytes_expected):
647     if bytes_expected:
648         return "\r{}M / {}M {:.1%} ".format(
649             bytes_written >> 20, bytes_expected >> 20,
650             float(bytes_written) / bytes_expected)
651     else:
652         return "\r{} ".format(bytes_written)
653
654 def progress_writer(progress_func, outfile=sys.stderr):
655     def write_progress(bytes_written, bytes_expected):
656         outfile.write(progress_func(bytes_written, bytes_expected))
657     return write_progress
658
659 def exit_signal_handler(sigcode, frame):
660     sys.exit(-sigcode)
661
662 def desired_project_uuid(api_client, project_uuid, num_retries):
663     if not project_uuid:
664         query = api_client.users().current()
665     elif arvados.util.user_uuid_pattern.match(project_uuid):
666         query = api_client.users().get(uuid=project_uuid)
667     elif arvados.util.group_uuid_pattern.match(project_uuid):
668         query = api_client.groups().get(uuid=project_uuid)
669     else:
670         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
671     return query.execute(num_retries=num_retries)['uuid']
672
673 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
674     global api_client
675
676     args = parse_arguments(arguments)
677     status = 0
678     if api_client is None:
679         api_client = arvados.api('v1')
680
681     # Determine the name to use
682     if args.name:
683         if args.stream or args.raw:
684             print >>stderr, "Cannot use --name with --stream or --raw"
685             sys.exit(1)
686         collection_name = args.name
687     else:
688         collection_name = "Saved at {} by {}@{}".format(
689             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
690             pwd.getpwuid(os.getuid()).pw_name,
691             socket.gethostname())
692
693     if args.project_uuid and (args.stream or args.raw):
694         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
695         sys.exit(1)
696
697     # Determine the parent project
698     try:
699         project_uuid = desired_project_uuid(api_client, args.project_uuid,
700                                             args.retries)
701     except (apiclient_errors.Error, ValueError) as error:
702         print >>stderr, error
703         sys.exit(1)
704
705     if args.progress:
706         reporter = progress_writer(human_progress)
707     elif args.batch_progress:
708         reporter = progress_writer(machine_progress)
709     else:
710         reporter = None
711
712     bytes_expected = expected_bytes_for(args.paths)
713     try:
714         writer = ArvPutUploadJob(paths = args.paths,
715                                  resume = args.resume,
716                                  filename = args.filename,
717                                  reporter = reporter,
718                                  bytes_expected = bytes_expected,
719                                  num_retries = args.retries,
720                                  replication_desired = args.replication,
721                                  name = collection_name,
722                                  owner_uuid = project_uuid,
723                                  ensure_unique_name = True)
724     except ResumeCacheConflict:
725         print >>stderr, "\n".join([
726             "arv-put: Another process is already uploading this data.",
727             "         Use --no-resume if this is really what you want."])
728         sys.exit(1)
729
730     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
731     # the originals.
732     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
733                             for sigcode in CAUGHT_SIGNALS}
734
735     if args.resume and writer.bytes_written > 0:
736         print >>stderr, "\n".join([
737                 "arv-put: Resuming previous upload from last checkpoint.",
738                 "         Use the --no-resume option to start over."])
739
740     writer.report_progress()
741     output = None
742     writer.start()
743     if args.progress:  # Print newline to split stderr from stdout for humans.
744         print >>stderr
745
746     if args.stream:
747         if args.normalize:
748             output = writer.manifest_text(normalize=True)
749         else:
750             output = writer.manifest_text()
751     elif args.raw:
752         output = ','.join(writer.data_locators())
753     else:
754         try:
755             writer.save_collection()
756             print >>stderr, "Collection saved as '%s'" % writer.collection_name()
757             if args.portable_data_hash:
758                 output = writer.portable_data_hash()
759             else:
760                 output = writer.manifest_locator()
761         except apiclient_errors.Error as error:
762             print >>stderr, (
763                 "arv-put: Error creating Collection on project: {}.".format(
764                     error))
765             status = 1
766
767     # Print the locator (uuid) of the new collection.
768     if output is None:
769         status = status or 1
770     else:
771         stdout.write(output)
772         if not output.endswith('\n'):
773             stdout.write('\n')
774
775     for sigcode, orig_handler in orig_signal_handlers.items():
776         signal.signal(sigcode, orig_handler)
777
778     if status != 0:
779         sys.exit(status)
780
781     # Success!
782     writer.destroy_cache()
783     return output
784
785
786 if __name__ == '__main__':
787     main()