11002: Merge branch 'master' into 11002-arvput-crash-fix
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 import traceback
27
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
30
31 import arvados.commands._util as arv_cmd
32
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
34 api_client = None
35
36 upload_opts = argparse.ArgumentParser(add_help=False)
37
38 upload_opts.add_argument('--version', action='version',
39                          version="%s %s" % (sys.argv[0], __version__),
40                          help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
42                          help="""
43 Local file or directory. Default: read from standard input.
44 """)
45
46 _group = upload_opts.add_mutually_exclusive_group()
47
48 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
49                     default=-1, help=argparse.SUPPRESS)
50
51 _group.add_argument('--normalize', action='store_true',
52                     help="""
53 Normalize the manifest by re-ordering files and streams after writing
54 data.
55 """)
56
57 _group.add_argument('--dry-run', action='store_true', default=False,
58                     help="""
59 Don't actually upload files, but only check if any file should be
60 uploaded. Exit with code=2 when files are pending for upload.
61 """)
62
63 _group = upload_opts.add_mutually_exclusive_group()
64
65 _group.add_argument('--as-stream', action='store_true', dest='stream',
66                     help="""
67 Synonym for --stream.
68 """)
69
70 _group.add_argument('--stream', action='store_true',
71                     help="""
72 Store the file content and display the resulting manifest on
73 stdout. Do not write the manifest to Keep or save a Collection object
74 in Arvados.
75 """)
76
77 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
78                     help="""
79 Synonym for --manifest.
80 """)
81
82 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
83                     help="""
84 Synonym for --manifest.
85 """)
86
87 _group.add_argument('--manifest', action='store_true',
88                     help="""
89 Store the file data and resulting manifest in Keep, save a Collection
90 object in Arvados, and display the manifest locator (Collection uuid)
91 on stdout. This is the default behavior.
92 """)
93
94 _group.add_argument('--as-raw', action='store_true', dest='raw',
95                     help="""
96 Synonym for --raw.
97 """)
98
99 _group.add_argument('--raw', action='store_true',
100                     help="""
101 Store the file content and display the data block locators on stdout,
102 separated by commas, with a trailing newline. Do not store a
103 manifest.
104 """)
105
106 upload_opts.add_argument('--update-collection', type=str, default=None,
107                          dest='update_collection', metavar="UUID", help="""
108 Update an existing collection identified by the given Arvados collection
109 UUID. All new local files will be uploaded.
110 """)
111
112 upload_opts.add_argument('--use-filename', type=str, default=None,
113                          dest='filename', help="""
114 Synonym for --filename.
115 """)
116
117 upload_opts.add_argument('--filename', type=str, default=None,
118                          help="""
119 Use the given filename in the manifest, instead of the name of the
120 local file. This is useful when "-" or "/dev/stdin" is given as an
121 input file. It can be used only if there is exactly one path given and
122 it is not a directory. Implies --manifest.
123 """)
124
125 upload_opts.add_argument('--portable-data-hash', action='store_true',
126                          help="""
127 Print the portable data hash instead of the Arvados UUID for the collection
128 created by the upload.
129 """)
130
131 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
132                          help="""
133 Set the replication level for the new collection: how many different
134 physical storage devices (e.g., disks) should have a copy of each data
135 block. Default is to use the server-provided default (if any) or 2.
136 """)
137
138 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
139                          help="""
140 Set the number of upload threads to be used. Take into account that
141 using lots of threads will increase the RAM requirements. Default is
142 to use 2 threads.
143 On high latency installations, using a greater number will improve
144 overall throughput.
145 """)
146
147 run_opts = argparse.ArgumentParser(add_help=False)
148
149 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
150 Store the collection in the specified project, instead of your Home
151 project.
152 """)
153
154 run_opts.add_argument('--name', help="""
155 Save the collection with the specified name.
156 """)
157
158 _group = run_opts.add_mutually_exclusive_group()
159 _group.add_argument('--progress', action='store_true',
160                     help="""
161 Display human-readable progress on stderr (bytes and, if possible,
162 percentage of total data size). This is the default behavior when
163 stderr is a tty.
164 """)
165
166 _group.add_argument('--no-progress', action='store_true',
167                     help="""
168 Do not display human-readable progress on stderr, even if stderr is a
169 tty.
170 """)
171
172 _group.add_argument('--batch-progress', action='store_true',
173                     help="""
174 Display machine-readable progress on stderr (bytes and, if known,
175 total data size).
176 """)
177
178 _group = run_opts.add_mutually_exclusive_group()
179 _group.add_argument('--resume', action='store_true', default=True,
180                     help="""
181 Continue interrupted uploads from cached state (default).
182 """)
183 _group.add_argument('--no-resume', action='store_false', dest='resume',
184                     help="""
185 Do not continue interrupted uploads from cached state.
186 """)
187
188 _group = run_opts.add_mutually_exclusive_group()
189 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
190                     help="""
191 Save upload state in a cache file for resuming (default).
192 """)
193 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
194                     help="""
195 Do not save upload state in a cache file for resuming.
196 """)
197
198 arg_parser = argparse.ArgumentParser(
199     description='Copy data from the local filesystem to Keep.',
200     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
201
202 def parse_arguments(arguments):
203     args = arg_parser.parse_args(arguments)
204
205     if len(args.paths) == 0:
206         args.paths = ['-']
207
208     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
209
210     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
211         if args.filename:
212             arg_parser.error("""
213     --filename argument cannot be used when storing a directory or
214     multiple files.
215     """)
216
217     # Turn on --progress by default if stderr is a tty.
218     if (not (args.batch_progress or args.no_progress)
219         and os.isatty(sys.stderr.fileno())):
220         args.progress = True
221
222     # Turn off --resume (default) if --no-cache is used.
223     if not args.use_cache:
224         args.resume = False
225
226     if args.paths == ['-']:
227         if args.update_collection:
228             arg_parser.error("""
229     --update-collection cannot be used when reading from stdin.
230     """)
231         args.resume = False
232         args.use_cache = False
233         if not args.filename:
234             args.filename = 'stdin'
235
236     return args
237
238
239 class CollectionUpdateError(Exception):
240     pass
241
242
243 class ResumeCacheConflict(Exception):
244     pass
245
246
247 class ArvPutArgumentConflict(Exception):
248     pass
249
250
251 class ArvPutUploadIsPending(Exception):
252     pass
253
254
255 class ArvPutUploadNotPending(Exception):
256     pass
257
258
259 class FileUploadList(list):
260     def __init__(self, dry_run=False):
261         list.__init__(self)
262         self.dry_run = dry_run
263
264     def append(self, other):
265         if self.dry_run:
266             raise ArvPutUploadIsPending()
267         super(FileUploadList, self).append(other)
268
269
270 class ResumeCache(object):
271     CACHE_DIR = '.cache/arvados/arv-put'
272
273     def __init__(self, file_spec):
274         self.cache_file = open(file_spec, 'a+')
275         self._lock_file(self.cache_file)
276         self.filename = self.cache_file.name
277
278     @classmethod
279     def make_path(cls, args):
280         md5 = hashlib.md5()
281         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
282         realpaths = sorted(os.path.realpath(path) for path in args.paths)
283         md5.update('\0'.join(realpaths))
284         if any(os.path.isdir(path) for path in realpaths):
285             md5.update("-1")
286         elif args.filename:
287             md5.update(args.filename)
288         return os.path.join(
289             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
290             md5.hexdigest())
291
292     def _lock_file(self, fileobj):
293         try:
294             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
295         except IOError:
296             raise ResumeCacheConflict("{} locked".format(fileobj.name))
297
298     def load(self):
299         self.cache_file.seek(0)
300         return json.load(self.cache_file)
301
302     def check_cache(self, api_client=None, num_retries=0):
303         try:
304             state = self.load()
305             locator = None
306             try:
307                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
308                     locator = state["_finished_streams"][0][1][0]
309                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
310                     locator = state["_current_stream_locators"][0]
311                 if locator is not None:
312                     kc = arvados.keep.KeepClient(api_client=api_client)
313                     kc.head(locator, num_retries=num_retries)
314             except Exception as e:
315                 self.restart()
316         except (ValueError):
317             pass
318
319     def save(self, data):
320         try:
321             new_cache_fd, new_cache_name = tempfile.mkstemp(
322                 dir=os.path.dirname(self.filename))
323             self._lock_file(new_cache_fd)
324             new_cache = os.fdopen(new_cache_fd, 'r+')
325             json.dump(data, new_cache)
326             os.rename(new_cache_name, self.filename)
327         except (IOError, OSError, ResumeCacheConflict) as error:
328             try:
329                 os.unlink(new_cache_name)
330             except NameError:  # mkstemp failed.
331                 pass
332         else:
333             self.cache_file.close()
334             self.cache_file = new_cache
335
336     def close(self):
337         self.cache_file.close()
338
339     def destroy(self):
340         try:
341             os.unlink(self.filename)
342         except OSError as error:
343             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
344                 raise
345         self.close()
346
347     def restart(self):
348         self.destroy()
349         self.__init__(self.filename)
350
351
352 class ArvPutUploadJob(object):
353     CACHE_DIR = '.cache/arvados/arv-put'
354     EMPTY_STATE = {
355         'manifest' : None, # Last saved manifest checkpoint
356         'files' : {} # Previous run file list: {path : {size, mtime}}
357     }
358
359     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
360                  bytes_expected=None, name=None, owner_uuid=None,
361                  ensure_unique_name=False, num_retries=None,
362                  put_threads=None, replication_desired=None,
363                  filename=None, update_time=60.0, update_collection=None,
364                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
365         self.paths = paths
366         self.resume = resume
367         self.use_cache = use_cache
368         self.update = False
369         self.reporter = reporter
370         self.bytes_expected = bytes_expected
371         self.bytes_written = 0
372         self.bytes_skipped = 0
373         self.name = name
374         self.owner_uuid = owner_uuid
375         self.ensure_unique_name = ensure_unique_name
376         self.num_retries = num_retries
377         self.replication_desired = replication_desired
378         self.put_threads = put_threads
379         self.filename = filename
380         self._state_lock = threading.Lock()
381         self._state = None # Previous run state (file list & manifest)
382         self._current_files = [] # Current run file list
383         self._cache_file = None
384         self._collection_lock = threading.Lock()
385         self._remote_collection = None # Collection being updated (if asked)
386         self._local_collection = None # Collection from previous run manifest
387         self._file_paths = set() # Files to be updated in remote collection
388         self._stop_checkpointer = threading.Event()
389         self._checkpointer = threading.Thread(target=self._update_task)
390         self._checkpointer.daemon = True
391         self._update_task_time = update_time  # How many seconds wait between update runs
392         self._files_to_upload = FileUploadList(dry_run=dry_run)
393         self._upload_started = False
394         self.logger = logger
395         self.dry_run = dry_run
396         self._checkpoint_before_quit = True
397
398         if not self.use_cache and self.resume:
399             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
400
401         # Check for obvious dry-run responses
402         if self.dry_run and (not self.use_cache or not self.resume):
403             raise ArvPutUploadIsPending()
404
405         # Load cached data if any and if needed
406         self._setup_state(update_collection)
407
408     def start(self, save_collection):
409         """
410         Start supporting thread & file uploading
411         """
412         if not self.dry_run:
413             self._checkpointer.start()
414         try:
415             for path in self.paths:
416                 # Test for stdin first, in case some file named '-' exist
417                 if path == '-':
418                     if self.dry_run:
419                         raise ArvPutUploadIsPending()
420                     self._write_stdin(self.filename or 'stdin')
421                 elif os.path.isdir(path):
422                     # Use absolute paths on cache index so CWD doesn't interfere
423                     # with the caching logic.
424                     prefixdir = path = os.path.abspath(path)
425                     if prefixdir != '/':
426                         prefixdir += '/'
427                     for root, dirs, files in os.walk(path):
428                         # Make os.walk()'s dir traversing order deterministic
429                         dirs.sort()
430                         files.sort()
431                         for f in files:
432                             self._check_file(os.path.join(root, f),
433                                              os.path.join(root[len(prefixdir):], f))
434                 else:
435                     self._check_file(os.path.abspath(path),
436                                      self.filename or os.path.basename(path))
437             # If dry-mode is on, and got up to this point, then we should notify that
438             # there aren't any file to upload.
439             if self.dry_run:
440                 raise ArvPutUploadNotPending()
441             # Remove local_collection's files that don't exist locally anymore, so the
442             # bytes_written count is correct.
443             for f in self.collection_file_paths(self._local_collection,
444                                                 path_prefix=""):
445                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
446                     self._local_collection.remove(f)
447             # Update bytes_written from current local collection and
448             # report initial progress.
449             self._update()
450             # Actual file upload
451             self._upload_started = True # Used by the update thread to start checkpointing
452             self._upload_files()
453         except (SystemExit, Exception) as e:
454             self._checkpoint_before_quit = False
455             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
456             # Note: We're expecting SystemExit instead of KeyboardInterrupt because
457             #   we have a custom signal handler in place that raises SystemExit with
458             #   the catched signal's code.
459             if not isinstance(e, SystemExit) or e.code != -2:
460                 self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
461             raise
462         finally:
463             if not self.dry_run:
464                 # Stop the thread before doing anything else
465                 self._stop_checkpointer.set()
466                 self._checkpointer.join()
467                 if self._checkpoint_before_quit:
468                     # Commit all pending blocks & one last _update()
469                     self._local_collection.manifest_text()
470                     self._update(final=True)
471                     if save_collection:
472                         self.save_collection()
473             if self.use_cache:
474                 self._cache_file.close()
475
476     def save_collection(self):
477         if self.update:
478             # Check if files should be updated on the remote collection.
479             for fp in self._file_paths:
480                 remote_file = self._remote_collection.find(fp)
481                 if not remote_file:
482                     # File don't exist on remote collection, copy it.
483                     self._remote_collection.copy(fp, fp, self._local_collection)
484                 elif remote_file != self._local_collection.find(fp):
485                     # A different file exist on remote collection, overwrite it.
486                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
487                 else:
488                     # The file already exist on remote collection, skip it.
489                     pass
490             self._remote_collection.save(num_retries=self.num_retries)
491         else:
492             self._local_collection.save_new(
493                 name=self.name, owner_uuid=self.owner_uuid,
494                 ensure_unique_name=self.ensure_unique_name,
495                 num_retries=self.num_retries)
496
497     def destroy_cache(self):
498         if self.use_cache:
499             try:
500                 os.unlink(self._cache_filename)
501             except OSError as error:
502                 # That's what we wanted anyway.
503                 if error.errno != errno.ENOENT:
504                     raise
505             self._cache_file.close()
506
507     def _collection_size(self, collection):
508         """
509         Recursively get the total size of the collection
510         """
511         size = 0
512         for item in collection.values():
513             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
514                 size += self._collection_size(item)
515             else:
516                 size += item.size()
517         return size
518
519     def _update_task(self):
520         """
521         Periodically called support task. File uploading is
522         asynchronous so we poll status from the collection.
523         """
524         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
525             self._update()
526
527     def _update(self, final=False):
528         """
529         Update cached manifest text and report progress.
530         """
531         if self._upload_started:
532             with self._collection_lock:
533                 self.bytes_written = self._collection_size(self._local_collection)
534                 if self.use_cache:
535                     if final:
536                         manifest = self._local_collection.manifest_text()
537                     else:
538                         # Get the manifest text without comitting pending blocks
539                         manifest = self._local_collection.manifest_text(strip=False,
540                                                                         normalize=False,
541                                                                         only_committed=True)
542                     # Update cache
543                     with self._state_lock:
544                         self._state['manifest'] = manifest
545             if self.use_cache:
546                 self._save_state()
547         else:
548             self.bytes_written = self.bytes_skipped
549         # Call the reporter, if any
550         self.report_progress()
551
552     def report_progress(self):
553         if self.reporter is not None:
554             self.reporter(self.bytes_written, self.bytes_expected)
555
556     def _write_stdin(self, filename):
557         output = self._local_collection.open(filename, 'w')
558         self._write(sys.stdin, output)
559         output.close()
560
561     def _check_file(self, source, filename):
562         """Check if this file needs to be uploaded"""
563         resume_offset = 0
564         should_upload = False
565         new_file_in_cache = False
566         # Record file path for updating the remote collection before exiting
567         self._file_paths.add(filename)
568
569         with self._state_lock:
570             # If no previous cached data on this file, store it for an eventual
571             # repeated run.
572             if source not in self._state['files']:
573                 self._state['files'][source] = {
574                     'mtime': os.path.getmtime(source),
575                     'size' : os.path.getsize(source)
576                 }
577                 new_file_in_cache = True
578             cached_file_data = self._state['files'][source]
579
580         # Check if file was already uploaded (at least partially)
581         file_in_local_collection = self._local_collection.find(filename)
582
583         # If not resuming, upload the full file.
584         if not self.resume:
585             should_upload = True
586         # New file detected from last run, upload it.
587         elif new_file_in_cache:
588             should_upload = True
589         # Local file didn't change from last run.
590         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
591             if not file_in_local_collection:
592                 # File not uploaded yet, upload it completely
593                 should_upload = True
594             elif file_in_local_collection.permission_expired():
595                 # Permission token expired, re-upload file. This will change whenever
596                 # we have a API for refreshing tokens.
597                 should_upload = True
598                 self._local_collection.remove(filename)
599             elif cached_file_data['size'] == file_in_local_collection.size():
600                 # File already there, skip it.
601                 self.bytes_skipped += cached_file_data['size']
602             elif cached_file_data['size'] > file_in_local_collection.size():
603                 # File partially uploaded, resume!
604                 resume_offset = file_in_local_collection.size()
605                 self.bytes_skipped += resume_offset
606                 should_upload = True
607             else:
608                 # Inconsistent cache, re-upload the file
609                 should_upload = True
610                 self._local_collection.remove(filename)
611                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
612         # Local file differs from cached data, re-upload it.
613         else:
614             if file_in_local_collection:
615                 self._local_collection.remove(filename)
616             should_upload = True
617
618         if should_upload:
619             self._files_to_upload.append((source, resume_offset, filename))
620
621     def _upload_files(self):
622         for source, resume_offset, filename in self._files_to_upload:
623             with open(source, 'r') as source_fd:
624                 with self._state_lock:
625                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
626                     self._state['files'][source]['size'] = os.path.getsize(source)
627                 if resume_offset > 0:
628                     # Start upload where we left off
629                     output = self._local_collection.open(filename, 'a')
630                     source_fd.seek(resume_offset)
631                 else:
632                     # Start from scratch
633                     output = self._local_collection.open(filename, 'w')
634                 self._write(source_fd, output)
635                 output.close(flush=False)
636
637     def _write(self, source_fd, output):
638         while True:
639             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
640             if not data:
641                 break
642             output.write(data)
643
644     def _my_collection(self):
645         return self._remote_collection if self.update else self._local_collection
646
647     def _setup_state(self, update_collection):
648         """
649         Create a new cache file or load a previously existing one.
650         """
651         # Load an already existing collection for update
652         if update_collection and re.match(arvados.util.collection_uuid_pattern,
653                                           update_collection):
654             try:
655                 self._remote_collection = arvados.collection.Collection(update_collection)
656             except arvados.errors.ApiError as error:
657                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
658             else:
659                 self.update = True
660         elif update_collection:
661             # Collection locator provided, but unknown format
662             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
663
664         if self.use_cache:
665             # Set up cache file name from input paths.
666             md5 = hashlib.md5()
667             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
668             realpaths = sorted(os.path.realpath(path) for path in self.paths)
669             md5.update('\0'.join(realpaths))
670             if self.filename:
671                 md5.update(self.filename)
672             cache_filename = md5.hexdigest()
673             cache_filepath = os.path.join(
674                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
675                 cache_filename)
676             if self.resume:
677                 self._cache_file = open(cache_filepath, 'a+')
678             else:
679                 # --no-resume means start with a empty cache file.
680                 self._cache_file = open(cache_filepath, 'w+')
681             self._cache_filename = self._cache_file.name
682             self._lock_file(self._cache_file)
683             self._cache_file.seek(0)
684
685         with self._state_lock:
686             if self.use_cache:
687                 try:
688                     self._state = json.load(self._cache_file)
689                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
690                         # Cache at least partially incomplete, set up new cache
691                         self._state = copy.deepcopy(self.EMPTY_STATE)
692                 except ValueError:
693                     # Cache file empty, set up new cache
694                     self._state = copy.deepcopy(self.EMPTY_STATE)
695             else:
696                 # No cache file, set empty state
697                 self._state = copy.deepcopy(self.EMPTY_STATE)
698             # Load the previous manifest so we can check if files were modified remotely.
699             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
700
701     def collection_file_paths(self, col, path_prefix='.'):
702         """Return a list of file paths by recursively go through the entire collection `col`"""
703         file_paths = []
704         for name, item in col.items():
705             if isinstance(item, arvados.arvfile.ArvadosFile):
706                 file_paths.append(os.path.join(path_prefix, name))
707             elif isinstance(item, arvados.collection.Subcollection):
708                 new_prefix = os.path.join(path_prefix, name)
709                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
710         return file_paths
711
712     def _lock_file(self, fileobj):
713         try:
714             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
715         except IOError:
716             raise ResumeCacheConflict("{} locked".format(fileobj.name))
717
718     def _save_state(self):
719         """
720         Atomically save current state into cache.
721         """
722         try:
723             with self._state_lock:
724                 # We're not using copy.deepcopy() here because it's a lot slower
725                 # than json.dumps(), and we're already needing JSON format to be
726                 # saved on disk.
727                 state = json.dumps(self._state)
728             new_cache_fd, new_cache_name = tempfile.mkstemp(
729                 dir=os.path.dirname(self._cache_filename))
730             self._lock_file(new_cache_fd)
731             new_cache = os.fdopen(new_cache_fd, 'r+')
732             new_cache.write(state)
733             new_cache.flush()
734             os.fsync(new_cache)
735             os.rename(new_cache_name, self._cache_filename)
736         except (IOError, OSError, ResumeCacheConflict) as error:
737             self.logger.error("There was a problem while saving the cache file: {}".format(error))
738             try:
739                 os.unlink(new_cache_name)
740             except NameError:  # mkstemp failed.
741                 pass
742         else:
743             self._cache_file.close()
744             self._cache_file = new_cache
745
746     def collection_name(self):
747         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
748
749     def manifest_locator(self):
750         return self._my_collection().manifest_locator()
751
752     def portable_data_hash(self):
753         pdh = self._my_collection().portable_data_hash()
754         m = self._my_collection().stripped_manifest()
755         local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
756         if pdh != local_pdh:
757             logger.warning("\n".join([
758                 "arv-put: API server provided PDH differs from local manifest.",
759                 "         This should not happen; showing API server version."]))
760         return pdh
761
762     def manifest_text(self, stream_name=".", strip=False, normalize=False):
763         return self._my_collection().manifest_text(stream_name, strip, normalize)
764
765     def _datablocks_on_item(self, item):
766         """
767         Return a list of datablock locators, recursively navigating
768         through subcollections
769         """
770         if isinstance(item, arvados.arvfile.ArvadosFile):
771             if item.size() == 0:
772                 # Empty file locator
773                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
774             else:
775                 locators = []
776                 for segment in item.segments():
777                     loc = segment.locator
778                     locators.append(loc)
779                 return locators
780         elif isinstance(item, arvados.collection.Collection):
781             l = [self._datablocks_on_item(x) for x in item.values()]
782             # Fast list flattener method taken from:
783             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
784             return [loc for sublist in l for loc in sublist]
785         else:
786             return None
787
788     def data_locators(self):
789         with self._collection_lock:
790             # Make sure all datablocks are flushed before getting the locators
791             self._my_collection().manifest_text()
792             datablocks = self._datablocks_on_item(self._my_collection())
793         return datablocks
794
795
796 def expected_bytes_for(pathlist):
797     # Walk the given directory trees and stat files, adding up file sizes,
798     # so we can display progress as percent
799     bytesum = 0
800     for path in pathlist:
801         if os.path.isdir(path):
802             for filename in arvados.util.listdir_recursive(path):
803                 bytesum += os.path.getsize(os.path.join(path, filename))
804         elif not os.path.isfile(path):
805             return None
806         else:
807             bytesum += os.path.getsize(path)
808     return bytesum
809
810 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
811                                                             os.getpid())
812 def machine_progress(bytes_written, bytes_expected):
813     return _machine_format.format(
814         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
815
816 def human_progress(bytes_written, bytes_expected):
817     if bytes_expected:
818         return "\r{}M / {}M {:.1%} ".format(
819             bytes_written >> 20, bytes_expected >> 20,
820             float(bytes_written) / bytes_expected)
821     else:
822         return "\r{} ".format(bytes_written)
823
824 def progress_writer(progress_func, outfile=sys.stderr):
825     def write_progress(bytes_written, bytes_expected):
826         outfile.write(progress_func(bytes_written, bytes_expected))
827     return write_progress
828
829 def exit_signal_handler(sigcode, frame):
830     sys.exit(-sigcode)
831
832 def desired_project_uuid(api_client, project_uuid, num_retries):
833     if not project_uuid:
834         query = api_client.users().current()
835     elif arvados.util.user_uuid_pattern.match(project_uuid):
836         query = api_client.users().get(uuid=project_uuid)
837     elif arvados.util.group_uuid_pattern.match(project_uuid):
838         query = api_client.groups().get(uuid=project_uuid)
839     else:
840         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
841     return query.execute(num_retries=num_retries)['uuid']
842
843 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
844     global api_client
845
846     logger = logging.getLogger('arvados.arv_put')
847     logger.setLevel(logging.INFO)
848     args = parse_arguments(arguments)
849     status = 0
850     if api_client is None:
851         api_client = arvados.api('v1')
852
853     # Determine the name to use
854     if args.name:
855         if args.stream or args.raw:
856             logger.error("Cannot use --name with --stream or --raw")
857             sys.exit(1)
858         elif args.update_collection:
859             logger.error("Cannot use --name with --update-collection")
860             sys.exit(1)
861         collection_name = args.name
862     else:
863         collection_name = "Saved at {} by {}@{}".format(
864             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
865             pwd.getpwuid(os.getuid()).pw_name,
866             socket.gethostname())
867
868     if args.project_uuid and (args.stream or args.raw):
869         logger.error("Cannot use --project-uuid with --stream or --raw")
870         sys.exit(1)
871
872     # Determine the parent project
873     try:
874         project_uuid = desired_project_uuid(api_client, args.project_uuid,
875                                             args.retries)
876     except (apiclient_errors.Error, ValueError) as error:
877         logger.error(error)
878         sys.exit(1)
879
880     if args.progress:
881         reporter = progress_writer(human_progress)
882     elif args.batch_progress:
883         reporter = progress_writer(machine_progress)
884     else:
885         reporter = None
886
887     # If this is used by a human, and there's at least one directory to be
888     # uploaded, the expected bytes calculation can take a moment.
889     if args.progress and any([os.path.isdir(f) for f in args.paths]):
890         logger.info("Calculating upload size, this could take some time...")
891     bytes_expected = expected_bytes_for(args.paths)
892
893     try:
894         writer = ArvPutUploadJob(paths = args.paths,
895                                  resume = args.resume,
896                                  use_cache = args.use_cache,
897                                  filename = args.filename,
898                                  reporter = reporter,
899                                  bytes_expected = bytes_expected,
900                                  num_retries = args.retries,
901                                  replication_desired = args.replication,
902                                  put_threads = args.threads,
903                                  name = collection_name,
904                                  owner_uuid = project_uuid,
905                                  ensure_unique_name = True,
906                                  update_collection = args.update_collection,
907                                  logger=logger,
908                                  dry_run=args.dry_run)
909     except ResumeCacheConflict:
910         logger.error("\n".join([
911             "arv-put: Another process is already uploading this data.",
912             "         Use --no-cache if this is really what you want."]))
913         sys.exit(1)
914     except CollectionUpdateError as error:
915         logger.error("\n".join([
916             "arv-put: %s" % str(error)]))
917         sys.exit(1)
918     except ArvPutUploadIsPending:
919         # Dry run check successful, return proper exit code.
920         sys.exit(2)
921     except ArvPutUploadNotPending:
922         # No files pending for upload
923         sys.exit(0)
924
925     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
926     # the originals.
927     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
928                             for sigcode in CAUGHT_SIGNALS}
929
930     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
931         logger.warning("\n".join([
932             "arv-put: Resuming previous upload from last checkpoint.",
933             "         Use the --no-resume option to start over."]))
934
935     if not args.dry_run:
936         writer.report_progress()
937     output = None
938     try:
939         writer.start(save_collection=not(args.stream or args.raw))
940     except arvados.errors.ApiError as error:
941         logger.error("\n".join([
942             "arv-put: %s" % str(error)]))
943         sys.exit(1)
944     except ArvPutUploadIsPending:
945         # Dry run check successful, return proper exit code.
946         sys.exit(2)
947     except ArvPutUploadNotPending:
948         # No files pending for upload
949         sys.exit(0)
950
951     if args.progress:  # Print newline to split stderr from stdout for humans.
952         logger.info("\n")
953
954     if args.stream:
955         if args.normalize:
956             output = writer.manifest_text(normalize=True)
957         else:
958             output = writer.manifest_text()
959     elif args.raw:
960         output = ','.join(writer.data_locators())
961     else:
962         try:
963             if args.update_collection:
964                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
965             else:
966                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
967             if args.portable_data_hash:
968                 output = writer.portable_data_hash()
969             else:
970                 output = writer.manifest_locator()
971         except apiclient_errors.Error as error:
972             logger.error(
973                 "arv-put: Error creating Collection on project: {}.".format(
974                     error))
975             status = 1
976
977     # Print the locator (uuid) of the new collection.
978     if output is None:
979         status = status or 1
980     else:
981         stdout.write(output)
982         if not output.endswith('\n'):
983             stdout.write('\n')
984
985     for sigcode, orig_handler in orig_signal_handlers.items():
986         signal.signal(sigcode, orig_handler)
987
988     if status != 0:
989         sys.exit(status)
990
991     # Success!
992     return output
993
994
995 if __name__ == '__main__':
996     main()