10383: Added 2 tests to check that no resuming take place when using either --no...
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 from apiclient import errors as apiclient_errors
27
28 import arvados.commands._util as arv_cmd
29
30 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
31 api_client = None
32
33 upload_opts = argparse.ArgumentParser(add_help=False)
34
35 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
36                          help="""
37 Local file or directory. Default: read from standard input.
38 """)
39
40 _group = upload_opts.add_mutually_exclusive_group()
41
42 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
43                     default=-1, help=argparse.SUPPRESS)
44
45 _group.add_argument('--normalize', action='store_true',
46                     help="""
47 Normalize the manifest by re-ordering files and streams after writing
48 data.
49 """)
50
51 _group = upload_opts.add_mutually_exclusive_group()
52
53 _group.add_argument('--as-stream', action='store_true', dest='stream',
54                     help="""
55 Synonym for --stream.
56 """)
57
58 _group.add_argument('--stream', action='store_true',
59                     help="""
60 Store the file content and display the resulting manifest on
61 stdout. Do not write the manifest to Keep or save a Collection object
62 in Arvados.
63 """)
64
65 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
66                     help="""
67 Synonym for --manifest.
68 """)
69
70 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
71                     help="""
72 Synonym for --manifest.
73 """)
74
75 _group.add_argument('--manifest', action='store_true',
76                     help="""
77 Store the file data and resulting manifest in Keep, save a Collection
78 object in Arvados, and display the manifest locator (Collection uuid)
79 on stdout. This is the default behavior.
80 """)
81
82 _group.add_argument('--as-raw', action='store_true', dest='raw',
83                     help="""
84 Synonym for --raw.
85 """)
86
87 _group.add_argument('--raw', action='store_true',
88                     help="""
89 Store the file content and display the data block locators on stdout,
90 separated by commas, with a trailing newline. Do not store a
91 manifest.
92 """)
93
94 upload_opts.add_argument('--update-collection', type=str, default=None,
95                          dest='update_collection', metavar="UUID", help="""
96 Update an existing collection identified by the given Arvados collection
97 UUID. All new local files will be uploaded.
98 """)
99
100 upload_opts.add_argument('--use-filename', type=str, default=None,
101                          dest='filename', help="""
102 Synonym for --filename.
103 """)
104
105 upload_opts.add_argument('--filename', type=str, default=None,
106                          help="""
107 Use the given filename in the manifest, instead of the name of the
108 local file. This is useful when "-" or "/dev/stdin" is given as an
109 input file. It can be used only if there is exactly one path given and
110 it is not a directory. Implies --manifest.
111 """)
112
113 upload_opts.add_argument('--portable-data-hash', action='store_true',
114                          help="""
115 Print the portable data hash instead of the Arvados UUID for the collection
116 created by the upload.
117 """)
118
119 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
120                          help="""
121 Set the replication level for the new collection: how many different
122 physical storage devices (e.g., disks) should have a copy of each data
123 block. Default is to use the server-provided default (if any) or 2.
124 """)
125
126 run_opts = argparse.ArgumentParser(add_help=False)
127
128 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
129 Store the collection in the specified project, instead of your Home
130 project.
131 """)
132
133 run_opts.add_argument('--name', help="""
134 Save the collection with the specified name.
135 """)
136
137 _group = run_opts.add_mutually_exclusive_group()
138 _group.add_argument('--progress', action='store_true',
139                     help="""
140 Display human-readable progress on stderr (bytes and, if possible,
141 percentage of total data size). This is the default behavior when
142 stderr is a tty.
143 """)
144
145 _group.add_argument('--no-progress', action='store_true',
146                     help="""
147 Do not display human-readable progress on stderr, even if stderr is a
148 tty.
149 """)
150
151 _group.add_argument('--batch-progress', action='store_true',
152                     help="""
153 Display machine-readable progress on stderr (bytes and, if known,
154 total data size).
155 """)
156
157 _group = run_opts.add_mutually_exclusive_group()
158 _group.add_argument('--resume', action='store_true', default=True,
159                     help="""
160 Continue interrupted uploads from cached state (default).
161 """)
162 _group.add_argument('--no-resume', action='store_false', dest='resume',
163                     help="""
164 Do not continue interrupted uploads from cached state.
165 """)
166
167 _group = run_opts.add_mutually_exclusive_group()
168 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
169                     help="""
170 Save upload state in a cache file for resuming (default).
171 """)
172 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
173                     help="""
174 Do not save upload state in a cache file for resuming.
175 """)
176
177 arg_parser = argparse.ArgumentParser(
178     description='Copy data from the local filesystem to Keep.',
179     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
180
181 def parse_arguments(arguments):
182     args = arg_parser.parse_args(arguments)
183
184     if len(args.paths) == 0:
185         args.paths = ['-']
186
187     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
188
189     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
190         if args.filename:
191             arg_parser.error("""
192     --filename argument cannot be used when storing a directory or
193     multiple files.
194     """)
195
196     # Turn on --progress by default if stderr is a tty.
197     if (not (args.batch_progress or args.no_progress)
198         and os.isatty(sys.stderr.fileno())):
199         args.progress = True
200
201     # Turn off --resume (default) if --no-cache is used.
202     if not args.use_cache:
203         args.resume = False
204
205     if args.paths == ['-']:
206         if args.update_collection:
207             arg_parser.error("""
208     --update-collection cannot be used when reading from stdin.
209     """)
210         args.resume = False
211         args.use_cache = False
212         if not args.filename:
213             args.filename = 'stdin'
214
215     return args
216
217
218 class CollectionUpdateError(Exception):
219     pass
220
221
222 class ResumeCacheConflict(Exception):
223     pass
224
225 class ArvPutArgumentConflict(Exception):
226     pass
227
228 class ResumeCache(object):
229     CACHE_DIR = '.cache/arvados/arv-put'
230
231     def __init__(self, file_spec):
232         self.cache_file = open(file_spec, 'a+')
233         self._lock_file(self.cache_file)
234         self.filename = self.cache_file.name
235
236     @classmethod
237     def make_path(cls, args):
238         md5 = hashlib.md5()
239         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
240         realpaths = sorted(os.path.realpath(path) for path in args.paths)
241         md5.update('\0'.join(realpaths))
242         if any(os.path.isdir(path) for path in realpaths):
243             md5.update("-1")
244         elif args.filename:
245             md5.update(args.filename)
246         return os.path.join(
247             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
248             md5.hexdigest())
249
250     def _lock_file(self, fileobj):
251         try:
252             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
253         except IOError:
254             raise ResumeCacheConflict("{} locked".format(fileobj.name))
255
256     def load(self):
257         self.cache_file.seek(0)
258         return json.load(self.cache_file)
259
260     def check_cache(self, api_client=None, num_retries=0):
261         try:
262             state = self.load()
263             locator = None
264             try:
265                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
266                     locator = state["_finished_streams"][0][1][0]
267                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
268                     locator = state["_current_stream_locators"][0]
269                 if locator is not None:
270                     kc = arvados.keep.KeepClient(api_client=api_client)
271                     kc.head(locator, num_retries=num_retries)
272             except Exception as e:
273                 self.restart()
274         except (ValueError):
275             pass
276
277     def save(self, data):
278         try:
279             new_cache_fd, new_cache_name = tempfile.mkstemp(
280                 dir=os.path.dirname(self.filename))
281             self._lock_file(new_cache_fd)
282             new_cache = os.fdopen(new_cache_fd, 'r+')
283             json.dump(data, new_cache)
284             os.rename(new_cache_name, self.filename)
285         except (IOError, OSError, ResumeCacheConflict) as error:
286             try:
287                 os.unlink(new_cache_name)
288             except NameError:  # mkstemp failed.
289                 pass
290         else:
291             self.cache_file.close()
292             self.cache_file = new_cache
293
294     def close(self):
295         self.cache_file.close()
296
297     def destroy(self):
298         try:
299             os.unlink(self.filename)
300         except OSError as error:
301             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
302                 raise
303         self.close()
304
305     def restart(self):
306         self.destroy()
307         self.__init__(self.filename)
308
309
310 class ArvPutUploadJob(object):
311     CACHE_DIR = '.cache/arvados/arv-put'
312     EMPTY_STATE = {
313         'manifest' : None, # Last saved manifest checkpoint
314         'files' : {} # Previous run file list: {path : {size, mtime}}
315     }
316
317     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
318                  bytes_expected=None, name=None, owner_uuid=None,
319                  ensure_unique_name=False, num_retries=None, replication_desired=None,
320                  filename=None, update_time=20.0, update_collection=None):
321         self.paths = paths
322         self.resume = resume
323         self.use_cache = use_cache
324         self.update = False
325         self.reporter = reporter
326         self.bytes_expected = bytes_expected
327         self.bytes_written = 0
328         self.bytes_skipped = 0
329         self.name = name
330         self.owner_uuid = owner_uuid
331         self.ensure_unique_name = ensure_unique_name
332         self.num_retries = num_retries
333         self.replication_desired = replication_desired
334         self.filename = filename
335         self._state_lock = threading.Lock()
336         self._state = None # Previous run state (file list & manifest)
337         self._current_files = [] # Current run file list
338         self._cache_file = None
339         self._collection_lock = threading.Lock()
340         self._remote_collection = None # Collection being updated (if asked)
341         self._local_collection = None # Collection from previous run manifest
342         self._file_paths = [] # Files to be updated in remote collection
343         self._stop_checkpointer = threading.Event()
344         self._checkpointer = threading.Thread(target=self._update_task)
345         self._update_task_time = update_time  # How many seconds wait between update runs
346         self._files_to_upload = []
347         self.logger = logging.getLogger('arvados.arv_put')
348
349         if not self.use_cache and self.resume:
350             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
351
352         # Load cached data if any and if needed
353         self._setup_state(update_collection)
354
355     def start(self, save_collection):
356         """
357         Start supporting thread & file uploading
358         """
359         self._checkpointer.daemon = True
360         self._checkpointer.start()
361         try:
362             for path in self.paths:
363                 # Test for stdin first, in case some file named '-' exist
364                 if path == '-':
365                     self._write_stdin(self.filename or 'stdin')
366                 elif os.path.isdir(path):
367                     # Use absolute paths on cache index so CWD doesn't interfere
368                     # with the caching logic.
369                     prefixdir = path = os.path.abspath(path)
370                     if prefixdir != '/':
371                         prefixdir += '/'
372                     for root, dirs, files in os.walk(path):
373                         # Make os.walk()'s dir traversing order deterministic
374                         dirs.sort()
375                         files.sort()
376                         for f in files:
377                             self._check_file(os.path.join(root, f),
378                                              os.path.join(root[len(prefixdir):], f))
379                 else:
380                     self._check_file(os.path.abspath(path),
381                                      self.filename or os.path.basename(path))
382             # Update bytes_written from current local collection and
383             # report initial progress.
384             self._update()
385             # Actual file upload
386             self._upload_files()
387         finally:
388             # Stop the thread before doing anything else
389             self._stop_checkpointer.set()
390             self._checkpointer.join()
391             # Commit all pending blocks & one last _update()
392             self._local_collection.manifest_text()
393             self._update(final=True)
394             if self.use_cache:
395                 self._cache_file.close()
396             if save_collection:
397                 self.save_collection()
398
399     def save_collection(self):
400         if self.update:
401             # Check if files should be updated on the remote collection.
402             for fp in self._file_paths:
403                 remote_file = self._remote_collection.find(fp)
404                 if not remote_file:
405                     # File don't exist on remote collection, copy it.
406                     self._remote_collection.copy(fp, fp, self._local_collection)
407                 elif remote_file != self._local_collection.find(fp):
408                     # A different file exist on remote collection, overwrite it.
409                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
410                 else:
411                     # The file already exist on remote collection, skip it.
412                     pass
413             self._remote_collection.save(num_retries=self.num_retries)
414         else:
415             self._local_collection.save_new(
416                 name=self.name, owner_uuid=self.owner_uuid,
417                 ensure_unique_name=self.ensure_unique_name,
418                 num_retries=self.num_retries)
419
420     def destroy_cache(self):
421         if self.resume:
422             try:
423                 os.unlink(self._cache_filename)
424             except OSError as error:
425                 # That's what we wanted anyway.
426                 if error.errno != errno.ENOENT:
427                     raise
428             self._cache_file.close()
429
430     def _collection_size(self, collection):
431         """
432         Recursively get the total size of the collection
433         """
434         size = 0
435         for item in collection.values():
436             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
437                 size += self._collection_size(item)
438             else:
439                 size += item.size()
440         return size
441
442     def _update_task(self):
443         """
444         Periodically called support task. File uploading is
445         asynchronous so we poll status from the collection.
446         """
447         while not self._stop_checkpointer.wait(self._update_task_time):
448             self._update()
449
450     def _update(self, final=False):
451         """
452         Update cached manifest text and report progress.
453         """
454         with self._collection_lock:
455             self.bytes_written = self._collection_size(self._local_collection)
456             if self.use_cache:
457                 # Update cache
458                 with self._state_lock:
459                     if final:
460                         self._state['manifest'] = self._local_collection.manifest_text()
461                     else:
462                         # Get the manifest text without comitting pending blocks
463                         self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
464                 self._save_state()
465         # Call the reporter, if any
466         self.report_progress()
467
468     def report_progress(self):
469         if self.reporter is not None:
470             self.reporter(self.bytes_written, self.bytes_expected)
471
472     def _write_stdin(self, filename):
473         output = self._local_collection.open(filename, 'w')
474         self._write(sys.stdin, output)
475         output.close()
476
477     def _check_file(self, source, filename):
478         """Check if this file needs to be uploaded"""
479         resume_offset = 0
480         should_upload = False
481         new_file_in_cache = False
482         # Record file path for updating the remote collection before exiting
483         self._file_paths.append(filename)
484
485         with self._state_lock:
486             # If no previous cached data on this file, store it for an eventual
487             # repeated run.
488             if source not in self._state['files']:
489                 self._state['files'][source] = {
490                     'mtime': os.path.getmtime(source),
491                     'size' : os.path.getsize(source)
492                 }
493                 new_file_in_cache = True
494             cached_file_data = self._state['files'][source]
495
496         # Check if file was already uploaded (at least partially)
497         file_in_local_collection = self._local_collection.find(filename)
498
499         # If not resuming, upload the full file.
500         if not self.resume:
501             should_upload = True
502         # New file detected from last run, upload it.
503         elif new_file_in_cache:
504             should_upload = True
505         # Local file didn't change from last run.
506         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
507             if not file_in_local_collection:
508                 # File not uploaded yet, upload it completely
509                 should_upload = True
510             elif file_in_local_collection.permission_expired():
511                 # Permission token expired, re-upload file. This will change whenever
512                 # we have a API for refreshing tokens.
513                 should_upload = True
514                 self._local_collection.remove(filename)
515             elif cached_file_data['size'] == file_in_local_collection.size():
516                 # File already there, skip it.
517                 self.bytes_skipped += cached_file_data['size']
518             elif cached_file_data['size'] > file_in_local_collection.size():
519                 # File partially uploaded, resume!
520                 resume_offset = file_in_local_collection.size()
521                 self.bytes_skipped += resume_offset
522                 should_upload = True
523             else:
524                 # Inconsistent cache, re-upload the file
525                 should_upload = True
526                 self._local_collection.remove(filename)
527                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
528         # Local file differs from cached data, re-upload it.
529         else:
530             if file_in_local_collection:
531                 self._local_collection.remove(filename)
532             should_upload = True
533
534         if should_upload:
535             self._files_to_upload.append((source, resume_offset, filename))
536
537     def _upload_files(self):
538         for source, resume_offset, filename in self._files_to_upload:
539             with open(source, 'r') as source_fd:
540                 with self._state_lock:
541                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
542                     self._state['files'][source]['size'] = os.path.getsize(source)
543                 if resume_offset > 0:
544                     # Start upload where we left off
545                     output = self._local_collection.open(filename, 'a')
546                     source_fd.seek(resume_offset)
547                 else:
548                     # Start from scratch
549                     output = self._local_collection.open(filename, 'w')
550                 self._write(source_fd, output)
551                 output.close(flush=False)
552
553     def _write(self, source_fd, output):
554         while True:
555             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
556             if not data:
557                 break
558             output.write(data)
559
560     def _my_collection(self):
561         return self._remote_collection if self.update else self._local_collection
562
563     def _setup_state(self, update_collection):
564         """
565         Create a new cache file or load a previously existing one.
566         """
567         # Load an already existing collection for update
568         if update_collection and re.match(arvados.util.collection_uuid_pattern,
569                                           update_collection):
570             try:
571                 self._remote_collection = arvados.collection.Collection(update_collection)
572             except arvados.errors.ApiError as error:
573                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
574             else:
575                 self.update = True
576         elif update_collection:
577             # Collection locator provided, but unknown format
578             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
579
580         if self.use_cache:
581             # Set up cache file name from input paths.
582             md5 = hashlib.md5()
583             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
584             realpaths = sorted(os.path.realpath(path) for path in self.paths)
585             md5.update('\0'.join(realpaths))
586             if self.filename:
587                 md5.update(self.filename)
588             cache_filename = md5.hexdigest()
589             self._cache_file = open(os.path.join(
590                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
591                 cache_filename), 'a+')
592             self._cache_filename = self._cache_file.name
593             self._lock_file(self._cache_file)
594             self._cache_file.seek(0)
595
596         with self._state_lock:
597             if self.use_cache:
598                 try:
599                     self._state = json.load(self._cache_file)
600                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
601                         # Cache at least partially incomplete, set up new cache
602                         self._state = copy.deepcopy(self.EMPTY_STATE)
603                 except ValueError:
604                     # Cache file empty, set up new cache
605                     self._state = copy.deepcopy(self.EMPTY_STATE)
606             else:
607                 # No cache file, set empty state
608                 self._state = copy.deepcopy(self.EMPTY_STATE)
609             # Load the previous manifest so we can check if files were modified remotely.
610             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
611
612     def _lock_file(self, fileobj):
613         try:
614             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
615         except IOError:
616             raise ResumeCacheConflict("{} locked".format(fileobj.name))
617
618     def _save_state(self):
619         """
620         Atomically save current state into cache.
621         """
622         try:
623             with self._state_lock:
624                 state = copy.deepcopy(self._state)
625             new_cache_fd, new_cache_name = tempfile.mkstemp(
626                 dir=os.path.dirname(self._cache_filename))
627             self._lock_file(new_cache_fd)
628             new_cache = os.fdopen(new_cache_fd, 'r+')
629             json.dump(state, new_cache)
630             new_cache.flush()
631             os.fsync(new_cache)
632             os.rename(new_cache_name, self._cache_filename)
633         except (IOError, OSError, ResumeCacheConflict) as error:
634             self.logger.error("There was a problem while saving the cache file: {}".format(error))
635             try:
636                 os.unlink(new_cache_name)
637             except NameError:  # mkstemp failed.
638                 pass
639         else:
640             self._cache_file.close()
641             self._cache_file = new_cache
642
643     def collection_name(self):
644         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
645
646     def manifest_locator(self):
647         return self._my_collection().manifest_locator()
648
649     def portable_data_hash(self):
650         return self._my_collection().portable_data_hash()
651
652     def manifest_text(self, stream_name=".", strip=False, normalize=False):
653         return self._my_collection().manifest_text(stream_name, strip, normalize)
654
655     def _datablocks_on_item(self, item):
656         """
657         Return a list of datablock locators, recursively navigating
658         through subcollections
659         """
660         if isinstance(item, arvados.arvfile.ArvadosFile):
661             if item.size() == 0:
662                 # Empty file locator
663                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
664             else:
665                 locators = []
666                 for segment in item.segments():
667                     loc = segment.locator
668                     locators.append(loc)
669                 return locators
670         elif isinstance(item, arvados.collection.Collection):
671             l = [self._datablocks_on_item(x) for x in item.values()]
672             # Fast list flattener method taken from:
673             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
674             return [loc for sublist in l for loc in sublist]
675         else:
676             return None
677
678     def data_locators(self):
679         with self._collection_lock:
680             # Make sure all datablocks are flushed before getting the locators
681             self._my_collection().manifest_text()
682             datablocks = self._datablocks_on_item(self._my_collection())
683         return datablocks
684
685
686 def expected_bytes_for(pathlist):
687     # Walk the given directory trees and stat files, adding up file sizes,
688     # so we can display progress as percent
689     bytesum = 0
690     for path in pathlist:
691         if os.path.isdir(path):
692             for filename in arvados.util.listdir_recursive(path):
693                 bytesum += os.path.getsize(os.path.join(path, filename))
694         elif not os.path.isfile(path):
695             return None
696         else:
697             bytesum += os.path.getsize(path)
698     return bytesum
699
700 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
701                                                             os.getpid())
702 def machine_progress(bytes_written, bytes_expected):
703     return _machine_format.format(
704         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
705
706 def human_progress(bytes_written, bytes_expected):
707     if bytes_expected:
708         return "\r{}M / {}M {:.1%} ".format(
709             bytes_written >> 20, bytes_expected >> 20,
710             float(bytes_written) / bytes_expected)
711     else:
712         return "\r{} ".format(bytes_written)
713
714 def progress_writer(progress_func, outfile=sys.stderr):
715     def write_progress(bytes_written, bytes_expected):
716         outfile.write(progress_func(bytes_written, bytes_expected))
717     return write_progress
718
719 def exit_signal_handler(sigcode, frame):
720     sys.exit(-sigcode)
721
722 def desired_project_uuid(api_client, project_uuid, num_retries):
723     if not project_uuid:
724         query = api_client.users().current()
725     elif arvados.util.user_uuid_pattern.match(project_uuid):
726         query = api_client.users().get(uuid=project_uuid)
727     elif arvados.util.group_uuid_pattern.match(project_uuid):
728         query = api_client.groups().get(uuid=project_uuid)
729     else:
730         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
731     return query.execute(num_retries=num_retries)['uuid']
732
733 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
734     global api_client
735
736     args = parse_arguments(arguments)
737     status = 0
738     if api_client is None:
739         api_client = arvados.api('v1')
740
741     # Determine the name to use
742     if args.name:
743         if args.stream or args.raw:
744             print >>stderr, "Cannot use --name with --stream or --raw"
745             sys.exit(1)
746         collection_name = args.name
747     else:
748         collection_name = "Saved at {} by {}@{}".format(
749             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
750             pwd.getpwuid(os.getuid()).pw_name,
751             socket.gethostname())
752
753     if args.project_uuid and (args.stream or args.raw):
754         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
755         sys.exit(1)
756
757     # Determine the parent project
758     try:
759         project_uuid = desired_project_uuid(api_client, args.project_uuid,
760                                             args.retries)
761     except (apiclient_errors.Error, ValueError) as error:
762         print >>stderr, error
763         sys.exit(1)
764
765     if args.progress:
766         reporter = progress_writer(human_progress)
767     elif args.batch_progress:
768         reporter = progress_writer(machine_progress)
769     else:
770         reporter = None
771
772     bytes_expected = expected_bytes_for(args.paths)
773
774     try:
775         writer = ArvPutUploadJob(paths = args.paths,
776                                  resume = args.resume,
777                                  use_cache = args.use_cache,
778                                  filename = args.filename,
779                                  reporter = reporter,
780                                  bytes_expected = bytes_expected,
781                                  num_retries = args.retries,
782                                  replication_desired = args.replication,
783                                  name = collection_name,
784                                  owner_uuid = project_uuid,
785                                  ensure_unique_name = True,
786                                  update_collection = args.update_collection)
787     except ResumeCacheConflict:
788         print >>stderr, "\n".join([
789             "arv-put: Another process is already uploading this data.",
790             "         Use --no-cache if this is really what you want."])
791         sys.exit(1)
792     except CollectionUpdateError as error:
793         print >>stderr, "\n".join([
794             "arv-put: %s" % str(error)])
795         sys.exit(1)
796
797     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
798     # the originals.
799     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
800                             for sigcode in CAUGHT_SIGNALS}
801
802     if not args.update_collection and args.resume and writer.bytes_written > 0:
803         print >>stderr, "\n".join([
804                 "arv-put: Resuming previous upload from last checkpoint.",
805                 "         Use the --no-resume option to start over."])
806
807     writer.report_progress()
808     output = None
809     try:
810         writer.start(save_collection=not(args.stream or args.raw))
811     except arvados.errors.ApiError as error:
812         print >>stderr, "\n".join([
813             "arv-put: %s" % str(error)])
814         sys.exit(1)
815
816     if args.progress:  # Print newline to split stderr from stdout for humans.
817         print >>stderr
818
819     if args.stream:
820         if args.normalize:
821             output = writer.manifest_text(normalize=True)
822         else:
823             output = writer.manifest_text()
824     elif args.raw:
825         output = ','.join(writer.data_locators())
826     else:
827         try:
828             if args.update_collection:
829                 print >>stderr, "Collection updated: '{}'".format(writer.collection_name())
830             else:
831                 print >>stderr, "Collection saved as '{}'".format(writer.collection_name())
832             if args.portable_data_hash:
833                 output = writer.portable_data_hash()
834             else:
835                 output = writer.manifest_locator()
836         except apiclient_errors.Error as error:
837             print >>stderr, (
838                 "arv-put: Error creating Collection on project: {}.".format(
839                     error))
840             status = 1
841
842     # Print the locator (uuid) of the new collection.
843     if output is None:
844         status = status or 1
845     else:
846         stdout.write(output)
847         if not output.endswith('\n'):
848             stdout.write('\n')
849
850     for sigcode, orig_handler in orig_signal_handlers.items():
851         signal.signal(sigcode, orig_handler)
852
853     if status != 0:
854         sys.exit(status)
855
856     # Success!
857     return output
858
859
860 if __name__ == '__main__':
861     main()