6e216ee3741573b535033e14fe1eed0c2039e0ff
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import time
18 import signal
19 import socket
20 import sys
21 import tempfile
22 from apiclient import errors as apiclient_errors
23
24 import arvados.commands._util as arv_cmd
25
26 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
27 api_client = None
28
29 upload_opts = argparse.ArgumentParser(add_help=False)
30
31 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
32                          help="""
33 Local file or directory. Default: read from standard input.
34 """)
35
36 _group = upload_opts.add_mutually_exclusive_group()
37
38 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
39                     default=-1, help="""
40 Maximum depth of directory tree to represent in the manifest
41 structure. A directory structure deeper than this will be represented
42 as a single stream in the manifest. If N=0, the manifest will contain
43 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
44 stream per filesystem directory that contains files.
45 """)
46
47 _group.add_argument('--normalize', action='store_true',
48                     help="""
49 Normalize the manifest by re-ordering files and streams after writing
50 data.
51 """)
52
53 _group = upload_opts.add_mutually_exclusive_group()
54
55 _group.add_argument('--as-stream', action='store_true', dest='stream',
56                     help="""
57 Synonym for --stream.
58 """)
59
60 _group.add_argument('--stream', action='store_true',
61                     help="""
62 Store the file content and display the resulting manifest on
63 stdout. Do not write the manifest to Keep or save a Collection object
64 in Arvados.
65 """)
66
67 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
68                     help="""
69 Synonym for --manifest.
70 """)
71
72 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
73                     help="""
74 Synonym for --manifest.
75 """)
76
77 _group.add_argument('--manifest', action='store_true',
78                     help="""
79 Store the file data and resulting manifest in Keep, save a Collection
80 object in Arvados, and display the manifest locator (Collection uuid)
81 on stdout. This is the default behavior.
82 """)
83
84 _group.add_argument('--as-raw', action='store_true', dest='raw',
85                     help="""
86 Synonym for --raw.
87 """)
88
89 _group.add_argument('--raw', action='store_true',
90                     help="""
91 Store the file content and display the data block locators on stdout,
92 separated by commas, with a trailing newline. Do not store a
93 manifest.
94 """)
95
96 upload_opts.add_argument('--use-filename', type=str, default=None,
97                          dest='filename', help="""
98 Synonym for --filename.
99 """)
100
101 upload_opts.add_argument('--filename', type=str, default=None,
102                          help="""
103 Use the given filename in the manifest, instead of the name of the
104 local file. This is useful when "-" or "/dev/stdin" is given as an
105 input file. It can be used only if there is exactly one path given and
106 it is not a directory. Implies --manifest.
107 """)
108
109 upload_opts.add_argument('--portable-data-hash', action='store_true',
110                          help="""
111 Print the portable data hash instead of the Arvados UUID for the collection
112 created by the upload.
113 """)
114
115 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
116                          help="""
117 Set the replication level for the new collection: how many different
118 physical storage devices (e.g., disks) should have a copy of each data
119 block. Default is to use the server-provided default (if any) or 2.
120 """)
121
122 run_opts = argparse.ArgumentParser(add_help=False)
123
124 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
125 Store the collection in the specified project, instead of your Home
126 project.
127 """)
128
129 run_opts.add_argument('--name', help="""
130 Save the collection with the specified name.
131 """)
132
133 _group = run_opts.add_mutually_exclusive_group()
134 _group.add_argument('--progress', action='store_true',
135                     help="""
136 Display human-readable progress on stderr (bytes and, if possible,
137 percentage of total data size). This is the default behavior when
138 stderr is a tty.
139 """)
140
141 _group.add_argument('--no-progress', action='store_true',
142                     help="""
143 Do not display human-readable progress on stderr, even if stderr is a
144 tty.
145 """)
146
147 _group.add_argument('--batch-progress', action='store_true',
148                     help="""
149 Display machine-readable progress on stderr (bytes and, if known,
150 total data size).
151 """)
152
153 _group = run_opts.add_mutually_exclusive_group()
154 _group.add_argument('--resume', action='store_true', default=True,
155                     help="""
156 Continue interrupted uploads from cached state (default).
157 """)
158 _group.add_argument('--no-resume', action='store_false', dest='resume',
159                     help="""
160 Do not continue interrupted uploads from cached state.
161 """)
162
163 arg_parser = argparse.ArgumentParser(
164     description='Copy data from the local filesystem to Keep.',
165     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
166
167 def parse_arguments(arguments):
168     args = arg_parser.parse_args(arguments)
169
170     if len(args.paths) == 0:
171         args.paths = ['-']
172
173     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
174
175     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
176         if args.filename:
177             arg_parser.error("""
178     --filename argument cannot be used when storing a directory or
179     multiple files.
180     """)
181
182     # Turn on --progress by default if stderr is a tty.
183     if (not (args.batch_progress or args.no_progress)
184         and os.isatty(sys.stderr.fileno())):
185         args.progress = True
186
187     if args.paths == ['-']:
188         args.resume = False
189         if not args.filename:
190             args.filename = 'stdin'
191
192     return args
193
194 class ResumeCacheConflict(Exception):
195     pass
196
197
198 class ResumeCache(object):
199     CACHE_DIR = '.cache/arvados/arv-put'
200
201     def __init__(self, file_spec):
202         self.cache_file = open(file_spec, 'a+')
203         self._lock_file(self.cache_file)
204         self.filename = self.cache_file.name
205
206     @classmethod
207     def make_path(cls, args):
208         md5 = hashlib.md5()
209         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
210         realpaths = sorted(os.path.realpath(path) for path in args.paths)
211         md5.update('\0'.join(realpaths))
212         if any(os.path.isdir(path) for path in realpaths):
213             md5.update(str(max(args.max_manifest_depth, -1)))
214         elif args.filename:
215             md5.update(args.filename)
216         return os.path.join(
217             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
218             md5.hexdigest())
219
220     def _lock_file(self, fileobj):
221         try:
222             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
223         except IOError:
224             raise ResumeCacheConflict("{} locked".format(fileobj.name))
225
226     def load(self):
227         self.cache_file.seek(0)
228         return json.load(self.cache_file)
229
230     def check_cache(self, api_client=None, num_retries=0):
231         try:
232             state = self.load()
233             locator = None
234             try:
235                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
236                     locator = state["_finished_streams"][0][1][0]
237                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
238                     locator = state["_current_stream_locators"][0]
239                 if locator is not None:
240                     kc = arvados.keep.KeepClient(api_client=api_client)
241                     kc.head(locator, num_retries=num_retries)
242             except Exception as e:
243                 self.restart()
244         except (ValueError):
245             pass
246
247     def save(self, data):
248         try:
249             new_cache_fd, new_cache_name = tempfile.mkstemp(
250                 dir=os.path.dirname(self.filename))
251             self._lock_file(new_cache_fd)
252             new_cache = os.fdopen(new_cache_fd, 'r+')
253             json.dump(data, new_cache)
254             os.rename(new_cache_name, self.filename)
255         except (IOError, OSError, ResumeCacheConflict) as error:
256             try:
257                 os.unlink(new_cache_name)
258             except NameError:  # mkstemp failed.
259                 pass
260         else:
261             self.cache_file.close()
262             self.cache_file = new_cache
263
264     def close(self):
265         self.cache_file.close()
266
267     def destroy(self):
268         try:
269             os.unlink(self.filename)
270         except OSError as error:
271             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
272                 raise
273         self.close()
274
275     def restart(self):
276         self.destroy()
277         self.__init__(self.filename)
278
279
280 class ArvPutCollectionCache(object):
281     def __init__(self, paths):
282         md5 = hashlib.md5()
283         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
284         realpaths = sorted(os.path.realpath(path) for path in paths)
285         self.files = {}
286         for path in realpaths:
287             self._get_file_data(path)
288         # Only hash args paths
289         md5.update('\0'.join(realpaths))
290         self.cache_hash = md5.hexdigest()
291         
292         self.cache_file = open(os.path.join(
293             arv_cmd.make_home_conf_dir('.cache/arvados/arv-put', 0o700, 'raise'), 
294             self.cache_hash), 'a+')
295         self._lock_file(self.cache_file)
296         self.filename = self.cache_file.name
297         self.data = self._load()
298     
299     def _load(self):
300         try:
301             self.cache_file.seek(0)
302             ret = json.load(self.cache_file)
303         except ValueError:
304             # File empty, set up new cache
305             ret = {
306                 'col_locator' : None, # Collection 
307                 'uploaded' : {}, # Uploaded file list: {path : {size, mtime}}
308             }
309         return ret
310     
311     def _save(self):
312         """
313         Atomically save (create temp file & rename() it)
314         """
315         # TODO: Should be a good idea to avoid _save() spamming? when writing 
316         # lots of small files.
317         print "SAVE START"
318         try:
319             new_cache_fd, new_cache_name = tempfile.mkstemp(
320                 dir=os.path.dirname(self.filename))
321             self._lock_file(new_cache_fd)
322             new_cache = os.fdopen(new_cache_fd, 'r+')
323             json.dump(self.data, new_cache)
324             os.rename(new_cache_name, self.filename)
325         except (IOError, OSError, ResumeCacheConflict) as error:
326             print "SAVE ERROR: %s" % error
327             try:
328                 os.unlink(new_cache_name)
329             except NameError:  # mkstemp failed.
330                 pass
331         else:
332             print "SAVE DONE!! %s" % self.filename
333             self.cache_file.close()
334             self.cache_file = new_cache
335     
336     def file_uploaded(self, path):
337         if path in self.files.keys():
338             self.data['uploaded'][path] = self.files[path]
339             self._save()
340     
341     def set_collection(self, uuid):
342         self.data['col_locator'] = uuid
343     
344     def collection(self):
345         return self.data['col_locator']
346     
347     def is_dirty(self, path):
348         if not path in self.data['uploaded'].keys():
349             # Cannot be dirty is it wasn't even uploaded
350             return False
351             
352         if (self.files[path]['mtime'] != self.data['uploaded'][path]['mtime']) or (self.files[path]['size'] != self.data['uploaded'][path]['size']):
353             return True
354         else:
355             return False
356     
357     def dirty_files(self):
358         """
359         Files that were previously uploaded but changed locally between 
360         upload runs. These files should be re-uploaded.
361         """
362         dirty = []
363         for f in self.data['uploaded'].keys():
364             if self.is_dirty(f):
365                 dirty.append(f)
366         return dirty
367     
368     def uploaded_files(self):
369         """
370         Files that were uploaded and have not changed locally between 
371         upload runs. These files should be checked for partial uploads
372         """
373         uploaded = []
374         for f in self.data['uploaded'].keys():
375             if not self.is_dirty(f):
376                 uploaded.append(f)
377         return uploaded
378     
379     def pending_files(self):
380         """
381         Files that should be uploaded, because of being dirty or that
382         never had the chance to be uploaded yet.
383         """
384         pending = []
385         uploaded = self.uploaded_files()
386         for f in self.files.keys():
387             if f not in uploaded:
388                 pending.append(f)
389         return pending
390     
391     def _get_file_data(self, path):
392         if os.path.isfile(path):
393             self.files[path] = {'mtime': os.path.getmtime(path),
394                                 'size': os.path.getsize(path)}
395         elif os.path.isdir(path):
396             for item in os.listdir(path):
397                 self._get_file_data(os.path.join(path, item))
398
399     def _lock_file(self, fileobj):
400         try:
401             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
402         except IOError:
403             raise ResumeCacheConflict("{} locked".format(fileobj.name))
404
405     def close(self):
406         self.cache_file.close()
407
408     def destroy(self):
409         # try:
410         #     os.unlink(self.filename)
411         # except OSError as error:
412         #     if error.errno != errno.ENOENT:  # That's what we wanted anyway.
413         #         raise
414         self.close()
415
416 class ArvPutUploader(object):
417     def __init__(self, paths):
418         self.cache = ArvPutCollectionCache(paths)
419         if self.cache.collection() is not None:
420             self.collection = ArvPutCollection(locator=self.cache.collection(), cache=self.cache)
421         else:
422             self.collection = ArvPutCollection(cache=self.cache)
423             self.cache.set_collection(self.collection.manifest_locator())
424         for p in paths:
425             if os.path.isdir(p):
426                 self.collection.write_directory_tree(p)
427             elif os.path.isfile(p):
428                 self.collection.write_file(p)
429         self.cache.destroy()
430     
431     def manifest(self):
432         return self.collection.manifest()
433     
434     def bytes_written(self):
435         return self.collection.bytes_written
436
437
438 class ArvPutCollection(object):
439     def __init__(self, locator=None, cache=None, reporter=None, 
440                     bytes_expected=None, **kwargs):
441         self.collection_flush_time = 60
442         self.bytes_written = 0
443         self._seen_inputs = []
444         self.cache = cache
445         self.reporter = reporter
446         self.bytes_expected = bytes_expected
447         
448         if locator is None:
449             self.collection = arvados.collection.Collection()
450             self.collection.save_new()
451         else:
452             self.collection = arvados.collection.Collection(locator)
453     
454     def manifest_locator(self):
455         return self.collection.manifest_locator()
456             
457     def write_file(self, source, filename):
458         if self.cache and source in self.cache.dirty_files():
459             print "DIRTY: Removing file %s from collection to be uploaded again" % source
460             self.collection.remove(filename)
461         
462         resume_offset = 0
463         resume_upload = False
464
465         print "FIND file %s" % filename
466         if self.collection.find(filename):
467             print "File %s already in the collection, checking!" % source
468             if os.path.getsize(source) == self.collection.find(filename).size():
469                 print "WARNING: file %s already uploaded, skipping!" % source
470                 # File already there, skip it.
471                 return
472             elif os.path.getsize(source) > self.collection.find(filename).size():
473                 print "WARNING: RESUMING file %s" % source
474                 # File partially uploaded, resume!
475                 resume_upload = True
476                 resume_offset = self.collection.find(filename).size()
477             else:
478                 # Source file smaller than uploaded file, what happened here?
479                 # TODO: Raise exception of some kind?
480                 pass
481
482         with open(source, 'r') as source_fd:
483             with self.collection as c:
484                 if resume_upload:
485                     print "Resuming file, source: %s, filename: %s" % (source, filename)
486                     output = c.open(filename, 'a')
487                     source_fd.seek(resume_offset)
488                     first_block = False
489                 else:
490                     print "Writing file, source: %s, filename: %s" % (source, filename)
491                     output = c.open(filename, 'w')
492                     first_block = True
493                     
494                 start_time = time.time()
495                 while True:
496                     data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
497                     if not data:
498                         break
499                     output.write(data)
500                     output.flush() # Commit block to Keep
501                     self.bytes_written += len(data)
502                     # Is it time to update the collection?
503                     if (time.time() - start_time) > self.collection_flush_time:
504                         self.collection.save()
505                         start_time = time.time()
506                     # Once a block is written on each file, mark it as uploaded on the cache
507                     if first_block:
508                         if self.cache:
509                             self.cache.file_uploaded(source)
510                         first_block = False
511                 # File write finished
512                 output.close()
513                 self.collection.save() # One last save...
514
515     def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
516         if os.path.isdir(path):
517             for item in os.listdir(path):
518                 print "Checking path: '%s' - stream_name: '%s'" % (path, stream_name)
519                 if os.path.isdir(os.path.join(path, item)):
520                     self.write_directory_tree(os.path.join(path, item), 
521                                     os.path.join(stream_name, item))
522                 else:
523                     self.write_file(os.path.join(path, item), 
524                                     os.path.join(stream_name, item))
525
526     def manifest(self):
527         print "BLOCK SIZE: %d" % arvados.config.KEEP_BLOCK_SIZE
528         print "MANIFEST Locator:\n%s\nMANIFEST TEXT:\n%s" % (self.manifest_locator(), self.collection.manifest_text())
529         return True
530     
531     def report_progress(self):
532         if self.reporter is not None:
533             self.reporter(self.bytes_written, self.bytes_expected)
534
535
536 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
537     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
538                    ['bytes_written', '_seen_inputs'])
539
540     def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
541         self.bytes_written = 0
542         self._seen_inputs = []
543         self.cache = cache
544         self.reporter = reporter
545         self.bytes_expected = bytes_expected
546         super(ArvPutCollectionWriter, self).__init__(**kwargs)
547
548     @classmethod
549     def from_cache(cls, cache, reporter=None, bytes_expected=None,
550                    num_retries=0, replication=0):
551         try:
552             state = cache.load()
553             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
554             writer = cls.from_state(state, cache, reporter, bytes_expected,
555                                     num_retries=num_retries,
556                                     replication=replication)
557         except (TypeError, ValueError,
558                 arvados.errors.StaleWriterStateError) as error:
559             return cls(cache, reporter, bytes_expected,
560                        num_retries=num_retries,
561                        replication=replication)
562         else:
563             return writer
564
565     def cache_state(self):
566         if self.cache is None:
567             return
568         state = self.dump_state()
569         # Transform attributes for serialization.
570         for attr, value in state.items():
571             if attr == '_data_buffer':
572                 state[attr] = base64.encodestring(''.join(value))
573             elif hasattr(value, 'popleft'):
574                 state[attr] = list(value)
575         self.cache.save(state)
576
577     def report_progress(self):
578         if self.reporter is not None:
579             self.reporter(self.bytes_written, self.bytes_expected)
580
581     def flush_data(self):
582         start_buffer_len = self._data_buffer_len
583         start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
584         super(ArvPutCollectionWriter, self).flush_data()
585         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
586             self.bytes_written += (start_buffer_len - self._data_buffer_len)
587             self.report_progress()
588             if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
589                 self.cache_state()
590
591     def _record_new_input(self, input_type, source_name, dest_name):
592         # The key needs to be a list because that's what we'll get back
593         # from JSON deserialization.
594         key = [input_type, source_name, dest_name]
595         if key in self._seen_inputs:
596             return False
597         self._seen_inputs.append(key)
598         return True
599
600     def write_file(self, source, filename=None):
601         if self._record_new_input('file', source, filename):
602             super(ArvPutCollectionWriter, self).write_file(source, filename)
603
604     def write_directory_tree(self,
605                              path, stream_name='.', max_manifest_depth=-1):
606         if self._record_new_input('directory', path, stream_name):
607             super(ArvPutCollectionWriter, self).write_directory_tree(
608                 path, stream_name, max_manifest_depth)
609
610
611 def expected_bytes_for(pathlist):
612     # Walk the given directory trees and stat files, adding up file sizes,
613     # so we can display progress as percent
614     bytesum = 0
615     for path in pathlist:
616         if os.path.isdir(path):
617             for filename in arvados.util.listdir_recursive(path):
618                 bytesum += os.path.getsize(os.path.join(path, filename))
619         elif not os.path.isfile(path):
620             return None
621         else:
622             bytesum += os.path.getsize(path)
623     return bytesum
624
625 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
626                                                             os.getpid())
627 def machine_progress(bytes_written, bytes_expected):
628     return _machine_format.format(
629         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
630
631 def human_progress(bytes_written, bytes_expected):
632     if bytes_expected:
633         return "\r{}M / {}M {:.1%} ".format(
634             bytes_written >> 20, bytes_expected >> 20,
635             float(bytes_written) / bytes_expected)
636     else:
637         return "\r{} ".format(bytes_written)
638
639 def progress_writer(progress_func, outfile=sys.stderr):
640     def write_progress(bytes_written, bytes_expected):
641         outfile.write(progress_func(bytes_written, bytes_expected))
642     return write_progress
643
644 def exit_signal_handler(sigcode, frame):
645     sys.exit(-sigcode)
646
647 def desired_project_uuid(api_client, project_uuid, num_retries):
648     if not project_uuid:
649         query = api_client.users().current()
650     elif arvados.util.user_uuid_pattern.match(project_uuid):
651         query = api_client.users().get(uuid=project_uuid)
652     elif arvados.util.group_uuid_pattern.match(project_uuid):
653         query = api_client.groups().get(uuid=project_uuid)
654     else:
655         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
656     return query.execute(num_retries=num_retries)['uuid']
657
658 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
659     global api_client
660
661     args = parse_arguments(arguments)
662     status = 0
663     if api_client is None:
664         api_client = arvados.api('v1')
665
666     # Determine the name to use
667     if args.name:
668         if args.stream or args.raw:
669             print >>stderr, "Cannot use --name with --stream or --raw"
670             sys.exit(1)
671         collection_name = args.name
672     else:
673         collection_name = "Saved at {} by {}@{}".format(
674             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
675             pwd.getpwuid(os.getuid()).pw_name,
676             socket.gethostname())
677
678     if args.project_uuid and (args.stream or args.raw):
679         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
680         sys.exit(1)
681
682     # Determine the parent project
683     try:
684         project_uuid = desired_project_uuid(api_client, args.project_uuid,
685                                             args.retries)
686     except (apiclient_errors.Error, ValueError) as error:
687         print >>stderr, error
688         sys.exit(1)
689
690     # write_copies diverges from args.replication here.
691     # args.replication is how many copies we will instruct Arvados to
692     # maintain (by passing it in collections().create()) after all
693     # data is written -- and if None was given, we'll use None there.
694     # Meanwhile, write_copies is how many copies of each data block we
695     # write to Keep, which has to be a number.
696     #
697     # If we simply changed args.replication from None to a default
698     # here, we'd end up erroneously passing the default replication
699     # level (instead of None) to collections().create().
700     write_copies = (args.replication or
701                     api_client._rootDesc.get('defaultCollectionReplication', 2))
702
703     if args.progress:
704         reporter = progress_writer(human_progress)
705     elif args.batch_progress:
706         reporter = progress_writer(machine_progress)
707     else:
708         reporter = None
709     bytes_expected = expected_bytes_for(args.paths)
710
711     resume_cache = None
712     if args.resume:
713         try:
714             resume_cache = ResumeCache(ResumeCache.make_path(args))
715             resume_cache.check_cache(api_client=api_client, num_retries=args.retries)
716         except (IOError, OSError, ValueError):
717             pass  # Couldn't open cache directory/file.  Continue without it.
718         except ResumeCacheConflict:
719             print >>stderr, "\n".join([
720                 "arv-put: Another process is already uploading this data.",
721                 "         Use --no-resume if this is really what you want."])
722             sys.exit(1)
723
724     if resume_cache is None:
725         writer = ArvPutCollectionWriter(
726             resume_cache, reporter, bytes_expected,
727             num_retries=args.retries,
728             replication=write_copies)
729     else:
730         writer = ArvPutCollectionWriter.from_cache(
731             resume_cache, reporter, bytes_expected,
732             num_retries=args.retries,
733             replication=write_copies)
734
735     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
736     # the originals.
737     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
738                             for sigcode in CAUGHT_SIGNALS}
739
740     if writer.bytes_written > 0:  # We're resuming a previous upload.
741         print >>stderr, "\n".join([
742                 "arv-put: Resuming previous upload from last checkpoint.",
743                 "         Use the --no-resume option to start over."])
744
745     writer.report_progress()
746     writer.do_queued_work()  # Do work resumed from cache.
747     for path in args.paths:  # Copy file data to Keep.
748         if path == '-':
749             writer.start_new_stream()
750             writer.start_new_file(args.filename)
751             r = sys.stdin.read(64*1024)
752             while r:
753                 # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
754                 # CollectionWriter.write().
755                 super(arvados.collection.ResumableCollectionWriter, writer).write(r)
756                 r = sys.stdin.read(64*1024)
757         elif os.path.isdir(path):
758             writer.write_directory_tree(
759                 path, max_manifest_depth=args.max_manifest_depth)
760         else:
761             writer.start_new_stream()
762             writer.write_file(path, args.filename or os.path.basename(path))
763     writer.finish_current_stream()
764
765     if args.progress:  # Print newline to split stderr from stdout for humans.
766         print >>stderr
767
768     output = None
769     if args.stream:
770         output = writer.manifest_text()
771         if args.normalize:
772             output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
773     elif args.raw:
774         output = ','.join(writer.data_locators())
775     else:
776         try:
777             manifest_text = writer.manifest_text()
778             if args.normalize:
779                 manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
780             replication_attr = 'replication_desired'
781             if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
782                 # API called it 'redundancy' before #3410.
783                 replication_attr = 'redundancy'
784             # Register the resulting collection in Arvados.
785             collection = api_client.collections().create(
786                 body={
787                     'owner_uuid': project_uuid,
788                     'name': collection_name,
789                     'manifest_text': manifest_text,
790                     replication_attr: args.replication,
791                     },
792                 ensure_unique_name=True
793                 ).execute(num_retries=args.retries)
794
795             print >>stderr, "Collection saved as '%s'" % collection['name']
796
797             if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
798                 output = collection['portable_data_hash']
799             else:
800                 output = collection['uuid']
801
802         except apiclient_errors.Error as error:
803             print >>stderr, (
804                 "arv-put: Error creating Collection on project: {}.".format(
805                     error))
806             status = 1
807
808     # Print the locator (uuid) of the new collection.
809     if output is None:
810         status = status or 1
811     else:
812         stdout.write(output)
813         if not output.endswith('\n'):
814             stdout.write('\n')
815
816     for sigcode, orig_handler in orig_signal_handlers.items():
817         signal.signal(sigcode, orig_handler)
818
819     if status != 0:
820         sys.exit(status)
821
822     if resume_cache is not None:
823         resume_cache.destroy()
824
825     return output
826
827 if __name__ == '__main__':
828     main()