Merge branch '10694-provenance-graph-consistency'
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 from apiclient import errors as apiclient_errors
27 from arvados._version import __version__
28
29 import arvados.commands._util as arv_cmd
30
31 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
32 api_client = None
33
34 upload_opts = argparse.ArgumentParser(add_help=False)
35
36 upload_opts.add_argument('--version', action='version',
37                          version="%s %s" % (sys.argv[0], __version__),
38                          help='Print version and exit.')
39 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
40                          help="""
41 Local file or directory. Default: read from standard input.
42 """)
43
44 _group = upload_opts.add_mutually_exclusive_group()
45
46 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
47                     default=-1, help=argparse.SUPPRESS)
48
49 _group.add_argument('--normalize', action='store_true',
50                     help="""
51 Normalize the manifest by re-ordering files and streams after writing
52 data.
53 """)
54
55 _group.add_argument('--dry-run', action='store_true', default=False,
56                     help="""
57 Don't actually upload files, but only check if any file should be
58 uploaded. Exit with code=2 when files are pending for upload.
59 """)
60
61 _group = upload_opts.add_mutually_exclusive_group()
62
63 _group.add_argument('--as-stream', action='store_true', dest='stream',
64                     help="""
65 Synonym for --stream.
66 """)
67
68 _group.add_argument('--stream', action='store_true',
69                     help="""
70 Store the file content and display the resulting manifest on
71 stdout. Do not write the manifest to Keep or save a Collection object
72 in Arvados.
73 """)
74
75 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
76                     help="""
77 Synonym for --manifest.
78 """)
79
80 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
81                     help="""
82 Synonym for --manifest.
83 """)
84
85 _group.add_argument('--manifest', action='store_true',
86                     help="""
87 Store the file data and resulting manifest in Keep, save a Collection
88 object in Arvados, and display the manifest locator (Collection uuid)
89 on stdout. This is the default behavior.
90 """)
91
92 _group.add_argument('--as-raw', action='store_true', dest='raw',
93                     help="""
94 Synonym for --raw.
95 """)
96
97 _group.add_argument('--raw', action='store_true',
98                     help="""
99 Store the file content and display the data block locators on stdout,
100 separated by commas, with a trailing newline. Do not store a
101 manifest.
102 """)
103
104 upload_opts.add_argument('--update-collection', type=str, default=None,
105                          dest='update_collection', metavar="UUID", help="""
106 Update an existing collection identified by the given Arvados collection
107 UUID. All new local files will be uploaded.
108 """)
109
110 upload_opts.add_argument('--use-filename', type=str, default=None,
111                          dest='filename', help="""
112 Synonym for --filename.
113 """)
114
115 upload_opts.add_argument('--filename', type=str, default=None,
116                          help="""
117 Use the given filename in the manifest, instead of the name of the
118 local file. This is useful when "-" or "/dev/stdin" is given as an
119 input file. It can be used only if there is exactly one path given and
120 it is not a directory. Implies --manifest.
121 """)
122
123 upload_opts.add_argument('--portable-data-hash', action='store_true',
124                          help="""
125 Print the portable data hash instead of the Arvados UUID for the collection
126 created by the upload.
127 """)
128
129 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
130                          help="""
131 Set the replication level for the new collection: how many different
132 physical storage devices (e.g., disks) should have a copy of each data
133 block. Default is to use the server-provided default (if any) or 2.
134 """)
135
136 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
137                          help="""
138 Set the number of upload threads to be used. Take into account that
139 using lots of threads will increase the RAM requirements. Default is
140 to use 2 threads.
141 On high latency installations, using a greater number will improve
142 overall throughput.
143 """)
144
145 run_opts = argparse.ArgumentParser(add_help=False)
146
147 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
148 Store the collection in the specified project, instead of your Home
149 project.
150 """)
151
152 run_opts.add_argument('--name', help="""
153 Save the collection with the specified name.
154 """)
155
156 _group = run_opts.add_mutually_exclusive_group()
157 _group.add_argument('--progress', action='store_true',
158                     help="""
159 Display human-readable progress on stderr (bytes and, if possible,
160 percentage of total data size). This is the default behavior when
161 stderr is a tty.
162 """)
163
164 _group.add_argument('--no-progress', action='store_true',
165                     help="""
166 Do not display human-readable progress on stderr, even if stderr is a
167 tty.
168 """)
169
170 _group.add_argument('--batch-progress', action='store_true',
171                     help="""
172 Display machine-readable progress on stderr (bytes and, if known,
173 total data size).
174 """)
175
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--resume', action='store_true', default=True,
178                     help="""
179 Continue interrupted uploads from cached state (default).
180 """)
181 _group.add_argument('--no-resume', action='store_false', dest='resume',
182                     help="""
183 Do not continue interrupted uploads from cached state.
184 """)
185
186 _group = run_opts.add_mutually_exclusive_group()
187 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
188                     help="""
189 Save upload state in a cache file for resuming (default).
190 """)
191 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
192                     help="""
193 Do not save upload state in a cache file for resuming.
194 """)
195
196 arg_parser = argparse.ArgumentParser(
197     description='Copy data from the local filesystem to Keep.',
198     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
199
200 def parse_arguments(arguments):
201     args = arg_parser.parse_args(arguments)
202
203     if len(args.paths) == 0:
204         args.paths = ['-']
205
206     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
207
208     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
209         if args.filename:
210             arg_parser.error("""
211     --filename argument cannot be used when storing a directory or
212     multiple files.
213     """)
214
215     # Turn on --progress by default if stderr is a tty.
216     if (not (args.batch_progress or args.no_progress)
217         and os.isatty(sys.stderr.fileno())):
218         args.progress = True
219
220     # Turn off --resume (default) if --no-cache is used.
221     if not args.use_cache:
222         args.resume = False
223
224     if args.paths == ['-']:
225         if args.update_collection:
226             arg_parser.error("""
227     --update-collection cannot be used when reading from stdin.
228     """)
229         args.resume = False
230         args.use_cache = False
231         if not args.filename:
232             args.filename = 'stdin'
233
234     return args
235
236
237 class CollectionUpdateError(Exception):
238     pass
239
240
241 class ResumeCacheConflict(Exception):
242     pass
243
244
245 class ArvPutArgumentConflict(Exception):
246     pass
247
248
249 class ArvPutUploadIsPending(Exception):
250     pass
251
252
253 class ArvPutUploadNotPending(Exception):
254     pass
255
256
257 class FileUploadList(list):
258     def __init__(self, dry_run=False):
259         list.__init__(self)
260         self.dry_run = dry_run
261
262     def append(self, other):
263         if self.dry_run:
264             raise ArvPutUploadIsPending()
265         super(FileUploadList, self).append(other)
266
267
268 class ResumeCache(object):
269     CACHE_DIR = '.cache/arvados/arv-put'
270
271     def __init__(self, file_spec):
272         self.cache_file = open(file_spec, 'a+')
273         self._lock_file(self.cache_file)
274         self.filename = self.cache_file.name
275
276     @classmethod
277     def make_path(cls, args):
278         md5 = hashlib.md5()
279         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
280         realpaths = sorted(os.path.realpath(path) for path in args.paths)
281         md5.update('\0'.join(realpaths))
282         if any(os.path.isdir(path) for path in realpaths):
283             md5.update("-1")
284         elif args.filename:
285             md5.update(args.filename)
286         return os.path.join(
287             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
288             md5.hexdigest())
289
290     def _lock_file(self, fileobj):
291         try:
292             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
293         except IOError:
294             raise ResumeCacheConflict("{} locked".format(fileobj.name))
295
296     def load(self):
297         self.cache_file.seek(0)
298         return json.load(self.cache_file)
299
300     def check_cache(self, api_client=None, num_retries=0):
301         try:
302             state = self.load()
303             locator = None
304             try:
305                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
306                     locator = state["_finished_streams"][0][1][0]
307                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
308                     locator = state["_current_stream_locators"][0]
309                 if locator is not None:
310                     kc = arvados.keep.KeepClient(api_client=api_client)
311                     kc.head(locator, num_retries=num_retries)
312             except Exception as e:
313                 self.restart()
314         except (ValueError):
315             pass
316
317     def save(self, data):
318         try:
319             new_cache_fd, new_cache_name = tempfile.mkstemp(
320                 dir=os.path.dirname(self.filename))
321             self._lock_file(new_cache_fd)
322             new_cache = os.fdopen(new_cache_fd, 'r+')
323             json.dump(data, new_cache)
324             os.rename(new_cache_name, self.filename)
325         except (IOError, OSError, ResumeCacheConflict) as error:
326             try:
327                 os.unlink(new_cache_name)
328             except NameError:  # mkstemp failed.
329                 pass
330         else:
331             self.cache_file.close()
332             self.cache_file = new_cache
333
334     def close(self):
335         self.cache_file.close()
336
337     def destroy(self):
338         try:
339             os.unlink(self.filename)
340         except OSError as error:
341             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
342                 raise
343         self.close()
344
345     def restart(self):
346         self.destroy()
347         self.__init__(self.filename)
348
349
350 class ArvPutUploadJob(object):
351     CACHE_DIR = '.cache/arvados/arv-put'
352     EMPTY_STATE = {
353         'manifest' : None, # Last saved manifest checkpoint
354         'files' : {} # Previous run file list: {path : {size, mtime}}
355     }
356
357     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
358                  bytes_expected=None, name=None, owner_uuid=None,
359                  ensure_unique_name=False, num_retries=None,
360                  put_threads=None, replication_desired=None,
361                  filename=None, update_time=60.0, update_collection=None,
362                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
363         self.paths = paths
364         self.resume = resume
365         self.use_cache = use_cache
366         self.update = False
367         self.reporter = reporter
368         self.bytes_expected = bytes_expected
369         self.bytes_written = 0
370         self.bytes_skipped = 0
371         self.name = name
372         self.owner_uuid = owner_uuid
373         self.ensure_unique_name = ensure_unique_name
374         self.num_retries = num_retries
375         self.replication_desired = replication_desired
376         self.put_threads = put_threads
377         self.filename = filename
378         self._state_lock = threading.Lock()
379         self._state = None # Previous run state (file list & manifest)
380         self._current_files = [] # Current run file list
381         self._cache_file = None
382         self._collection_lock = threading.Lock()
383         self._remote_collection = None # Collection being updated (if asked)
384         self._local_collection = None # Collection from previous run manifest
385         self._file_paths = [] # Files to be updated in remote collection
386         self._stop_checkpointer = threading.Event()
387         self._checkpointer = threading.Thread(target=self._update_task)
388         self._checkpointer.daemon = True
389         self._update_task_time = update_time  # How many seconds wait between update runs
390         self._files_to_upload = FileUploadList(dry_run=dry_run)
391         self.logger = logger
392         self.dry_run = dry_run
393
394         if not self.use_cache and self.resume:
395             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
396
397         # Check for obvious dry-run responses
398         if self.dry_run and (not self.use_cache or not self.resume):
399             raise ArvPutUploadIsPending()
400
401         # Load cached data if any and if needed
402         self._setup_state(update_collection)
403
404     def start(self, save_collection):
405         """
406         Start supporting thread & file uploading
407         """
408         if not self.dry_run:
409             self._checkpointer.start()
410         try:
411             for path in self.paths:
412                 # Test for stdin first, in case some file named '-' exist
413                 if path == '-':
414                     if self.dry_run:
415                         raise ArvPutUploadIsPending()
416                     self._write_stdin(self.filename or 'stdin')
417                 elif os.path.isdir(path):
418                     # Use absolute paths on cache index so CWD doesn't interfere
419                     # with the caching logic.
420                     prefixdir = path = os.path.abspath(path)
421                     if prefixdir != '/':
422                         prefixdir += '/'
423                     for root, dirs, files in os.walk(path):
424                         # Make os.walk()'s dir traversing order deterministic
425                         dirs.sort()
426                         files.sort()
427                         for f in files:
428                             self._check_file(os.path.join(root, f),
429                                              os.path.join(root[len(prefixdir):], f))
430                 else:
431                     self._check_file(os.path.abspath(path),
432                                      self.filename or os.path.basename(path))
433             # If dry-mode is on, and got up to this point, then we should notify that
434             # there aren't any file to upload.
435             if self.dry_run:
436                 raise ArvPutUploadNotPending()
437             # Remove local_collection's files that don't exist locally anymore, so the
438             # bytes_written count is correct.
439             for f in self.collection_file_paths(self._local_collection,
440                                                 path_prefix=""):
441                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
442                     self._local_collection.remove(f)
443             # Update bytes_written from current local collection and
444             # report initial progress.
445             self._update()
446             # Actual file upload
447             self._upload_files()
448         finally:
449             if not self.dry_run:
450                 # Stop the thread before doing anything else
451                 self._stop_checkpointer.set()
452                 self._checkpointer.join()
453                 # Commit all pending blocks & one last _update()
454                 self._local_collection.manifest_text()
455                 self._update(final=True)
456                 if save_collection:
457                     self.save_collection()
458             if self.use_cache:
459                 self._cache_file.close()
460
461     def save_collection(self):
462         if self.update:
463             # Check if files should be updated on the remote collection.
464             for fp in self._file_paths:
465                 remote_file = self._remote_collection.find(fp)
466                 if not remote_file:
467                     # File don't exist on remote collection, copy it.
468                     self._remote_collection.copy(fp, fp, self._local_collection)
469                 elif remote_file != self._local_collection.find(fp):
470                     # A different file exist on remote collection, overwrite it.
471                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
472                 else:
473                     # The file already exist on remote collection, skip it.
474                     pass
475             self._remote_collection.save(num_retries=self.num_retries)
476         else:
477             self._local_collection.save_new(
478                 name=self.name, owner_uuid=self.owner_uuid,
479                 ensure_unique_name=self.ensure_unique_name,
480                 num_retries=self.num_retries)
481
482     def destroy_cache(self):
483         if self.use_cache:
484             try:
485                 os.unlink(self._cache_filename)
486             except OSError as error:
487                 # That's what we wanted anyway.
488                 if error.errno != errno.ENOENT:
489                     raise
490             self._cache_file.close()
491
492     def _collection_size(self, collection):
493         """
494         Recursively get the total size of the collection
495         """
496         size = 0
497         for item in collection.values():
498             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
499                 size += self._collection_size(item)
500             else:
501                 size += item.size()
502         return size
503
504     def _update_task(self):
505         """
506         Periodically called support task. File uploading is
507         asynchronous so we poll status from the collection.
508         """
509         while not self._stop_checkpointer.wait(self._update_task_time):
510             self._update()
511
512     def _update(self, final=False):
513         """
514         Update cached manifest text and report progress.
515         """
516         with self._collection_lock:
517             self.bytes_written = self._collection_size(self._local_collection)
518             if self.use_cache:
519                 if final:
520                     manifest = self._local_collection.manifest_text()
521                 else:
522                     # Get the manifest text without comitting pending blocks
523                     manifest = self._local_collection.manifest_text(strip=False,
524                                                                     normalize=False,
525                                                                     only_committed=True)
526                 # Update cache
527                 with self._state_lock:
528                     self._state['manifest'] = manifest
529         if self.use_cache:
530             self._save_state()
531         # Call the reporter, if any
532         self.report_progress()
533
534     def report_progress(self):
535         if self.reporter is not None:
536             self.reporter(self.bytes_written, self.bytes_expected)
537
538     def _write_stdin(self, filename):
539         output = self._local_collection.open(filename, 'w')
540         self._write(sys.stdin, output)
541         output.close()
542
543     def _check_file(self, source, filename):
544         """Check if this file needs to be uploaded"""
545         resume_offset = 0
546         should_upload = False
547         new_file_in_cache = False
548         # Record file path for updating the remote collection before exiting
549         self._file_paths.append(filename)
550
551         with self._state_lock:
552             # If no previous cached data on this file, store it for an eventual
553             # repeated run.
554             if source not in self._state['files']:
555                 self._state['files'][source] = {
556                     'mtime': os.path.getmtime(source),
557                     'size' : os.path.getsize(source)
558                 }
559                 new_file_in_cache = True
560             cached_file_data = self._state['files'][source]
561
562         # Check if file was already uploaded (at least partially)
563         file_in_local_collection = self._local_collection.find(filename)
564
565         # If not resuming, upload the full file.
566         if not self.resume:
567             should_upload = True
568         # New file detected from last run, upload it.
569         elif new_file_in_cache:
570             should_upload = True
571         # Local file didn't change from last run.
572         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
573             if not file_in_local_collection:
574                 # File not uploaded yet, upload it completely
575                 should_upload = True
576             elif file_in_local_collection.permission_expired():
577                 # Permission token expired, re-upload file. This will change whenever
578                 # we have a API for refreshing tokens.
579                 should_upload = True
580                 self._local_collection.remove(filename)
581             elif cached_file_data['size'] == file_in_local_collection.size():
582                 # File already there, skip it.
583                 self.bytes_skipped += cached_file_data['size']
584             elif cached_file_data['size'] > file_in_local_collection.size():
585                 # File partially uploaded, resume!
586                 resume_offset = file_in_local_collection.size()
587                 self.bytes_skipped += resume_offset
588                 should_upload = True
589             else:
590                 # Inconsistent cache, re-upload the file
591                 should_upload = True
592                 self._local_collection.remove(filename)
593                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
594         # Local file differs from cached data, re-upload it.
595         else:
596             if file_in_local_collection:
597                 self._local_collection.remove(filename)
598             should_upload = True
599
600         if should_upload:
601             self._files_to_upload.append((source, resume_offset, filename))
602
603     def _upload_files(self):
604         for source, resume_offset, filename in self._files_to_upload:
605             with open(source, 'r') as source_fd:
606                 with self._state_lock:
607                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
608                     self._state['files'][source]['size'] = os.path.getsize(source)
609                 if resume_offset > 0:
610                     # Start upload where we left off
611                     output = self._local_collection.open(filename, 'a')
612                     source_fd.seek(resume_offset)
613                 else:
614                     # Start from scratch
615                     output = self._local_collection.open(filename, 'w')
616                 self._write(source_fd, output)
617                 output.close(flush=False)
618
619     def _write(self, source_fd, output):
620         while True:
621             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
622             if not data:
623                 break
624             output.write(data)
625
626     def _my_collection(self):
627         return self._remote_collection if self.update else self._local_collection
628
629     def _setup_state(self, update_collection):
630         """
631         Create a new cache file or load a previously existing one.
632         """
633         # Load an already existing collection for update
634         if update_collection and re.match(arvados.util.collection_uuid_pattern,
635                                           update_collection):
636             try:
637                 self._remote_collection = arvados.collection.Collection(update_collection)
638             except arvados.errors.ApiError as error:
639                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
640             else:
641                 self.update = True
642         elif update_collection:
643             # Collection locator provided, but unknown format
644             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
645
646         if self.use_cache:
647             # Set up cache file name from input paths.
648             md5 = hashlib.md5()
649             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
650             realpaths = sorted(os.path.realpath(path) for path in self.paths)
651             md5.update('\0'.join(realpaths))
652             if self.filename:
653                 md5.update(self.filename)
654             cache_filename = md5.hexdigest()
655             cache_filepath = os.path.join(
656                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
657                 cache_filename)
658             if self.resume:
659                 self._cache_file = open(cache_filepath, 'a+')
660             else:
661                 # --no-resume means start with a empty cache file.
662                 self._cache_file = open(cache_filepath, 'w+')
663             self._cache_filename = self._cache_file.name
664             self._lock_file(self._cache_file)
665             self._cache_file.seek(0)
666
667         with self._state_lock:
668             if self.use_cache:
669                 try:
670                     self._state = json.load(self._cache_file)
671                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
672                         # Cache at least partially incomplete, set up new cache
673                         self._state = copy.deepcopy(self.EMPTY_STATE)
674                 except ValueError:
675                     # Cache file empty, set up new cache
676                     self._state = copy.deepcopy(self.EMPTY_STATE)
677             else:
678                 # No cache file, set empty state
679                 self._state = copy.deepcopy(self.EMPTY_STATE)
680             # Load the previous manifest so we can check if files were modified remotely.
681             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
682
683     def collection_file_paths(self, col, path_prefix='.'):
684         """Return a list of file paths by recursively go through the entire collection `col`"""
685         file_paths = []
686         for name, item in col.items():
687             if isinstance(item, arvados.arvfile.ArvadosFile):
688                 file_paths.append(os.path.join(path_prefix, name))
689             elif isinstance(item, arvados.collection.Subcollection):
690                 new_prefix = os.path.join(path_prefix, name)
691                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
692         return file_paths
693
694     def _lock_file(self, fileobj):
695         try:
696             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
697         except IOError:
698             raise ResumeCacheConflict("{} locked".format(fileobj.name))
699
700     def _save_state(self):
701         """
702         Atomically save current state into cache.
703         """
704         try:
705             with self._state_lock:
706                 state = copy.deepcopy(self._state)
707             new_cache_fd, new_cache_name = tempfile.mkstemp(
708                 dir=os.path.dirname(self._cache_filename))
709             self._lock_file(new_cache_fd)
710             new_cache = os.fdopen(new_cache_fd, 'r+')
711             json.dump(state, new_cache)
712             new_cache.flush()
713             os.fsync(new_cache)
714             os.rename(new_cache_name, self._cache_filename)
715         except (IOError, OSError, ResumeCacheConflict) as error:
716             self.logger.error("There was a problem while saving the cache file: {}".format(error))
717             try:
718                 os.unlink(new_cache_name)
719             except NameError:  # mkstemp failed.
720                 pass
721         else:
722             self._cache_file.close()
723             self._cache_file = new_cache
724
725     def collection_name(self):
726         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
727
728     def manifest_locator(self):
729         return self._my_collection().manifest_locator()
730
731     def portable_data_hash(self):
732         return self._my_collection().portable_data_hash()
733
734     def manifest_text(self, stream_name=".", strip=False, normalize=False):
735         return self._my_collection().manifest_text(stream_name, strip, normalize)
736
737     def _datablocks_on_item(self, item):
738         """
739         Return a list of datablock locators, recursively navigating
740         through subcollections
741         """
742         if isinstance(item, arvados.arvfile.ArvadosFile):
743             if item.size() == 0:
744                 # Empty file locator
745                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
746             else:
747                 locators = []
748                 for segment in item.segments():
749                     loc = segment.locator
750                     locators.append(loc)
751                 return locators
752         elif isinstance(item, arvados.collection.Collection):
753             l = [self._datablocks_on_item(x) for x in item.values()]
754             # Fast list flattener method taken from:
755             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
756             return [loc for sublist in l for loc in sublist]
757         else:
758             return None
759
760     def data_locators(self):
761         with self._collection_lock:
762             # Make sure all datablocks are flushed before getting the locators
763             self._my_collection().manifest_text()
764             datablocks = self._datablocks_on_item(self._my_collection())
765         return datablocks
766
767
768 def expected_bytes_for(pathlist):
769     # Walk the given directory trees and stat files, adding up file sizes,
770     # so we can display progress as percent
771     bytesum = 0
772     for path in pathlist:
773         if os.path.isdir(path):
774             for filename in arvados.util.listdir_recursive(path):
775                 bytesum += os.path.getsize(os.path.join(path, filename))
776         elif not os.path.isfile(path):
777             return None
778         else:
779             bytesum += os.path.getsize(path)
780     return bytesum
781
782 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
783                                                             os.getpid())
784 def machine_progress(bytes_written, bytes_expected):
785     return _machine_format.format(
786         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
787
788 def human_progress(bytes_written, bytes_expected):
789     if bytes_expected:
790         return "\r{}M / {}M {:.1%} ".format(
791             bytes_written >> 20, bytes_expected >> 20,
792             float(bytes_written) / bytes_expected)
793     else:
794         return "\r{} ".format(bytes_written)
795
796 def progress_writer(progress_func, outfile=sys.stderr):
797     def write_progress(bytes_written, bytes_expected):
798         outfile.write(progress_func(bytes_written, bytes_expected))
799     return write_progress
800
801 def exit_signal_handler(sigcode, frame):
802     sys.exit(-sigcode)
803
804 def desired_project_uuid(api_client, project_uuid, num_retries):
805     if not project_uuid:
806         query = api_client.users().current()
807     elif arvados.util.user_uuid_pattern.match(project_uuid):
808         query = api_client.users().get(uuid=project_uuid)
809     elif arvados.util.group_uuid_pattern.match(project_uuid):
810         query = api_client.groups().get(uuid=project_uuid)
811     else:
812         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
813     return query.execute(num_retries=num_retries)['uuid']
814
815 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
816     global api_client
817
818     logger = logging.getLogger('arvados.arv_put')
819     args = parse_arguments(arguments)
820     status = 0
821     if api_client is None:
822         api_client = arvados.api('v1')
823
824     # Determine the name to use
825     if args.name:
826         if args.stream or args.raw:
827             logger.error("Cannot use --name with --stream or --raw")
828             sys.exit(1)
829         elif args.update_collection:
830             logger.error("Cannot use --name with --update-collection")
831             sys.exit(1)
832         collection_name = args.name
833     else:
834         collection_name = "Saved at {} by {}@{}".format(
835             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
836             pwd.getpwuid(os.getuid()).pw_name,
837             socket.gethostname())
838
839     if args.project_uuid and (args.stream or args.raw):
840         logger.error("Cannot use --project-uuid with --stream or --raw")
841         sys.exit(1)
842
843     # Determine the parent project
844     try:
845         project_uuid = desired_project_uuid(api_client, args.project_uuid,
846                                             args.retries)
847     except (apiclient_errors.Error, ValueError) as error:
848         logger.error(error)
849         sys.exit(1)
850
851     if args.progress:
852         reporter = progress_writer(human_progress)
853     elif args.batch_progress:
854         reporter = progress_writer(machine_progress)
855     else:
856         reporter = None
857
858     bytes_expected = expected_bytes_for(args.paths)
859
860     try:
861         writer = ArvPutUploadJob(paths = args.paths,
862                                  resume = args.resume,
863                                  use_cache = args.use_cache,
864                                  filename = args.filename,
865                                  reporter = reporter,
866                                  bytes_expected = bytes_expected,
867                                  num_retries = args.retries,
868                                  replication_desired = args.replication,
869                                  put_threads = args.threads,
870                                  name = collection_name,
871                                  owner_uuid = project_uuid,
872                                  ensure_unique_name = True,
873                                  update_collection = args.update_collection,
874                                  logger=logger,
875                                  dry_run=args.dry_run)
876     except ResumeCacheConflict:
877         logger.error("\n".join([
878             "arv-put: Another process is already uploading this data.",
879             "         Use --no-cache if this is really what you want."]))
880         sys.exit(1)
881     except CollectionUpdateError as error:
882         logger.error("\n".join([
883             "arv-put: %s" % str(error)]))
884         sys.exit(1)
885     except ArvPutUploadIsPending:
886         # Dry run check successful, return proper exit code.
887         sys.exit(2)
888     except ArvPutUploadNotPending:
889         # No files pending for upload
890         sys.exit(0)
891
892     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
893     # the originals.
894     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
895                             for sigcode in CAUGHT_SIGNALS}
896
897     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
898         logger.warning("\n".join([
899             "arv-put: Resuming previous upload from last checkpoint.",
900             "         Use the --no-resume option to start over."]))
901
902     if not args.dry_run:
903         writer.report_progress()
904     output = None
905     try:
906         writer.start(save_collection=not(args.stream or args.raw))
907     except arvados.errors.ApiError as error:
908         logger.error("\n".join([
909             "arv-put: %s" % str(error)]))
910         sys.exit(1)
911     except ArvPutUploadIsPending:
912         # Dry run check successful, return proper exit code.
913         sys.exit(2)
914     except ArvPutUploadNotPending:
915         # No files pending for upload
916         sys.exit(0)
917
918     if args.progress:  # Print newline to split stderr from stdout for humans.
919         logger.info("\n")
920
921     if args.stream:
922         if args.normalize:
923             output = writer.manifest_text(normalize=True)
924         else:
925             output = writer.manifest_text()
926     elif args.raw:
927         output = ','.join(writer.data_locators())
928     else:
929         try:
930             if args.update_collection:
931                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
932             else:
933                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
934             if args.portable_data_hash:
935                 output = writer.portable_data_hash()
936             else:
937                 output = writer.manifest_locator()
938         except apiclient_errors.Error as error:
939             logger.error(
940                 "arv-put: Error creating Collection on project: {}.".format(
941                     error))
942             status = 1
943
944     # Print the locator (uuid) of the new collection.
945     if output is None:
946         status = status or 1
947     else:
948         stdout.write(output)
949         if not output.endswith('\n'):
950             stdout.write('\n')
951
952     for sigcode, orig_handler in orig_signal_handlers.items():
953         signal.signal(sigcode, orig_handler)
954
955     if status != 0:
956         sys.exit(status)
957
958     # Success!
959     return output
960
961
962 if __name__ == '__main__':
963     main()