9463: Use a constant as a template for initializing the empty cache
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import time
18 import signal
19 import socket
20 import sys
21 import tempfile
22 import threading
23 import copy
24 from apiclient import errors as apiclient_errors
25
26 import arvados.commands._util as arv_cmd
27
28 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
29 api_client = None
30
31 upload_opts = argparse.ArgumentParser(add_help=False)
32
33 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
34                          help="""
35 Local file or directory. Default: read from standard input.
36 """)
37
38 _group = upload_opts.add_mutually_exclusive_group()
39
40 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
41                     default=-1, help="""
42 Maximum depth of directory tree to represent in the manifest
43 structure. A directory structure deeper than this will be represented
44 as a single stream in the manifest. If N=0, the manifest will contain
45 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
46 stream per filesystem directory that contains files.
47 """)
48
49 _group.add_argument('--normalize', action='store_true',
50                     help="""
51 Normalize the manifest by re-ordering files and streams after writing
52 data.
53 """)
54
55 _group = upload_opts.add_mutually_exclusive_group()
56
57 _group.add_argument('--as-stream', action='store_true', dest='stream',
58                     help="""
59 Synonym for --stream.
60 """)
61
62 _group.add_argument('--stream', action='store_true',
63                     help="""
64 Store the file content and display the resulting manifest on
65 stdout. Do not write the manifest to Keep or save a Collection object
66 in Arvados.
67 """)
68
69 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
70                     help="""
71 Synonym for --manifest.
72 """)
73
74 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
75                     help="""
76 Synonym for --manifest.
77 """)
78
79 _group.add_argument('--manifest', action='store_true',
80                     help="""
81 Store the file data and resulting manifest in Keep, save a Collection
82 object in Arvados, and display the manifest locator (Collection uuid)
83 on stdout. This is the default behavior.
84 """)
85
86 _group.add_argument('--as-raw', action='store_true', dest='raw',
87                     help="""
88 Synonym for --raw.
89 """)
90
91 _group.add_argument('--raw', action='store_true',
92                     help="""
93 Store the file content and display the data block locators on stdout,
94 separated by commas, with a trailing newline. Do not store a
95 manifest.
96 """)
97
98 upload_opts.add_argument('--use-filename', type=str, default=None,
99                          dest='filename', help="""
100 Synonym for --filename.
101 """)
102
103 upload_opts.add_argument('--filename', type=str, default=None,
104                          help="""
105 Use the given filename in the manifest, instead of the name of the
106 local file. This is useful when "-" or "/dev/stdin" is given as an
107 input file. It can be used only if there is exactly one path given and
108 it is not a directory. Implies --manifest.
109 """)
110
111 upload_opts.add_argument('--portable-data-hash', action='store_true',
112                          help="""
113 Print the portable data hash instead of the Arvados UUID for the collection
114 created by the upload.
115 """)
116
117 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
118                          help="""
119 Set the replication level for the new collection: how many different
120 physical storage devices (e.g., disks) should have a copy of each data
121 block. Default is to use the server-provided default (if any) or 2.
122 """)
123
124 run_opts = argparse.ArgumentParser(add_help=False)
125
126 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
127 Store the collection in the specified project, instead of your Home
128 project.
129 """)
130
131 run_opts.add_argument('--name', help="""
132 Save the collection with the specified name.
133 """)
134
135 _group = run_opts.add_mutually_exclusive_group()
136 _group.add_argument('--progress', action='store_true',
137                     help="""
138 Display human-readable progress on stderr (bytes and, if possible,
139 percentage of total data size). This is the default behavior when
140 stderr is a tty.
141 """)
142
143 _group.add_argument('--no-progress', action='store_true',
144                     help="""
145 Do not display human-readable progress on stderr, even if stderr is a
146 tty.
147 """)
148
149 _group.add_argument('--batch-progress', action='store_true',
150                     help="""
151 Display machine-readable progress on stderr (bytes and, if known,
152 total data size).
153 """)
154
155 _group = run_opts.add_mutually_exclusive_group()
156 _group.add_argument('--resume', action='store_true', default=True,
157                     help="""
158 Continue interrupted uploads from cached state (default).
159 """)
160 _group.add_argument('--no-resume', action='store_false', dest='resume',
161                     help="""
162 Do not continue interrupted uploads from cached state.
163 """)
164
165 arg_parser = argparse.ArgumentParser(
166     description='Copy data from the local filesystem to Keep.',
167     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
168
169 def parse_arguments(arguments):
170     args = arg_parser.parse_args(arguments)
171
172     if len(args.paths) == 0:
173         args.paths = ['-']
174
175     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
176
177     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
178         if args.filename:
179             arg_parser.error("""
180     --filename argument cannot be used when storing a directory or
181     multiple files.
182     """)
183
184     # Turn on --progress by default if stderr is a tty.
185     if (not (args.batch_progress or args.no_progress)
186         and os.isatty(sys.stderr.fileno())):
187         args.progress = True
188
189     if args.paths == ['-']:
190         args.resume = False
191         if not args.filename:
192             args.filename = 'stdin'
193
194     return args
195
196 class ResumeCacheConflict(Exception):
197     pass
198
199
200 class ResumeCache(object):
201     CACHE_DIR = '.cache/arvados/arv-put'
202
203     def __init__(self, file_spec):
204         self.cache_file = open(file_spec, 'a+')
205         self._lock_file(self.cache_file)
206         self.filename = self.cache_file.name
207
208     @classmethod
209     def make_path(cls, args):
210         md5 = hashlib.md5()
211         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
212         realpaths = sorted(os.path.realpath(path) for path in args.paths)
213         md5.update('\0'.join(realpaths))
214         if any(os.path.isdir(path) for path in realpaths):
215             md5.update(str(max(args.max_manifest_depth, -1)))
216         elif args.filename:
217             md5.update(args.filename)
218         return os.path.join(
219             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
220             md5.hexdigest())
221
222     def _lock_file(self, fileobj):
223         try:
224             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
225         except IOError:
226             raise ResumeCacheConflict("{} locked".format(fileobj.name))
227
228     def load(self):
229         self.cache_file.seek(0)
230         return json.load(self.cache_file)
231
232     def check_cache(self, api_client=None, num_retries=0):
233         try:
234             state = self.load()
235             locator = None
236             try:
237                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
238                     locator = state["_finished_streams"][0][1][0]
239                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
240                     locator = state["_current_stream_locators"][0]
241                 if locator is not None:
242                     kc = arvados.keep.KeepClient(api_client=api_client)
243                     kc.head(locator, num_retries=num_retries)
244             except Exception as e:
245                 self.restart()
246         except (ValueError):
247             pass
248
249     def save(self, data):
250         try:
251             new_cache_fd, new_cache_name = tempfile.mkstemp(
252                 dir=os.path.dirname(self.filename))
253             self._lock_file(new_cache_fd)
254             new_cache = os.fdopen(new_cache_fd, 'r+')
255             json.dump(data, new_cache)
256             os.rename(new_cache_name, self.filename)
257         except (IOError, OSError, ResumeCacheConflict) as error:
258             try:
259                 os.unlink(new_cache_name)
260             except NameError:  # mkstemp failed.
261                 pass
262         else:
263             self.cache_file.close()
264             self.cache_file = new_cache
265
266     def close(self):
267         self.cache_file.close()
268
269     def destroy(self):
270         try:
271             os.unlink(self.filename)
272         except OSError as error:
273             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
274                 raise
275         self.close()
276
277     def restart(self):
278         self.destroy()
279         self.__init__(self.filename)
280
281
282 class ArvPutUploadJob(object):
283     CACHE_DIR = '.cache/arvados/arv-put'
284     EMPTY_STATE = {
285         'manifest' : None, # Last saved manifest checkpoint
286         'files' : {} # Previous run file list: {path : {size, mtime}}
287     }
288
289     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
290                  name=None, owner_uuid=None, ensure_unique_name=False,
291                  num_retries=None, write_copies=None, replication=None,
292                  filename=None, update_time=60.0):
293         self.paths = paths
294         self.resume = resume
295         self.reporter = reporter
296         self.bytes_expected = bytes_expected
297         self.bytes_written = 0
298         self.bytes_skipped = 0
299         self.name = name
300         self.owner_uuid = owner_uuid
301         self.ensure_unique_name = ensure_unique_name
302         self.num_retries = num_retries
303         self.write_copies = write_copies
304         self.replication = replication
305         self.filename = filename
306         self._state_lock = threading.Lock()
307         self._state = None # Previous run state (file list & manifest)
308         self._current_files = [] # Current run file list
309         self._cache_file = None
310         self._collection = None
311         self._collection_lock = threading.Lock()
312         self._stop_checkpointer = threading.Event()
313         self._checkpointer = threading.Thread(target=self._update_task)
314         self._update_task_time = update_time  # How many seconds wait between update runs
315         # Load cached data if any and if needed
316         self._setup_state()
317
318     def start(self):
319         """
320         Start supporting thread & file uploading
321         """
322         self._checkpointer.start()
323         try:
324             for path in self.paths:
325                 # Test for stdin first, in case some file named '-' exist
326                 if path == '-':
327                     self._write_stdin(self.filename or 'stdin')
328                 elif os.path.isdir(path):
329                     self._write_directory_tree(path)
330                 else:
331                     self._write_file(path, self.filename or os.path.basename(path))
332         finally:
333             # Stop the thread before doing anything else
334             self._stop_checkpointer.set()
335             self._checkpointer.join()
336         # Successful upload, one last _update()
337         self._update()
338         if self.resume:
339             self._cache_file.close()
340             # Correct the final written bytes count
341             self.bytes_written -= self.bytes_skipped
342
343     def save_collection(self):
344         with self._collection_lock:
345             self._my_collection().save_new(
346                                 name=self.name, owner_uuid=self.owner_uuid,
347                                 ensure_unique_name=self.ensure_unique_name,
348                                 num_retries=self.num_retries,
349                                 replication_desired=self.replication)
350
351     def destroy_cache(self):
352         if self.resume:
353             try:
354                 os.unlink(self._cache_filename)
355             except OSError as error:
356                 if error.errno != errno.ENOENT:  # That's what we wanted anyway.
357                     raise
358             self._cache_file.close()
359
360     def _collection_size(self, collection):
361         """
362         Recursively get the total size of the collection
363         """
364         size = 0
365         for item in collection.values():
366             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
367                 size += self._collection_size(item)
368             else:
369                 size += item.size()
370         return size
371
372     def _update_task(self):
373         """
374         Periodically called support task. File uploading is
375         asynchronous so we poll status from the collection.
376         """
377         while not self._stop_checkpointer.wait(self._update_task_time):
378             self._update()
379
380     def _update(self):
381         """
382         Update cached manifest text and report progress.
383         """
384         with self._collection_lock:
385             self.bytes_written = self._collection_size(self._my_collection())
386             # Update cache, if resume enabled
387             if self.resume:
388                 with self._state_lock:
389                     self._state['manifest'] = self._my_collection().manifest_text()
390         if self.resume:
391             self._save_state()
392         # Call the reporter, if any
393         self.report_progress()
394
395     def report_progress(self):
396         if self.reporter is not None:
397             self.reporter(self.bytes_written, self.bytes_expected)
398
399     def _write_directory_tree(self, path, stream_name="."):
400         # TODO: Check what happens when multiple directories are passed as
401         # arguments.
402         # If the code below is uncommented, integration test
403         # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
404         # fails, I suppose it is because the manifest_uuid changes because
405         # of the dir addition to stream_name.
406
407         # if stream_name == '.':
408         #     stream_name = os.path.join('.', os.path.basename(path))
409         for item in os.listdir(path):
410             if os.path.isdir(os.path.join(path, item)):
411                 self._write_directory_tree(os.path.join(path, item),
412                                 os.path.join(stream_name, item))
413             else:
414                 self._write_file(os.path.join(path, item),
415                                 os.path.join(stream_name, item))
416
417     def _write_stdin(self, filename):
418         with self._collection_lock:
419             output = self._my_collection().open(filename, 'w')
420         self._write(sys.stdin, output)
421         output.close()
422
423     def _write_file(self, source, filename):
424         resume_offset = 0
425         resume_upload = False
426         if self.resume:
427             # Check if file was already uploaded (at least partially)
428             with self._collection_lock:
429                 try:
430                     file_in_collection = self._my_collection().find(filename)
431                 except IOError:
432                     # Not found
433                     file_in_collection = None
434             # If no previous cached data on this file, store it for an eventual
435             # repeated run.
436             if source not in self._state['files'].keys():
437                 with self._state_lock:
438                     self._state['files'][source] = {
439                         'mtime' : os.path.getmtime(source),
440                         'size' : os.path.getsize(source)
441                     }
442             cached_file_data = self._state['files'][source]
443             # See if this file was already uploaded at least partially
444             if file_in_collection:
445                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
446                     if os.path.getsize(source) == file_in_collection.size():
447                         # File already there, skip it.
448                         self.bytes_skipped += os.path.getsize(source)
449                         return
450                     elif os.path.getsize(source) > file_in_collection.size():
451                         # File partially uploaded, resume!
452                         resume_upload = True
453                         resume_offset = file_in_collection.size()
454                     else:
455                         # Inconsistent cache, re-upload the file
456                         pass
457                 else:
458                     # Local file differs from cached data, re-upload it
459                     pass
460         with open(source, 'r') as source_fd:
461             if self.resume and resume_upload:
462                 with self._collection_lock:
463                     # Open for appending
464                     output = self._my_collection().open(filename, 'a')
465                 source_fd.seek(resume_offset)
466                 self.bytes_skipped += resume_offset
467             else:
468                 with self._collection_lock:
469                     output = self._my_collection().open(filename, 'w')
470             self._write(source_fd, output)
471             output.close()
472
473     def _write(self, source_fd, output):
474         while True:
475             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
476             if not data:
477                 break
478             output.write(data)
479
480     def _my_collection(self):
481         """
482         Create a new collection if none cached. Load it from cache otherwise.
483         """
484         if self._collection is None:
485             with self._state_lock:
486                 manifest = self._state['manifest']
487             if self.resume and manifest is not None:
488                 # Create collection from saved state
489                 self._collection = arvados.collection.Collection(
490                                         manifest,
491                                         num_write_copies=self.write_copies)
492             else:
493                 # Create new collection
494                 self._collection = arvados.collection.Collection(
495                                         num_write_copies=self.write_copies)
496         return self._collection
497
498     def _setup_state(self):
499         """
500         Create a new cache file or load a previously existing one.
501         """
502         if self.resume:
503             md5 = hashlib.md5()
504             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
505             realpaths = sorted(os.path.realpath(path) for path in self.paths)
506             md5.update('\0'.join(realpaths))
507             if self.filename:
508                 md5.update(self.filename)
509             cache_filename = md5.hexdigest()
510             self._cache_file = open(os.path.join(
511                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
512                 cache_filename), 'a+')
513             self._cache_filename = self._cache_file.name
514             self._lock_file(self._cache_file)
515             self._cache_file.seek(0)
516             with self._state_lock:
517                 try:
518                     self._state = json.load(self._cache_file)
519                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
520                         # Cache at least partially incomplete, set up new cache
521                         self._state = copy.deepcopy(self.EMPTY_STATE)
522                 except ValueError:
523                     # Cache file empty, set up new cache
524                     self._state = copy.deepcopy(self.EMPTY_STATE)
525             # Load how many bytes were uploaded on previous run
526             with self._collection_lock:
527                 self.bytes_written = self._collection_size(self._my_collection())
528         # No resume required
529         else:
530             with self._state_lock:
531                 self._state = copy.deepcopy(self.EMPTY_STATE)
532
533     def _lock_file(self, fileobj):
534         try:
535             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
536         except IOError:
537             raise ResumeCacheConflict("{} locked".format(fileobj.name))
538
539     def _save_state(self):
540         """
541         Atomically save current state into cache.
542         """
543         try:
544             with self._state_lock:
545                 state = self._state
546             new_cache_fd, new_cache_name = tempfile.mkstemp(
547                 dir=os.path.dirname(self._cache_filename))
548             self._lock_file(new_cache_fd)
549             new_cache = os.fdopen(new_cache_fd, 'r+')
550             json.dump(state, new_cache)
551             new_cache.flush()
552             os.fsync(new_cache)
553             os.rename(new_cache_name, self._cache_filename)
554         except (IOError, OSError, ResumeCacheConflict) as error:
555             try:
556                 os.unlink(new_cache_name)
557             except NameError:  # mkstemp failed.
558                 pass
559         else:
560             self._cache_file.close()
561             self._cache_file = new_cache
562
563     def collection_name(self):
564         with self._collection_lock:
565             name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
566         return name
567
568     def manifest_locator(self):
569         with self._collection_lock:
570             locator = self._my_collection().manifest_locator()
571         return locator
572
573     def portable_data_hash(self):
574         with self._collection_lock:
575             datahash = self._my_collection().portable_data_hash()
576         return datahash
577
578     def manifest_text(self, stream_name=".", strip=False, normalize=False):
579         with self._collection_lock:
580             manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
581         return manifest
582
583     def _datablocks_on_item(self, item):
584         """
585         Return a list of datablock locators, recursively navigating
586         through subcollections
587         """
588         if isinstance(item, arvados.arvfile.ArvadosFile):
589             locators = []
590             for segment in item.segments():
591                 loc = segment.locator
592                 locators.append(loc)
593             return locators
594         elif isinstance(item, arvados.collection.Collection):
595             l = [self._datablocks_on_item(x) for x in item.values()]
596             # Fast list flattener method taken from:
597             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
598             return [loc for sublist in l for loc in sublist]
599         else:
600             return None
601
602     def data_locators(self):
603         with self._collection_lock:
604             # Make sure all datablocks are flushed before getting the locators
605             self._my_collection().manifest_text()
606             datablocks = self._datablocks_on_item(self._my_collection())
607         return datablocks
608
609
610 def expected_bytes_for(pathlist):
611     # Walk the given directory trees and stat files, adding up file sizes,
612     # so we can display progress as percent
613     bytesum = 0
614     for path in pathlist:
615         if os.path.isdir(path):
616             for filename in arvados.util.listdir_recursive(path):
617                 bytesum += os.path.getsize(os.path.join(path, filename))
618         elif not os.path.isfile(path):
619             return None
620         else:
621             bytesum += os.path.getsize(path)
622     return bytesum
623
624 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
625                                                             os.getpid())
626 def machine_progress(bytes_written, bytes_expected):
627     return _machine_format.format(
628         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
629
630 def human_progress(bytes_written, bytes_expected):
631     if bytes_expected:
632         return "\r{}M / {}M {:.1%} ".format(
633             bytes_written >> 20, bytes_expected >> 20,
634             float(bytes_written) / bytes_expected)
635     else:
636         return "\r{} ".format(bytes_written)
637
638 def progress_writer(progress_func, outfile=sys.stderr):
639     def write_progress(bytes_written, bytes_expected):
640         outfile.write(progress_func(bytes_written, bytes_expected))
641     return write_progress
642
643 def exit_signal_handler(sigcode, frame):
644     sys.exit(-sigcode)
645
646 def desired_project_uuid(api_client, project_uuid, num_retries):
647     if not project_uuid:
648         query = api_client.users().current()
649     elif arvados.util.user_uuid_pattern.match(project_uuid):
650         query = api_client.users().get(uuid=project_uuid)
651     elif arvados.util.group_uuid_pattern.match(project_uuid):
652         query = api_client.groups().get(uuid=project_uuid)
653     else:
654         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
655     return query.execute(num_retries=num_retries)['uuid']
656
657 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
658     global api_client
659
660     args = parse_arguments(arguments)
661     status = 0
662     if api_client is None:
663         api_client = arvados.api('v1')
664
665     # Determine the name to use
666     if args.name:
667         if args.stream or args.raw:
668             print >>stderr, "Cannot use --name with --stream or --raw"
669             sys.exit(1)
670         collection_name = args.name
671     else:
672         collection_name = "Saved at {} by {}@{}".format(
673             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
674             pwd.getpwuid(os.getuid()).pw_name,
675             socket.gethostname())
676
677     if args.project_uuid and (args.stream or args.raw):
678         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
679         sys.exit(1)
680
681     # Determine the parent project
682     try:
683         project_uuid = desired_project_uuid(api_client, args.project_uuid,
684                                             args.retries)
685     except (apiclient_errors.Error, ValueError) as error:
686         print >>stderr, error
687         sys.exit(1)
688
689     # write_copies diverges from args.replication here.
690     # args.replication is how many copies we will instruct Arvados to
691     # maintain (by passing it in collections().create()) after all
692     # data is written -- and if None was given, we'll use None there.
693     # Meanwhile, write_copies is how many copies of each data block we
694     # write to Keep, which has to be a number.
695     #
696     # If we simply changed args.replication from None to a default
697     # here, we'd end up erroneously passing the default replication
698     # level (instead of None) to collections().create().
699     write_copies = (args.replication or
700                     api_client._rootDesc.get('defaultCollectionReplication', 2))
701
702     if args.progress:
703         reporter = progress_writer(human_progress)
704     elif args.batch_progress:
705         reporter = progress_writer(machine_progress)
706     else:
707         reporter = None
708
709     bytes_expected = expected_bytes_for(args.paths)
710     try:
711         writer = ArvPutUploadJob(paths = args.paths,
712                                 resume = args.resume,
713                                 reporter = reporter,
714                                 bytes_expected = bytes_expected,
715                                 num_retries = args.retries,
716                                 write_copies = write_copies,
717                                 replication = args.replication,
718                                 name = collection_name,
719                                 owner_uuid = project_uuid,
720                                 ensure_unique_name = True)
721     except ResumeCacheConflict:
722         print >>stderr, "\n".join([
723             "arv-put: Another process is already uploading this data.",
724             "         Use --no-resume if this is really what you want."])
725         sys.exit(1)
726
727     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
728     # the originals.
729     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
730                             for sigcode in CAUGHT_SIGNALS}
731
732     if args.resume and writer.bytes_written > 0:
733         print >>stderr, "\n".join([
734                 "arv-put: Resuming previous upload from last checkpoint.",
735                 "         Use the --no-resume option to start over."])
736
737     writer.report_progress()
738     output = None
739     writer.start()
740     if args.progress:  # Print newline to split stderr from stdout for humans.
741         print >>stderr
742
743     if args.stream:
744         if args.normalize:
745             output = writer.manifest_text(normalize=True)
746         else:
747             output = writer.manifest_text()
748     elif args.raw:
749         output = ','.join(writer.data_locators())
750     else:
751         try:
752             writer.save_collection()
753             print >>stderr, "Collection saved as '%s'" % writer.collection_name()
754             if args.portable_data_hash:
755                 output = writer.portable_data_hash()
756             else:
757                 output = writer.manifest_locator()
758         except apiclient_errors.Error as error:
759             print >>stderr, (
760                 "arv-put: Error creating Collection on project: {}.".format(
761                     error))
762             status = 1
763
764     # Print the locator (uuid) of the new collection.
765     if output is None:
766         status = status or 1
767     else:
768         stdout.write(output)
769         if not output.endswith('\n'):
770             stdout.write('\n')
771
772     for sigcode, orig_handler in orig_signal_handlers.items():
773         signal.signal(sigcode, orig_handler)
774
775     if status != 0:
776         sys.exit(status)
777     else:
778         writer.destroy_cache()
779
780     return output
781
782
783 if __name__ == '__main__':
784     main()