10315: Allow small block packing passing flush=False.
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import time
18 import signal
19 import socket
20 import sys
21 import tempfile
22 import threading
23 import copy
24 import logging
25 from apiclient import errors as apiclient_errors
26
27 import arvados.commands._util as arv_cmd
28
29 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
30 api_client = None
31
32 upload_opts = argparse.ArgumentParser(add_help=False)
33
34 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
35                          help="""
36 Local file or directory. Default: read from standard input.
37 """)
38
39 _group = upload_opts.add_mutually_exclusive_group()
40
41 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
42                     default=-1, help="""
43 Maximum depth of directory tree to represent in the manifest
44 structure. A directory structure deeper than this will be represented
45 as a single stream in the manifest. If N=0, the manifest will contain
46 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
47 stream per filesystem directory that contains files.
48 """)
49
50 _group.add_argument('--normalize', action='store_true',
51                     help="""
52 Normalize the manifest by re-ordering files and streams after writing
53 data.
54 """)
55
56 _group = upload_opts.add_mutually_exclusive_group()
57
58 _group.add_argument('--as-stream', action='store_true', dest='stream',
59                     help="""
60 Synonym for --stream.
61 """)
62
63 _group.add_argument('--stream', action='store_true',
64                     help="""
65 Store the file content and display the resulting manifest on
66 stdout. Do not write the manifest to Keep or save a Collection object
67 in Arvados.
68 """)
69
70 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
71                     help="""
72 Synonym for --manifest.
73 """)
74
75 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
76                     help="""
77 Synonym for --manifest.
78 """)
79
80 _group.add_argument('--manifest', action='store_true',
81                     help="""
82 Store the file data and resulting manifest in Keep, save a Collection
83 object in Arvados, and display the manifest locator (Collection uuid)
84 on stdout. This is the default behavior.
85 """)
86
87 _group.add_argument('--as-raw', action='store_true', dest='raw',
88                     help="""
89 Synonym for --raw.
90 """)
91
92 _group.add_argument('--raw', action='store_true',
93                     help="""
94 Store the file content and display the data block locators on stdout,
95 separated by commas, with a trailing newline. Do not store a
96 manifest.
97 """)
98
99 upload_opts.add_argument('--use-filename', type=str, default=None,
100                          dest='filename', help="""
101 Synonym for --filename.
102 """)
103
104 upload_opts.add_argument('--filename', type=str, default=None,
105                          help="""
106 Use the given filename in the manifest, instead of the name of the
107 local file. This is useful when "-" or "/dev/stdin" is given as an
108 input file. It can be used only if there is exactly one path given and
109 it is not a directory. Implies --manifest.
110 """)
111
112 upload_opts.add_argument('--portable-data-hash', action='store_true',
113                          help="""
114 Print the portable data hash instead of the Arvados UUID for the collection
115 created by the upload.
116 """)
117
118 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
119                          help="""
120 Set the replication level for the new collection: how many different
121 physical storage devices (e.g., disks) should have a copy of each data
122 block. Default is to use the server-provided default (if any) or 2.
123 """)
124
125 run_opts = argparse.ArgumentParser(add_help=False)
126
127 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
128 Store the collection in the specified project, instead of your Home
129 project.
130 """)
131
132 run_opts.add_argument('--name', help="""
133 Save the collection with the specified name.
134 """)
135
136 _group = run_opts.add_mutually_exclusive_group()
137 _group.add_argument('--progress', action='store_true',
138                     help="""
139 Display human-readable progress on stderr (bytes and, if possible,
140 percentage of total data size). This is the default behavior when
141 stderr is a tty.
142 """)
143
144 _group.add_argument('--no-progress', action='store_true',
145                     help="""
146 Do not display human-readable progress on stderr, even if stderr is a
147 tty.
148 """)
149
150 _group.add_argument('--batch-progress', action='store_true',
151                     help="""
152 Display machine-readable progress on stderr (bytes and, if known,
153 total data size).
154 """)
155
156 _group = run_opts.add_mutually_exclusive_group()
157 _group.add_argument('--resume', action='store_true', default=True,
158                     help="""
159 Continue interrupted uploads from cached state (default).
160 """)
161 _group.add_argument('--no-resume', action='store_false', dest='resume',
162                     help="""
163 Do not continue interrupted uploads from cached state.
164 """)
165
166 arg_parser = argparse.ArgumentParser(
167     description='Copy data from the local filesystem to Keep.',
168     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
169
170 def parse_arguments(arguments):
171     args = arg_parser.parse_args(arguments)
172
173     if len(args.paths) == 0:
174         args.paths = ['-']
175
176     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
177
178     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
179         if args.filename:
180             arg_parser.error("""
181     --filename argument cannot be used when storing a directory or
182     multiple files.
183     """)
184
185     # Turn on --progress by default if stderr is a tty.
186     if (not (args.batch_progress or args.no_progress)
187         and os.isatty(sys.stderr.fileno())):
188         args.progress = True
189
190     if args.paths == ['-']:
191         args.resume = False
192         if not args.filename:
193             args.filename = 'stdin'
194
195     return args
196
197 class ResumeCacheConflict(Exception):
198     pass
199
200
201 class ResumeCache(object):
202     CACHE_DIR = '.cache/arvados/arv-put'
203
204     def __init__(self, file_spec):
205         self.cache_file = open(file_spec, 'a+')
206         self._lock_file(self.cache_file)
207         self.filename = self.cache_file.name
208
209     @classmethod
210     def make_path(cls, args):
211         md5 = hashlib.md5()
212         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
213         realpaths = sorted(os.path.realpath(path) for path in args.paths)
214         md5.update('\0'.join(realpaths))
215         if any(os.path.isdir(path) for path in realpaths):
216             md5.update(str(max(args.max_manifest_depth, -1)))
217         elif args.filename:
218             md5.update(args.filename)
219         return os.path.join(
220             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
221             md5.hexdigest())
222
223     def _lock_file(self, fileobj):
224         try:
225             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
226         except IOError:
227             raise ResumeCacheConflict("{} locked".format(fileobj.name))
228
229     def load(self):
230         self.cache_file.seek(0)
231         return json.load(self.cache_file)
232
233     def check_cache(self, api_client=None, num_retries=0):
234         try:
235             state = self.load()
236             locator = None
237             try:
238                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
239                     locator = state["_finished_streams"][0][1][0]
240                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
241                     locator = state["_current_stream_locators"][0]
242                 if locator is not None:
243                     kc = arvados.keep.KeepClient(api_client=api_client)
244                     kc.head(locator, num_retries=num_retries)
245             except Exception as e:
246                 self.restart()
247         except (ValueError):
248             pass
249
250     def save(self, data):
251         try:
252             new_cache_fd, new_cache_name = tempfile.mkstemp(
253                 dir=os.path.dirname(self.filename))
254             self._lock_file(new_cache_fd)
255             new_cache = os.fdopen(new_cache_fd, 'r+')
256             json.dump(data, new_cache)
257             os.rename(new_cache_name, self.filename)
258         except (IOError, OSError, ResumeCacheConflict) as error:
259             try:
260                 os.unlink(new_cache_name)
261             except NameError:  # mkstemp failed.
262                 pass
263         else:
264             self.cache_file.close()
265             self.cache_file = new_cache
266
267     def close(self):
268         self.cache_file.close()
269
270     def destroy(self):
271         try:
272             os.unlink(self.filename)
273         except OSError as error:
274             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
275                 raise
276         self.close()
277
278     def restart(self):
279         self.destroy()
280         self.__init__(self.filename)
281
282
283 class ArvPutUploadJob(object):
284     CACHE_DIR = '.cache/arvados/arv-put'
285     EMPTY_STATE = {
286         'manifest' : None, # Last saved manifest checkpoint
287         'files' : {} # Previous run file list: {path : {size, mtime}}
288     }
289
290     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
291                  name=None, owner_uuid=None, ensure_unique_name=False,
292                  num_retries=None, replication_desired=None,
293                  filename=None, update_time=60.0):
294         self.paths = paths
295         self.resume = resume
296         self.reporter = reporter
297         self.bytes_expected = bytes_expected
298         self.bytes_written = 0
299         self.bytes_skipped = 0
300         self.name = name
301         self.owner_uuid = owner_uuid
302         self.ensure_unique_name = ensure_unique_name
303         self.num_retries = num_retries
304         self.replication_desired = replication_desired
305         self.filename = filename
306         self._state_lock = threading.Lock()
307         self._state = None # Previous run state (file list & manifest)
308         self._current_files = [] # Current run file list
309         self._cache_file = None
310         self._collection = None
311         self._collection_lock = threading.Lock()
312         self._stop_checkpointer = threading.Event()
313         self._checkpointer = threading.Thread(target=self._update_task)
314         self._update_task_time = update_time  # How many seconds wait between update runs
315         self.logger = logging.getLogger('arvados.arv_put')
316         # Load cached data if any and if needed
317         self._setup_state()
318
319     def start(self):
320         """
321         Start supporting thread & file uploading
322         """
323         self._checkpointer.daemon = True
324         self._checkpointer.start()
325         try:
326             for path in self.paths:
327                 # Test for stdin first, in case some file named '-' exist
328                 if path == '-':
329                     self._write_stdin(self.filename or 'stdin')
330                 elif os.path.isdir(path):
331                     self._write_directory_tree(path)
332                 else:
333                     self._write_file(path, self.filename or os.path.basename(path))
334         finally:
335             # Stop the thread before doing anything else
336             self._stop_checkpointer.set()
337             self._checkpointer.join()
338             # Commit all & one last _update()
339             self.manifest_text()
340             self._update()
341             if self.resume:
342                 self._cache_file.close()
343                 # Correct the final written bytes count
344                 self.bytes_written -= self.bytes_skipped
345
346     def save_collection(self):
347         with self._collection_lock:
348             self._my_collection().save_new(
349                 name=self.name, owner_uuid=self.owner_uuid,
350                 ensure_unique_name=self.ensure_unique_name,
351                 num_retries=self.num_retries)
352
353     def destroy_cache(self):
354         if self.resume:
355             try:
356                 os.unlink(self._cache_filename)
357             except OSError as error:
358                 # That's what we wanted anyway.
359                 if error.errno != errno.ENOENT:
360                     raise
361             self._cache_file.close()
362
363     def _collection_size(self, collection):
364         """
365         Recursively get the total size of the collection
366         """
367         size = 0
368         for item in collection.values():
369             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
370                 size += self._collection_size(item)
371             else:
372                 size += item.size()
373         return size
374
375     def _update_task(self):
376         """
377         Periodically called support task. File uploading is
378         asynchronous so we poll status from the collection.
379         """
380         while not self._stop_checkpointer.wait(self._update_task_time):
381             self._update()
382
383     def _update(self):
384         """
385         Update cached manifest text and report progress.
386         """
387         with self._collection_lock:
388             self.bytes_written = self._collection_size(self._my_collection())
389             # Update cache, if resume enabled
390             if self.resume:
391                 with self._state_lock:
392                     # Get the manifest text without comitting pending blocks
393                     self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
394         if self.resume:
395             self._save_state()
396         # Call the reporter, if any
397         self.report_progress()
398
399     def report_progress(self):
400         if self.reporter is not None:
401             self.reporter(self.bytes_written, self.bytes_expected)
402
403     def _write_directory_tree(self, path, stream_name="."):
404         # TODO: Check what happens when multiple directories are passed as
405         # arguments.
406         # If the code below is uncommented, integration test
407         # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
408         # fails, I suppose it is because the manifest_uuid changes because
409         # of the dir addition to stream_name.
410
411         # if stream_name == '.':
412         #     stream_name = os.path.join('.', os.path.basename(path))
413         for item in os.listdir(path):
414             if os.path.isdir(os.path.join(path, item)):
415                 self._write_directory_tree(os.path.join(path, item),
416                                 os.path.join(stream_name, item))
417             else:
418                 self._write_file(os.path.join(path, item),
419                                 os.path.join(stream_name, item))
420
421     def _write_stdin(self, filename):
422         with self._collection_lock:
423             output = self._my_collection().open(filename, 'w')
424         self._write(sys.stdin, output)
425         output.close()
426
427     def _write_file(self, source, filename):
428         resume_offset = 0
429         if self.resume:
430             # Check if file was already uploaded (at least partially)
431             with self._collection_lock:
432                 try:
433                     file_in_collection = self._my_collection().find(filename)
434                 except IOError:
435                     # Not found
436                     file_in_collection = None
437             # If no previous cached data on this file, store it for an eventual
438             # repeated run.
439             if source not in self._state['files']:
440                 with self._state_lock:
441                     self._state['files'][source] = {
442                         'mtime': os.path.getmtime(source),
443                         'size' : os.path.getsize(source)
444                     }
445             with self._state_lock:
446                 cached_file_data = self._state['files'][source]
447             # See if this file was already uploaded at least partially
448             if file_in_collection:
449                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
450                     if cached_file_data['size'] == file_in_collection.size():
451                         # File already there, skip it.
452                         self.bytes_skipped += cached_file_data['size']
453                         return
454                     elif cached_file_data['size'] > file_in_collection.size():
455                         # File partially uploaded, resume!
456                         resume_offset = file_in_collection.size()
457                     else:
458                         # Inconsistent cache, re-upload the file
459                         self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
460                 else:
461                     # Local file differs from cached data, re-upload it
462                     pass
463         with open(source, 'r') as source_fd:
464             if resume_offset > 0:
465                 # Start upload where we left off
466                 with self._collection_lock:
467                     output = self._my_collection().open(filename, 'a')
468                 source_fd.seek(resume_offset)
469                 self.bytes_skipped += resume_offset
470             else:
471                 # Start from scratch
472                 with self._collection_lock:
473                     output = self._my_collection().open(filename, 'w')
474             self._write(source_fd, output)
475             output.close(flush=False)
476
477     def _write(self, source_fd, output):
478         first_read = True
479         while True:
480             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
481             # Allow an empty file to be written
482             if not data and not first_read:
483                 break
484             if first_read:
485                 first_read = False
486             output.write(data)
487
488     def _my_collection(self):
489         """
490         Create a new collection if none cached. Load it from cache otherwise.
491         """
492         if self._collection is None:
493             with self._state_lock:
494                 manifest = self._state['manifest']
495             if self.resume and manifest is not None:
496                 # Create collection from saved state
497                 self._collection = arvados.collection.Collection(
498                     manifest,
499                     replication_desired=self.replication_desired)
500             else:
501                 # Create new collection
502                 self._collection = arvados.collection.Collection(
503                     replication_desired=self.replication_desired)
504         return self._collection
505
506     def _setup_state(self):
507         """
508         Create a new cache file or load a previously existing one.
509         """
510         if self.resume:
511             md5 = hashlib.md5()
512             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
513             realpaths = sorted(os.path.realpath(path) for path in self.paths)
514             md5.update('\0'.join(realpaths))
515             if self.filename:
516                 md5.update(self.filename)
517             cache_filename = md5.hexdigest()
518             self._cache_file = open(os.path.join(
519                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
520                 cache_filename), 'a+')
521             self._cache_filename = self._cache_file.name
522             self._lock_file(self._cache_file)
523             self._cache_file.seek(0)
524             with self._state_lock:
525                 try:
526                     self._state = json.load(self._cache_file)
527                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
528                         # Cache at least partially incomplete, set up new cache
529                         self._state = copy.deepcopy(self.EMPTY_STATE)
530                 except ValueError:
531                     # Cache file empty, set up new cache
532                     self._state = copy.deepcopy(self.EMPTY_STATE)
533             # Load how many bytes were uploaded on previous run
534             with self._collection_lock:
535                 self.bytes_written = self._collection_size(self._my_collection())
536         # No resume required
537         else:
538             with self._state_lock:
539                 self._state = copy.deepcopy(self.EMPTY_STATE)
540
541     def _lock_file(self, fileobj):
542         try:
543             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
544         except IOError:
545             raise ResumeCacheConflict("{} locked".format(fileobj.name))
546
547     def _save_state(self):
548         """
549         Atomically save current state into cache.
550         """
551         try:
552             with self._state_lock:
553                 state = self._state
554             new_cache_fd, new_cache_name = tempfile.mkstemp(
555                 dir=os.path.dirname(self._cache_filename))
556             self._lock_file(new_cache_fd)
557             new_cache = os.fdopen(new_cache_fd, 'r+')
558             json.dump(state, new_cache)
559             new_cache.flush()
560             os.fsync(new_cache)
561             os.rename(new_cache_name, self._cache_filename)
562         except (IOError, OSError, ResumeCacheConflict) as error:
563             self.logger.error("There was a problem while saving the cache file: {}".format(error))
564             try:
565                 os.unlink(new_cache_name)
566             except NameError:  # mkstemp failed.
567                 pass
568         else:
569             self._cache_file.close()
570             self._cache_file = new_cache
571
572     def collection_name(self):
573         with self._collection_lock:
574             name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
575         return name
576
577     def manifest_locator(self):
578         with self._collection_lock:
579             locator = self._my_collection().manifest_locator()
580         return locator
581
582     def portable_data_hash(self):
583         with self._collection_lock:
584             datahash = self._my_collection().portable_data_hash()
585         return datahash
586
587     def manifest_text(self, stream_name=".", strip=False, normalize=False):
588         with self._collection_lock:
589             manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
590         return manifest
591
592     def _datablocks_on_item(self, item):
593         """
594         Return a list of datablock locators, recursively navigating
595         through subcollections
596         """
597         if isinstance(item, arvados.arvfile.ArvadosFile):
598             if item.size() == 0:
599                 # Empty file locator
600                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
601             else:
602                 locators = []
603                 for segment in item.segments():
604                     loc = segment.locator
605                     locators.append(loc)
606                 return locators
607         elif isinstance(item, arvados.collection.Collection):
608             l = [self._datablocks_on_item(x) for x in item.values()]
609             # Fast list flattener method taken from:
610             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
611             return [loc for sublist in l for loc in sublist]
612         else:
613             return None
614
615     def data_locators(self):
616         with self._collection_lock:
617             # Make sure all datablocks are flushed before getting the locators
618             self._my_collection().manifest_text()
619             datablocks = self._datablocks_on_item(self._my_collection())
620         return datablocks
621
622
623 def expected_bytes_for(pathlist):
624     # Walk the given directory trees and stat files, adding up file sizes,
625     # so we can display progress as percent
626     bytesum = 0
627     for path in pathlist:
628         if os.path.isdir(path):
629             for filename in arvados.util.listdir_recursive(path):
630                 bytesum += os.path.getsize(os.path.join(path, filename))
631         elif not os.path.isfile(path):
632             return None
633         else:
634             bytesum += os.path.getsize(path)
635     return bytesum
636
637 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
638                                                             os.getpid())
639 def machine_progress(bytes_written, bytes_expected):
640     return _machine_format.format(
641         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
642
643 def human_progress(bytes_written, bytes_expected):
644     if bytes_expected:
645         return "\r{}M / {}M {:.1%} ".format(
646             bytes_written >> 20, bytes_expected >> 20,
647             float(bytes_written) / bytes_expected)
648     else:
649         return "\r{} ".format(bytes_written)
650
651 def progress_writer(progress_func, outfile=sys.stderr):
652     def write_progress(bytes_written, bytes_expected):
653         outfile.write(progress_func(bytes_written, bytes_expected))
654     return write_progress
655
656 def exit_signal_handler(sigcode, frame):
657     sys.exit(-sigcode)
658
659 def desired_project_uuid(api_client, project_uuid, num_retries):
660     if not project_uuid:
661         query = api_client.users().current()
662     elif arvados.util.user_uuid_pattern.match(project_uuid):
663         query = api_client.users().get(uuid=project_uuid)
664     elif arvados.util.group_uuid_pattern.match(project_uuid):
665         query = api_client.groups().get(uuid=project_uuid)
666     else:
667         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
668     return query.execute(num_retries=num_retries)['uuid']
669
670 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
671     global api_client
672
673     args = parse_arguments(arguments)
674     status = 0
675     if api_client is None:
676         api_client = arvados.api('v1')
677
678     # Determine the name to use
679     if args.name:
680         if args.stream or args.raw:
681             print >>stderr, "Cannot use --name with --stream or --raw"
682             sys.exit(1)
683         collection_name = args.name
684     else:
685         collection_name = "Saved at {} by {}@{}".format(
686             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
687             pwd.getpwuid(os.getuid()).pw_name,
688             socket.gethostname())
689
690     if args.project_uuid and (args.stream or args.raw):
691         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
692         sys.exit(1)
693
694     # Determine the parent project
695     try:
696         project_uuid = desired_project_uuid(api_client, args.project_uuid,
697                                             args.retries)
698     except (apiclient_errors.Error, ValueError) as error:
699         print >>stderr, error
700         sys.exit(1)
701
702     if args.progress:
703         reporter = progress_writer(human_progress)
704     elif args.batch_progress:
705         reporter = progress_writer(machine_progress)
706     else:
707         reporter = None
708
709     bytes_expected = expected_bytes_for(args.paths)
710     try:
711         writer = ArvPutUploadJob(paths = args.paths,
712                                  resume = args.resume,
713                                  filename = args.filename,
714                                  reporter = reporter,
715                                  bytes_expected = bytes_expected,
716                                  num_retries = args.retries,
717                                  replication_desired = args.replication,
718                                  name = collection_name,
719                                  owner_uuid = project_uuid,
720                                  ensure_unique_name = True)
721     except ResumeCacheConflict:
722         print >>stderr, "\n".join([
723             "arv-put: Another process is already uploading this data.",
724             "         Use --no-resume if this is really what you want."])
725         sys.exit(1)
726
727     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
728     # the originals.
729     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
730                             for sigcode in CAUGHT_SIGNALS}
731
732     if args.resume and writer.bytes_written > 0:
733         print >>stderr, "\n".join([
734                 "arv-put: Resuming previous upload from last checkpoint.",
735                 "         Use the --no-resume option to start over."])
736
737     writer.report_progress()
738     output = None
739     writer.start()
740     if args.progress:  # Print newline to split stderr from stdout for humans.
741         print >>stderr
742
743     if args.stream:
744         if args.normalize:
745             output = writer.manifest_text(normalize=True)
746         else:
747             output = writer.manifest_text()
748     elif args.raw:
749         output = ','.join(writer.data_locators())
750     else:
751         try:
752             writer.save_collection()
753             print >>stderr, "Collection saved as '%s'" % writer.collection_name()
754             if args.portable_data_hash:
755                 output = writer.portable_data_hash()
756             else:
757                 output = writer.manifest_locator()
758         except apiclient_errors.Error as error:
759             print >>stderr, (
760                 "arv-put: Error creating Collection on project: {}.".format(
761                     error))
762             status = 1
763
764     # Print the locator (uuid) of the new collection.
765     if output is None:
766         status = status or 1
767     else:
768         stdout.write(output)
769         if not output.endswith('\n'):
770             stdout.write('\n')
771
772     for sigcode, orig_handler in orig_signal_handlers.items():
773         signal.signal(sigcode, orig_handler)
774
775     if status != 0:
776         sys.exit(status)
777
778     # Success!
779     writer.destroy_cache()
780     return output
781
782
783 if __name__ == '__main__':
784     main()