10383: Merge branch 'master' into 10383-arv-put-incremental-upload
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 from apiclient import errors as apiclient_errors
27 from arvados._version import __version__
28
29 import arvados.commands._util as arv_cmd
30
31 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
32 api_client = None
33
34 upload_opts = argparse.ArgumentParser(add_help=False)
35
36 upload_opts.add_argument('--version', action='version',
37                          version="%s %s" % (sys.argv[0], __version__),
38                          help='Print version and exit.')
39 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
40                          help="""
41 Local file or directory. Default: read from standard input.
42 """)
43
44 _group = upload_opts.add_mutually_exclusive_group()
45
46 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
47                     default=-1, help=argparse.SUPPRESS)
48
49 _group.add_argument('--normalize', action='store_true',
50                     help="""
51 Normalize the manifest by re-ordering files and streams after writing
52 data.
53 """)
54
55 _group = upload_opts.add_mutually_exclusive_group()
56
57 _group.add_argument('--as-stream', action='store_true', dest='stream',
58                     help="""
59 Synonym for --stream.
60 """)
61
62 _group.add_argument('--stream', action='store_true',
63                     help="""
64 Store the file content and display the resulting manifest on
65 stdout. Do not write the manifest to Keep or save a Collection object
66 in Arvados.
67 """)
68
69 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
70                     help="""
71 Synonym for --manifest.
72 """)
73
74 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
75                     help="""
76 Synonym for --manifest.
77 """)
78
79 _group.add_argument('--manifest', action='store_true',
80                     help="""
81 Store the file data and resulting manifest in Keep, save a Collection
82 object in Arvados, and display the manifest locator (Collection uuid)
83 on stdout. This is the default behavior.
84 """)
85
86 _group.add_argument('--as-raw', action='store_true', dest='raw',
87                     help="""
88 Synonym for --raw.
89 """)
90
91 _group.add_argument('--raw', action='store_true',
92                     help="""
93 Store the file content and display the data block locators on stdout,
94 separated by commas, with a trailing newline. Do not store a
95 manifest.
96 """)
97
98 upload_opts.add_argument('--update-collection', type=str, default=None,
99                          dest='update_collection', metavar="UUID", help="""
100 Update an existing collection identified by the given Arvados collection
101 UUID. All new local files will be uploaded.
102 """)
103
104 upload_opts.add_argument('--use-filename', type=str, default=None,
105                          dest='filename', help="""
106 Synonym for --filename.
107 """)
108
109 upload_opts.add_argument('--filename', type=str, default=None,
110                          help="""
111 Use the given filename in the manifest, instead of the name of the
112 local file. This is useful when "-" or "/dev/stdin" is given as an
113 input file. It can be used only if there is exactly one path given and
114 it is not a directory. Implies --manifest.
115 """)
116
117 upload_opts.add_argument('--portable-data-hash', action='store_true',
118                          help="""
119 Print the portable data hash instead of the Arvados UUID for the collection
120 created by the upload.
121 """)
122
123 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
124                          help="""
125 Set the replication level for the new collection: how many different
126 physical storage devices (e.g., disks) should have a copy of each data
127 block. Default is to use the server-provided default (if any) or 2.
128 """)
129
130 run_opts = argparse.ArgumentParser(add_help=False)
131
132 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
133 Store the collection in the specified project, instead of your Home
134 project.
135 """)
136
137 run_opts.add_argument('--name', help="""
138 Save the collection with the specified name.
139 """)
140
141 _group = run_opts.add_mutually_exclusive_group()
142 _group.add_argument('--progress', action='store_true',
143                     help="""
144 Display human-readable progress on stderr (bytes and, if possible,
145 percentage of total data size). This is the default behavior when
146 stderr is a tty.
147 """)
148
149 _group.add_argument('--no-progress', action='store_true',
150                     help="""
151 Do not display human-readable progress on stderr, even if stderr is a
152 tty.
153 """)
154
155 _group.add_argument('--batch-progress', action='store_true',
156                     help="""
157 Display machine-readable progress on stderr (bytes and, if known,
158 total data size).
159 """)
160
161 _group = run_opts.add_mutually_exclusive_group()
162 _group.add_argument('--resume', action='store_true', default=True,
163                     help="""
164 Continue interrupted uploads from cached state (default).
165 """)
166 _group.add_argument('--no-resume', action='store_false', dest='resume',
167                     help="""
168 Do not continue interrupted uploads from cached state.
169 """)
170
171 _group = run_opts.add_mutually_exclusive_group()
172 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
173                     help="""
174 Save upload state in a cache file for resuming (default).
175 """)
176 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
177                     help="""
178 Do not save upload state in a cache file for resuming.
179 """)
180
181 arg_parser = argparse.ArgumentParser(
182     description='Copy data from the local filesystem to Keep.',
183     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
184
185 def parse_arguments(arguments):
186     args = arg_parser.parse_args(arguments)
187
188     if len(args.paths) == 0:
189         args.paths = ['-']
190
191     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
192
193     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
194         if args.filename:
195             arg_parser.error("""
196     --filename argument cannot be used when storing a directory or
197     multiple files.
198     """)
199
200     # Turn on --progress by default if stderr is a tty.
201     if (not (args.batch_progress or args.no_progress)
202         and os.isatty(sys.stderr.fileno())):
203         args.progress = True
204
205     # Turn off --resume (default) if --no-cache is used.
206     if not args.use_cache:
207         args.resume = False
208
209     if args.paths == ['-']:
210         if args.update_collection:
211             arg_parser.error("""
212     --update-collection cannot be used when reading from stdin.
213     """)
214         args.resume = False
215         args.use_cache = False
216         if not args.filename:
217             args.filename = 'stdin'
218
219     return args
220
221
222 class CollectionUpdateError(Exception):
223     pass
224
225
226 class ResumeCacheConflict(Exception):
227     pass
228
229 class ArvPutArgumentConflict(Exception):
230     pass
231
232 class ResumeCache(object):
233     CACHE_DIR = '.cache/arvados/arv-put'
234
235     def __init__(self, file_spec):
236         self.cache_file = open(file_spec, 'a+')
237         self._lock_file(self.cache_file)
238         self.filename = self.cache_file.name
239
240     @classmethod
241     def make_path(cls, args):
242         md5 = hashlib.md5()
243         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
244         realpaths = sorted(os.path.realpath(path) for path in args.paths)
245         md5.update('\0'.join(realpaths))
246         if any(os.path.isdir(path) for path in realpaths):
247             md5.update("-1")
248         elif args.filename:
249             md5.update(args.filename)
250         return os.path.join(
251             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
252             md5.hexdigest())
253
254     def _lock_file(self, fileobj):
255         try:
256             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
257         except IOError:
258             raise ResumeCacheConflict("{} locked".format(fileobj.name))
259
260     def load(self):
261         self.cache_file.seek(0)
262         return json.load(self.cache_file)
263
264     def check_cache(self, api_client=None, num_retries=0):
265         try:
266             state = self.load()
267             locator = None
268             try:
269                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
270                     locator = state["_finished_streams"][0][1][0]
271                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
272                     locator = state["_current_stream_locators"][0]
273                 if locator is not None:
274                     kc = arvados.keep.KeepClient(api_client=api_client)
275                     kc.head(locator, num_retries=num_retries)
276             except Exception as e:
277                 self.restart()
278         except (ValueError):
279             pass
280
281     def save(self, data):
282         try:
283             new_cache_fd, new_cache_name = tempfile.mkstemp(
284                 dir=os.path.dirname(self.filename))
285             self._lock_file(new_cache_fd)
286             new_cache = os.fdopen(new_cache_fd, 'r+')
287             json.dump(data, new_cache)
288             os.rename(new_cache_name, self.filename)
289         except (IOError, OSError, ResumeCacheConflict) as error:
290             try:
291                 os.unlink(new_cache_name)
292             except NameError:  # mkstemp failed.
293                 pass
294         else:
295             self.cache_file.close()
296             self.cache_file = new_cache
297
298     def close(self):
299         self.cache_file.close()
300
301     def destroy(self):
302         try:
303             os.unlink(self.filename)
304         except OSError as error:
305             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
306                 raise
307         self.close()
308
309     def restart(self):
310         self.destroy()
311         self.__init__(self.filename)
312
313
314 class ArvPutUploadJob(object):
315     CACHE_DIR = '.cache/arvados/arv-put'
316     EMPTY_STATE = {
317         'manifest' : None, # Last saved manifest checkpoint
318         'files' : {} # Previous run file list: {path : {size, mtime}}
319     }
320
321     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
322                  bytes_expected=None, name=None, owner_uuid=None,
323                  ensure_unique_name=False, num_retries=None, replication_desired=None,
324                  filename=None, update_time=20.0, update_collection=None):
325         self.paths = paths
326         self.resume = resume
327         self.use_cache = use_cache
328         self.update = False
329         self.reporter = reporter
330         self.bytes_expected = bytes_expected
331         self.bytes_written = 0
332         self.bytes_skipped = 0
333         self.name = name
334         self.owner_uuid = owner_uuid
335         self.ensure_unique_name = ensure_unique_name
336         self.num_retries = num_retries
337         self.replication_desired = replication_desired
338         self.filename = filename
339         self._state_lock = threading.Lock()
340         self._state = None # Previous run state (file list & manifest)
341         self._current_files = [] # Current run file list
342         self._cache_file = None
343         self._collection_lock = threading.Lock()
344         self._remote_collection = None # Collection being updated (if asked)
345         self._local_collection = None # Collection from previous run manifest
346         self._file_paths = [] # Files to be updated in remote collection
347         self._stop_checkpointer = threading.Event()
348         self._checkpointer = threading.Thread(target=self._update_task)
349         self._update_task_time = update_time  # How many seconds wait between update runs
350         self._files_to_upload = []
351         self.logger = logging.getLogger('arvados.arv_put')
352
353         if not self.use_cache and self.resume:
354             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
355
356         # Load cached data if any and if needed
357         self._setup_state(update_collection)
358
359     def start(self, save_collection):
360         """
361         Start supporting thread & file uploading
362         """
363         self._checkpointer.daemon = True
364         self._checkpointer.start()
365         try:
366             for path in self.paths:
367                 # Test for stdin first, in case some file named '-' exist
368                 if path == '-':
369                     self._write_stdin(self.filename or 'stdin')
370                 elif os.path.isdir(path):
371                     # Use absolute paths on cache index so CWD doesn't interfere
372                     # with the caching logic.
373                     prefixdir = path = os.path.abspath(path)
374                     if prefixdir != '/':
375                         prefixdir += '/'
376                     for root, dirs, files in os.walk(path):
377                         # Make os.walk()'s dir traversing order deterministic
378                         dirs.sort()
379                         files.sort()
380                         for f in files:
381                             self._check_file(os.path.join(root, f),
382                                              os.path.join(root[len(prefixdir):], f))
383                 else:
384                     self._check_file(os.path.abspath(path),
385                                      self.filename or os.path.basename(path))
386             # Update bytes_written from current local collection and
387             # report initial progress.
388             self._update()
389             # Actual file upload
390             self._upload_files()
391         finally:
392             # Stop the thread before doing anything else
393             self._stop_checkpointer.set()
394             self._checkpointer.join()
395             # Commit all pending blocks & one last _update()
396             self._local_collection.manifest_text()
397             self._update(final=True)
398             if self.use_cache:
399                 self._cache_file.close()
400             if save_collection:
401                 self.save_collection()
402
403     def save_collection(self):
404         if self.update:
405             # Check if files should be updated on the remote collection.
406             for fp in self._file_paths:
407                 remote_file = self._remote_collection.find(fp)
408                 if not remote_file:
409                     # File don't exist on remote collection, copy it.
410                     self._remote_collection.copy(fp, fp, self._local_collection)
411                 elif remote_file != self._local_collection.find(fp):
412                     # A different file exist on remote collection, overwrite it.
413                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
414                 else:
415                     # The file already exist on remote collection, skip it.
416                     pass
417             self._remote_collection.save(num_retries=self.num_retries)
418         else:
419             self._local_collection.save_new(
420                 name=self.name, owner_uuid=self.owner_uuid,
421                 ensure_unique_name=self.ensure_unique_name,
422                 num_retries=self.num_retries)
423
424     def destroy_cache(self):
425         if self.resume:
426             try:
427                 os.unlink(self._cache_filename)
428             except OSError as error:
429                 # That's what we wanted anyway.
430                 if error.errno != errno.ENOENT:
431                     raise
432             self._cache_file.close()
433
434     def _collection_size(self, collection):
435         """
436         Recursively get the total size of the collection
437         """
438         size = 0
439         for item in collection.values():
440             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
441                 size += self._collection_size(item)
442             else:
443                 size += item.size()
444         return size
445
446     def _update_task(self):
447         """
448         Periodically called support task. File uploading is
449         asynchronous so we poll status from the collection.
450         """
451         while not self._stop_checkpointer.wait(self._update_task_time):
452             self._update()
453
454     def _update(self, final=False):
455         """
456         Update cached manifest text and report progress.
457         """
458         with self._collection_lock:
459             self.bytes_written = self._collection_size(self._local_collection)
460             if self.use_cache:
461                 # Update cache
462                 with self._state_lock:
463                     if final:
464                         self._state['manifest'] = self._local_collection.manifest_text()
465                     else:
466                         # Get the manifest text without comitting pending blocks
467                         self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
468                 self._save_state()
469         # Call the reporter, if any
470         self.report_progress()
471
472     def report_progress(self):
473         if self.reporter is not None:
474             self.reporter(self.bytes_written, self.bytes_expected)
475
476     def _write_stdin(self, filename):
477         output = self._local_collection.open(filename, 'w')
478         self._write(sys.stdin, output)
479         output.close()
480
481     def _check_file(self, source, filename):
482         """Check if this file needs to be uploaded"""
483         resume_offset = 0
484         should_upload = False
485         new_file_in_cache = False
486         # Record file path for updating the remote collection before exiting
487         self._file_paths.append(filename)
488
489         with self._state_lock:
490             # If no previous cached data on this file, store it for an eventual
491             # repeated run.
492             if source not in self._state['files']:
493                 self._state['files'][source] = {
494                     'mtime': os.path.getmtime(source),
495                     'size' : os.path.getsize(source)
496                 }
497                 new_file_in_cache = True
498             cached_file_data = self._state['files'][source]
499
500         # Check if file was already uploaded (at least partially)
501         file_in_local_collection = self._local_collection.find(filename)
502
503         # If not resuming, upload the full file.
504         if not self.resume:
505             should_upload = True
506         # New file detected from last run, upload it.
507         elif new_file_in_cache:
508             should_upload = True
509         # Local file didn't change from last run.
510         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
511             if not file_in_local_collection:
512                 # File not uploaded yet, upload it completely
513                 should_upload = True
514             elif file_in_local_collection.permission_expired():
515                 # Permission token expired, re-upload file. This will change whenever
516                 # we have a API for refreshing tokens.
517                 should_upload = True
518                 self._local_collection.remove(filename)
519             elif cached_file_data['size'] == file_in_local_collection.size():
520                 # File already there, skip it.
521                 self.bytes_skipped += cached_file_data['size']
522             elif cached_file_data['size'] > file_in_local_collection.size():
523                 # File partially uploaded, resume!
524                 resume_offset = file_in_local_collection.size()
525                 self.bytes_skipped += resume_offset
526                 should_upload = True
527             else:
528                 # Inconsistent cache, re-upload the file
529                 should_upload = True
530                 self._local_collection.remove(filename)
531                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
532         # Local file differs from cached data, re-upload it.
533         else:
534             if file_in_local_collection:
535                 self._local_collection.remove(filename)
536             should_upload = True
537
538         if should_upload:
539             self._files_to_upload.append((source, resume_offset, filename))
540
541     def _upload_files(self):
542         for source, resume_offset, filename in self._files_to_upload:
543             with open(source, 'r') as source_fd:
544                 with self._state_lock:
545                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
546                     self._state['files'][source]['size'] = os.path.getsize(source)
547                 if resume_offset > 0:
548                     # Start upload where we left off
549                     output = self._local_collection.open(filename, 'a')
550                     source_fd.seek(resume_offset)
551                 else:
552                     # Start from scratch
553                     output = self._local_collection.open(filename, 'w')
554                 self._write(source_fd, output)
555                 output.close(flush=False)
556
557     def _write(self, source_fd, output):
558         while True:
559             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
560             if not data:
561                 break
562             output.write(data)
563
564     def _my_collection(self):
565         return self._remote_collection if self.update else self._local_collection
566
567     def _setup_state(self, update_collection):
568         """
569         Create a new cache file or load a previously existing one.
570         """
571         # Load an already existing collection for update
572         if update_collection and re.match(arvados.util.collection_uuid_pattern,
573                                           update_collection):
574             try:
575                 self._remote_collection = arvados.collection.Collection(update_collection)
576             except arvados.errors.ApiError as error:
577                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
578             else:
579                 self.update = True
580         elif update_collection:
581             # Collection locator provided, but unknown format
582             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
583
584         if self.use_cache:
585             # Set up cache file name from input paths.
586             md5 = hashlib.md5()
587             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
588             realpaths = sorted(os.path.realpath(path) for path in self.paths)
589             md5.update('\0'.join(realpaths))
590             if self.filename:
591                 md5.update(self.filename)
592             cache_filename = md5.hexdigest()
593             self._cache_file = open(os.path.join(
594                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
595                 cache_filename), 'a+')
596             self._cache_filename = self._cache_file.name
597             self._lock_file(self._cache_file)
598             self._cache_file.seek(0)
599
600         with self._state_lock:
601             if self.use_cache:
602                 try:
603                     self._state = json.load(self._cache_file)
604                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
605                         # Cache at least partially incomplete, set up new cache
606                         self._state = copy.deepcopy(self.EMPTY_STATE)
607                 except ValueError:
608                     # Cache file empty, set up new cache
609                     self._state = copy.deepcopy(self.EMPTY_STATE)
610             else:
611                 # No cache file, set empty state
612                 self._state = copy.deepcopy(self.EMPTY_STATE)
613             # Load the previous manifest so we can check if files were modified remotely.
614             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
615
616     def _lock_file(self, fileobj):
617         try:
618             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
619         except IOError:
620             raise ResumeCacheConflict("{} locked".format(fileobj.name))
621
622     def _save_state(self):
623         """
624         Atomically save current state into cache.
625         """
626         try:
627             with self._state_lock:
628                 state = copy.deepcopy(self._state)
629             new_cache_fd, new_cache_name = tempfile.mkstemp(
630                 dir=os.path.dirname(self._cache_filename))
631             self._lock_file(new_cache_fd)
632             new_cache = os.fdopen(new_cache_fd, 'r+')
633             json.dump(state, new_cache)
634             new_cache.flush()
635             os.fsync(new_cache)
636             os.rename(new_cache_name, self._cache_filename)
637         except (IOError, OSError, ResumeCacheConflict) as error:
638             self.logger.error("There was a problem while saving the cache file: {}".format(error))
639             try:
640                 os.unlink(new_cache_name)
641             except NameError:  # mkstemp failed.
642                 pass
643         else:
644             self._cache_file.close()
645             self._cache_file = new_cache
646
647     def collection_name(self):
648         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
649
650     def manifest_locator(self):
651         return self._my_collection().manifest_locator()
652
653     def portable_data_hash(self):
654         return self._my_collection().portable_data_hash()
655
656     def manifest_text(self, stream_name=".", strip=False, normalize=False):
657         return self._my_collection().manifest_text(stream_name, strip, normalize)
658
659     def _datablocks_on_item(self, item):
660         """
661         Return a list of datablock locators, recursively navigating
662         through subcollections
663         """
664         if isinstance(item, arvados.arvfile.ArvadosFile):
665             if item.size() == 0:
666                 # Empty file locator
667                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
668             else:
669                 locators = []
670                 for segment in item.segments():
671                     loc = segment.locator
672                     locators.append(loc)
673                 return locators
674         elif isinstance(item, arvados.collection.Collection):
675             l = [self._datablocks_on_item(x) for x in item.values()]
676             # Fast list flattener method taken from:
677             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
678             return [loc for sublist in l for loc in sublist]
679         else:
680             return None
681
682     def data_locators(self):
683         with self._collection_lock:
684             # Make sure all datablocks are flushed before getting the locators
685             self._my_collection().manifest_text()
686             datablocks = self._datablocks_on_item(self._my_collection())
687         return datablocks
688
689
690 def expected_bytes_for(pathlist):
691     # Walk the given directory trees and stat files, adding up file sizes,
692     # so we can display progress as percent
693     bytesum = 0
694     for path in pathlist:
695         if os.path.isdir(path):
696             for filename in arvados.util.listdir_recursive(path):
697                 bytesum += os.path.getsize(os.path.join(path, filename))
698         elif not os.path.isfile(path):
699             return None
700         else:
701             bytesum += os.path.getsize(path)
702     return bytesum
703
704 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
705                                                             os.getpid())
706 def machine_progress(bytes_written, bytes_expected):
707     return _machine_format.format(
708         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
709
710 def human_progress(bytes_written, bytes_expected):
711     if bytes_expected:
712         return "\r{}M / {}M {:.1%} ".format(
713             bytes_written >> 20, bytes_expected >> 20,
714             float(bytes_written) / bytes_expected)
715     else:
716         return "\r{} ".format(bytes_written)
717
718 def progress_writer(progress_func, outfile=sys.stderr):
719     def write_progress(bytes_written, bytes_expected):
720         outfile.write(progress_func(bytes_written, bytes_expected))
721     return write_progress
722
723 def exit_signal_handler(sigcode, frame):
724     sys.exit(-sigcode)
725
726 def desired_project_uuid(api_client, project_uuid, num_retries):
727     if not project_uuid:
728         query = api_client.users().current()
729     elif arvados.util.user_uuid_pattern.match(project_uuid):
730         query = api_client.users().get(uuid=project_uuid)
731     elif arvados.util.group_uuid_pattern.match(project_uuid):
732         query = api_client.groups().get(uuid=project_uuid)
733     else:
734         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
735     return query.execute(num_retries=num_retries)['uuid']
736
737 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
738     global api_client
739
740     args = parse_arguments(arguments)
741     status = 0
742     if api_client is None:
743         api_client = arvados.api('v1')
744
745     # Determine the name to use
746     if args.name:
747         if args.stream or args.raw:
748             print >>stderr, "Cannot use --name with --stream or --raw"
749             sys.exit(1)
750         collection_name = args.name
751     else:
752         collection_name = "Saved at {} by {}@{}".format(
753             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
754             pwd.getpwuid(os.getuid()).pw_name,
755             socket.gethostname())
756
757     if args.project_uuid and (args.stream or args.raw):
758         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
759         sys.exit(1)
760
761     # Determine the parent project
762     try:
763         project_uuid = desired_project_uuid(api_client, args.project_uuid,
764                                             args.retries)
765     except (apiclient_errors.Error, ValueError) as error:
766         print >>stderr, error
767         sys.exit(1)
768
769     if args.progress:
770         reporter = progress_writer(human_progress)
771     elif args.batch_progress:
772         reporter = progress_writer(machine_progress)
773     else:
774         reporter = None
775
776     bytes_expected = expected_bytes_for(args.paths)
777
778     try:
779         writer = ArvPutUploadJob(paths = args.paths,
780                                  resume = args.resume,
781                                  use_cache = args.use_cache,
782                                  filename = args.filename,
783                                  reporter = reporter,
784                                  bytes_expected = bytes_expected,
785                                  num_retries = args.retries,
786                                  replication_desired = args.replication,
787                                  name = collection_name,
788                                  owner_uuid = project_uuid,
789                                  ensure_unique_name = True,
790                                  update_collection = args.update_collection)
791     except ResumeCacheConflict:
792         print >>stderr, "\n".join([
793             "arv-put: Another process is already uploading this data.",
794             "         Use --no-cache if this is really what you want."])
795         sys.exit(1)
796     except CollectionUpdateError as error:
797         print >>stderr, "\n".join([
798             "arv-put: %s" % str(error)])
799         sys.exit(1)
800
801     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
802     # the originals.
803     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
804                             for sigcode in CAUGHT_SIGNALS}
805
806     if not args.update_collection and args.resume and writer.bytes_written > 0:
807         print >>stderr, "\n".join([
808                 "arv-put: Resuming previous upload from last checkpoint.",
809                 "         Use the --no-resume option to start over."])
810
811     writer.report_progress()
812     output = None
813     try:
814         writer.start(save_collection=not(args.stream or args.raw))
815     except arvados.errors.ApiError as error:
816         print >>stderr, "\n".join([
817             "arv-put: %s" % str(error)])
818         sys.exit(1)
819
820     if args.progress:  # Print newline to split stderr from stdout for humans.
821         print >>stderr
822
823     if args.stream:
824         if args.normalize:
825             output = writer.manifest_text(normalize=True)
826         else:
827             output = writer.manifest_text()
828     elif args.raw:
829         output = ','.join(writer.data_locators())
830     else:
831         try:
832             if args.update_collection:
833                 print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
834             else:
835                 print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
836             if args.portable_data_hash:
837                 output = writer.portable_data_hash()
838             else:
839                 output = writer.manifest_locator()
840         except apiclient_errors.Error as error:
841             print >>stderr, (
842                 "arv-put: Error creating Collection on project: {}.".format(
843                     error))
844             status = 1
845
846     # Print the locator (uuid) of the new collection.
847     if output is None:
848         status = status or 1
849     else:
850         stdout.write(output)
851         if not output.endswith('\n'):
852             stdout.write('\n')
853
854     for sigcode, orig_handler in orig_signal_handlers.items():
855         signal.signal(sigcode, orig_handler)
856
857     if status != 0:
858         sys.exit(status)
859
860     # Success!
861     return output
862
863
864 if __name__ == '__main__':
865     main()