9463: Optimizations on _write_ffile()
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import time
18 import signal
19 import socket
20 import sys
21 import tempfile
22 import threading
23 import copy
24 from apiclient import errors as apiclient_errors
25
26 import arvados.commands._util as arv_cmd
27
28 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
29 api_client = None
30
31 upload_opts = argparse.ArgumentParser(add_help=False)
32
33 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
34                          help="""
35 Local file or directory. Default: read from standard input.
36 """)
37
38 _group = upload_opts.add_mutually_exclusive_group()
39
40 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
41                     default=-1, help="""
42 Maximum depth of directory tree to represent in the manifest
43 structure. A directory structure deeper than this will be represented
44 as a single stream in the manifest. If N=0, the manifest will contain
45 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
46 stream per filesystem directory that contains files.
47 """)
48
49 _group.add_argument('--normalize', action='store_true',
50                     help="""
51 Normalize the manifest by re-ordering files and streams after writing
52 data.
53 """)
54
55 _group = upload_opts.add_mutually_exclusive_group()
56
57 _group.add_argument('--as-stream', action='store_true', dest='stream',
58                     help="""
59 Synonym for --stream.
60 """)
61
62 _group.add_argument('--stream', action='store_true',
63                     help="""
64 Store the file content and display the resulting manifest on
65 stdout. Do not write the manifest to Keep or save a Collection object
66 in Arvados.
67 """)
68
69 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
70                     help="""
71 Synonym for --manifest.
72 """)
73
74 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
75                     help="""
76 Synonym for --manifest.
77 """)
78
79 _group.add_argument('--manifest', action='store_true',
80                     help="""
81 Store the file data and resulting manifest in Keep, save a Collection
82 object in Arvados, and display the manifest locator (Collection uuid)
83 on stdout. This is the default behavior.
84 """)
85
86 _group.add_argument('--as-raw', action='store_true', dest='raw',
87                     help="""
88 Synonym for --raw.
89 """)
90
91 _group.add_argument('--raw', action='store_true',
92                     help="""
93 Store the file content and display the data block locators on stdout,
94 separated by commas, with a trailing newline. Do not store a
95 manifest.
96 """)
97
98 upload_opts.add_argument('--use-filename', type=str, default=None,
99                          dest='filename', help="""
100 Synonym for --filename.
101 """)
102
103 upload_opts.add_argument('--filename', type=str, default=None,
104                          help="""
105 Use the given filename in the manifest, instead of the name of the
106 local file. This is useful when "-" or "/dev/stdin" is given as an
107 input file. It can be used only if there is exactly one path given and
108 it is not a directory. Implies --manifest.
109 """)
110
111 upload_opts.add_argument('--portable-data-hash', action='store_true',
112                          help="""
113 Print the portable data hash instead of the Arvados UUID for the collection
114 created by the upload.
115 """)
116
117 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
118                          help="""
119 Set the replication level for the new collection: how many different
120 physical storage devices (e.g., disks) should have a copy of each data
121 block. Default is to use the server-provided default (if any) or 2.
122 """)
123
124 run_opts = argparse.ArgumentParser(add_help=False)
125
126 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
127 Store the collection in the specified project, instead of your Home
128 project.
129 """)
130
131 run_opts.add_argument('--name', help="""
132 Save the collection with the specified name.
133 """)
134
135 _group = run_opts.add_mutually_exclusive_group()
136 _group.add_argument('--progress', action='store_true',
137                     help="""
138 Display human-readable progress on stderr (bytes and, if possible,
139 percentage of total data size). This is the default behavior when
140 stderr is a tty.
141 """)
142
143 _group.add_argument('--no-progress', action='store_true',
144                     help="""
145 Do not display human-readable progress on stderr, even if stderr is a
146 tty.
147 """)
148
149 _group.add_argument('--batch-progress', action='store_true',
150                     help="""
151 Display machine-readable progress on stderr (bytes and, if known,
152 total data size).
153 """)
154
155 _group = run_opts.add_mutually_exclusive_group()
156 _group.add_argument('--resume', action='store_true', default=True,
157                     help="""
158 Continue interrupted uploads from cached state (default).
159 """)
160 _group.add_argument('--no-resume', action='store_false', dest='resume',
161                     help="""
162 Do not continue interrupted uploads from cached state.
163 """)
164
165 arg_parser = argparse.ArgumentParser(
166     description='Copy data from the local filesystem to Keep.',
167     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
168
169 def parse_arguments(arguments):
170     args = arg_parser.parse_args(arguments)
171
172     if len(args.paths) == 0:
173         args.paths = ['-']
174
175     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
176
177     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
178         if args.filename:
179             arg_parser.error("""
180     --filename argument cannot be used when storing a directory or
181     multiple files.
182     """)
183
184     # Turn on --progress by default if stderr is a tty.
185     if (not (args.batch_progress or args.no_progress)
186         and os.isatty(sys.stderr.fileno())):
187         args.progress = True
188
189     if args.paths == ['-']:
190         args.resume = False
191         if not args.filename:
192             args.filename = 'stdin'
193
194     return args
195
196 class ResumeCacheConflict(Exception):
197     pass
198
199
200 class ResumeCache(object):
201     CACHE_DIR = '.cache/arvados/arv-put'
202
203     def __init__(self, file_spec):
204         self.cache_file = open(file_spec, 'a+')
205         self._lock_file(self.cache_file)
206         self.filename = self.cache_file.name
207
208     @classmethod
209     def make_path(cls, args):
210         md5 = hashlib.md5()
211         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
212         realpaths = sorted(os.path.realpath(path) for path in args.paths)
213         md5.update('\0'.join(realpaths))
214         if any(os.path.isdir(path) for path in realpaths):
215             md5.update(str(max(args.max_manifest_depth, -1)))
216         elif args.filename:
217             md5.update(args.filename)
218         return os.path.join(
219             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
220             md5.hexdigest())
221
222     def _lock_file(self, fileobj):
223         try:
224             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
225         except IOError:
226             raise ResumeCacheConflict("{} locked".format(fileobj.name))
227
228     def load(self):
229         self.cache_file.seek(0)
230         return json.load(self.cache_file)
231
232     def check_cache(self, api_client=None, num_retries=0):
233         try:
234             state = self.load()
235             locator = None
236             try:
237                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
238                     locator = state["_finished_streams"][0][1][0]
239                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
240                     locator = state["_current_stream_locators"][0]
241                 if locator is not None:
242                     kc = arvados.keep.KeepClient(api_client=api_client)
243                     kc.head(locator, num_retries=num_retries)
244             except Exception as e:
245                 self.restart()
246         except (ValueError):
247             pass
248
249     def save(self, data):
250         try:
251             new_cache_fd, new_cache_name = tempfile.mkstemp(
252                 dir=os.path.dirname(self.filename))
253             self._lock_file(new_cache_fd)
254             new_cache = os.fdopen(new_cache_fd, 'r+')
255             json.dump(data, new_cache)
256             os.rename(new_cache_name, self.filename)
257         except (IOError, OSError, ResumeCacheConflict) as error:
258             try:
259                 os.unlink(new_cache_name)
260             except NameError:  # mkstemp failed.
261                 pass
262         else:
263             self.cache_file.close()
264             self.cache_file = new_cache
265
266     def close(self):
267         self.cache_file.close()
268
269     def destroy(self):
270         try:
271             os.unlink(self.filename)
272         except OSError as error:
273             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
274                 raise
275         self.close()
276
277     def restart(self):
278         self.destroy()
279         self.__init__(self.filename)
280
281
282 class ArvPutUploadJob(object):
283     CACHE_DIR = '.cache/arvados/arv-put'
284     EMPTY_STATE = {
285         'manifest' : None, # Last saved manifest checkpoint
286         'files' : {} # Previous run file list: {path : {size, mtime}}
287     }
288
289     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
290                  name=None, owner_uuid=None, ensure_unique_name=False,
291                  num_retries=None, write_copies=None, replication=None,
292                  filename=None, update_time=60.0):
293         self.paths = paths
294         self.resume = resume
295         self.reporter = reporter
296         self.bytes_expected = bytes_expected
297         self.bytes_written = 0
298         self.bytes_skipped = 0
299         self.name = name
300         self.owner_uuid = owner_uuid
301         self.ensure_unique_name = ensure_unique_name
302         self.num_retries = num_retries
303         self.write_copies = write_copies
304         self.replication = replication
305         self.filename = filename
306         self._state_lock = threading.Lock()
307         self._state = None # Previous run state (file list & manifest)
308         self._current_files = [] # Current run file list
309         self._cache_file = None
310         self._collection = None
311         self._collection_lock = threading.Lock()
312         self._stop_checkpointer = threading.Event()
313         self._checkpointer = threading.Thread(target=self._update_task)
314         self._update_task_time = update_time  # How many seconds wait between update runs
315         # Load cached data if any and if needed
316         self._setup_state()
317
318     def start(self):
319         """
320         Start supporting thread & file uploading
321         """
322         self._checkpointer.start()
323         try:
324             for path in self.paths:
325                 # Test for stdin first, in case some file named '-' exist
326                 if path == '-':
327                     self._write_stdin(self.filename or 'stdin')
328                 elif os.path.isdir(path):
329                     self._write_directory_tree(path)
330                 else:
331                     self._write_file(path, self.filename or os.path.basename(path))
332         finally:
333             # Stop the thread before doing anything else
334             self._stop_checkpointer.set()
335             self._checkpointer.join()
336         # Successful upload, one last _update()
337         self._update()
338         if self.resume:
339             self._cache_file.close()
340             # Correct the final written bytes count
341             self.bytes_written -= self.bytes_skipped
342
343     def save_collection(self):
344         with self._collection_lock:
345             self._my_collection().save_new(
346                                 name=self.name, owner_uuid=self.owner_uuid,
347                                 ensure_unique_name=self.ensure_unique_name,
348                                 num_retries=self.num_retries,
349                                 replication_desired=self.replication)
350
351     def destroy_cache(self):
352         if self.resume:
353             try:
354                 os.unlink(self._cache_filename)
355             except OSError as error:
356                 if error.errno != errno.ENOENT:  # That's what we wanted anyway.
357                     raise
358             self._cache_file.close()
359
360     def _collection_size(self, collection):
361         """
362         Recursively get the total size of the collection
363         """
364         size = 0
365         for item in collection.values():
366             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
367                 size += self._collection_size(item)
368             else:
369                 size += item.size()
370         return size
371
372     def _update_task(self):
373         """
374         Periodically called support task. File uploading is
375         asynchronous so we poll status from the collection.
376         """
377         while not self._stop_checkpointer.wait(self._update_task_time):
378             self._update()
379
380     def _update(self):
381         """
382         Update cached manifest text and report progress.
383         """
384         with self._collection_lock:
385             self.bytes_written = self._collection_size(self._my_collection())
386             # Update cache, if resume enabled
387             if self.resume:
388                 with self._state_lock:
389                     self._state['manifest'] = self._my_collection().manifest_text()
390         if self.resume:
391             self._save_state()
392         # Call the reporter, if any
393         self.report_progress()
394
395     def report_progress(self):
396         if self.reporter is not None:
397             self.reporter(self.bytes_written, self.bytes_expected)
398
399     def _write_directory_tree(self, path, stream_name="."):
400         # TODO: Check what happens when multiple directories are passed as
401         # arguments.
402         # If the code below is uncommented, integration test
403         # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
404         # fails, I suppose it is because the manifest_uuid changes because
405         # of the dir addition to stream_name.
406
407         # if stream_name == '.':
408         #     stream_name = os.path.join('.', os.path.basename(path))
409         for item in os.listdir(path):
410             if os.path.isdir(os.path.join(path, item)):
411                 self._write_directory_tree(os.path.join(path, item),
412                                 os.path.join(stream_name, item))
413             else:
414                 self._write_file(os.path.join(path, item),
415                                 os.path.join(stream_name, item))
416
417     def _write_stdin(self, filename):
418         with self._collection_lock:
419             output = self._my_collection().open(filename, 'w')
420         self._write(sys.stdin, output)
421         output.close()
422
423     def _write_file(self, source, filename):
424         resume_offset = 0
425         if self.resume:
426             # Check if file was already uploaded (at least partially)
427             with self._collection_lock:
428                 try:
429                     file_in_collection = self._my_collection().find(filename)
430                 except IOError:
431                     # Not found
432                     file_in_collection = None
433             # If no previous cached data on this file, store it for an eventual
434             # repeated run.
435             if source not in self._state['files']:
436                 with self._state_lock:
437                     self._state['files'][source] = {
438                         'mtime': os.path.getmtime(source),
439                         'size' : os.path.getsize(source)
440                     }
441             cached_file_data = self._state['files'][source]
442             # See if this file was already uploaded at least partially
443             if file_in_collection:
444                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
445                     if cached_file_data['size'] == file_in_collection.size():
446                         # File already there, skip it.
447                         self.bytes_skipped += cached_file_data['size']
448                         return
449                     elif cached_file_data['size'] > file_in_collection.size():
450                         # File partially uploaded, resume!
451                         resume_offset = file_in_collection.size()
452                     else:
453                         # Inconsistent cache, re-upload the file
454                         pass # TODO: log warning message
455                 else:
456                     # Local file differs from cached data, re-upload it
457                     pass
458         with open(source, 'r') as source_fd:
459             if resume_offset > 0:
460                 # Start upload where we left off
461                 with self._collection_lock:
462                     output = self._my_collection().open(filename, 'a')
463                 source_fd.seek(resume_offset)
464                 self.bytes_skipped += resume_offset
465             else:
466                 # Start from scratch
467                 with self._collection_lock:
468                     output = self._my_collection().open(filename, 'w')
469             self._write(source_fd, output)
470             output.close()
471
472     def _write(self, source_fd, output):
473         while True:
474             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
475             if not data:
476                 break
477             output.write(data)
478
479     def _my_collection(self):
480         """
481         Create a new collection if none cached. Load it from cache otherwise.
482         """
483         if self._collection is None:
484             with self._state_lock:
485                 manifest = self._state['manifest']
486             if self.resume and manifest is not None:
487                 # Create collection from saved state
488                 self._collection = arvados.collection.Collection(
489                                         manifest,
490                                         num_write_copies=self.write_copies)
491             else:
492                 # Create new collection
493                 self._collection = arvados.collection.Collection(
494                                         num_write_copies=self.write_copies)
495         return self._collection
496
497     def _setup_state(self):
498         """
499         Create a new cache file or load a previously existing one.
500         """
501         if self.resume:
502             md5 = hashlib.md5()
503             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
504             realpaths = sorted(os.path.realpath(path) for path in self.paths)
505             md5.update('\0'.join(realpaths))
506             if self.filename:
507                 md5.update(self.filename)
508             cache_filename = md5.hexdigest()
509             self._cache_file = open(os.path.join(
510                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
511                 cache_filename), 'a+')
512             self._cache_filename = self._cache_file.name
513             self._lock_file(self._cache_file)
514             self._cache_file.seek(0)
515             with self._state_lock:
516                 try:
517                     self._state = json.load(self._cache_file)
518                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
519                         # Cache at least partially incomplete, set up new cache
520                         self._state = copy.deepcopy(self.EMPTY_STATE)
521                 except ValueError:
522                     # Cache file empty, set up new cache
523                     self._state = copy.deepcopy(self.EMPTY_STATE)
524             # Load how many bytes were uploaded on previous run
525             with self._collection_lock:
526                 self.bytes_written = self._collection_size(self._my_collection())
527         # No resume required
528         else:
529             with self._state_lock:
530                 self._state = copy.deepcopy(self.EMPTY_STATE)
531
532     def _lock_file(self, fileobj):
533         try:
534             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
535         except IOError:
536             raise ResumeCacheConflict("{} locked".format(fileobj.name))
537
538     def _save_state(self):
539         """
540         Atomically save current state into cache.
541         """
542         try:
543             with self._state_lock:
544                 state = self._state
545             new_cache_fd, new_cache_name = tempfile.mkstemp(
546                 dir=os.path.dirname(self._cache_filename))
547             self._lock_file(new_cache_fd)
548             new_cache = os.fdopen(new_cache_fd, 'r+')
549             json.dump(state, new_cache)
550             new_cache.flush()
551             os.fsync(new_cache)
552             os.rename(new_cache_name, self._cache_filename)
553         except (IOError, OSError, ResumeCacheConflict) as error:
554             try:
555                 os.unlink(new_cache_name)
556             except NameError:  # mkstemp failed.
557                 pass
558         else:
559             self._cache_file.close()
560             self._cache_file = new_cache
561
562     def collection_name(self):
563         with self._collection_lock:
564             name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
565         return name
566
567     def manifest_locator(self):
568         with self._collection_lock:
569             locator = self._my_collection().manifest_locator()
570         return locator
571
572     def portable_data_hash(self):
573         with self._collection_lock:
574             datahash = self._my_collection().portable_data_hash()
575         return datahash
576
577     def manifest_text(self, stream_name=".", strip=False, normalize=False):
578         with self._collection_lock:
579             manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
580         return manifest
581
582     def _datablocks_on_item(self, item):
583         """
584         Return a list of datablock locators, recursively navigating
585         through subcollections
586         """
587         if isinstance(item, arvados.arvfile.ArvadosFile):
588             locators = []
589             for segment in item.segments():
590                 loc = segment.locator
591                 locators.append(loc)
592             return locators
593         elif isinstance(item, arvados.collection.Collection):
594             l = [self._datablocks_on_item(x) for x in item.values()]
595             # Fast list flattener method taken from:
596             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
597             return [loc for sublist in l for loc in sublist]
598         else:
599             return None
600
601     def data_locators(self):
602         with self._collection_lock:
603             # Make sure all datablocks are flushed before getting the locators
604             self._my_collection().manifest_text()
605             datablocks = self._datablocks_on_item(self._my_collection())
606         return datablocks
607
608
609 def expected_bytes_for(pathlist):
610     # Walk the given directory trees and stat files, adding up file sizes,
611     # so we can display progress as percent
612     bytesum = 0
613     for path in pathlist:
614         if os.path.isdir(path):
615             for filename in arvados.util.listdir_recursive(path):
616                 bytesum += os.path.getsize(os.path.join(path, filename))
617         elif not os.path.isfile(path):
618             return None
619         else:
620             bytesum += os.path.getsize(path)
621     return bytesum
622
623 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
624                                                             os.getpid())
625 def machine_progress(bytes_written, bytes_expected):
626     return _machine_format.format(
627         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
628
629 def human_progress(bytes_written, bytes_expected):
630     if bytes_expected:
631         return "\r{}M / {}M {:.1%} ".format(
632             bytes_written >> 20, bytes_expected >> 20,
633             float(bytes_written) / bytes_expected)
634     else:
635         return "\r{} ".format(bytes_written)
636
637 def progress_writer(progress_func, outfile=sys.stderr):
638     def write_progress(bytes_written, bytes_expected):
639         outfile.write(progress_func(bytes_written, bytes_expected))
640     return write_progress
641
642 def exit_signal_handler(sigcode, frame):
643     sys.exit(-sigcode)
644
645 def desired_project_uuid(api_client, project_uuid, num_retries):
646     if not project_uuid:
647         query = api_client.users().current()
648     elif arvados.util.user_uuid_pattern.match(project_uuid):
649         query = api_client.users().get(uuid=project_uuid)
650     elif arvados.util.group_uuid_pattern.match(project_uuid):
651         query = api_client.groups().get(uuid=project_uuid)
652     else:
653         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
654     return query.execute(num_retries=num_retries)['uuid']
655
656 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
657     global api_client
658
659     args = parse_arguments(arguments)
660     status = 0
661     if api_client is None:
662         api_client = arvados.api('v1')
663
664     # Determine the name to use
665     if args.name:
666         if args.stream or args.raw:
667             print >>stderr, "Cannot use --name with --stream or --raw"
668             sys.exit(1)
669         collection_name = args.name
670     else:
671         collection_name = "Saved at {} by {}@{}".format(
672             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
673             pwd.getpwuid(os.getuid()).pw_name,
674             socket.gethostname())
675
676     if args.project_uuid and (args.stream or args.raw):
677         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
678         sys.exit(1)
679
680     # Determine the parent project
681     try:
682         project_uuid = desired_project_uuid(api_client, args.project_uuid,
683                                             args.retries)
684     except (apiclient_errors.Error, ValueError) as error:
685         print >>stderr, error
686         sys.exit(1)
687
688     # write_copies diverges from args.replication here.
689     # args.replication is how many copies we will instruct Arvados to
690     # maintain (by passing it in collections().create()) after all
691     # data is written -- and if None was given, we'll use None there.
692     # Meanwhile, write_copies is how many copies of each data block we
693     # write to Keep, which has to be a number.
694     #
695     # If we simply changed args.replication from None to a default
696     # here, we'd end up erroneously passing the default replication
697     # level (instead of None) to collections().create().
698     write_copies = (args.replication or
699                     api_client._rootDesc.get('defaultCollectionReplication', 2))
700
701     if args.progress:
702         reporter = progress_writer(human_progress)
703     elif args.batch_progress:
704         reporter = progress_writer(machine_progress)
705     else:
706         reporter = None
707
708     bytes_expected = expected_bytes_for(args.paths)
709     try:
710         writer = ArvPutUploadJob(paths = args.paths,
711                                 resume = args.resume,
712                                 reporter = reporter,
713                                 bytes_expected = bytes_expected,
714                                 num_retries = args.retries,
715                                 write_copies = write_copies,
716                                 replication = args.replication,
717                                 name = collection_name,
718                                 owner_uuid = project_uuid,
719                                 ensure_unique_name = True)
720     except ResumeCacheConflict:
721         print >>stderr, "\n".join([
722             "arv-put: Another process is already uploading this data.",
723             "         Use --no-resume if this is really what you want."])
724         sys.exit(1)
725
726     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
727     # the originals.
728     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
729                             for sigcode in CAUGHT_SIGNALS}
730
731     if args.resume and writer.bytes_written > 0:
732         print >>stderr, "\n".join([
733                 "arv-put: Resuming previous upload from last checkpoint.",
734                 "         Use the --no-resume option to start over."])
735
736     writer.report_progress()
737     output = None
738     writer.start()
739     if args.progress:  # Print newline to split stderr from stdout for humans.
740         print >>stderr
741
742     if args.stream:
743         if args.normalize:
744             output = writer.manifest_text(normalize=True)
745         else:
746             output = writer.manifest_text()
747     elif args.raw:
748         output = ','.join(writer.data_locators())
749     else:
750         try:
751             writer.save_collection()
752             print >>stderr, "Collection saved as '%s'" % writer.collection_name()
753             if args.portable_data_hash:
754                 output = writer.portable_data_hash()
755             else:
756                 output = writer.manifest_locator()
757         except apiclient_errors.Error as error:
758             print >>stderr, (
759                 "arv-put: Error creating Collection on project: {}.".format(
760                     error))
761             status = 1
762
763     # Print the locator (uuid) of the new collection.
764     if output is None:
765         status = status or 1
766     else:
767         stdout.write(output)
768         if not output.endswith('\n'):
769             stdout.write('\n')
770
771     for sigcode, orig_handler in orig_signal_handlers.items():
772         signal.signal(sigcode, orig_handler)
773
774     if status != 0:
775         sys.exit(status)
776     else:
777         writer.destroy_cache()
778
779     return output
780
781
782 if __name__ == '__main__':
783     main()