9463: blank spaces cleanup
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import os
16 import pwd
17 import time
18 import signal
19 import socket
20 import sys
21 import tempfile
22 import threading
23 from apiclient import errors as apiclient_errors
24
25 import arvados.commands._util as arv_cmd
26
27 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
28 api_client = None
29
30 upload_opts = argparse.ArgumentParser(add_help=False)
31
32 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
33                          help="""
34 Local file or directory. Default: read from standard input.
35 """)
36
37 _group = upload_opts.add_mutually_exclusive_group()
38
39 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
40                     default=-1, help="""
41 Maximum depth of directory tree to represent in the manifest
42 structure. A directory structure deeper than this will be represented
43 as a single stream in the manifest. If N=0, the manifest will contain
44 a single stream. Default: -1 (unlimited), i.e., exactly one manifest
45 stream per filesystem directory that contains files.
46 """)
47
48 _group.add_argument('--normalize', action='store_true',
49                     help="""
50 Normalize the manifest by re-ordering files and streams after writing
51 data.
52 """)
53
54 _group = upload_opts.add_mutually_exclusive_group()
55
56 _group.add_argument('--as-stream', action='store_true', dest='stream',
57                     help="""
58 Synonym for --stream.
59 """)
60
61 _group.add_argument('--stream', action='store_true',
62                     help="""
63 Store the file content and display the resulting manifest on
64 stdout. Do not write the manifest to Keep or save a Collection object
65 in Arvados.
66 """)
67
68 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
69                     help="""
70 Synonym for --manifest.
71 """)
72
73 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
74                     help="""
75 Synonym for --manifest.
76 """)
77
78 _group.add_argument('--manifest', action='store_true',
79                     help="""
80 Store the file data and resulting manifest in Keep, save a Collection
81 object in Arvados, and display the manifest locator (Collection uuid)
82 on stdout. This is the default behavior.
83 """)
84
85 _group.add_argument('--as-raw', action='store_true', dest='raw',
86                     help="""
87 Synonym for --raw.
88 """)
89
90 _group.add_argument('--raw', action='store_true',
91                     help="""
92 Store the file content and display the data block locators on stdout,
93 separated by commas, with a trailing newline. Do not store a
94 manifest.
95 """)
96
97 upload_opts.add_argument('--use-filename', type=str, default=None,
98                          dest='filename', help="""
99 Synonym for --filename.
100 """)
101
102 upload_opts.add_argument('--filename', type=str, default=None,
103                          help="""
104 Use the given filename in the manifest, instead of the name of the
105 local file. This is useful when "-" or "/dev/stdin" is given as an
106 input file. It can be used only if there is exactly one path given and
107 it is not a directory. Implies --manifest.
108 """)
109
110 upload_opts.add_argument('--portable-data-hash', action='store_true',
111                          help="""
112 Print the portable data hash instead of the Arvados UUID for the collection
113 created by the upload.
114 """)
115
116 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
117                          help="""
118 Set the replication level for the new collection: how many different
119 physical storage devices (e.g., disks) should have a copy of each data
120 block. Default is to use the server-provided default (if any) or 2.
121 """)
122
123 run_opts = argparse.ArgumentParser(add_help=False)
124
125 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
126 Store the collection in the specified project, instead of your Home
127 project.
128 """)
129
130 run_opts.add_argument('--name', help="""
131 Save the collection with the specified name.
132 """)
133
134 _group = run_opts.add_mutually_exclusive_group()
135 _group.add_argument('--progress', action='store_true',
136                     help="""
137 Display human-readable progress on stderr (bytes and, if possible,
138 percentage of total data size). This is the default behavior when
139 stderr is a tty.
140 """)
141
142 _group.add_argument('--no-progress', action='store_true',
143                     help="""
144 Do not display human-readable progress on stderr, even if stderr is a
145 tty.
146 """)
147
148 _group.add_argument('--batch-progress', action='store_true',
149                     help="""
150 Display machine-readable progress on stderr (bytes and, if known,
151 total data size).
152 """)
153
154 _group = run_opts.add_mutually_exclusive_group()
155 _group.add_argument('--resume', action='store_true', default=True,
156                     help="""
157 Continue interrupted uploads from cached state (default).
158 """)
159 _group.add_argument('--no-resume', action='store_false', dest='resume',
160                     help="""
161 Do not continue interrupted uploads from cached state.
162 """)
163
164 arg_parser = argparse.ArgumentParser(
165     description='Copy data from the local filesystem to Keep.',
166     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
167
168 def parse_arguments(arguments):
169     args = arg_parser.parse_args(arguments)
170
171     if len(args.paths) == 0:
172         args.paths = ['-']
173
174     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
175
176     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
177         if args.filename:
178             arg_parser.error("""
179     --filename argument cannot be used when storing a directory or
180     multiple files.
181     """)
182
183     # Turn on --progress by default if stderr is a tty.
184     if (not (args.batch_progress or args.no_progress)
185         and os.isatty(sys.stderr.fileno())):
186         args.progress = True
187
188     if args.paths == ['-']:
189         args.resume = False
190         if not args.filename:
191             args.filename = 'stdin'
192
193     return args
194
195 class ResumeCacheConflict(Exception):
196     pass
197
198
199 class FileUploadError(Exception):
200     pass
201
202
203 class ResumeCache(object):
204     CACHE_DIR = '.cache/arvados/arv-put'
205
206     def __init__(self, file_spec):
207         self.cache_file = open(file_spec, 'a+')
208         self._lock_file(self.cache_file)
209         self.filename = self.cache_file.name
210
211     @classmethod
212     def make_path(cls, args):
213         md5 = hashlib.md5()
214         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
215         realpaths = sorted(os.path.realpath(path) for path in args.paths)
216         md5.update('\0'.join(realpaths))
217         if any(os.path.isdir(path) for path in realpaths):
218             md5.update(str(max(args.max_manifest_depth, -1)))
219         elif args.filename:
220             md5.update(args.filename)
221         return os.path.join(
222             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
223             md5.hexdigest())
224
225     def _lock_file(self, fileobj):
226         try:
227             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
228         except IOError:
229             raise ResumeCacheConflict("{} locked".format(fileobj.name))
230
231     def load(self):
232         self.cache_file.seek(0)
233         return json.load(self.cache_file)
234
235     def check_cache(self, api_client=None, num_retries=0):
236         try:
237             state = self.load()
238             locator = None
239             try:
240                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
241                     locator = state["_finished_streams"][0][1][0]
242                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
243                     locator = state["_current_stream_locators"][0]
244                 if locator is not None:
245                     kc = arvados.keep.KeepClient(api_client=api_client)
246                     kc.head(locator, num_retries=num_retries)
247             except Exception as e:
248                 self.restart()
249         except (ValueError):
250             pass
251
252     def save(self, data):
253         try:
254             new_cache_fd, new_cache_name = tempfile.mkstemp(
255                 dir=os.path.dirname(self.filename))
256             self._lock_file(new_cache_fd)
257             new_cache = os.fdopen(new_cache_fd, 'r+')
258             json.dump(data, new_cache)
259             os.rename(new_cache_name, self.filename)
260         except (IOError, OSError, ResumeCacheConflict) as error:
261             try:
262                 os.unlink(new_cache_name)
263             except NameError:  # mkstemp failed.
264                 pass
265         else:
266             self.cache_file.close()
267             self.cache_file = new_cache
268
269     def close(self):
270         self.cache_file.close()
271
272     def destroy(self):
273         try:
274             os.unlink(self.filename)
275         except OSError as error:
276             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
277                 raise
278         self.close()
279
280     def restart(self):
281         self.destroy()
282         self.__init__(self.filename)
283
284
285 class ArvPutUploadJob(object):
286     def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
287                 name=None, owner_uuid=None, ensure_unique_name=False,
288                 num_retries=None, write_copies=None, replication=None,
289                 filename=None):
290         self.paths = paths
291         self.resume = resume
292         self.reporter = reporter
293         self.bytes_expected = bytes_expected
294         self.bytes_written = 0
295         self.name = name
296         self.owner_uuid = owner_uuid
297         self.ensure_unique_name = ensure_unique_name
298         self.num_retries = num_retries
299         self.write_copies = write_copies
300         self.replication = replication
301         self.filename = filename
302         self._state_lock = threading.Lock()
303         self._state = None # Previous run state (file list & manifest)
304         self._current_files = [] # Current run file list
305         self._cache_hash = None # MD5 digest based on paths & filename
306         self._cache_file = None
307         self._collection = None
308         self._collection_lock = threading.Lock()
309         self._stop_checkpointer = threading.Event()
310         self._checkpointer = threading.Thread(target=self._update_task)
311         self._update_task_time = 60.0 # How many seconds wait between update runs
312         # Load cached data if any and if needed
313         self._setup_state()
314
315     def start(self):
316         """
317         Start supporting thread & file uploading
318         """
319         self._checkpointer.start()
320         for path in self.paths:
321             try:
322                 # Test for stdin first, in case some file named '-' exist
323                 if path == '-':
324                     self._write_stdin(self.filename or 'stdin')
325                 elif os.path.isdir(path):
326                     self._write_directory_tree(path)
327                 else: #if os.path.isfile(path):
328                     self._write_file(path, self.filename or os.path.basename(path))
329                 # else:
330                 #     raise FileUploadError('Inadequate file type, cannot upload: %s' % path)
331             except:
332                 # Stop the thread before continue complaining
333                 self._stop_checkpointer.set()
334                 self._checkpointer.join()
335                 raise
336         # Work finished, stop updater task
337         self._stop_checkpointer.set()
338         self._checkpointer.join()
339         # Successful upload, one last _update()
340         self._update()
341
342     def save_collection(self):
343         with self._collection_lock:
344             self._my_collection().save_new(
345                                 name=self.name, owner_uuid=self.owner_uuid,
346                                 ensure_unique_name=self.ensure_unique_name,
347                                 num_retries=self.num_retries,
348                                 replication_desired=self.replication)
349         if self.resume:
350             # Delete cache file upon successful collection saving
351             try:
352                 os.unlink(self._cache_file.name)
353             except OSError as error:
354                 if error.errno != errno.ENOENT:  # That's what we wanted anyway.
355                     raise
356             self._cache_file.close()
357
358     def _collection_size(self, collection):
359         """
360         Recursively get the total size of the collection
361         """
362         size = 0
363         for item in collection:
364             if isinstance(item, arvados.arvfile.ArvadosFile):
365                 size += item.size()
366             elif isinstance(item, arvados.collection.Collection):
367                 size += self._collection_size(item)
368         return size
369
370     def _update_task(self):
371         """
372         Periodically call support tasks. File uploading is
373         asynchronous so we poll status from the collection.
374         """
375         while not self._stop_checkpointer.wait(self._update_task_time):
376             self._update()
377
378     def _update(self):
379         """
380         Update cached manifest text and report progress.
381         """
382         with self._collection_lock:
383             self.bytes_written = self._collection_size(self._my_collection())
384             # Update cache, if resume enabled
385             if self.resume:
386                 with self._state_lock:
387                     self._state['manifest'] = self._my_collection().manifest_text()
388         if self.resume:
389             self._save_state()
390         # Call the reporter, if any
391         self.report_progress()
392
393     def report_progress(self):
394         if self.reporter is not None:
395             self.reporter(self.bytes_written, self.bytes_expected)
396
397     def _write_directory_tree(self, path, stream_name="."):
398         # TODO: Check what happens when multiple directories are passes as
399         # arguments.
400         # If the code below is uncommented, integration test
401         # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
402         # fails, I suppose it is because the manifest_uuid changes because
403         # of the dir addition to stream_name.
404
405         # if stream_name == '.':
406         #     stream_name = os.path.join('.', os.path.basename(path))
407         for item in os.listdir(path):
408             if os.path.isdir(os.path.join(path, item)):
409                 self._write_directory_tree(os.path.join(path, item),
410                                 os.path.join(stream_name, item))
411             elif os.path.isfile(os.path.join(path, item)):
412                 self._write_file(os.path.join(path, item),
413                                 os.path.join(stream_name, item))
414             else:
415                 raise FileUploadError('Inadequate file type, cannot upload: %s' % path)
416
417     def _write_stdin(self, filename):
418         with self._collection_lock:
419             output = self._my_collection().open(filename, 'w')
420         self._write(sys.stdin, output)
421         output.close()
422
423     def _write_file(self, source, filename):
424         resume_offset = 0
425         resume_upload = False
426         if self.resume:
427             # Check if file was already uploaded (at least partially)
428             with self._collection_lock:
429                 try:
430                     file_in_collection = self._my_collection().find(filename)
431                 except IOError:
432                     # Not found
433                     file_in_collection = None
434             # If no previous cached data on this file, store it for an eventual
435             # repeated run.
436             if source not in self._state['files'].keys():
437                 with self._state_lock:
438                     self._state['files'][source] = {
439                         'mtime' : os.path.getmtime(source),
440                         'size' : os.path.getsize(source)
441                     }
442             cached_file_data = self._state['files'][source]
443             # See if this file was already uploaded at least partially
444             if file_in_collection:
445                 if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
446                     if os.path.getsize(source) == file_in_collection.size():
447                         # File already there, skip it.
448                         return
449                     elif os.path.getsize(source) > file_in_collection.size():
450                         # File partially uploaded, resume!
451                         resume_upload = True
452                         resume_offset = file_in_collection.size()
453                     else:
454                         # Inconsistent cache, re-upload the file
455                         pass
456                 else:
457                     # Local file differs from cached data, re-upload it
458                     pass
459         with open(source, 'r') as source_fd:
460             if self.resume and resume_upload:
461                 with self._collection_lock:
462                     # Open for appending
463                     output = self._my_collection().open(filename, 'a')
464                 source_fd.seek(resume_offset)
465             else:
466                 with self._collection_lock:
467                     output = self._my_collection().open(filename, 'w')
468             self._write(source_fd, output)
469             output.close()
470
471     def _write(self, source_fd, output):
472         while True:
473             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
474             if not data:
475                 break
476             output.write(data)
477
478     def _my_collection(self):
479         """
480         Create a new collection if none cached. Load it from cache otherwise.
481         """
482         if self._collection is None:
483             with self._state_lock:
484                 manifest = self._state['manifest']
485             if self.resume and manifest is not None:
486                 # Create collection from saved state
487                 self._collection = arvados.collection.Collection(
488                                         manifest,
489                                         num_write_copies=self.write_copies)
490             else:
491                 # Create new collection
492                 self._collection = arvados.collection.Collection(
493                                         num_write_copies=self.write_copies)
494         return self._collection
495
496     def _setup_state(self):
497         """
498         Create a new cache file or load a previously existing one.
499         """
500         if self.resume:
501             md5 = hashlib.md5()
502             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
503             realpaths = sorted(os.path.realpath(path) for path in self.paths)
504             md5.update('\0'.join(realpaths))
505             self._cache_hash = md5.hexdigest()
506             if self.filename:
507                 md5.update(self.filename)
508             self._cache_file = open(os.path.join(
509                 arv_cmd.make_home_conf_dir('.cache/arvados/arv-put', 0o700, 'raise'),
510                 self._cache_hash), 'a+')
511             self._cache_file.seek(0)
512             with self._state_lock:
513                 try:
514                     self._state = json.load(self._cache_file)
515                     if not 'manifest' in self._state.keys():
516                         self._state['manifest'] = ""
517                     if not 'files' in self._state.keys():
518                         self._state['files'] = {}
519                 except ValueError:
520                     # File empty, set up new cache
521                     self._state = {
522                         'manifest' : None,
523                         # Previous run file list: {path : {size, mtime}}
524                         'files' : {}
525                     }
526             # Load how many bytes were uploaded on previous run
527             with self._collection_lock:
528                 self.bytes_written = self._collection_size(self._my_collection())
529         # No resume required
530         else:
531             with self._state_lock:
532                 self._state = {
533                     'manifest' : None,
534                     'files' : {} # Previous run file list: {path : {size, mtime}}
535                 }
536
537     def _lock_file(self, fileobj):
538         try:
539             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
540         except IOError:
541             raise ResumeCacheConflict("{} locked".format(fileobj.name))
542
543     def _save_state(self):
544         """
545         Atomically save current state into cache.
546         """
547         try:
548             with self._state_lock:
549                 state = self._state
550             new_cache_fd, new_cache_name = tempfile.mkstemp(
551                 dir=os.path.dirname(self._cache_file.name))
552             self._lock_file(new_cache_fd)
553             new_cache = os.fdopen(new_cache_fd, 'r+')
554             json.dump(state, new_cache)
555             # new_cache.flush()
556             # os.fsync(new_cache)
557             os.rename(new_cache_name, self._cache_file.name)
558         except (IOError, OSError, ResumeCacheConflict) as error:
559             try:
560                 os.unlink(new_cache_name)
561             except NameError:  # mkstemp failed.
562                 pass
563         else:
564             self._cache_file.close()
565             self._cache_file = new_cache
566
567     def collection_name(self):
568         with self._collection_lock:
569             name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
570         return name
571
572     def manifest_locator(self):
573         with self._collection_lock:
574             locator = self._my_collection().manifest_locator()
575         return locator
576
577     def portable_data_hash(self):
578         with self._collection_lock:
579             datahash = self._my_collection().portable_data_hash()
580         return datahash
581
582     def manifest_text(self, stream_name=".", strip=False, normalize=False):
583         with self._collection_lock:
584             manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
585         return manifest
586
587     def _datablocks_on_item(self, item):
588         """
589         Return a list of datablock locators, recursively navigating
590         through subcollections
591         """
592         if isinstance(item, arvados.arvfile.ArvadosFile):
593             locators = []
594             for segment in item.segments():
595                 loc = segment.locator
596                 if loc.startswith("bufferblock"):
597                     loc = item._bufferblocks[loc].calculate_locator()
598                 locators.append(loc)
599             return locators
600         elif isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
601             l = [self._datablocks_on_item(x) for x in item.values()]
602             # Fast list flattener method taken from:
603             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
604             return [loc for sublist in l for loc in sublist]
605         else:
606             return None
607
608     def data_locators(self):
609         with self._collection_lock:
610             datablocks = self._datablocks_on_item(self._my_collection())
611         return datablocks
612
613
614 class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
615     STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
616                    ['bytes_written', '_seen_inputs'])
617
618     def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
619         self.bytes_written = 0
620         self._seen_inputs = []
621         self.cache = cache
622         self.reporter = reporter
623         self.bytes_expected = bytes_expected
624         super(ArvPutCollectionWriter, self).__init__(**kwargs)
625
626     @classmethod
627     def from_cache(cls, cache, reporter=None, bytes_expected=None,
628                    num_retries=0, replication=0):
629         try:
630             state = cache.load()
631             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
632             writer = cls.from_state(state, cache, reporter, bytes_expected,
633                                     num_retries=num_retries,
634                                     replication=replication)
635         except (TypeError, ValueError,
636                 arvados.errors.StaleWriterStateError) as error:
637             return cls(cache, reporter, bytes_expected,
638                        num_retries=num_retries,
639                        replication=replication)
640         else:
641             return writer
642
643     def cache_state(self):
644         if self.cache is None:
645             return
646         state = self.dump_state()
647         # Transform attributes for serialization.
648         for attr, value in state.items():
649             if attr == '_data_buffer':
650                 state[attr] = base64.encodestring(''.join(value))
651             elif hasattr(value, 'popleft'):
652                 state[attr] = list(value)
653         self.cache.save(state)
654
655     def report_progress(self):
656         if self.reporter is not None:
657             self.reporter(self.bytes_written, self.bytes_expected)
658
659     def flush_data(self):
660         start_buffer_len = self._data_buffer_len
661         start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
662         super(ArvPutCollectionWriter, self).flush_data()
663         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
664             self.bytes_written += (start_buffer_len - self._data_buffer_len)
665             self.report_progress()
666             if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
667                 self.cache_state()
668
669     def _record_new_input(self, input_type, source_name, dest_name):
670         # The key needs to be a list because that's what we'll get back
671         # from JSON deserialization.
672         key = [input_type, source_name, dest_name]
673         if key in self._seen_inputs:
674             return False
675         self._seen_inputs.append(key)
676         return True
677
678     def write_file(self, source, filename=None):
679         if self._record_new_input('file', source, filename):
680             super(ArvPutCollectionWriter, self).write_file(source, filename)
681
682     def write_directory_tree(self,
683                              path, stream_name='.', max_manifest_depth=-1):
684         if self._record_new_input('directory', path, stream_name):
685             super(ArvPutCollectionWriter, self).write_directory_tree(
686                 path, stream_name, max_manifest_depth)
687
688
689 def expected_bytes_for(pathlist):
690     # Walk the given directory trees and stat files, adding up file sizes,
691     # so we can display progress as percent
692     bytesum = 0
693     for path in pathlist:
694         if os.path.isdir(path):
695             for filename in arvados.util.listdir_recursive(path):
696                 bytesum += os.path.getsize(os.path.join(path, filename))
697         elif not os.path.isfile(path):
698             return None
699         else:
700             bytesum += os.path.getsize(path)
701     return bytesum
702
703 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
704                                                             os.getpid())
705 def machine_progress(bytes_written, bytes_expected):
706     return _machine_format.format(
707         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
708
709 def human_progress(bytes_written, bytes_expected):
710     if bytes_expected:
711         return "\r{}M / {}M {:.1%} ".format(
712             bytes_written >> 20, bytes_expected >> 20,
713             float(bytes_written) / bytes_expected)
714     else:
715         return "\r{} ".format(bytes_written)
716
717 def progress_writer(progress_func, outfile=sys.stderr):
718     def write_progress(bytes_written, bytes_expected):
719         outfile.write(progress_func(bytes_written, bytes_expected))
720     return write_progress
721
722 def exit_signal_handler(sigcode, frame):
723     sys.exit(-sigcode)
724
725 def desired_project_uuid(api_client, project_uuid, num_retries):
726     if not project_uuid:
727         query = api_client.users().current()
728     elif arvados.util.user_uuid_pattern.match(project_uuid):
729         query = api_client.users().get(uuid=project_uuid)
730     elif arvados.util.group_uuid_pattern.match(project_uuid):
731         query = api_client.groups().get(uuid=project_uuid)
732     else:
733         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
734     return query.execute(num_retries=num_retries)['uuid']
735
736 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
737     global api_client
738
739     args = parse_arguments(arguments)
740     status = 0
741     if api_client is None:
742         api_client = arvados.api('v1')
743
744     # Determine the name to use
745     if args.name:
746         if args.stream or args.raw:
747             print >>stderr, "Cannot use --name with --stream or --raw"
748             sys.exit(1)
749         collection_name = args.name
750     else:
751         collection_name = "Saved at {} by {}@{}".format(
752             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
753             pwd.getpwuid(os.getuid()).pw_name,
754             socket.gethostname())
755
756     if args.project_uuid and (args.stream or args.raw):
757         print >>stderr, "Cannot use --project-uuid with --stream or --raw"
758         sys.exit(1)
759
760     # Determine the parent project
761     try:
762         project_uuid = desired_project_uuid(api_client, args.project_uuid,
763                                             args.retries)
764     except (apiclient_errors.Error, ValueError) as error:
765         print >>stderr, error
766         sys.exit(1)
767
768     # write_copies diverges from args.replication here.
769     # args.replication is how many copies we will instruct Arvados to
770     # maintain (by passing it in collections().create()) after all
771     # data is written -- and if None was given, we'll use None there.
772     # Meanwhile, write_copies is how many copies of each data block we
773     # write to Keep, which has to be a number.
774     #
775     # If we simply changed args.replication from None to a default
776     # here, we'd end up erroneously passing the default replication
777     # level (instead of None) to collections().create().
778     write_copies = (args.replication or
779                     api_client._rootDesc.get('defaultCollectionReplication', 2))
780
781     if args.progress:
782         reporter = progress_writer(human_progress)
783     elif args.batch_progress:
784         reporter = progress_writer(machine_progress)
785     else:
786         reporter = None
787
788     bytes_expected = expected_bytes_for(args.paths)
789     try:
790         writer = ArvPutUploadJob(paths = args.paths,
791                                 resume = args.resume,
792                                 reporter = reporter,
793                                 bytes_expected = bytes_expected,
794                                 num_retries = args.retries,
795                                 write_copies = write_copies,
796                                 replication = args.replication,
797                                 name = collection_name,
798                                 owner_uuid = project_uuid,
799                                 ensure_unique_name = True)
800     except ResumeCacheConflict:
801         print >>stderr, "\n".join([
802             "arv-put: Another process is already uploading this data.",
803             "         Use --no-resume if this is really what you want."])
804         sys.exit(1)
805
806     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
807     # the originals.
808     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
809                             for sigcode in CAUGHT_SIGNALS}
810
811     if args.resume and writer.bytes_written > 0:
812         print >>stderr, "\n".join([
813                 "arv-put: Resuming previous upload from last checkpoint.",
814                 "         Use the --no-resume option to start over."])
815
816     writer.report_progress()
817     output = None
818     writer.start()
819     if args.progress:  # Print newline to split stderr from stdout for humans.
820         print >>stderr
821
822     if args.stream:
823         if args.normalize:
824             output = writer.manifest_text(normalize=True)
825         else:
826             output = writer.manifest_text()
827     elif args.raw:
828         output = ','.join(writer.data_locators())
829     else:
830         try:
831             writer.save_collection()
832             print >>stderr, "Collection saved as '%s'" % writer.collection_name()
833             if args.portable_data_hash:
834                 output = writer.portable_data_hash()
835             else:
836                 output = writer.manifest_locator()
837         except apiclient_errors.Error as error:
838             print >>stderr, (
839                 "arv-put: Error creating Collection on project: {}.".format(
840                     error))
841             status = 1
842
843     # Print the locator (uuid) of the new collection.
844     if output is None:
845         status = status or 1
846     else:
847         stdout.write(output)
848         if not output.endswith('\n'):
849             stdout.write('\n')
850
851     for sigcode, orig_handler in orig_signal_handlers.items():
852         signal.signal(sigcode, orig_handler)
853
854     if status != 0:
855         sys.exit(status)
856
857     return output
858
859
860 if __name__ == '__main__':
861     main()