41aa41e91f222a777682cc0c5f0f3b4f51cffe0e
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 from apiclient import errors as apiclient_errors
27 from arvados._version import __version__
28
29 import arvados.commands._util as arv_cmd
30
31 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
32 api_client = None
33
34 upload_opts = argparse.ArgumentParser(add_help=False)
35
36 upload_opts.add_argument('--version', action='version',
37                          version="%s %s" % (sys.argv[0], __version__),
38                          help='Print version and exit.')
39 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
40                          help="""
41 Local file or directory. Default: read from standard input.
42 """)
43
44 _group = upload_opts.add_mutually_exclusive_group()
45
46 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
47                     default=-1, help=argparse.SUPPRESS)
48
49 _group.add_argument('--normalize', action='store_true',
50                     help="""
51 Normalize the manifest by re-ordering files and streams after writing
52 data.
53 """)
54
55 _group.add_argument('--dry-run', action='store_true', default=False,
56                     help="""
57 Don't actually upload files, but only check if any file should be
58 uploaded. Exit with code=2 when files are pending for upload.
59 """)
60
61 _group = upload_opts.add_mutually_exclusive_group()
62
63 _group.add_argument('--as-stream', action='store_true', dest='stream',
64                     help="""
65 Synonym for --stream.
66 """)
67
68 _group.add_argument('--stream', action='store_true',
69                     help="""
70 Store the file content and display the resulting manifest on
71 stdout. Do not write the manifest to Keep or save a Collection object
72 in Arvados.
73 """)
74
75 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
76                     help="""
77 Synonym for --manifest.
78 """)
79
80 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
81                     help="""
82 Synonym for --manifest.
83 """)
84
85 _group.add_argument('--manifest', action='store_true',
86                     help="""
87 Store the file data and resulting manifest in Keep, save a Collection
88 object in Arvados, and display the manifest locator (Collection uuid)
89 on stdout. This is the default behavior.
90 """)
91
92 _group.add_argument('--as-raw', action='store_true', dest='raw',
93                     help="""
94 Synonym for --raw.
95 """)
96
97 _group.add_argument('--raw', action='store_true',
98                     help="""
99 Store the file content and display the data block locators on stdout,
100 separated by commas, with a trailing newline. Do not store a
101 manifest.
102 """)
103
104 upload_opts.add_argument('--update-collection', type=str, default=None,
105                          dest='update_collection', metavar="UUID", help="""
106 Update an existing collection identified by the given Arvados collection
107 UUID. All new local files will be uploaded.
108 """)
109
110 upload_opts.add_argument('--use-filename', type=str, default=None,
111                          dest='filename', help="""
112 Synonym for --filename.
113 """)
114
115 upload_opts.add_argument('--filename', type=str, default=None,
116                          help="""
117 Use the given filename in the manifest, instead of the name of the
118 local file. This is useful when "-" or "/dev/stdin" is given as an
119 input file. It can be used only if there is exactly one path given and
120 it is not a directory. Implies --manifest.
121 """)
122
123 upload_opts.add_argument('--portable-data-hash', action='store_true',
124                          help="""
125 Print the portable data hash instead of the Arvados UUID for the collection
126 created by the upload.
127 """)
128
129 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
130                          help="""
131 Set the replication level for the new collection: how many different
132 physical storage devices (e.g., disks) should have a copy of each data
133 block. Default is to use the server-provided default (if any) or 2.
134 """)
135
136 run_opts = argparse.ArgumentParser(add_help=False)
137
138 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
139 Store the collection in the specified project, instead of your Home
140 project.
141 """)
142
143 run_opts.add_argument('--name', help="""
144 Save the collection with the specified name.
145 """)
146
147 _group = run_opts.add_mutually_exclusive_group()
148 _group.add_argument('--progress', action='store_true',
149                     help="""
150 Display human-readable progress on stderr (bytes and, if possible,
151 percentage of total data size). This is the default behavior when
152 stderr is a tty.
153 """)
154
155 _group.add_argument('--no-progress', action='store_true',
156                     help="""
157 Do not display human-readable progress on stderr, even if stderr is a
158 tty.
159 """)
160
161 _group.add_argument('--batch-progress', action='store_true',
162                     help="""
163 Display machine-readable progress on stderr (bytes and, if known,
164 total data size).
165 """)
166
167 _group = run_opts.add_mutually_exclusive_group()
168 _group.add_argument('--resume', action='store_true', default=True,
169                     help="""
170 Continue interrupted uploads from cached state (default).
171 """)
172 _group.add_argument('--no-resume', action='store_false', dest='resume',
173                     help="""
174 Do not continue interrupted uploads from cached state.
175 """)
176
177 _group = run_opts.add_mutually_exclusive_group()
178 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
179                     help="""
180 Save upload state in a cache file for resuming (default).
181 """)
182 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
183                     help="""
184 Do not save upload state in a cache file for resuming.
185 """)
186
187 arg_parser = argparse.ArgumentParser(
188     description='Copy data from the local filesystem to Keep.',
189     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
190
191 def parse_arguments(arguments):
192     args = arg_parser.parse_args(arguments)
193
194     if len(args.paths) == 0:
195         args.paths = ['-']
196
197     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
198
199     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
200         if args.filename:
201             arg_parser.error("""
202     --filename argument cannot be used when storing a directory or
203     multiple files.
204     """)
205
206     # Turn on --progress by default if stderr is a tty.
207     if (not (args.batch_progress or args.no_progress)
208         and os.isatty(sys.stderr.fileno())):
209         args.progress = True
210
211     # Turn off --resume (default) if --no-cache is used.
212     if not args.use_cache:
213         args.resume = False
214
215     if args.paths == ['-']:
216         if args.update_collection:
217             arg_parser.error("""
218     --update-collection cannot be used when reading from stdin.
219     """)
220         args.resume = False
221         args.use_cache = False
222         if not args.filename:
223             args.filename = 'stdin'
224
225     return args
226
227
228 class CollectionUpdateError(Exception):
229     pass
230
231
232 class ResumeCacheConflict(Exception):
233     pass
234
235
236 class ArvPutArgumentConflict(Exception):
237     pass
238
239
240 class ArvPutUploadIsPending(Exception):
241     pass
242
243
244 class ArvPutUploadNotPending(Exception):
245     pass
246
247
248 class FileUploadList(list):
249     def __init__(self, dry_run=False):
250         list.__init__(self)
251         self.dry_run = dry_run
252
253     def append(self, other):
254         if self.dry_run:
255             raise ArvPutUploadIsPending()
256         super(FileUploadList, self).append(other)
257
258
259 class ResumeCache(object):
260     CACHE_DIR = '.cache/arvados/arv-put'
261
262     def __init__(self, file_spec):
263         self.cache_file = open(file_spec, 'a+')
264         self._lock_file(self.cache_file)
265         self.filename = self.cache_file.name
266
267     @classmethod
268     def make_path(cls, args):
269         md5 = hashlib.md5()
270         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
271         realpaths = sorted(os.path.realpath(path) for path in args.paths)
272         md5.update('\0'.join(realpaths))
273         if any(os.path.isdir(path) for path in realpaths):
274             md5.update("-1")
275         elif args.filename:
276             md5.update(args.filename)
277         return os.path.join(
278             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
279             md5.hexdigest())
280
281     def _lock_file(self, fileobj):
282         try:
283             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
284         except IOError:
285             raise ResumeCacheConflict("{} locked".format(fileobj.name))
286
287     def load(self):
288         self.cache_file.seek(0)
289         return json.load(self.cache_file)
290
291     def check_cache(self, api_client=None, num_retries=0):
292         try:
293             state = self.load()
294             locator = None
295             try:
296                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
297                     locator = state["_finished_streams"][0][1][0]
298                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
299                     locator = state["_current_stream_locators"][0]
300                 if locator is not None:
301                     kc = arvados.keep.KeepClient(api_client=api_client)
302                     kc.head(locator, num_retries=num_retries)
303             except Exception as e:
304                 self.restart()
305         except (ValueError):
306             pass
307
308     def save(self, data):
309         try:
310             new_cache_fd, new_cache_name = tempfile.mkstemp(
311                 dir=os.path.dirname(self.filename))
312             self._lock_file(new_cache_fd)
313             new_cache = os.fdopen(new_cache_fd, 'r+')
314             json.dump(data, new_cache)
315             os.rename(new_cache_name, self.filename)
316         except (IOError, OSError, ResumeCacheConflict) as error:
317             try:
318                 os.unlink(new_cache_name)
319             except NameError:  # mkstemp failed.
320                 pass
321         else:
322             self.cache_file.close()
323             self.cache_file = new_cache
324
325     def close(self):
326         self.cache_file.close()
327
328     def destroy(self):
329         try:
330             os.unlink(self.filename)
331         except OSError as error:
332             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
333                 raise
334         self.close()
335
336     def restart(self):
337         self.destroy()
338         self.__init__(self.filename)
339
340
341 class ArvPutUploadJob(object):
342     CACHE_DIR = '.cache/arvados/arv-put'
343     EMPTY_STATE = {
344         'manifest' : None, # Last saved manifest checkpoint
345         'files' : {} # Previous run file list: {path : {size, mtime}}
346     }
347
348     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
349                  bytes_expected=None, name=None, owner_uuid=None,
350                  ensure_unique_name=False, num_retries=None, replication_desired=None,
351                  filename=None, update_time=20.0, update_collection=None,
352                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
353         self.paths = paths
354         self.resume = resume
355         self.use_cache = use_cache
356         self.update = False
357         self.reporter = reporter
358         self.bytes_expected = bytes_expected
359         self.bytes_written = 0
360         self.bytes_skipped = 0
361         self.name = name
362         self.owner_uuid = owner_uuid
363         self.ensure_unique_name = ensure_unique_name
364         self.num_retries = num_retries
365         self.replication_desired = replication_desired
366         self.filename = filename
367         self._state_lock = threading.Lock()
368         self._state = None # Previous run state (file list & manifest)
369         self._current_files = [] # Current run file list
370         self._cache_file = None
371         self._collection_lock = threading.Lock()
372         self._remote_collection = None # Collection being updated (if asked)
373         self._local_collection = None # Collection from previous run manifest
374         self._file_paths = [] # Files to be updated in remote collection
375         self._stop_checkpointer = threading.Event()
376         self._checkpointer = threading.Thread(target=self._update_task)
377         self._update_task_time = update_time  # How many seconds wait between update runs
378         self._files_to_upload = FileUploadList(dry_run=dry_run)
379         self.logger = logger
380         self.dry_run = dry_run
381
382         if not self.use_cache and self.resume:
383             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
384
385         # Check for obvious dry-run responses
386         if self.dry_run and (not self.use_cache or not self.resume):
387             raise ArvPutUploadIsPending()
388
389         # Load cached data if any and if needed
390         self._setup_state(update_collection)
391
392     def start(self, save_collection):
393         """
394         Start supporting thread & file uploading
395         """
396         self._checkpointer.daemon = True
397         self._checkpointer.start()
398         try:
399             for path in self.paths:
400                 # Test for stdin first, in case some file named '-' exist
401                 if path == '-':
402                     if self.dry_run:
403                         raise ArvPutUploadIsPending()
404                     self._write_stdin(self.filename or 'stdin')
405                 elif os.path.isdir(path):
406                     # Use absolute paths on cache index so CWD doesn't interfere
407                     # with the caching logic.
408                     prefixdir = path = os.path.abspath(path)
409                     if prefixdir != '/':
410                         prefixdir += '/'
411                     for root, dirs, files in os.walk(path):
412                         # Make os.walk()'s dir traversing order deterministic
413                         dirs.sort()
414                         files.sort()
415                         for f in files:
416                             self._check_file(os.path.join(root, f),
417                                              os.path.join(root[len(prefixdir):], f))
418                 else:
419                     self._check_file(os.path.abspath(path),
420                                      self.filename or os.path.basename(path))
421             # If dry-mode is on, and got up to this point, then we should notify that
422             # there aren't any file to upload.
423             if self.dry_run:
424                 raise ArvPutUploadNotPending()
425             # Update bytes_written from current local collection and
426             # report initial progress.
427             self._update()
428             # Actual file upload
429             self._upload_files()
430         finally:
431             # Stop the thread before doing anything else
432             self._stop_checkpointer.set()
433             self._checkpointer.join()
434             # Commit all pending blocks & one last _update()
435             self._local_collection.manifest_text()
436             self._update(final=True)
437             if self.use_cache:
438                 self._cache_file.close()
439             if save_collection:
440                 self.save_collection()
441
442     def save_collection(self):
443         if self.update:
444             # Check if files should be updated on the remote collection.
445             for fp in self._file_paths:
446                 remote_file = self._remote_collection.find(fp)
447                 if not remote_file:
448                     # File don't exist on remote collection, copy it.
449                     self._remote_collection.copy(fp, fp, self._local_collection)
450                 elif remote_file != self._local_collection.find(fp):
451                     # A different file exist on remote collection, overwrite it.
452                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
453                 else:
454                     # The file already exist on remote collection, skip it.
455                     pass
456             self._remote_collection.save(num_retries=self.num_retries)
457         else:
458             self._local_collection.save_new(
459                 name=self.name, owner_uuid=self.owner_uuid,
460                 ensure_unique_name=self.ensure_unique_name,
461                 num_retries=self.num_retries)
462
463     def destroy_cache(self):
464         if self.use_cache:
465             try:
466                 os.unlink(self._cache_filename)
467             except OSError as error:
468                 # That's what we wanted anyway.
469                 if error.errno != errno.ENOENT:
470                     raise
471             self._cache_file.close()
472
473     def _collection_size(self, collection):
474         """
475         Recursively get the total size of the collection
476         """
477         size = 0
478         for item in collection.values():
479             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
480                 size += self._collection_size(item)
481             else:
482                 size += item.size()
483         return size
484
485     def _update_task(self):
486         """
487         Periodically called support task. File uploading is
488         asynchronous so we poll status from the collection.
489         """
490         while not self._stop_checkpointer.wait(self._update_task_time):
491             self._update()
492
493     def _update(self, final=False):
494         """
495         Update cached manifest text and report progress.
496         """
497         with self._collection_lock:
498             self.bytes_written = self._collection_size(self._local_collection)
499             if self.use_cache:
500                 # Update cache
501                 with self._state_lock:
502                     if final:
503                         self._state['manifest'] = self._local_collection.manifest_text()
504                     else:
505                         # Get the manifest text without comitting pending blocks
506                         self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
507                 self._save_state()
508         # Call the reporter, if any
509         self.report_progress()
510
511     def report_progress(self):
512         if self.reporter is not None:
513             self.reporter(self.bytes_written, self.bytes_expected)
514
515     def _write_stdin(self, filename):
516         output = self._local_collection.open(filename, 'w')
517         self._write(sys.stdin, output)
518         output.close()
519
520     def _check_file(self, source, filename):
521         """Check if this file needs to be uploaded"""
522         resume_offset = 0
523         should_upload = False
524         new_file_in_cache = False
525         # Record file path for updating the remote collection before exiting
526         self._file_paths.append(filename)
527
528         with self._state_lock:
529             # If no previous cached data on this file, store it for an eventual
530             # repeated run.
531             if source not in self._state['files']:
532                 self._state['files'][source] = {
533                     'mtime': os.path.getmtime(source),
534                     'size' : os.path.getsize(source)
535                 }
536                 new_file_in_cache = True
537             cached_file_data = self._state['files'][source]
538
539         # Check if file was already uploaded (at least partially)
540         file_in_local_collection = self._local_collection.find(filename)
541
542         # If not resuming, upload the full file.
543         if not self.resume:
544             should_upload = True
545         # New file detected from last run, upload it.
546         elif new_file_in_cache:
547             should_upload = True
548         # Local file didn't change from last run.
549         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
550             if not file_in_local_collection:
551                 # File not uploaded yet, upload it completely
552                 should_upload = True
553             elif file_in_local_collection.permission_expired():
554                 # Permission token expired, re-upload file. This will change whenever
555                 # we have a API for refreshing tokens.
556                 should_upload = True
557                 self._local_collection.remove(filename)
558             elif cached_file_data['size'] == file_in_local_collection.size():
559                 # File already there, skip it.
560                 self.bytes_skipped += cached_file_data['size']
561             elif cached_file_data['size'] > file_in_local_collection.size():
562                 # File partially uploaded, resume!
563                 resume_offset = file_in_local_collection.size()
564                 self.bytes_skipped += resume_offset
565                 should_upload = True
566             else:
567                 # Inconsistent cache, re-upload the file
568                 should_upload = True
569                 self._local_collection.remove(filename)
570                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
571         # Local file differs from cached data, re-upload it.
572         else:
573             if file_in_local_collection:
574                 self._local_collection.remove(filename)
575             should_upload = True
576
577         if should_upload:
578             self._files_to_upload.append((source, resume_offset, filename))
579
580     def _upload_files(self):
581         for source, resume_offset, filename in self._files_to_upload:
582             with open(source, 'r') as source_fd:
583                 with self._state_lock:
584                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
585                     self._state['files'][source]['size'] = os.path.getsize(source)
586                 if resume_offset > 0:
587                     # Start upload where we left off
588                     output = self._local_collection.open(filename, 'a')
589                     source_fd.seek(resume_offset)
590                 else:
591                     # Start from scratch
592                     output = self._local_collection.open(filename, 'w')
593                 self._write(source_fd, output)
594                 output.close(flush=False)
595
596     def _write(self, source_fd, output):
597         while True:
598             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
599             if not data:
600                 break
601             output.write(data)
602
603     def _my_collection(self):
604         return self._remote_collection if self.update else self._local_collection
605
606     def _setup_state(self, update_collection):
607         """
608         Create a new cache file or load a previously existing one.
609         """
610         # Load an already existing collection for update
611         if update_collection and re.match(arvados.util.collection_uuid_pattern,
612                                           update_collection):
613             try:
614                 self._remote_collection = arvados.collection.Collection(update_collection)
615             except arvados.errors.ApiError as error:
616                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
617             else:
618                 self.update = True
619         elif update_collection:
620             # Collection locator provided, but unknown format
621             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
622
623         if self.use_cache:
624             # Set up cache file name from input paths.
625             md5 = hashlib.md5()
626             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
627             realpaths = sorted(os.path.realpath(path) for path in self.paths)
628             md5.update('\0'.join(realpaths))
629             if self.filename:
630                 md5.update(self.filename)
631             cache_filename = md5.hexdigest()
632             cache_filepath = os.path.join(
633                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
634                 cache_filename)
635             if self.resume:
636                 self._cache_file = open(cache_filepath, 'a+')
637             else:
638                 # --no-resume means start with a empty cache file.
639                 self._cache_file = open(cache_filepath, 'w+')
640             self._cache_filename = self._cache_file.name
641             self._lock_file(self._cache_file)
642             self._cache_file.seek(0)
643
644         with self._state_lock:
645             if self.use_cache:
646                 try:
647                     self._state = json.load(self._cache_file)
648                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
649                         # Cache at least partially incomplete, set up new cache
650                         self._state = copy.deepcopy(self.EMPTY_STATE)
651                 except ValueError:
652                     # Cache file empty, set up new cache
653                     self._state = copy.deepcopy(self.EMPTY_STATE)
654             else:
655                 # No cache file, set empty state
656                 self._state = copy.deepcopy(self.EMPTY_STATE)
657             # Load the previous manifest so we can check if files were modified remotely.
658             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
659
660     def _lock_file(self, fileobj):
661         try:
662             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
663         except IOError:
664             raise ResumeCacheConflict("{} locked".format(fileobj.name))
665
666     def _save_state(self):
667         """
668         Atomically save current state into cache.
669         """
670         try:
671             with self._state_lock:
672                 state = copy.deepcopy(self._state)
673             new_cache_fd, new_cache_name = tempfile.mkstemp(
674                 dir=os.path.dirname(self._cache_filename))
675             self._lock_file(new_cache_fd)
676             new_cache = os.fdopen(new_cache_fd, 'r+')
677             json.dump(state, new_cache)
678             new_cache.flush()
679             os.fsync(new_cache)
680             os.rename(new_cache_name, self._cache_filename)
681         except (IOError, OSError, ResumeCacheConflict) as error:
682             self.logger.error("There was a problem while saving the cache file: {}".format(error))
683             try:
684                 os.unlink(new_cache_name)
685             except NameError:  # mkstemp failed.
686                 pass
687         else:
688             self._cache_file.close()
689             self._cache_file = new_cache
690
691     def collection_name(self):
692         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
693
694     def manifest_locator(self):
695         return self._my_collection().manifest_locator()
696
697     def portable_data_hash(self):
698         return self._my_collection().portable_data_hash()
699
700     def manifest_text(self, stream_name=".", strip=False, normalize=False):
701         return self._my_collection().manifest_text(stream_name, strip, normalize)
702
703     def _datablocks_on_item(self, item):
704         """
705         Return a list of datablock locators, recursively navigating
706         through subcollections
707         """
708         if isinstance(item, arvados.arvfile.ArvadosFile):
709             if item.size() == 0:
710                 # Empty file locator
711                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
712             else:
713                 locators = []
714                 for segment in item.segments():
715                     loc = segment.locator
716                     locators.append(loc)
717                 return locators
718         elif isinstance(item, arvados.collection.Collection):
719             l = [self._datablocks_on_item(x) for x in item.values()]
720             # Fast list flattener method taken from:
721             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
722             return [loc for sublist in l for loc in sublist]
723         else:
724             return None
725
726     def data_locators(self):
727         with self._collection_lock:
728             # Make sure all datablocks are flushed before getting the locators
729             self._my_collection().manifest_text()
730             datablocks = self._datablocks_on_item(self._my_collection())
731         return datablocks
732
733
734 def expected_bytes_for(pathlist):
735     # Walk the given directory trees and stat files, adding up file sizes,
736     # so we can display progress as percent
737     bytesum = 0
738     for path in pathlist:
739         if os.path.isdir(path):
740             for filename in arvados.util.listdir_recursive(path):
741                 bytesum += os.path.getsize(os.path.join(path, filename))
742         elif not os.path.isfile(path):
743             return None
744         else:
745             bytesum += os.path.getsize(path)
746     return bytesum
747
748 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
749                                                             os.getpid())
750 def machine_progress(bytes_written, bytes_expected):
751     return _machine_format.format(
752         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
753
754 def human_progress(bytes_written, bytes_expected):
755     if bytes_expected:
756         return "\r{}M / {}M {:.1%} ".format(
757             bytes_written >> 20, bytes_expected >> 20,
758             float(bytes_written) / bytes_expected)
759     else:
760         return "\r{} ".format(bytes_written)
761
762 def progress_writer(progress_func, outfile=sys.stderr):
763     def write_progress(bytes_written, bytes_expected):
764         outfile.write(progress_func(bytes_written, bytes_expected))
765     return write_progress
766
767 def exit_signal_handler(sigcode, frame):
768     sys.exit(-sigcode)
769
770 def desired_project_uuid(api_client, project_uuid, num_retries):
771     if not project_uuid:
772         query = api_client.users().current()
773     elif arvados.util.user_uuid_pattern.match(project_uuid):
774         query = api_client.users().get(uuid=project_uuid)
775     elif arvados.util.group_uuid_pattern.match(project_uuid):
776         query = api_client.groups().get(uuid=project_uuid)
777     else:
778         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
779     return query.execute(num_retries=num_retries)['uuid']
780
781 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
782     global api_client
783
784     logger = logging.getLogger('arvados.arv_put')
785     args = parse_arguments(arguments)
786     status = 0
787     if api_client is None:
788         api_client = arvados.api('v1')
789
790     # Determine the name to use
791     if args.name:
792         if args.stream or args.raw:
793             logger.error("Cannot use --name with --stream or --raw")
794             sys.exit(1)
795         collection_name = args.name
796     else:
797         collection_name = "Saved at {} by {}@{}".format(
798             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
799             pwd.getpwuid(os.getuid()).pw_name,
800             socket.gethostname())
801
802     if args.project_uuid and (args.stream or args.raw):
803         logger.error("Cannot use --project-uuid with --stream or --raw")
804         sys.exit(1)
805
806     # Determine the parent project
807     try:
808         project_uuid = desired_project_uuid(api_client, args.project_uuid,
809                                             args.retries)
810     except (apiclient_errors.Error, ValueError) as error:
811         logger.error(error)
812         sys.exit(1)
813
814     if args.progress:
815         reporter = progress_writer(human_progress)
816     elif args.batch_progress:
817         reporter = progress_writer(machine_progress)
818     else:
819         reporter = None
820
821     bytes_expected = expected_bytes_for(args.paths)
822
823     try:
824         writer = ArvPutUploadJob(paths = args.paths,
825                                  resume = args.resume,
826                                  use_cache = args.use_cache,
827                                  filename = args.filename,
828                                  reporter = reporter,
829                                  bytes_expected = bytes_expected,
830                                  num_retries = args.retries,
831                                  replication_desired = args.replication,
832                                  name = collection_name,
833                                  owner_uuid = project_uuid,
834                                  ensure_unique_name = True,
835                                  update_collection = args.update_collection,
836                                  logger=logger)
837     except ResumeCacheConflict:
838         logger.error("\n".join([
839             "arv-put: Another process is already uploading this data.",
840             "         Use --no-cache if this is really what you want."]))
841         sys.exit(1)
842     except CollectionUpdateError as error:
843         logger.error("\n".join([
844             "arv-put: %s" % str(error)]))
845         sys.exit(1)
846     except ArvPutUploadIsPending:
847         # Dry run check successful, return proper exit code.
848         sys.exit(2)
849     except ArvPutUploadNotPending:
850         # No files pending for upload
851         sys.exit(0)
852
853     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
854     # the originals.
855     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
856                             for sigcode in CAUGHT_SIGNALS}
857
858     if not args.update_collection and args.resume and writer.bytes_written > 0:
859         logger.warning("\n".join([
860             "arv-put: Resuming previous upload from last checkpoint.",
861             "         Use the --no-resume option to start over."]))
862
863     writer.report_progress()
864     output = None
865     try:
866         writer.start(save_collection=not(args.stream or args.raw))
867     except arvados.errors.ApiError as error:
868         logger.error("\n".join([
869             "arv-put: %s" % str(error)]))
870         sys.exit(1)
871
872     if args.progress:  # Print newline to split stderr from stdout for humans.
873         logger.info("\n")
874
875     if args.stream:
876         if args.normalize:
877             output = writer.manifest_text(normalize=True)
878         else:
879             output = writer.manifest_text()
880     elif args.raw:
881         output = ','.join(writer.data_locators())
882     else:
883         try:
884             if args.update_collection:
885                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
886             else:
887                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
888             if args.portable_data_hash:
889                 output = writer.portable_data_hash()
890             else:
891                 output = writer.manifest_locator()
892         except apiclient_errors.Error as error:
893             logger.error(
894                 "arv-put: Error creating Collection on project: {}.".format(
895                     error))
896             status = 1
897
898     # Print the locator (uuid) of the new collection.
899     if output is None:
900         status = status or 1
901     else:
902         stdout.write(output)
903         if not output.endswith('\n'):
904             stdout.write('\n')
905
906     for sigcode, orig_handler in orig_signal_handlers.items():
907         signal.signal(sigcode, orig_handler)
908
909     if status != 0:
910         sys.exit(status)
911
912     # Success!
913     return output
914
915
916 if __name__ == '__main__':
917     main()