9463: Setting up the checkpointer thread as a daemon so that it finishes when the...
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import time
18 import signal
19 import socket
20 import sys
21 import tempfile
22 import threading
23 import copy
24 import logging
25 from apiclient import errors as apiclient_errors
26
27 import arvados.commands._util as arv_cmd
28
29 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
30 api_client = None
31
32 upload_opts = argparse.ArgumentParser(add_help=False)
33
34 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
35                          help="""
36 Local file or directory. Default: read from standard input.
37 """)
38
39 _group = upload_opts.add_mutually_exclusive_group()
40
41 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
42                     default=-1, help="""
43 Maximum depth of directory tree to represent in the manifest
44 structure. A directory structure deeper than this will be represented
45 as a single stream in the manifest. If N=0, the manifest will contain
46 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
47 stream per filesystem directory that contains files.
48 """)
49
50 _group.add_argument('--normalize', action='store_true',
51                     help="""
52 Normalize the manifest by re-ordering files and streams after writing
53 data.
54 """)
55
56 _group = upload_opts.add_mutually_exclusive_group()
57
58 _group.add_argument('--as-stream', action='store_true', dest='stream',
59                     help="""
60 Synonym for --stream.
61 """)
62
63 _group.add_argument('--stream', action='store_true',
64                     help="""
65 Store the file content and display the resulting manifest on
66 stdout. Do not write the manifest to Keep or save a Collection object
67 in Arvados.
68 """)
69
70 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
71                     help="""
72 Synonym for --manifest.
73 """)
74
75 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
76                     help="""
77 Synonym for --manifest.
78 """)
79
80 _group.add_argument('--manifest', action='store_true',
81                     help="""
82 Store the file data and resulting manifest in Keep, save a Collection
83 object in Arvados, and display the manifest locator (Collection uuid)
84 on stdout. This is the default behavior.
85 """)
86
87 _group.add_argument('--as-raw', action='store_true', dest='raw',
88                     help="""
89 Synonym for --raw.
90 """)
91
92 _group.add_argument('--raw', action='store_true',
93                     help="""
94 Store the file content and display the data block locators on stdout,
95 separated by commas, with a trailing newline. Do not store a
96 manifest.
97 """)
98
99 upload_opts.add_argument('--use-filename', type=str, default=None,
100                          dest='filename', help="""
101 Synonym for --filename.
102 """)
103
104 upload_opts.add_argument('--filename', type=str, default=None,
105                          help="""
106 Use the given filename in the manifest, instead of the name of the
107 local file. This is useful when "-" or "/dev/stdin" is given as an
108 input file. It can be used only if there is exactly one path given and
109 it is not a directory. Implies --manifest.
110 """)
111
112 upload_opts.add_argument('--portable-data-hash', action='store_true',
113                          help="""
114 Print the portable data hash instead of the Arvados UUID for the collection
115 created by the upload.
116 """)
117
118 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
119                          help="""
120 Set the replication level for the new collection: how many different
121 physical storage devices (e.g., disks) should have a copy of each data
122 block. Default is to use the server-provided default (if any) or 2.
123 """)
124
125 run_opts = argparse.ArgumentParser(add_help=False)
126
127 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
128 Store the collection in the specified project, instead of your Home
129 project.
130 """)
131
132 run_opts.add_argument('--name', help="""
133 Save the collection with the specified name.
134 """)
135
136 _group = run_opts.add_mutually_exclusive_group()
137 _group.add_argument('--progress', action='store_true',
138                     help="""
139 Display human-readable progress on stderr (bytes and, if possible,
140 percentage of total data size). This is the default behavior when
141 stderr is a tty.
142 """)
143
144 _group.add_argument('--no-progress', action='store_true',
145                     help="""
146 Do not display human-readable progress on stderr, even if stderr is a
147 tty.
148 """)
149
150 _group.add_argument('--batch-progress', action='store_true',
151                     help="""
152 Display machine-readable progress on stderr (bytes and, if known,
153 total data size).
154 """)
155
156 _group = run_opts.add_mutually_exclusive_group()
157 _group.add_argument('--resume', action='store_true', default=True,
158                     help="""
159 Continue interrupted uploads from cached state (default).
160 """)
161 _group.add_argument('--no-resume', action='store_false', dest='resume',
162                     help="""
163 Do not continue interrupted uploads from cached state.
164 """)
165
166 arg_parser = argparse.ArgumentParser(
167     description='Copy data from the local filesystem to Keep.',
168     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
169
170 def parse_arguments(arguments):
171     args = arg_parser.parse_args(arguments)
172
173     if len(args.paths) == 0:
174         args.paths = ['-']
175
176     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
177
178     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
179         if args.filename:
180             arg_parser.error("""
181     --filename argument cannot be used when storing a directory or
182     multiple files.
183     """)
184
185     # Turn on --progress by default if stderr is a tty.
186     if (not (args.batch_progress or args.no_progress)
187         and os.isatty(sys.stderr.fileno())):
188         args.progress = True
189
190     if args.paths == ['-']:
191         args.resume = False
192         if not args.filename:
193             args.filename = 'stdin'
194
195     return args
196
197 class ResumeCacheConflict(Exception):
198     pass
199
200
201 class ResumeCache(object):
202     CACHE_DIR = '.cache/arvados/arv-put'
203
204     def __init__(self, file_spec):
205         self.cache_file = open(file_spec, 'a+')
206         self._lock_file(self.cache_file)
207         self.filename = self.cache_file.name
208
209     @classmethod
210     def make_path(cls, args):
211         md5 = hashlib.md5()
212         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
213         realpaths = sorted(os.path.realpath(path) for path in args.paths)
214         md5.update('\0'.join(realpaths))
215         if any(os.path.isdir(path) for path in realpaths):
216             md5.update(str(max(args.max_manifest_depth, -1)))
217         elif args.filename:
218             md5.update(args.filename)
219         return os.path.join(
220             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
221             md5.hexdigest())
222
223     def _lock_file(self, fileobj):
224         try:
225             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
226         except IOError:
227             raise ResumeCacheConflict("{} locked".format(fileobj.name))
228
229     def load(self):
230         self.cache_file.seek(0)
231         return json.load(self.cache_file)
232
233     def check_cache(self, api_client=None, num_retries=0):
234         try:
235             state = self.load()
236             locator = None
237             try:
238                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
239                     locator = state["_finished_streams"][0][1][0]
240                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
241                     locator = state["_current_stream_locators"][0]
242                 if locator is not None:
243                     kc = arvados.keep.KeepClient(api_client=api_client)
244                     kc.head(locator, num_retries=num_retries)
245             except Exception as e:
246                 self.restart()
247         except (ValueError):
248             pass
249
250     def save(self, data):
251         try:
252             new_cache_fd, new_cache_name = tempfile.mkstemp(
253                 dir=os.path.dirname(self.filename))
254             self._lock_file(new_cache_fd)
255             new_cache = os.fdopen(new_cache_fd, 'r+')
256             json.dump(data, new_cache)
257             os.rename(new_cache_name, self.filename)
258         except (IOError, OSError, ResumeCacheConflict) as error:
259             try:
260                 os.unlink(new_cache_name)
261             except NameError:  # mkstemp failed.
262                 pass
263         else:
264             self.cache_file.close()
265             self.cache_file = new_cache
266
267     def close(self):
268         self.cache_file.close()
269
270     def destroy(self):
271         try:
272             os.unlink(self.filename)
273         except OSError as error:
274             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
275                 raise
276         self.close()
277
278     def restart(self):
279         self.destroy()
280         self.__init__(self.filename)
281
282
283 class ArvPutUploadJob(object):
284     CACHE_DIR = '.cache/arvados/arv-put'
285     EMPTY_STATE = {
286         'manifest' : None, # Last saved manifest checkpoint
287         'files' : {} # Previous run file list: {path : {size, mtime}}
288     }
289
290     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
291                  name=None, owner_uuid=None, ensure_unique_name=False,
292                  num_retries=None, replication_desired=None,
293                  filename=None, update_time=60.0):
294         self.paths = paths
295         self.resume = resume
296         self.reporter = reporter
297         self.bytes_expected = bytes_expected
298         self.bytes_written = 0
299         self.bytes_skipped = 0
300         self.name = name
301         self.owner_uuid = owner_uuid
302         self.ensure_unique_name = ensure_unique_name
303         self.num_retries = num_retries
304         self.replication_desired = replication_desired
305         self.filename = filename
306         self._state_lock = threading.Lock()
307         self._state = None # Previous run state (file list & manifest)
308         self._current_files = [] # Current run file list
309         self._cache_file = None
310         self._collection = None
311         self._collection_lock = threading.Lock()
312         self._stop_checkpointer = threading.Event()
313         self._checkpointer = threading.Thread(target=self._update_task)
314         self._update_task_time = update_time  # How many seconds wait between update runs
315         self.logger = logging.getLogger('arvados.arv_put')
316         # Load cached data if any and if needed
317         self._setup_state()
318
319     def start(self):
320         """
321         Start supporting thread & file uploading
322         """
323         self._checkpointer.daemon = True
324         self._checkpointer.start()
325         try:
326             for path in self.paths:
327                 # Test for stdin first, in case some file named '-' exist
328                 if path == '-':
329                     self._write_stdin(self.filename or 'stdin')
330                 elif os.path.isdir(path):
331                     self._write_directory_tree(path)
332                 else:
333                     self._write_file(path, self.filename or os.path.basename(path))
334         finally:
335             # Stop the thread before doing anything else
336             self._stop_checkpointer.set()
337             self._checkpointer.join()
338             # Commit all & one last _update()
339             self.manifest_text()
340             self._update()
341             if self.resume:
342                 self._cache_file.close()
343                 # Correct the final written bytes count
344                 self.bytes_written -= self.bytes_skipped
345
346     def save_collection(self):
347         with self._collection_lock:
348             self._my_collection().save_new(
349                 name=self.name, owner_uuid=self.owner_uuid,
350                 ensure_unique_name=self.ensure_unique_name,
351                 num_retries=self.num_retries)
352
353     def destroy_cache(self):
354         if self.resume:
355             try:
356                 os.unlink(self._cache_filename)
357             except OSError as error:
358                 # That's what we wanted anyway.
359                 if error.errno != errno.ENOENT:
360                     raise
361             self._cache_file.close()
362
363     def _collection_size(self, collection):
364         """
365         Recursively get the total size of the collection
366         """
367         size = 0
368         for item in collection.values():
369             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
370                 size += self._collection_size(item)
371             else:
372                 size += item.size()
373         return size
374
375     def _update_task(self):
376         """
377         Periodically called support task. File uploading is
378         asynchronous so we poll status from the collection.
379         """
380         while not self._stop_checkpointer.wait(self._update_task_time):
381             self._update()
382
383     def _update(self):
384         """
385         Update cached manifest text and report progress.
386         """
387         with self._collection_lock:
388             self.bytes_written = self._collection_size(self._my_collection())
389             # Update cache, if resume enabled
390             if self.resume:
391                 with self._state_lock:
392                     # Get the manifest text without comitting pending blocks
393                     self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
394         if self.resume:
395             self._save_state()
396         # Call the reporter, if any
397         self.report_progress()
398
399     def report_progress(self):
400         if self.reporter is not None:
401             self.reporter(self.bytes_written, self.bytes_expected)
402
403     def _write_directory_tree(self, path, stream_name="."):
404         # TODO: Check what happens when multiple directories are passed as
405         # arguments.
406         # If the code below is uncommented, integration test
407         # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
408         # fails, I suppose it is because the manifest_uuid changes because
409         # of the dir addition to stream_name.
410
411         # if stream_name == '.':
412         #     stream_name = os.path.join('.', os.path.basename(path))
413         for item in os.listdir(path):
414             if os.path.isdir(os.path.join(path, item)):
415                 self._write_directory_tree(os.path.join(path, item),
416                                 os.path.join(stream_name, item))
417             else:
418                 self._write_file(os.path.join(path, item),
419                                 os.path.join(stream_name, item))
420
421     def _write_stdin(self, filename):
422         with self._collection_lock:
423             output = self._my_collection().open(filename, 'w')
424         self._write(sys.stdin, output)
425         output.close()
426
427     def _write_file(self, source, filename):
428         resume_offset = 0
429         if self.resume:
430             # Check if file was already uploaded (at least partially)
431             with self._collection_lock:
432                 try:
433                     file_in_collection = self._my_collection().find(filename)
434                 except IOError:
435                     # Not found
436                     file_in_collection = None
437             # If no previous cached data on this file, store it for an eventual
438             # repeated run.
439             if source not in self._state['files']:
440                 with self._state_lock:
441                     self._state['files'][source] = {
442                         'mtime': os.path.getmtime(source),
443                         'size' : os.path.getsize(source)
444                     }
445             with self._state_lock:
446                 cached_file_data = self._state['files'][source]
447             # See if this file was already uploaded at least partially
448             if file_in_collection:
449                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
450                     if cached_file_data['size'] == file_in_collection.size():
451                         # File already there, skip it.
452                         self.bytes_skipped += cached_file_data['size']
453                         return
454                     elif cached_file_data['size'] > file_in_collection.size():
455                         # File partially uploaded, resume!
456                         resume_offset = file_in_collection.size()
457                     else:
458                         # Inconsistent cache, re-upload the file
459                         self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
460                 else:
461                     # Local file differs from cached data, re-upload it
462                     pass
463         with open(source, 'r') as source_fd:
464             if resume_offset > 0:
465                 # Start upload where we left off
466                 with self._collection_lock:
467                     output = self._my_collection().open(filename, 'a')
468                 source_fd.seek(resume_offset)
469                 self.bytes_skipped += resume_offset
470             else:
471                 # Start from scratch
472                 with self._collection_lock:
473                     output = self._my_collection().open(filename, 'w')
474             self._write(source_fd, output)
475             output.close()
476
477     def _write(self, source_fd, output):
478         while True:
479             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
480             if not data:
481                 break
482             output.write(data)
483
484     def _my_collection(self):
485         """
486         Create a new collection if none cached. Load it from cache otherwise.
487         """
488         if self._collection is None:
489             with self._state_lock:
490                 manifest = self._state['manifest']
491             if self.resume and manifest is not None:
492                 # Create collection from saved state
493                 self._collection = arvados.collection.Collection(
494                     manifest,
495                     replication_desired=self.replication_desired)
496             else:
497                 # Create new collection
498                 self._collection = arvados.collection.Collection(
499                     replication_desired=self.replication_desired)
500         return self._collection
501
502     def _setup_state(self):
503         """
504         Create a new cache file or load a previously existing one.
505         """
506         if self.resume:
507             md5 = hashlib.md5()
508             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
509             realpaths = sorted(os.path.realpath(path) for path in self.paths)
510             md5.update('\0'.join(realpaths))
511             if self.filename:
512                 md5.update(self.filename)
513             cache_filename = md5.hexdigest()
514             self._cache_file = open(os.path.join(
515                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
516                 cache_filename), 'a+')
517             self._cache_filename = self._cache_file.name
518             self._lock_file(self._cache_file)
519             self._cache_file.seek(0)
520             with self._state_lock:
521                 try:
522                     self._state = json.load(self._cache_file)
523                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
524                         # Cache at least partially incomplete, set up new cache
525                         self._state = copy.deepcopy(self.EMPTY_STATE)
526                 except ValueError:
527                     # Cache file empty, set up new cache
528                     self._state = copy.deepcopy(self.EMPTY_STATE)
529             # Load how many bytes were uploaded on previous run
530             with self._collection_lock:
531                 self.bytes_written = self._collection_size(self._my_collection())
532         # No resume required
533         else:
534             with self._state_lock:
535                 self._state = copy.deepcopy(self.EMPTY_STATE)
536
537     def _lock_file(self, fileobj):
538         try:
539             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
540         except IOError:
541             raise ResumeCacheConflict("{} locked".format(fileobj.name))
542
543     def _save_state(self):
544         """
545         Atomically save current state into cache.
546         """
547         try:
548             with self._state_lock:
549                 state = self._state
550             new_cache_fd, new_cache_name = tempfile.mkstemp(
551                 dir=os.path.dirname(self._cache_filename))
552             self._lock_file(new_cache_fd)
553             new_cache = os.fdopen(new_cache_fd, 'r+')
554             json.dump(state, new_cache)
555             new_cache.flush()
556             os.fsync(new_cache)
557             os.rename(new_cache_name, self._cache_filename)
558         except (IOError, OSError, ResumeCacheConflict) as error:
559             self.logger.error("There was a problem while saving the cache file: {}".format(error))
560             try:
561                 os.unlink(new_cache_name)
562             except NameError:  # mkstemp failed.
563                 pass
564         else:
565             self._cache_file.close()
566             self._cache_file = new_cache
567
568     def collection_name(self):
569         with self._collection_lock:
570             name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
571         return name
572
573     def manifest_locator(self):
574         with self._collection_lock:
575             locator = self._my_collection().manifest_locator()
576         return locator
577
578     def portable_data_hash(self):
579         with self._collection_lock:
580             datahash = self._my_collection().portable_data_hash()
581         return datahash
582
583     def manifest_text(self, stream_name=".", strip=False, normalize=False):
584         with self._collection_lock:
585             manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
586         return manifest
587
588     def _datablocks_on_item(self, item):
589         """
590         Return a list of datablock locators, recursively navigating
591         through subcollections
592         """
593         if isinstance(item, arvados.arvfile.ArvadosFile):
594             locators = []
595             for segment in item.segments():
596                 loc = segment.locator
597                 locators.append(loc)
598             return locators
599         elif isinstance(item, arvados.collection.Collection):
600             l = [self._datablocks_on_item(x) for x in item.values()]
601             # Fast list flattener method taken from:
602             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
603             return [loc for sublist in l for loc in sublist]
604         else:
605             return None
606
607     def data_locators(self):
608         with self._collection_lock:
609             # Make sure all datablocks are flushed before getting the locators
610             self._my_collection().manifest_text()
611             datablocks = self._datablocks_on_item(self._my_collection())
612         return datablocks
613
614
615 def expected_bytes_for(pathlist):
616     # Walk the given directory trees and stat files, adding up file sizes,
617     # so we can display progress as percent
618     bytesum = 0
619     for path in pathlist:
620         if os.path.isdir(path):
621             for filename in arvados.util.listdir_recursive(path):
622                 bytesum += os.path.getsize(os.path.join(path, filename))
623         elif not os.path.isfile(path):
624             return None
625         else:
626             bytesum += os.path.getsize(path)
627     return bytesum
628
629 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
630                                                             os.getpid())
631 def machine_progress(bytes_written, bytes_expected):
632     return _machine_format.format(
633         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
634
635 def human_progress(bytes_written, bytes_expected):
636     if bytes_expected:
637         return "\r{}M / {}M {:.1%} ".format(
638             bytes_written >> 20, bytes_expected >> 20,
639             float(bytes_written) / bytes_expected)
640     else:
641         return "\r{} ".format(bytes_written)
642
643 def progress_writer(progress_func, outfile=sys.stderr):
644     def write_progress(bytes_written, bytes_expected):
645         outfile.write(progress_func(bytes_written, bytes_expected))
646     return write_progress
647
648 def exit_signal_handler(sigcode, frame):
649     sys.exit(-sigcode)
650
651 def desired_project_uuid(api_client, project_uuid, num_retries):
652     if not project_uuid:
653         query = api_client.users().current()
654     elif arvados.util.user_uuid_pattern.match(project_uuid):
655         query = api_client.users().get(uuid=project_uuid)
656     elif arvados.util.group_uuid_pattern.match(project_uuid):
657         query = api_client.groups().get(uuid=project_uuid)
658     else:
659         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
660     return query.execute(num_retries=num_retries)['uuid']
661
662 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
663     global api_client
664
665     args = parse_arguments(arguments)
666     status = 0
667     if api_client is None:
668         api_client = arvados.api('v1')
669
670     # Determine the name to use
671     if args.name:
672         if args.stream or args.raw:
673             print >>stderr, "Cannot use --name with --stream or --raw"
674             sys.exit(1)
675         collection_name = args.name
676     else:
677         collection_name = "Saved at {} by {}@{}".format(
678             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
679             pwd.getpwuid(os.getuid()).pw_name,
680             socket.gethostname())
681
682     if args.project_uuid and (args.stream or args.raw):
683         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
684         sys.exit(1)
685
686     # Determine the parent project
687     try:
688         project_uuid = desired_project_uuid(api_client, args.project_uuid,
689                                             args.retries)
690     except (apiclient_errors.Error, ValueError) as error:
691         print >>stderr, error
692         sys.exit(1)
693
694     if args.progress:
695         reporter = progress_writer(human_progress)
696     elif args.batch_progress:
697         reporter = progress_writer(machine_progress)
698     else:
699         reporter = None
700
701     bytes_expected = expected_bytes_for(args.paths)
702     try:
703         writer = ArvPutUploadJob(paths = args.paths,
704                                 resume = args.resume,
705                                 reporter = reporter,
706                                 bytes_expected = bytes_expected,
707                                 num_retries = args.retries,
708                                 replication_desired = args.replication,
709                                 name = collection_name,
710                                 owner_uuid = project_uuid,
711                                 ensure_unique_name = True)
712     except ResumeCacheConflict:
713         print >>stderr, "\n".join([
714             "arv-put: Another process is already uploading this data.",
715             "         Use --no-resume if this is really what you want."])
716         sys.exit(1)
717
718     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
719     # the originals.
720     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
721                             for sigcode in CAUGHT_SIGNALS}
722
723     if args.resume and writer.bytes_written > 0:
724         print >>stderr, "\n".join([
725                 "arv-put: Resuming previous upload from last checkpoint.",
726                 "         Use the --no-resume option to start over."])
727
728     writer.report_progress()
729     output = None
730     writer.start()
731     if args.progress:  # Print newline to split stderr from stdout for humans.
732         print >>stderr
733
734     if args.stream:
735         if args.normalize:
736             output = writer.manifest_text(normalize=True)
737         else:
738             output = writer.manifest_text()
739     elif args.raw:
740         output = ','.join(writer.data_locators())
741     else:
742         try:
743             writer.save_collection()
744             print >>stderr, "Collection saved as '%s'" % writer.collection_name()
745             if args.portable_data_hash:
746                 output = writer.portable_data_hash()
747             else:
748                 output = writer.manifest_locator()
749         except apiclient_errors.Error as error:
750             print >>stderr, (
751                 "arv-put: Error creating Collection on project: {}.".format(
752                     error))
753             status = 1
754
755     # Print the locator (uuid) of the new collection.
756     if output is None:
757         status = status or 1
758     else:
759         stdout.write(output)
760         if not output.endswith('\n'):
761             stdout.write('\n')
762
763     for sigcode, orig_handler in orig_signal_handlers.items():
764         signal.signal(sigcode, orig_handler)
765
766     if status != 0:
767         sys.exit(status)
768
769     # Success!
770     writer.destroy_cache()
771     return output
772
773
774 if __name__ == '__main__':
775     main()