Merge branch 'master' into 10231-keep-cache-runtime-constraints
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import time
18 import signal
19 import socket
20 import sys
21 import tempfile
22 import threading
23 import copy
24 import logging
25 from apiclient import errors as apiclient_errors
26
27 import arvados.commands._util as arv_cmd
28
29 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
30 api_client = None
31
32 upload_opts = argparse.ArgumentParser(add_help=False)
33
34 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
35                          help="""
36 Local file or directory. Default: read from standard input.
37 """)
38
39 _group = upload_opts.add_mutually_exclusive_group()
40
41 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
42                     default=-1, help="""
43 Maximum depth of directory tree to represent in the manifest
44 structure. A directory structure deeper than this will be represented
45 as a single stream in the manifest. If N=0, the manifest will contain
46 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
47 stream per filesystem directory that contains files.
48 """)
49
50 _group.add_argument('--normalize', action='store_true',
51                     help="""
52 Normalize the manifest by re-ordering files and streams after writing
53 data.
54 """)
55
56 _group = upload_opts.add_mutually_exclusive_group()
57
58 _group.add_argument('--as-stream', action='store_true', dest='stream',
59                     help="""
60 Synonym for --stream.
61 """)
62
63 _group.add_argument('--stream', action='store_true',
64                     help="""
65 Store the file content and display the resulting manifest on
66 stdout. Do not write the manifest to Keep or save a Collection object
67 in Arvados.
68 """)
69
70 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
71                     help="""
72 Synonym for --manifest.
73 """)
74
75 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
76                     help="""
77 Synonym for --manifest.
78 """)
79
80 _group.add_argument('--manifest', action='store_true',
81                     help="""
82 Store the file data and resulting manifest in Keep, save a Collection
83 object in Arvados, and display the manifest locator (Collection uuid)
84 on stdout. This is the default behavior.
85 """)
86
87 _group.add_argument('--as-raw', action='store_true', dest='raw',
88                     help="""
89 Synonym for --raw.
90 """)
91
92 _group.add_argument('--raw', action='store_true',
93                     help="""
94 Store the file content and display the data block locators on stdout,
95 separated by commas, with a trailing newline. Do not store a
96 manifest.
97 """)
98
99 upload_opts.add_argument('--use-filename', type=str, default=None,
100                          dest='filename', help="""
101 Synonym for --filename.
102 """)
103
104 upload_opts.add_argument('--filename', type=str, default=None,
105                          help="""
106 Use the given filename in the manifest, instead of the name of the
107 local file. This is useful when "-" or "/dev/stdin" is given as an
108 input file. It can be used only if there is exactly one path given and
109 it is not a directory. Implies --manifest.
110 """)
111
112 upload_opts.add_argument('--portable-data-hash', action='store_true',
113                          help="""
114 Print the portable data hash instead of the Arvados UUID for the collection
115 created by the upload.
116 """)
117
118 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
119                          help="""
120 Set the replication level for the new collection: how many different
121 physical storage devices (e.g., disks) should have a copy of each data
122 block. Default is to use the server-provided default (if any) or 2.
123 """)
124
125 run_opts = argparse.ArgumentParser(add_help=False)
126
127 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
128 Store the collection in the specified project, instead of your Home
129 project.
130 """)
131
132 run_opts.add_argument('--name', help="""
133 Save the collection with the specified name.
134 """)
135
136 _group = run_opts.add_mutually_exclusive_group()
137 _group.add_argument('--progress', action='store_true',
138                     help="""
139 Display human-readable progress on stderr (bytes and, if possible,
140 percentage of total data size). This is the default behavior when
141 stderr is a tty.
142 """)
143
144 _group.add_argument('--no-progress', action='store_true',
145                     help="""
146 Do not display human-readable progress on stderr, even if stderr is a
147 tty.
148 """)
149
150 _group.add_argument('--batch-progress', action='store_true',
151                     help="""
152 Display machine-readable progress on stderr (bytes and, if known,
153 total data size).
154 """)
155
156 _group = run_opts.add_mutually_exclusive_group()
157 _group.add_argument('--resume', action='store_true', default=True,
158                     help="""
159 Continue interrupted uploads from cached state (default).
160 """)
161 _group.add_argument('--no-resume', action='store_false', dest='resume',
162                     help="""
163 Do not continue interrupted uploads from cached state.
164 """)
165
166 arg_parser = argparse.ArgumentParser(
167     description='Copy data from the local filesystem to Keep.',
168     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
169
170 def parse_arguments(arguments):
171     args = arg_parser.parse_args(arguments)
172
173     if len(args.paths) == 0:
174         args.paths = ['-']
175
176     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
177
178     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
179         if args.filename:
180             arg_parser.error("""
181     --filename argument cannot be used when storing a directory or
182     multiple files.
183     """)
184
185     # Turn on --progress by default if stderr is a tty.
186     if (not (args.batch_progress or args.no_progress)
187         and os.isatty(sys.stderr.fileno())):
188         args.progress = True
189
190     if args.paths == ['-']:
191         args.resume = False
192         if not args.filename:
193             args.filename = 'stdin'
194
195     return args
196
197 class ResumeCacheConflict(Exception):
198     pass
199
200
201 class ResumeCache(object):
202     CACHE_DIR = '.cache/arvados/arv-put'
203
204     def __init__(self, file_spec):
205         self.cache_file = open(file_spec, 'a+')
206         self._lock_file(self.cache_file)
207         self.filename = self.cache_file.name
208
209     @classmethod
210     def make_path(cls, args):
211         md5 = hashlib.md5()
212         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
213         realpaths = sorted(os.path.realpath(path) for path in args.paths)
214         md5.update('\0'.join(realpaths))
215         if any(os.path.isdir(path) for path in realpaths):
216             md5.update(str(max(args.max_manifest_depth, -1)))
217         elif args.filename:
218             md5.update(args.filename)
219         return os.path.join(
220             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
221             md5.hexdigest())
222
223     def _lock_file(self, fileobj):
224         try:
225             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
226         except IOError:
227             raise ResumeCacheConflict("{} locked".format(fileobj.name))
228
229     def load(self):
230         self.cache_file.seek(0)
231         return json.load(self.cache_file)
232
233     def check_cache(self, api_client=None, num_retries=0):
234         try:
235             state = self.load()
236             locator = None
237             try:
238                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
239                     locator = state["_finished_streams"][0][1][0]
240                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
241                     locator = state["_current_stream_locators"][0]
242                 if locator is not None:
243                     kc = arvados.keep.KeepClient(api_client=api_client)
244                     kc.head(locator, num_retries=num_retries)
245             except Exception as e:
246                 self.restart()
247         except (ValueError):
248             pass
249
250     def save(self, data):
251         try:
252             new_cache_fd, new_cache_name = tempfile.mkstemp(
253                 dir=os.path.dirname(self.filename))
254             self._lock_file(new_cache_fd)
255             new_cache = os.fdopen(new_cache_fd, 'r+')
256             json.dump(data, new_cache)
257             os.rename(new_cache_name, self.filename)
258         except (IOError, OSError, ResumeCacheConflict) as error:
259             try:
260                 os.unlink(new_cache_name)
261             except NameError:  # mkstemp failed.
262                 pass
263         else:
264             self.cache_file.close()
265             self.cache_file = new_cache
266
267     def close(self):
268         self.cache_file.close()
269
270     def destroy(self):
271         try:
272             os.unlink(self.filename)
273         except OSError as error:
274             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
275                 raise
276         self.close()
277
278     def restart(self):
279         self.destroy()
280         self.__init__(self.filename)
281
282
283 class ArvPutUploadJob(object):
284     CACHE_DIR = '.cache/arvados/arv-put'
285     EMPTY_STATE = {
286         'manifest' : None, # Last saved manifest checkpoint
287         'files' : {} # Previous run file list: {path : {size, mtime}}
288     }
289
290     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
291                  name=None, owner_uuid=None, ensure_unique_name=False,
292                  num_retries=None, replication_desired=None,
293                  filename=None, update_time=1.0):
294         self.paths = paths
295         self.resume = resume
296         self.reporter = reporter
297         self.bytes_expected = bytes_expected
298         self.bytes_written = 0
299         self.bytes_skipped = 0
300         self.name = name
301         self.owner_uuid = owner_uuid
302         self.ensure_unique_name = ensure_unique_name
303         self.num_retries = num_retries
304         self.replication_desired = replication_desired
305         self.filename = filename
306         self._state_lock = threading.Lock()
307         self._state = None # Previous run state (file list & manifest)
308         self._current_files = [] # Current run file list
309         self._cache_file = None
310         self._collection = None
311         self._collection_lock = threading.Lock()
312         self._stop_checkpointer = threading.Event()
313         self._checkpointer = threading.Thread(target=self._update_task)
314         self._update_task_time = update_time  # How many seconds wait between update runs
315         self.logger = logging.getLogger('arvados.arv_put')
316         # Load cached data if any and if needed
317         self._setup_state()
318
319     def start(self):
320         """
321         Start supporting thread & file uploading
322         """
323         self._checkpointer.daemon = True
324         self._checkpointer.start()
325         try:
326             for path in self.paths:
327                 # Test for stdin first, in case some file named '-' exist
328                 if path == '-':
329                     self._write_stdin(self.filename or 'stdin')
330                 elif os.path.isdir(path):
331                     self._write_directory_tree(path)
332                 else:
333                     self._write_file(path, self.filename or os.path.basename(path))
334         finally:
335             # Stop the thread before doing anything else
336             self._stop_checkpointer.set()
337             self._checkpointer.join()
338             # Commit all & one last _update()
339             self.manifest_text()
340             self._update()
341             if self.resume:
342                 self._cache_file.close()
343                 # Correct the final written bytes count
344                 self.bytes_written -= self.bytes_skipped
345
346     def save_collection(self):
347         with self._collection_lock:
348             self._my_collection().save_new(
349                 name=self.name, owner_uuid=self.owner_uuid,
350                 ensure_unique_name=self.ensure_unique_name,
351                 num_retries=self.num_retries)
352
353     def destroy_cache(self):
354         if self.resume:
355             try:
356                 os.unlink(self._cache_filename)
357             except OSError as error:
358                 # That's what we wanted anyway.
359                 if error.errno != errno.ENOENT:
360                     raise
361             self._cache_file.close()
362
363     def _collection_size(self, collection):
364         """
365         Recursively get the total size of the collection
366         """
367         size = 0
368         for item in collection.values():
369             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
370                 size += self._collection_size(item)
371             else:
372                 size += item.size()
373         return size
374
375     def _update_task(self):
376         """
377         Periodically called support task. File uploading is
378         asynchronous so we poll status from the collection.
379         """
380         while not self._stop_checkpointer.wait(self._update_task_time):
381             self._update()
382
383     def _update(self):
384         """
385         Update cached manifest text and report progress.
386         """
387         with self._collection_lock:
388             self.bytes_written = self._collection_size(self._my_collection())
389             # Update cache, if resume enabled
390             if self.resume:
391                 with self._state_lock:
392                     # Get the manifest text without comitting pending blocks
393                     self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
394                 self._save_state()
395         # Call the reporter, if any
396         self.report_progress()
397
398     def report_progress(self):
399         if self.reporter is not None:
400             self.reporter(self.bytes_written, self.bytes_expected)
401
402     def _write_directory_tree(self, path, stream_name="."):
403         # TODO: Check what happens when multiple directories are passed as
404         # arguments.
405         # If the code below is uncommented, integration test
406         # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
407         # fails, I suppose it is because the manifest_uuid changes because
408         # of the dir addition to stream_name.
409
410         # if stream_name == '.':
411         #     stream_name = os.path.join('.', os.path.basename(path))
412         for item in os.listdir(path):
413             if os.path.isdir(os.path.join(path, item)):
414                 self._write_directory_tree(os.path.join(path, item),
415                                 os.path.join(stream_name, item))
416             else:
417                 self._write_file(os.path.join(path, item),
418                                 os.path.join(stream_name, item))
419
420     def _write_stdin(self, filename):
421         with self._collection_lock:
422             output = self._my_collection().open(filename, 'w')
423         self._write(sys.stdin, output)
424         output.close()
425
426     def _write_file(self, source, filename):
427         resume_offset = 0
428         if self.resume:
429             # Check if file was already uploaded (at least partially)
430             with self._collection_lock:
431                 try:
432                     file_in_collection = self._my_collection().find(filename)
433                 except IOError:
434                     # Not found
435                     file_in_collection = None
436             # If no previous cached data on this file, store it for an eventual
437             # repeated run.
438             if source not in self._state['files']:
439                 with self._state_lock:
440                     self._state['files'][source] = {
441                         'mtime': os.path.getmtime(source),
442                         'size' : os.path.getsize(source)
443                     }
444             with self._state_lock:
445                 cached_file_data = self._state['files'][source]
446             # See if this file was already uploaded at least partially
447             if file_in_collection:
448                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
449                     if cached_file_data['size'] == file_in_collection.size():
450                         # File already there, skip it.
451                         self.bytes_skipped += cached_file_data['size']
452                         return
453                     elif cached_file_data['size'] > file_in_collection.size():
454                         # File partially uploaded, resume!
455                         resume_offset = file_in_collection.size()
456                     else:
457                         # Inconsistent cache, re-upload the file
458                         self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
459                 else:
460                     # Local file differs from cached data, re-upload it
461                     pass
462         with open(source, 'r') as source_fd:
463             if resume_offset > 0:
464                 # Start upload where we left off
465                 with self._collection_lock:
466                     output = self._my_collection().open(filename, 'a')
467                 source_fd.seek(resume_offset)
468                 self.bytes_skipped += resume_offset
469             else:
470                 # Start from scratch
471                 with self._collection_lock:
472                     output = self._my_collection().open(filename, 'w')
473             self._write(source_fd, output)
474             output.close(flush=False)
475
476     def _write(self, source_fd, output):
477         first_read = True
478         while True:
479             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
480             # Allow an empty file to be written
481             if not data and not first_read:
482                 break
483             if first_read:
484                 first_read = False
485             output.write(data)
486
487     def _my_collection(self):
488         """
489         Create a new collection if none cached. Load it from cache otherwise.
490         """
491         if self._collection is None:
492             with self._state_lock:
493                 manifest = self._state['manifest']
494             if self.resume and manifest is not None:
495                 # Create collection from saved state
496                 self._collection = arvados.collection.Collection(
497                     manifest,
498                     replication_desired=self.replication_desired)
499             else:
500                 # Create new collection
501                 self._collection = arvados.collection.Collection(
502                     replication_desired=self.replication_desired)
503         return self._collection
504
505     def _setup_state(self):
506         """
507         Create a new cache file or load a previously existing one.
508         """
509         if self.resume:
510             md5 = hashlib.md5()
511             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
512             realpaths = sorted(os.path.realpath(path) for path in self.paths)
513             md5.update('\0'.join(realpaths))
514             if self.filename:
515                 md5.update(self.filename)
516             cache_filename = md5.hexdigest()
517             self._cache_file = open(os.path.join(
518                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
519                 cache_filename), 'a+')
520             self._cache_filename = self._cache_file.name
521             self._lock_file(self._cache_file)
522             self._cache_file.seek(0)
523             with self._state_lock:
524                 try:
525                     self._state = json.load(self._cache_file)
526                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
527                         # Cache at least partially incomplete, set up new cache
528                         self._state = copy.deepcopy(self.EMPTY_STATE)
529                 except ValueError:
530                     # Cache file empty, set up new cache
531                     self._state = copy.deepcopy(self.EMPTY_STATE)
532             # Load how many bytes were uploaded on previous run
533             with self._collection_lock:
534                 self.bytes_written = self._collection_size(self._my_collection())
535         # No resume required
536         else:
537             with self._state_lock:
538                 self._state = copy.deepcopy(self.EMPTY_STATE)
539
540     def _lock_file(self, fileobj):
541         try:
542             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
543         except IOError:
544             raise ResumeCacheConflict("{} locked".format(fileobj.name))
545
546     def _save_state(self):
547         """
548         Atomically save current state into cache.
549         """
550         try:
551             with self._state_lock:
552                 state = self._state
553             new_cache_fd, new_cache_name = tempfile.mkstemp(
554                 dir=os.path.dirname(self._cache_filename))
555             self._lock_file(new_cache_fd)
556             new_cache = os.fdopen(new_cache_fd, 'r+')
557             json.dump(state, new_cache)
558             new_cache.flush()
559             os.fsync(new_cache)
560             os.rename(new_cache_name, self._cache_filename)
561         except (IOError, OSError, ResumeCacheConflict) as error:
562             self.logger.error("There was a problem while saving the cache file: {}".format(error))
563             try:
564                 os.unlink(new_cache_name)
565             except NameError:  # mkstemp failed.
566                 pass
567         else:
568             self._cache_file.close()
569             self._cache_file = new_cache
570
571     def collection_name(self):
572         with self._collection_lock:
573             name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
574         return name
575
576     def manifest_locator(self):
577         with self._collection_lock:
578             locator = self._my_collection().manifest_locator()
579         return locator
580
581     def portable_data_hash(self):
582         with self._collection_lock:
583             datahash = self._my_collection().portable_data_hash()
584         return datahash
585
586     def manifest_text(self, stream_name=".", strip=False, normalize=False):
587         with self._collection_lock:
588             manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
589         return manifest
590
591     def _datablocks_on_item(self, item):
592         """
593         Return a list of datablock locators, recursively navigating
594         through subcollections
595         """
596         if isinstance(item, arvados.arvfile.ArvadosFile):
597             if item.size() == 0:
598                 # Empty file locator
599                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
600             else:
601                 locators = []
602                 for segment in item.segments():
603                     loc = segment.locator
604                     locators.append(loc)
605                 return locators
606         elif isinstance(item, arvados.collection.Collection):
607             l = [self._datablocks_on_item(x) for x in item.values()]
608             # Fast list flattener method taken from:
609             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
610             return [loc for sublist in l for loc in sublist]
611         else:
612             return None
613
614     def data_locators(self):
615         with self._collection_lock:
616             # Make sure all datablocks are flushed before getting the locators
617             self._my_collection().manifest_text()
618             datablocks = self._datablocks_on_item(self._my_collection())
619         return datablocks
620
621
622 def expected_bytes_for(pathlist):
623     # Walk the given directory trees and stat files, adding up file sizes,
624     # so we can display progress as percent
625     bytesum = 0
626     for path in pathlist:
627         if os.path.isdir(path):
628             for filename in arvados.util.listdir_recursive(path):
629                 bytesum += os.path.getsize(os.path.join(path, filename))
630         elif not os.path.isfile(path):
631             return None
632         else:
633             bytesum += os.path.getsize(path)
634     return bytesum
635
636 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
637                                                             os.getpid())
638 def machine_progress(bytes_written, bytes_expected):
639     return _machine_format.format(
640         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
641
642 def human_progress(bytes_written, bytes_expected):
643     if bytes_expected:
644         return "\r{}M / {}M {:.1%} ".format(
645             bytes_written >> 20, bytes_expected >> 20,
646             float(bytes_written) / bytes_expected)
647     else:
648         return "\r{} ".format(bytes_written)
649
650 def progress_writer(progress_func, outfile=sys.stderr):
651     def write_progress(bytes_written, bytes_expected):
652         outfile.write(progress_func(bytes_written, bytes_expected))
653     return write_progress
654
655 def exit_signal_handler(sigcode, frame):
656     sys.exit(-sigcode)
657
658 def desired_project_uuid(api_client, project_uuid, num_retries):
659     if not project_uuid:
660         query = api_client.users().current()
661     elif arvados.util.user_uuid_pattern.match(project_uuid):
662         query = api_client.users().get(uuid=project_uuid)
663     elif arvados.util.group_uuid_pattern.match(project_uuid):
664         query = api_client.groups().get(uuid=project_uuid)
665     else:
666         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
667     return query.execute(num_retries=num_retries)['uuid']
668
669 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
670     global api_client
671
672     args = parse_arguments(arguments)
673     status = 0
674     if api_client is None:
675         api_client = arvados.api('v1')
676
677     # Determine the name to use
678     if args.name:
679         if args.stream or args.raw:
680             print >>stderr, "Cannot use --name with --stream or --raw"
681             sys.exit(1)
682         collection_name = args.name
683     else:
684         collection_name = "Saved at {} by {}@{}".format(
685             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
686             pwd.getpwuid(os.getuid()).pw_name,
687             socket.gethostname())
688
689     if args.project_uuid and (args.stream or args.raw):
690         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
691         sys.exit(1)
692
693     # Determine the parent project
694     try:
695         project_uuid = desired_project_uuid(api_client, args.project_uuid,
696                                             args.retries)
697     except (apiclient_errors.Error, ValueError) as error:
698         print >>stderr, error
699         sys.exit(1)
700
701     if args.progress:
702         reporter = progress_writer(human_progress)
703     elif args.batch_progress:
704         reporter = progress_writer(machine_progress)
705     else:
706         reporter = None
707
708     bytes_expected = expected_bytes_for(args.paths)
709     try:
710         writer = ArvPutUploadJob(paths = args.paths,
711                                  resume = args.resume,
712                                  filename = args.filename,
713                                  reporter = reporter,
714                                  bytes_expected = bytes_expected,
715                                  num_retries = args.retries,
716                                  replication_desired = args.replication,
717                                  name = collection_name,
718                                  owner_uuid = project_uuid,
719                                  ensure_unique_name = True)
720     except ResumeCacheConflict:
721         print >>stderr, "\n".join([
722             "arv-put: Another process is already uploading this data.",
723             "         Use --no-resume if this is really what you want."])
724         sys.exit(1)
725
726     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
727     # the originals.
728     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
729                             for sigcode in CAUGHT_SIGNALS}
730
731     if args.resume and writer.bytes_written > 0:
732         print >>stderr, "\n".join([
733                 "arv-put: Resuming previous upload from last checkpoint.",
734                 "         Use the --no-resume option to start over."])
735
736     writer.report_progress()
737     output = None
738     writer.start()
739     if args.progress:  # Print newline to split stderr from stdout for humans.
740         print >>stderr
741
742     if args.stream:
743         if args.normalize:
744             output = writer.manifest_text(normalize=True)
745         else:
746             output = writer.manifest_text()
747     elif args.raw:
748         output = ','.join(writer.data_locators())
749     else:
750         try:
751             writer.save_collection()
752             print >>stderr, "Collection saved as '%s'" % writer.collection_name()
753             if args.portable_data_hash:
754                 output = writer.portable_data_hash()
755             else:
756                 output = writer.manifest_locator()
757         except apiclient_errors.Error as error:
758             print >>stderr, (
759                 "arv-put: Error creating Collection on project: {}.".format(
760                     error))
761             status = 1
762
763     # Print the locator (uuid) of the new collection.
764     if output is None:
765         status = status or 1
766     else:
767         stdout.write(output)
768         if not output.endswith('\n'):
769             stdout.write('\n')
770
771     for sigcode, orig_handler in orig_signal_handlers.items():
772         signal.signal(sigcode, orig_handler)
773
774     if status != 0:
775         sys.exit(status)
776
777     # Success!
778     writer.destroy_cache()
779     return output
780
781
782 if __name__ == '__main__':
783     main()