11308: Merge branch 'master' into 11308-python3
[arvados.git] / sdk / python / arvados / commands / put.py
1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import copy
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import logging
16 import os
17 import pwd
18 import re
19 import signal
20 import socket
21 import sys
22 import tempfile
23 import threading
24 import time
25 import traceback
26
27 from apiclient import errors as apiclient_errors
28 from arvados._version import __version__
29
30 import arvados.commands._util as arv_cmd
31
32 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
33 api_client = None
34
35 upload_opts = argparse.ArgumentParser(add_help=False)
36
37 upload_opts.add_argument('--version', action='version',
38                          version="%s %s" % (sys.argv[0], __version__),
39                          help='Print version and exit.')
40 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
41                          help="""
42 Local file or directory. Default: read from standard input.
43 """)
44
45 _group = upload_opts.add_mutually_exclusive_group()
46
47 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
48                     default=-1, help=argparse.SUPPRESS)
49
50 _group.add_argument('--normalize', action='store_true',
51                     help="""
52 Normalize the manifest by re-ordering files and streams after writing
53 data.
54 """)
55
56 _group.add_argument('--dry-run', action='store_true', default=False,
57                     help="""
58 Don't actually upload files, but only check if any file should be
59 uploaded. Exit with code=2 when files are pending for upload.
60 """)
61
62 _group = upload_opts.add_mutually_exclusive_group()
63
64 _group.add_argument('--as-stream', action='store_true', dest='stream',
65                     help="""
66 Synonym for --stream.
67 """)
68
69 _group.add_argument('--stream', action='store_true',
70                     help="""
71 Store the file content and display the resulting manifest on
72 stdout. Do not write the manifest to Keep or save a Collection object
73 in Arvados.
74 """)
75
76 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
77                     help="""
78 Synonym for --manifest.
79 """)
80
81 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
82                     help="""
83 Synonym for --manifest.
84 """)
85
86 _group.add_argument('--manifest', action='store_true',
87                     help="""
88 Store the file data and resulting manifest in Keep, save a Collection
89 object in Arvados, and display the manifest locator (Collection uuid)
90 on stdout. This is the default behavior.
91 """)
92
93 _group.add_argument('--as-raw', action='store_true', dest='raw',
94                     help="""
95 Synonym for --raw.
96 """)
97
98 _group.add_argument('--raw', action='store_true',
99                     help="""
100 Store the file content and display the data block locators on stdout,
101 separated by commas, with a trailing newline. Do not store a
102 manifest.
103 """)
104
105 upload_opts.add_argument('--update-collection', type=str, default=None,
106                          dest='update_collection', metavar="UUID", help="""
107 Update an existing collection identified by the given Arvados collection
108 UUID. All new local files will be uploaded.
109 """)
110
111 upload_opts.add_argument('--use-filename', type=str, default=None,
112                          dest='filename', help="""
113 Synonym for --filename.
114 """)
115
116 upload_opts.add_argument('--filename', type=str, default=None,
117                          help="""
118 Use the given filename in the manifest, instead of the name of the
119 local file. This is useful when "-" or "/dev/stdin" is given as an
120 input file. It can be used only if there is exactly one path given and
121 it is not a directory. Implies --manifest.
122 """)
123
124 upload_opts.add_argument('--portable-data-hash', action='store_true',
125                          help="""
126 Print the portable data hash instead of the Arvados UUID for the collection
127 created by the upload.
128 """)
129
130 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
131                          help="""
132 Set the replication level for the new collection: how many different
133 physical storage devices (e.g., disks) should have a copy of each data
134 block. Default is to use the server-provided default (if any) or 2.
135 """)
136
137 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
138                          help="""
139 Set the number of upload threads to be used. Take into account that
140 using lots of threads will increase the RAM requirements. Default is
141 to use 2 threads.
142 On high latency installations, using a greater number will improve
143 overall throughput.
144 """)
145
146 run_opts = argparse.ArgumentParser(add_help=False)
147
148 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
149 Store the collection in the specified project, instead of your Home
150 project.
151 """)
152
153 run_opts.add_argument('--name', help="""
154 Save the collection with the specified name.
155 """)
156
157 _group = run_opts.add_mutually_exclusive_group()
158 _group.add_argument('--progress', action='store_true',
159                     help="""
160 Display human-readable progress on stderr (bytes and, if possible,
161 percentage of total data size). This is the default behavior when
162 stderr is a tty.
163 """)
164
165 _group.add_argument('--no-progress', action='store_true',
166                     help="""
167 Do not display human-readable progress on stderr, even if stderr is a
168 tty.
169 """)
170
171 _group.add_argument('--batch-progress', action='store_true',
172                     help="""
173 Display machine-readable progress on stderr (bytes and, if known,
174 total data size).
175 """)
176
177 _group = run_opts.add_mutually_exclusive_group()
178 _group.add_argument('--resume', action='store_true', default=True,
179                     help="""
180 Continue interrupted uploads from cached state (default).
181 """)
182 _group.add_argument('--no-resume', action='store_false', dest='resume',
183                     help="""
184 Do not continue interrupted uploads from cached state.
185 """)
186
187 _group = run_opts.add_mutually_exclusive_group()
188 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
189                     help="""
190 Save upload state in a cache file for resuming (default).
191 """)
192 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
193                     help="""
194 Do not save upload state in a cache file for resuming.
195 """)
196
197 arg_parser = argparse.ArgumentParser(
198     description='Copy data from the local filesystem to Keep.',
199     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
200
201 def parse_arguments(arguments):
202     args = arg_parser.parse_args(arguments)
203
204     if len(args.paths) == 0:
205         args.paths = ['-']
206
207     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
208
209     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
210         if args.filename:
211             arg_parser.error("""
212     --filename argument cannot be used when storing a directory or
213     multiple files.
214     """)
215
216     # Turn on --progress by default if stderr is a tty.
217     if (not (args.batch_progress or args.no_progress)
218         and os.isatty(sys.stderr.fileno())):
219         args.progress = True
220
221     # Turn off --resume (default) if --no-cache is used.
222     if not args.use_cache:
223         args.resume = False
224
225     if args.paths == ['-']:
226         if args.update_collection:
227             arg_parser.error("""
228     --update-collection cannot be used when reading from stdin.
229     """)
230         args.resume = False
231         args.use_cache = False
232         if not args.filename:
233             args.filename = 'stdin'
234
235     return args
236
237
238 class CollectionUpdateError(Exception):
239     pass
240
241
242 class ResumeCacheConflict(Exception):
243     pass
244
245
246 class ArvPutArgumentConflict(Exception):
247     pass
248
249
250 class ArvPutUploadIsPending(Exception):
251     pass
252
253
254 class ArvPutUploadNotPending(Exception):
255     pass
256
257
258 class FileUploadList(list):
259     def __init__(self, dry_run=False):
260         list.__init__(self)
261         self.dry_run = dry_run
262
263     def append(self, other):
264         if self.dry_run:
265             raise ArvPutUploadIsPending()
266         super(FileUploadList, self).append(other)
267
268
269 class ResumeCache(object):
270     CACHE_DIR = '.cache/arvados/arv-put'
271
272     def __init__(self, file_spec):
273         self.cache_file = open(file_spec, 'a+')
274         self._lock_file(self.cache_file)
275         self.filename = self.cache_file.name
276
277     @classmethod
278     def make_path(cls, args):
279         md5 = hashlib.md5()
280         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
281         realpaths = sorted(os.path.realpath(path) for path in args.paths)
282         md5.update(b'\0'.join([p.encode() for p in realpaths]))
283         if any(os.path.isdir(path) for path in realpaths):
284             md5.update(b'-1')
285         elif args.filename:
286             md5.update(args.filename.encode())
287         return os.path.join(
288             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
289             md5.hexdigest())
290
291     def _lock_file(self, fileobj):
292         try:
293             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
294         except IOError:
295             raise ResumeCacheConflict("{} locked".format(fileobj.name))
296
297     def load(self):
298         self.cache_file.seek(0)
299         return json.load(self.cache_file)
300
301     def check_cache(self, api_client=None, num_retries=0):
302         try:
303             state = self.load()
304             locator = None
305             try:
306                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
307                     locator = state["_finished_streams"][0][1][0]
308                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
309                     locator = state["_current_stream_locators"][0]
310                 if locator is not None:
311                     kc = arvados.keep.KeepClient(api_client=api_client)
312                     kc.head(locator, num_retries=num_retries)
313             except Exception as e:
314                 self.restart()
315         except (ValueError):
316             pass
317
318     def save(self, data):
319         try:
320             new_cache_fd, new_cache_name = tempfile.mkstemp(
321                 dir=os.path.dirname(self.filename))
322             self._lock_file(new_cache_fd)
323             new_cache = os.fdopen(new_cache_fd, 'r+')
324             json.dump(data, new_cache)
325             os.rename(new_cache_name, self.filename)
326         except (IOError, OSError, ResumeCacheConflict) as error:
327             try:
328                 os.unlink(new_cache_name)
329             except NameError:  # mkstemp failed.
330                 pass
331         else:
332             self.cache_file.close()
333             self.cache_file = new_cache
334
335     def close(self):
336         self.cache_file.close()
337
338     def destroy(self):
339         try:
340             os.unlink(self.filename)
341         except OSError as error:
342             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
343                 raise
344         self.close()
345
346     def restart(self):
347         self.destroy()
348         self.__init__(self.filename)
349
350
351 class ArvPutUploadJob(object):
352     CACHE_DIR = '.cache/arvados/arv-put'
353     EMPTY_STATE = {
354         'manifest' : None, # Last saved manifest checkpoint
355         'files' : {} # Previous run file list: {path : {size, mtime}}
356     }
357
358     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
359                  bytes_expected=None, name=None, owner_uuid=None,
360                  ensure_unique_name=False, num_retries=None,
361                  put_threads=None, replication_desired=None,
362                  filename=None, update_time=60.0, update_collection=None,
363                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
364         self.paths = paths
365         self.resume = resume
366         self.use_cache = use_cache
367         self.update = False
368         self.reporter = reporter
369         self.bytes_expected = bytes_expected
370         self.bytes_written = 0
371         self.bytes_skipped = 0
372         self.name = name
373         self.owner_uuid = owner_uuid
374         self.ensure_unique_name = ensure_unique_name
375         self.num_retries = num_retries
376         self.replication_desired = replication_desired
377         self.put_threads = put_threads
378         self.filename = filename
379         self._state_lock = threading.Lock()
380         self._state = None # Previous run state (file list & manifest)
381         self._current_files = [] # Current run file list
382         self._cache_file = None
383         self._collection_lock = threading.Lock()
384         self._remote_collection = None # Collection being updated (if asked)
385         self._local_collection = None # Collection from previous run manifest
386         self._file_paths = set() # Files to be updated in remote collection
387         self._stop_checkpointer = threading.Event()
388         self._checkpointer = threading.Thread(target=self._update_task)
389         self._checkpointer.daemon = True
390         self._update_task_time = update_time  # How many seconds wait between update runs
391         self._files_to_upload = FileUploadList(dry_run=dry_run)
392         self._upload_started = False
393         self.logger = logger
394         self.dry_run = dry_run
395         self._checkpoint_before_quit = True
396
397         if not self.use_cache and self.resume:
398             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
399
400         # Check for obvious dry-run responses
401         if self.dry_run and (not self.use_cache or not self.resume):
402             raise ArvPutUploadIsPending()
403
404         # Load cached data if any and if needed
405         self._setup_state(update_collection)
406
407     def start(self, save_collection):
408         """
409         Start supporting thread & file uploading
410         """
411         if not self.dry_run:
412             self._checkpointer.start()
413         try:
414             for path in self.paths:
415                 # Test for stdin first, in case some file named '-' exist
416                 if path == '-':
417                     if self.dry_run:
418                         raise ArvPutUploadIsPending()
419                     self._write_stdin(self.filename or 'stdin')
420                 elif os.path.isdir(path):
421                     # Use absolute paths on cache index so CWD doesn't interfere
422                     # with the caching logic.
423                     prefixdir = path = os.path.abspath(path)
424                     if prefixdir != '/':
425                         prefixdir += '/'
426                     for root, dirs, files in os.walk(path):
427                         # Make os.walk()'s dir traversing order deterministic
428                         dirs.sort()
429                         files.sort()
430                         for f in files:
431                             self._check_file(os.path.join(root, f),
432                                              os.path.join(root[len(prefixdir):], f))
433                 else:
434                     self._check_file(os.path.abspath(path),
435                                      self.filename or os.path.basename(path))
436             # If dry-mode is on, and got up to this point, then we should notify that
437             # there aren't any file to upload.
438             if self.dry_run:
439                 raise ArvPutUploadNotPending()
440             # Remove local_collection's files that don't exist locally anymore, so the
441             # bytes_written count is correct.
442             for f in self.collection_file_paths(self._local_collection,
443                                                 path_prefix=""):
444                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
445                     self._local_collection.remove(f)
446             # Update bytes_written from current local collection and
447             # report initial progress.
448             self._update()
449             # Actual file upload
450             self._upload_started = True # Used by the update thread to start checkpointing
451             self._upload_files()
452         except (SystemExit, Exception) as e:
453             self._checkpoint_before_quit = False
454             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
455             # Note: We're expecting SystemExit instead of KeyboardInterrupt because
456             #   we have a custom signal handler in place that raises SystemExit with
457             #   the catched signal's code.
458             if not isinstance(e, SystemExit) or e.code != -2:
459                 self.logger.warning("Abnormal termination:\n{}".format(
460                     traceback.format_exc()))
461             raise
462         finally:
463             if not self.dry_run:
464                 # Stop the thread before doing anything else
465                 self._stop_checkpointer.set()
466                 self._checkpointer.join()
467                 if self._checkpoint_before_quit:
468                     # Commit all pending blocks & one last _update()
469                     self._local_collection.manifest_text()
470                     self._update(final=True)
471                     if save_collection:
472                         self.save_collection()
473             if self.use_cache:
474                 self._cache_file.close()
475
476     def save_collection(self):
477         if self.update:
478             # Check if files should be updated on the remote collection.
479             for fp in self._file_paths:
480                 remote_file = self._remote_collection.find(fp)
481                 if not remote_file:
482                     # File don't exist on remote collection, copy it.
483                     self._remote_collection.copy(fp, fp, self._local_collection)
484                 elif remote_file != self._local_collection.find(fp):
485                     # A different file exist on remote collection, overwrite it.
486                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
487                 else:
488                     # The file already exist on remote collection, skip it.
489                     pass
490             self._remote_collection.save(num_retries=self.num_retries)
491         else:
492             self._local_collection.save_new(
493                 name=self.name, owner_uuid=self.owner_uuid,
494                 ensure_unique_name=self.ensure_unique_name,
495                 num_retries=self.num_retries)
496
497     def destroy_cache(self):
498         if self.use_cache:
499             try:
500                 os.unlink(self._cache_filename)
501             except OSError as error:
502                 # That's what we wanted anyway.
503                 if error.errno != errno.ENOENT:
504                     raise
505             self._cache_file.close()
506
507     def _collection_size(self, collection):
508         """
509         Recursively get the total size of the collection
510         """
511         size = 0
512         for item in listvalues(collection):
513             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
514                 size += self._collection_size(item)
515             else:
516                 size += item.size()
517         return size
518
519     def _update_task(self):
520         """
521         Periodically called support task. File uploading is
522         asynchronous so we poll status from the collection.
523         """
524         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
525             self._update()
526
527     def _update(self, final=False):
528         """
529         Update cached manifest text and report progress.
530         """
531         if self._upload_started:
532             with self._collection_lock:
533                 self.bytes_written = self._collection_size(self._local_collection)
534                 if self.use_cache:
535                     if final:
536                         manifest = self._local_collection.manifest_text()
537                     else:
538                         # Get the manifest text without comitting pending blocks
539                         manifest = self._local_collection.manifest_text(strip=False,
540                                                                         normalize=False,
541                                                                         only_committed=True)
542                     # Update cache
543                     with self._state_lock:
544                         self._state['manifest'] = manifest
545             if self.use_cache:
546                 try:
547                     self._save_state()
548                 except Exception as e:
549                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
550         else:
551             self.bytes_written = self.bytes_skipped
552         # Call the reporter, if any
553         self.report_progress()
554
555     def report_progress(self):
556         if self.reporter is not None:
557             self.reporter(self.bytes_written, self.bytes_expected)
558
559     def _write_stdin(self, filename):
560         output = self._local_collection.open(filename, 'wb')
561         self._write(sys.stdin, output)
562         output.close()
563
564     def _check_file(self, source, filename):
565         """Check if this file needs to be uploaded"""
566         resume_offset = 0
567         should_upload = False
568         new_file_in_cache = False
569         # Record file path for updating the remote collection before exiting
570         self._file_paths.add(filename)
571
572         with self._state_lock:
573             # If no previous cached data on this file, store it for an eventual
574             # repeated run.
575             if source not in self._state['files']:
576                 self._state['files'][source] = {
577                     'mtime': os.path.getmtime(source),
578                     'size' : os.path.getsize(source)
579                 }
580                 new_file_in_cache = True
581             cached_file_data = self._state['files'][source]
582
583         # Check if file was already uploaded (at least partially)
584         file_in_local_collection = self._local_collection.find(filename)
585
586         # If not resuming, upload the full file.
587         if not self.resume:
588             should_upload = True
589         # New file detected from last run, upload it.
590         elif new_file_in_cache:
591             should_upload = True
592         # Local file didn't change from last run.
593         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
594             if not file_in_local_collection:
595                 # File not uploaded yet, upload it completely
596                 should_upload = True
597             elif file_in_local_collection.permission_expired():
598                 # Permission token expired, re-upload file. This will change whenever
599                 # we have a API for refreshing tokens.
600                 should_upload = True
601                 self._local_collection.remove(filename)
602             elif cached_file_data['size'] == file_in_local_collection.size():
603                 # File already there, skip it.
604                 self.bytes_skipped += cached_file_data['size']
605             elif cached_file_data['size'] > file_in_local_collection.size():
606                 # File partially uploaded, resume!
607                 resume_offset = file_in_local_collection.size()
608                 self.bytes_skipped += resume_offset
609                 should_upload = True
610             else:
611                 # Inconsistent cache, re-upload the file
612                 should_upload = True
613                 self._local_collection.remove(filename)
614                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
615         # Local file differs from cached data, re-upload it.
616         else:
617             if file_in_local_collection:
618                 self._local_collection.remove(filename)
619             should_upload = True
620
621         if should_upload:
622             self._files_to_upload.append((source, resume_offset, filename))
623
624     def _upload_files(self):
625         for source, resume_offset, filename in self._files_to_upload:
626             with open(source, 'rb') as source_fd:
627                 with self._state_lock:
628                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
629                     self._state['files'][source]['size'] = os.path.getsize(source)
630                 if resume_offset > 0:
631                     # Start upload where we left off
632                     output = self._local_collection.open(filename, 'ab')
633                     source_fd.seek(resume_offset)
634                 else:
635                     # Start from scratch
636                     output = self._local_collection.open(filename, 'wb')
637                 self._write(source_fd, output)
638                 output.close(flush=False)
639
640     def _write(self, source_fd, output):
641         while True:
642             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
643             if not data:
644                 break
645             output.write(data)
646
647     def _my_collection(self):
648         return self._remote_collection if self.update else self._local_collection
649
650     def _setup_state(self, update_collection):
651         """
652         Create a new cache file or load a previously existing one.
653         """
654         # Load an already existing collection for update
655         if update_collection and re.match(arvados.util.collection_uuid_pattern,
656                                           update_collection):
657             try:
658                 self._remote_collection = arvados.collection.Collection(update_collection)
659             except arvados.errors.ApiError as error:
660                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
661             else:
662                 self.update = True
663         elif update_collection:
664             # Collection locator provided, but unknown format
665             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
666
667         if self.use_cache:
668             # Set up cache file name from input paths.
669             md5 = hashlib.md5()
670             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
671             realpaths = sorted(os.path.realpath(path) for path in self.paths)
672             md5.update(b'\0'.join([p.encode() for p in realpaths]))
673             if self.filename:
674                 md5.update(self.filename.encode())
675             cache_filename = md5.hexdigest()
676             cache_filepath = os.path.join(
677                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
678                 cache_filename)
679             if self.resume and os.path.exists(cache_filepath):
680                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
681                 self._cache_file = open(cache_filepath, 'a+')
682             else:
683                 # --no-resume means start with a empty cache file.
684                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
685                 self._cache_file = open(cache_filepath, 'w+')
686             self._cache_filename = self._cache_file.name
687             self._lock_file(self._cache_file)
688             self._cache_file.seek(0)
689
690         with self._state_lock:
691             if self.use_cache:
692                 try:
693                     self._state = json.load(self._cache_file)
694                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
695                         # Cache at least partially incomplete, set up new cache
696                         self._state = copy.deepcopy(self.EMPTY_STATE)
697                 except ValueError:
698                     # Cache file empty, set up new cache
699                     self._state = copy.deepcopy(self.EMPTY_STATE)
700             else:
701                 self.logger.info("No cache usage requested for this run.")
702                 # No cache file, set empty state
703                 self._state = copy.deepcopy(self.EMPTY_STATE)
704             # Load the previous manifest so we can check if files were modified remotely.
705             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
706
707     def collection_file_paths(self, col, path_prefix='.'):
708         """Return a list of file paths by recursively go through the entire collection `col`"""
709         file_paths = []
710         for name, item in listitems(col):
711             if isinstance(item, arvados.arvfile.ArvadosFile):
712                 file_paths.append(os.path.join(path_prefix, name))
713             elif isinstance(item, arvados.collection.Subcollection):
714                 new_prefix = os.path.join(path_prefix, name)
715                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
716         return file_paths
717
718     def _lock_file(self, fileobj):
719         try:
720             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
721         except IOError:
722             raise ResumeCacheConflict("{} locked".format(fileobj.name))
723
724     def _save_state(self):
725         """
726         Atomically save current state into cache.
727         """
728         with self._state_lock:
729             # We're not using copy.deepcopy() here because it's a lot slower
730             # than json.dumps(), and we're already needing JSON format to be
731             # saved on disk.
732             state = json.dumps(self._state)
733         try:
734             new_cache = tempfile.NamedTemporaryFile(
735                 dir=os.path.dirname(self._cache_filename), delete=False)
736             self._lock_file(new_cache)
737             new_cache.write(state)
738             new_cache.flush()
739             os.fsync(new_cache)
740             os.rename(new_cache.name, self._cache_filename)
741         except (IOError, OSError, ResumeCacheConflict) as error:
742             self.logger.error("There was a problem while saving the cache file: {}".format(error))
743             try:
744                 os.unlink(new_cache_name)
745             except NameError:  # mkstemp failed.
746                 pass
747         else:
748             self._cache_file.close()
749             self._cache_file = new_cache
750
751     def collection_name(self):
752         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
753
754     def manifest_locator(self):
755         return self._my_collection().manifest_locator()
756
757     def portable_data_hash(self):
758         pdh = self._my_collection().portable_data_hash()
759         m = self._my_collection().stripped_manifest().encode()
760         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
761         if pdh != local_pdh:
762             logger.warning("\n".join([
763                 "arv-put: API server provided PDH differs from local manifest.",
764                 "         This should not happen; showing API server version."]))
765         return pdh
766
767     def manifest_text(self, stream_name=".", strip=False, normalize=False):
768         return self._my_collection().manifest_text(stream_name, strip, normalize)
769
770     def _datablocks_on_item(self, item):
771         """
772         Return a list of datablock locators, recursively navigating
773         through subcollections
774         """
775         if isinstance(item, arvados.arvfile.ArvadosFile):
776             if item.size() == 0:
777                 # Empty file locator
778                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
779             else:
780                 locators = []
781                 for segment in item.segments():
782                     loc = segment.locator
783                     locators.append(loc)
784                 return locators
785         elif isinstance(item, arvados.collection.Collection):
786             l = [self._datablocks_on_item(x) for x in listvalues(item)]
787             # Fast list flattener method taken from:
788             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
789             return [loc for sublist in l for loc in sublist]
790         else:
791             return None
792
793     def data_locators(self):
794         with self._collection_lock:
795             # Make sure all datablocks are flushed before getting the locators
796             self._my_collection().manifest_text()
797             datablocks = self._datablocks_on_item(self._my_collection())
798         return datablocks
799
800
801 def expected_bytes_for(pathlist):
802     # Walk the given directory trees and stat files, adding up file sizes,
803     # so we can display progress as percent
804     bytesum = 0
805     for path in pathlist:
806         if os.path.isdir(path):
807             for filename in arvados.util.listdir_recursive(path):
808                 bytesum += os.path.getsize(os.path.join(path, filename))
809         elif not os.path.isfile(path):
810             return None
811         else:
812             bytesum += os.path.getsize(path)
813     return bytesum
814
815 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
816                                                             os.getpid())
817 def machine_progress(bytes_written, bytes_expected):
818     return _machine_format.format(
819         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
820
821 def human_progress(bytes_written, bytes_expected):
822     if bytes_expected:
823         return "\r{}M / {}M {:.1%} ".format(
824             bytes_written >> 20, bytes_expected >> 20,
825             float(bytes_written) / bytes_expected)
826     else:
827         return "\r{} ".format(bytes_written)
828
829 def progress_writer(progress_func, outfile=sys.stderr):
830     def write_progress(bytes_written, bytes_expected):
831         outfile.write(progress_func(bytes_written, bytes_expected))
832     return write_progress
833
834 def exit_signal_handler(sigcode, frame):
835     sys.exit(-sigcode)
836
837 def desired_project_uuid(api_client, project_uuid, num_retries):
838     if not project_uuid:
839         query = api_client.users().current()
840     elif arvados.util.user_uuid_pattern.match(project_uuid):
841         query = api_client.users().get(uuid=project_uuid)
842     elif arvados.util.group_uuid_pattern.match(project_uuid):
843         query = api_client.groups().get(uuid=project_uuid)
844     else:
845         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
846     return query.execute(num_retries=num_retries)['uuid']
847
848 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
849     global api_client
850
851     logger = logging.getLogger('arvados.arv_put')
852     logger.setLevel(logging.INFO)
853     args = parse_arguments(arguments)
854     status = 0
855     if api_client is None:
856         api_client = arvados.api('v1')
857
858     # Determine the name to use
859     if args.name:
860         if args.stream or args.raw:
861             logger.error("Cannot use --name with --stream or --raw")
862             sys.exit(1)
863         elif args.update_collection:
864             logger.error("Cannot use --name with --update-collection")
865             sys.exit(1)
866         collection_name = args.name
867     else:
868         collection_name = "Saved at {} by {}@{}".format(
869             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
870             pwd.getpwuid(os.getuid()).pw_name,
871             socket.gethostname())
872
873     if args.project_uuid and (args.stream or args.raw):
874         logger.error("Cannot use --project-uuid with --stream or --raw")
875         sys.exit(1)
876
877     # Determine the parent project
878     try:
879         project_uuid = desired_project_uuid(api_client, args.project_uuid,
880                                             args.retries)
881     except (apiclient_errors.Error, ValueError) as error:
882         logger.error(error)
883         sys.exit(1)
884
885     if args.progress:
886         reporter = progress_writer(human_progress)
887     elif args.batch_progress:
888         reporter = progress_writer(machine_progress)
889     else:
890         reporter = None
891
892     # If this is used by a human, and there's at least one directory to be
893     # uploaded, the expected bytes calculation can take a moment.
894     if args.progress and any([os.path.isdir(f) for f in args.paths]):
895         logger.info("Calculating upload size, this could take some time...")
896     bytes_expected = expected_bytes_for(args.paths)
897
898     try:
899         writer = ArvPutUploadJob(paths = args.paths,
900                                  resume = args.resume,
901                                  use_cache = args.use_cache,
902                                  filename = args.filename,
903                                  reporter = reporter,
904                                  bytes_expected = bytes_expected,
905                                  num_retries = args.retries,
906                                  replication_desired = args.replication,
907                                  put_threads = args.threads,
908                                  name = collection_name,
909                                  owner_uuid = project_uuid,
910                                  ensure_unique_name = True,
911                                  update_collection = args.update_collection,
912                                  logger=logger,
913                                  dry_run=args.dry_run)
914     except ResumeCacheConflict:
915         logger.error("\n".join([
916             "arv-put: Another process is already uploading this data.",
917             "         Use --no-cache if this is really what you want."]))
918         sys.exit(1)
919     except CollectionUpdateError as error:
920         logger.error("\n".join([
921             "arv-put: %s" % str(error)]))
922         sys.exit(1)
923     except ArvPutUploadIsPending:
924         # Dry run check successful, return proper exit code.
925         sys.exit(2)
926     except ArvPutUploadNotPending:
927         # No files pending for upload
928         sys.exit(0)
929
930     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
931     # the originals.
932     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
933                             for sigcode in CAUGHT_SIGNALS}
934
935     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
936         logger.warning("\n".join([
937             "arv-put: Resuming previous upload from last checkpoint.",
938             "         Use the --no-resume option to start over."]))
939
940     if not args.dry_run:
941         writer.report_progress()
942     output = None
943     try:
944         writer.start(save_collection=not(args.stream or args.raw))
945     except arvados.errors.ApiError as error:
946         logger.error("\n".join([
947             "arv-put: %s" % str(error)]))
948         sys.exit(1)
949     except ArvPutUploadIsPending:
950         # Dry run check successful, return proper exit code.
951         sys.exit(2)
952     except ArvPutUploadNotPending:
953         # No files pending for upload
954         sys.exit(0)
955
956     if args.progress:  # Print newline to split stderr from stdout for humans.
957         logger.info("\n")
958
959     if args.stream:
960         if args.normalize:
961             output = writer.manifest_text(normalize=True)
962         else:
963             output = writer.manifest_text()
964     elif args.raw:
965         output = ','.join(writer.data_locators())
966     else:
967         try:
968             if args.update_collection:
969                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
970             else:
971                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
972             if args.portable_data_hash:
973                 output = writer.portable_data_hash()
974             else:
975                 output = writer.manifest_locator()
976         except apiclient_errors.Error as error:
977             logger.error(
978                 "arv-put: Error creating Collection on project: {}.".format(
979                     error))
980             status = 1
981
982     # Print the locator (uuid) of the new collection.
983     if output is None:
984         status = status or 1
985     else:
986         stdout.write(output)
987         if not output.endswith('\n'):
988             stdout.write('\n')
989
990     for sigcode, orig_handler in listitems(orig_signal_handlers):
991         signal.signal(sigcode, orig_handler)
992
993     if status != 0:
994         sys.exit(status)
995
996     # Success!
997     return output
998
999
1000 if __name__ == '__main__':
1001     main()