11308: Merge branch 'master' into 11308-python3
[arvados.git] / sdk / python / arvados / commands / put.py
1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import copy
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import logging
16 import os
17 import pwd
18 import re
19 import signal
20 import socket
21 import sys
22 import tempfile
23 import threading
24 import time
25 import traceback
26
27 from apiclient import errors as apiclient_errors
28 from arvados._version import __version__
29
30 import arvados.commands._util as arv_cmd
31
32 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
33 api_client = None
34
35 upload_opts = argparse.ArgumentParser(add_help=False)
36
37 upload_opts.add_argument('--version', action='version',
38                          version="%s %s" % (sys.argv[0], __version__),
39                          help='Print version and exit.')
40 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
41                          help="""
42 Local file or directory. Default: read from standard input.
43 """)
44
45 _group = upload_opts.add_mutually_exclusive_group()
46
47 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
48                     default=-1, help=argparse.SUPPRESS)
49
50 _group.add_argument('--normalize', action='store_true',
51                     help="""
52 Normalize the manifest by re-ordering files and streams after writing
53 data.
54 """)
55
56 _group.add_argument('--dry-run', action='store_true', default=False,
57                     help="""
58 Don't actually upload files, but only check if any file should be
59 uploaded. Exit with code=2 when files are pending for upload.
60 """)
61
62 _group = upload_opts.add_mutually_exclusive_group()
63
64 _group.add_argument('--as-stream', action='store_true', dest='stream',
65                     help="""
66 Synonym for --stream.
67 """)
68
69 _group.add_argument('--stream', action='store_true',
70                     help="""
71 Store the file content and display the resulting manifest on
72 stdout. Do not write the manifest to Keep or save a Collection object
73 in Arvados.
74 """)
75
76 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
77                     help="""
78 Synonym for --manifest.
79 """)
80
81 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
82                     help="""
83 Synonym for --manifest.
84 """)
85
86 _group.add_argument('--manifest', action='store_true',
87                     help="""
88 Store the file data and resulting manifest in Keep, save a Collection
89 object in Arvados, and display the manifest locator (Collection uuid)
90 on stdout. This is the default behavior.
91 """)
92
93 _group.add_argument('--as-raw', action='store_true', dest='raw',
94                     help="""
95 Synonym for --raw.
96 """)
97
98 _group.add_argument('--raw', action='store_true',
99                     help="""
100 Store the file content and display the data block locators on stdout,
101 separated by commas, with a trailing newline. Do not store a
102 manifest.
103 """)
104
105 upload_opts.add_argument('--update-collection', type=str, default=None,
106                          dest='update_collection', metavar="UUID", help="""
107 Update an existing collection identified by the given Arvados collection
108 UUID. All new local files will be uploaded.
109 """)
110
111 upload_opts.add_argument('--use-filename', type=str, default=None,
112                          dest='filename', help="""
113 Synonym for --filename.
114 """)
115
116 upload_opts.add_argument('--filename', type=str, default=None,
117                          help="""
118 Use the given filename in the manifest, instead of the name of the
119 local file. This is useful when "-" or "/dev/stdin" is given as an
120 input file. It can be used only if there is exactly one path given and
121 it is not a directory. Implies --manifest.
122 """)
123
124 upload_opts.add_argument('--portable-data-hash', action='store_true',
125                          help="""
126 Print the portable data hash instead of the Arvados UUID for the collection
127 created by the upload.
128 """)
129
130 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
131                          help="""
132 Set the replication level for the new collection: how many different
133 physical storage devices (e.g., disks) should have a copy of each data
134 block. Default is to use the server-provided default (if any) or 2.
135 """)
136
137 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
138                          help="""
139 Set the number of upload threads to be used. Take into account that
140 using lots of threads will increase the RAM requirements. Default is
141 to use 2 threads.
142 On high latency installations, using a greater number will improve
143 overall throughput.
144 """)
145
146 run_opts = argparse.ArgumentParser(add_help=False)
147
148 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
149 Store the collection in the specified project, instead of your Home
150 project.
151 """)
152
153 run_opts.add_argument('--name', help="""
154 Save the collection with the specified name.
155 """)
156
157 _group = run_opts.add_mutually_exclusive_group()
158 _group.add_argument('--progress', action='store_true',
159                     help="""
160 Display human-readable progress on stderr (bytes and, if possible,
161 percentage of total data size). This is the default behavior when
162 stderr is a tty.
163 """)
164
165 _group.add_argument('--no-progress', action='store_true',
166                     help="""
167 Do not display human-readable progress on stderr, even if stderr is a
168 tty.
169 """)
170
171 _group.add_argument('--batch-progress', action='store_true',
172                     help="""
173 Display machine-readable progress on stderr (bytes and, if known,
174 total data size).
175 """)
176
177 _group = run_opts.add_mutually_exclusive_group()
178 _group.add_argument('--resume', action='store_true', default=True,
179                     help="""
180 Continue interrupted uploads from cached state (default).
181 """)
182 _group.add_argument('--no-resume', action='store_false', dest='resume',
183                     help="""
184 Do not continue interrupted uploads from cached state.
185 """)
186
187 _group = run_opts.add_mutually_exclusive_group()
188 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
189                     help="""
190 Save upload state in a cache file for resuming (default).
191 """)
192 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
193                     help="""
194 Do not save upload state in a cache file for resuming.
195 """)
196
197 arg_parser = argparse.ArgumentParser(
198     description='Copy data from the local filesystem to Keep.',
199     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
200
201 def parse_arguments(arguments):
202     args = arg_parser.parse_args(arguments)
203
204     if len(args.paths) == 0:
205         args.paths = ['-']
206
207     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
208
209     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
210         if args.filename:
211             arg_parser.error("""
212     --filename argument cannot be used when storing a directory or
213     multiple files.
214     """)
215
216     # Turn on --progress by default if stderr is a tty.
217     if (not (args.batch_progress or args.no_progress)
218         and os.isatty(sys.stderr.fileno())):
219         args.progress = True
220
221     # Turn off --resume (default) if --no-cache is used.
222     if not args.use_cache:
223         args.resume = False
224
225     if args.paths == ['-']:
226         if args.update_collection:
227             arg_parser.error("""
228     --update-collection cannot be used when reading from stdin.
229     """)
230         args.resume = False
231         args.use_cache = False
232         if not args.filename:
233             args.filename = 'stdin'
234
235     return args
236
237
238 class CollectionUpdateError(Exception):
239     pass
240
241
242 class ResumeCacheConflict(Exception):
243     pass
244
245
246 class ArvPutArgumentConflict(Exception):
247     pass
248
249
250 class ArvPutUploadIsPending(Exception):
251     pass
252
253
254 class ArvPutUploadNotPending(Exception):
255     pass
256
257
258 class FileUploadList(list):
259     def __init__(self, dry_run=False):
260         list.__init__(self)
261         self.dry_run = dry_run
262
263     def append(self, other):
264         if self.dry_run:
265             raise ArvPutUploadIsPending()
266         super(FileUploadList, self).append(other)
267
268
269 class ResumeCache(object):
270     CACHE_DIR = '.cache/arvados/arv-put'
271
272     def __init__(self, file_spec):
273         self.cache_file = open(file_spec, 'a+')
274         self._lock_file(self.cache_file)
275         self.filename = self.cache_file.name
276
277     @classmethod
278     def make_path(cls, args):
279         md5 = hashlib.md5()
280         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
281         realpaths = sorted(os.path.realpath(path) for path in args.paths)
282         md5.update(b'\0'.join([p.encode() for p in realpaths]))
283         if any(os.path.isdir(path) for path in realpaths):
284             md5.update(b'-1')
285         elif args.filename:
286             md5.update(args.filename.encode())
287         return os.path.join(
288             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
289             md5.hexdigest())
290
291     def _lock_file(self, fileobj):
292         try:
293             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
294         except IOError:
295             raise ResumeCacheConflict("{} locked".format(fileobj.name))
296
297     def load(self):
298         self.cache_file.seek(0)
299         return json.load(self.cache_file)
300
301     def check_cache(self, api_client=None, num_retries=0):
302         try:
303             state = self.load()
304             locator = None
305             try:
306                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
307                     locator = state["_finished_streams"][0][1][0]
308                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
309                     locator = state["_current_stream_locators"][0]
310                 if locator is not None:
311                     kc = arvados.keep.KeepClient(api_client=api_client)
312                     kc.head(locator, num_retries=num_retries)
313             except Exception as e:
314                 self.restart()
315         except (ValueError):
316             pass
317
318     def save(self, data):
319         try:
320             new_cache_fd, new_cache_name = tempfile.mkstemp(
321                 dir=os.path.dirname(self.filename))
322             self._lock_file(new_cache_fd)
323             new_cache = os.fdopen(new_cache_fd, 'r+')
324             json.dump(data, new_cache)
325             os.rename(new_cache_name, self.filename)
326         except (IOError, OSError, ResumeCacheConflict) as error:
327             try:
328                 os.unlink(new_cache_name)
329             except NameError:  # mkstemp failed.
330                 pass
331         else:
332             self.cache_file.close()
333             self.cache_file = new_cache
334
335     def close(self):
336         self.cache_file.close()
337
338     def destroy(self):
339         try:
340             os.unlink(self.filename)
341         except OSError as error:
342             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
343                 raise
344         self.close()
345
346     def restart(self):
347         self.destroy()
348         self.__init__(self.filename)
349
350
351 class ArvPutUploadJob(object):
352     CACHE_DIR = '.cache/arvados/arv-put'
353     EMPTY_STATE = {
354         'manifest' : None, # Last saved manifest checkpoint
355         'files' : {} # Previous run file list: {path : {size, mtime}}
356     }
357
358     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
359                  bytes_expected=None, name=None, owner_uuid=None,
360                  ensure_unique_name=False, num_retries=None,
361                  put_threads=None, replication_desired=None,
362                  filename=None, update_time=60.0, update_collection=None,
363                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
364         self.paths = paths
365         self.resume = resume
366         self.use_cache = use_cache
367         self.update = False
368         self.reporter = reporter
369         self.bytes_expected = bytes_expected
370         self.bytes_written = 0
371         self.bytes_skipped = 0
372         self.name = name
373         self.owner_uuid = owner_uuid
374         self.ensure_unique_name = ensure_unique_name
375         self.num_retries = num_retries
376         self.replication_desired = replication_desired
377         self.put_threads = put_threads
378         self.filename = filename
379         self._state_lock = threading.Lock()
380         self._state = None # Previous run state (file list & manifest)
381         self._current_files = [] # Current run file list
382         self._cache_file = None
383         self._collection_lock = threading.Lock()
384         self._remote_collection = None # Collection being updated (if asked)
385         self._local_collection = None # Collection from previous run manifest
386         self._file_paths = set() # Files to be updated in remote collection
387         self._stop_checkpointer = threading.Event()
388         self._checkpointer = threading.Thread(target=self._update_task)
389         self._checkpointer.daemon = True
390         self._update_task_time = update_time  # How many seconds wait between update runs
391         self._files_to_upload = FileUploadList(dry_run=dry_run)
392         self._upload_started = False
393         self.logger = logger
394         self.dry_run = dry_run
395         self._checkpoint_before_quit = True
396
397         if not self.use_cache and self.resume:
398             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
399
400         # Check for obvious dry-run responses
401         if self.dry_run and (not self.use_cache or not self.resume):
402             raise ArvPutUploadIsPending()
403
404         # Load cached data if any and if needed
405         self._setup_state(update_collection)
406
407     def start(self, save_collection):
408         """
409         Start supporting thread & file uploading
410         """
411         if not self.dry_run:
412             self._checkpointer.start()
413         try:
414             for path in self.paths:
415                 # Test for stdin first, in case some file named '-' exist
416                 if path == '-':
417                     if self.dry_run:
418                         raise ArvPutUploadIsPending()
419                     self._write_stdin(self.filename or 'stdin')
420                 elif os.path.isdir(path):
421                     # Use absolute paths on cache index so CWD doesn't interfere
422                     # with the caching logic.
423                     prefixdir = path = os.path.abspath(path)
424                     if prefixdir != '/':
425                         prefixdir += '/'
426                     for root, dirs, files in os.walk(path):
427                         # Make os.walk()'s dir traversing order deterministic
428                         dirs.sort()
429                         files.sort()
430                         for f in files:
431                             self._check_file(os.path.join(root, f),
432                                              os.path.join(root[len(prefixdir):], f))
433                 else:
434                     self._check_file(os.path.abspath(path),
435                                      self.filename or os.path.basename(path))
436             # If dry-mode is on, and got up to this point, then we should notify that
437             # there aren't any file to upload.
438             if self.dry_run:
439                 raise ArvPutUploadNotPending()
440             # Remove local_collection's files that don't exist locally anymore, so the
441             # bytes_written count is correct.
442             for f in self.collection_file_paths(self._local_collection,
443                                                 path_prefix=""):
444                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
445                     self._local_collection.remove(f)
446             # Update bytes_written from current local collection and
447             # report initial progress.
448             self._update()
449             # Actual file upload
450             self._upload_started = True # Used by the update thread to start checkpointing
451             self._upload_files()
452         except (SystemExit, Exception) as e:
453             self._checkpoint_before_quit = False
454             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
455             # Note: We're expecting SystemExit instead of KeyboardInterrupt because
456             #   we have a custom signal handler in place that raises SystemExit with
457             #   the catched signal's code.
458             if not isinstance(e, SystemExit) or e.code != -2:
459                 self.logger.warning("Abnormal termination:\n{}".format(
460                     traceback.format_exc()))
461             raise
462         finally:
463             if not self.dry_run:
464                 # Stop the thread before doing anything else
465                 self._stop_checkpointer.set()
466                 self._checkpointer.join()
467                 if self._checkpoint_before_quit:
468                     # Commit all pending blocks & one last _update()
469                     self._local_collection.manifest_text()
470                     self._update(final=True)
471                     if save_collection:
472                         self.save_collection()
473             if self.use_cache:
474                 self._cache_file.close()
475
476     def save_collection(self):
477         if self.update:
478             # Check if files should be updated on the remote collection.
479             for fp in self._file_paths:
480                 remote_file = self._remote_collection.find(fp)
481                 if not remote_file:
482                     # File don't exist on remote collection, copy it.
483                     self._remote_collection.copy(fp, fp, self._local_collection)
484                 elif remote_file != self._local_collection.find(fp):
485                     # A different file exist on remote collection, overwrite it.
486                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
487                 else:
488                     # The file already exist on remote collection, skip it.
489                     pass
490             self._remote_collection.save(num_retries=self.num_retries)
491         else:
492             self._local_collection.save_new(
493                 name=self.name, owner_uuid=self.owner_uuid,
494                 ensure_unique_name=self.ensure_unique_name,
495                 num_retries=self.num_retries)
496
497     def destroy_cache(self):
498         if self.use_cache:
499             try:
500                 os.unlink(self._cache_filename)
501             except OSError as error:
502                 # That's what we wanted anyway.
503                 if error.errno != errno.ENOENT:
504                     raise
505             self._cache_file.close()
506
507     def _collection_size(self, collection):
508         """
509         Recursively get the total size of the collection
510         """
511         size = 0
512         for item in listvalues(collection):
513             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
514                 size += self._collection_size(item)
515             else:
516                 size += item.size()
517         return size
518
519     def _update_task(self):
520         """
521         Periodically called support task. File uploading is
522         asynchronous so we poll status from the collection.
523         """
524         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
525             self._update()
526
527     def _update(self, final=False):
528         """
529         Update cached manifest text and report progress.
530         """
531         if self._upload_started:
532             with self._collection_lock:
533                 self.bytes_written = self._collection_size(self._local_collection)
534                 if self.use_cache:
535                     if final:
536                         manifest = self._local_collection.manifest_text()
537                     else:
538                         # Get the manifest text without comitting pending blocks
539                         manifest = self._local_collection.manifest_text(strip=False,
540                                                                         normalize=False,
541                                                                         only_committed=True)
542                     # Update cache
543                     with self._state_lock:
544                         self._state['manifest'] = manifest
545             if self.use_cache:
546                 self._save_state()
547         else:
548             self.bytes_written = self.bytes_skipped
549         # Call the reporter, if any
550         self.report_progress()
551
552     def report_progress(self):
553         if self.reporter is not None:
554             self.reporter(self.bytes_written, self.bytes_expected)
555
556     def _write_stdin(self, filename):
557         output = self._local_collection.open(filename, 'wb')
558         self._write(sys.stdin, output)
559         output.close()
560
561     def _check_file(self, source, filename):
562         """Check if this file needs to be uploaded"""
563         resume_offset = 0
564         should_upload = False
565         new_file_in_cache = False
566         # Record file path for updating the remote collection before exiting
567         self._file_paths.add(filename)
568
569         with self._state_lock:
570             # If no previous cached data on this file, store it for an eventual
571             # repeated run.
572             if source not in self._state['files']:
573                 self._state['files'][source] = {
574                     'mtime': os.path.getmtime(source),
575                     'size' : os.path.getsize(source)
576                 }
577                 new_file_in_cache = True
578             cached_file_data = self._state['files'][source]
579
580         # Check if file was already uploaded (at least partially)
581         file_in_local_collection = self._local_collection.find(filename)
582
583         # If not resuming, upload the full file.
584         if not self.resume:
585             should_upload = True
586         # New file detected from last run, upload it.
587         elif new_file_in_cache:
588             should_upload = True
589         # Local file didn't change from last run.
590         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
591             if not file_in_local_collection:
592                 # File not uploaded yet, upload it completely
593                 should_upload = True
594             elif file_in_local_collection.permission_expired():
595                 # Permission token expired, re-upload file. This will change whenever
596                 # we have a API for refreshing tokens.
597                 should_upload = True
598                 self._local_collection.remove(filename)
599             elif cached_file_data['size'] == file_in_local_collection.size():
600                 # File already there, skip it.
601                 self.bytes_skipped += cached_file_data['size']
602             elif cached_file_data['size'] > file_in_local_collection.size():
603                 # File partially uploaded, resume!
604                 resume_offset = file_in_local_collection.size()
605                 self.bytes_skipped += resume_offset
606                 should_upload = True
607             else:
608                 # Inconsistent cache, re-upload the file
609                 should_upload = True
610                 self._local_collection.remove(filename)
611                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
612         # Local file differs from cached data, re-upload it.
613         else:
614             if file_in_local_collection:
615                 self._local_collection.remove(filename)
616             should_upload = True
617
618         if should_upload:
619             self._files_to_upload.append((source, resume_offset, filename))
620
621     def _upload_files(self):
622         for source, resume_offset, filename in self._files_to_upload:
623             with open(source, 'rb') as source_fd:
624                 with self._state_lock:
625                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
626                     self._state['files'][source]['size'] = os.path.getsize(source)
627                 if resume_offset > 0:
628                     # Start upload where we left off
629                     output = self._local_collection.open(filename, 'ab')
630                     source_fd.seek(resume_offset)
631                 else:
632                     # Start from scratch
633                     output = self._local_collection.open(filename, 'wb')
634                 self._write(source_fd, output)
635                 output.close(flush=False)
636
637     def _write(self, source_fd, output):
638         while True:
639             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
640             if not data:
641                 break
642             output.write(data)
643
644     def _my_collection(self):
645         return self._remote_collection if self.update else self._local_collection
646
647     def _setup_state(self, update_collection):
648         """
649         Create a new cache file or load a previously existing one.
650         """
651         # Load an already existing collection for update
652         if update_collection and re.match(arvados.util.collection_uuid_pattern,
653                                           update_collection):
654             try:
655                 self._remote_collection = arvados.collection.Collection(update_collection)
656             except arvados.errors.ApiError as error:
657                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
658             else:
659                 self.update = True
660         elif update_collection:
661             # Collection locator provided, but unknown format
662             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
663
664         if self.use_cache:
665             # Set up cache file name from input paths.
666             md5 = hashlib.md5()
667             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
668             realpaths = sorted(os.path.realpath(path) for path in self.paths)
669             md5.update(b'\0'.join([p.encode() for p in realpaths]))
670             if self.filename:
671                 md5.update(self.filename.encode())
672             cache_filename = md5.hexdigest()
673             cache_filepath = os.path.join(
674                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
675                 cache_filename)
676             if self.resume:
677                 self._cache_file = open(cache_filepath, 'a+')
678             else:
679                 # --no-resume means start with a empty cache file.
680                 self._cache_file = open(cache_filepath, 'w+')
681             self._cache_filename = self._cache_file.name
682             self._lock_file(self._cache_file)
683             self._cache_file.seek(0)
684
685         with self._state_lock:
686             if self.use_cache:
687                 try:
688                     self._state = json.load(self._cache_file)
689                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
690                         # Cache at least partially incomplete, set up new cache
691                         self._state = copy.deepcopy(self.EMPTY_STATE)
692                 except ValueError:
693                     # Cache file empty, set up new cache
694                     self._state = copy.deepcopy(self.EMPTY_STATE)
695             else:
696                 # No cache file, set empty state
697                 self._state = copy.deepcopy(self.EMPTY_STATE)
698             # Load the previous manifest so we can check if files were modified remotely.
699             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
700
701     def collection_file_paths(self, col, path_prefix='.'):
702         """Return a list of file paths by recursively go through the entire collection `col`"""
703         file_paths = []
704         for name, item in listitems(col):
705             if isinstance(item, arvados.arvfile.ArvadosFile):
706                 file_paths.append(os.path.join(path_prefix, name))
707             elif isinstance(item, arvados.collection.Subcollection):
708                 new_prefix = os.path.join(path_prefix, name)
709                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
710         return file_paths
711
712     def _lock_file(self, fileobj):
713         try:
714             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
715         except IOError:
716             raise ResumeCacheConflict("{} locked".format(fileobj.name))
717
718     def _save_state(self):
719         """
720         Atomically save current state into cache.
721         """
722         try:
723             with self._state_lock:
724                 # We're not using copy.deepcopy() here because it's a lot slower
725                 # than json.dumps(), and we're already needing JSON format to be
726                 # saved on disk.
727                 state = json.dumps(self._state)
728             new_cache_fd, new_cache_name = tempfile.mkstemp(
729                 dir=os.path.dirname(self._cache_filename))
730             self._lock_file(new_cache_fd)
731             new_cache = os.fdopen(new_cache_fd, 'r+')
732             new_cache.write(state)
733             new_cache.flush()
734             os.fsync(new_cache)
735             os.rename(new_cache_name, self._cache_filename)
736         except (IOError, OSError, ResumeCacheConflict) as error:
737             self.logger.error("There was a problem while saving the cache file: {}".format(error))
738             try:
739                 os.unlink(new_cache_name)
740             except NameError:  # mkstemp failed.
741                 pass
742         else:
743             self._cache_file.close()
744             self._cache_file = new_cache
745
746     def collection_name(self):
747         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
748
749     def manifest_locator(self):
750         return self._my_collection().manifest_locator()
751
752     def portable_data_hash(self):
753         pdh = self._my_collection().portable_data_hash()
754         m = self._my_collection().stripped_manifest().encode()
755         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
756         if pdh != local_pdh:
757             logger.warning("\n".join([
758                 "arv-put: API server provided PDH differs from local manifest.",
759                 "         This should not happen; showing API server version."]))
760         return pdh
761
762     def manifest_text(self, stream_name=".", strip=False, normalize=False):
763         return self._my_collection().manifest_text(stream_name, strip, normalize)
764
765     def _datablocks_on_item(self, item):
766         """
767         Return a list of datablock locators, recursively navigating
768         through subcollections
769         """
770         if isinstance(item, arvados.arvfile.ArvadosFile):
771             if item.size() == 0:
772                 # Empty file locator
773                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
774             else:
775                 locators = []
776                 for segment in item.segments():
777                     loc = segment.locator
778                     locators.append(loc)
779                 return locators
780         elif isinstance(item, arvados.collection.Collection):
781             l = [self._datablocks_on_item(x) for x in listvalues(item)]
782             # Fast list flattener method taken from:
783             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
784             return [loc for sublist in l for loc in sublist]
785         else:
786             return None
787
788     def data_locators(self):
789         with self._collection_lock:
790             # Make sure all datablocks are flushed before getting the locators
791             self._my_collection().manifest_text()
792             datablocks = self._datablocks_on_item(self._my_collection())
793         return datablocks
794
795
796 def expected_bytes_for(pathlist):
797     # Walk the given directory trees and stat files, adding up file sizes,
798     # so we can display progress as percent
799     bytesum = 0
800     for path in pathlist:
801         if os.path.isdir(path):
802             for filename in arvados.util.listdir_recursive(path):
803                 bytesum += os.path.getsize(os.path.join(path, filename))
804         elif not os.path.isfile(path):
805             return None
806         else:
807             bytesum += os.path.getsize(path)
808     return bytesum
809
810 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
811                                                             os.getpid())
812 def machine_progress(bytes_written, bytes_expected):
813     return _machine_format.format(
814         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
815
816 def human_progress(bytes_written, bytes_expected):
817     if bytes_expected:
818         return "\r{}M / {}M {:.1%} ".format(
819             bytes_written >> 20, bytes_expected >> 20,
820             float(bytes_written) / bytes_expected)
821     else:
822         return "\r{} ".format(bytes_written)
823
824 def progress_writer(progress_func, outfile=sys.stderr):
825     def write_progress(bytes_written, bytes_expected):
826         outfile.write(progress_func(bytes_written, bytes_expected))
827     return write_progress
828
829 def exit_signal_handler(sigcode, frame):
830     sys.exit(-sigcode)
831
832 def desired_project_uuid(api_client, project_uuid, num_retries):
833     if not project_uuid:
834         query = api_client.users().current()
835     elif arvados.util.user_uuid_pattern.match(project_uuid):
836         query = api_client.users().get(uuid=project_uuid)
837     elif arvados.util.group_uuid_pattern.match(project_uuid):
838         query = api_client.groups().get(uuid=project_uuid)
839     else:
840         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
841     return query.execute(num_retries=num_retries)['uuid']
842
843 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
844     global api_client
845
846     logger = logging.getLogger('arvados.arv_put')
847     logger.setLevel(logging.INFO)
848     args = parse_arguments(arguments)
849     status = 0
850     if api_client is None:
851         api_client = arvados.api('v1')
852
853     # Determine the name to use
854     if args.name:
855         if args.stream or args.raw:
856             logger.error("Cannot use --name with --stream or --raw")
857             sys.exit(1)
858         elif args.update_collection:
859             logger.error("Cannot use --name with --update-collection")
860             sys.exit(1)
861         collection_name = args.name
862     else:
863         collection_name = "Saved at {} by {}@{}".format(
864             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
865             pwd.getpwuid(os.getuid()).pw_name,
866             socket.gethostname())
867
868     if args.project_uuid and (args.stream or args.raw):
869         logger.error("Cannot use --project-uuid with --stream or --raw")
870         sys.exit(1)
871
872     # Determine the parent project
873     try:
874         project_uuid = desired_project_uuid(api_client, args.project_uuid,
875                                             args.retries)
876     except (apiclient_errors.Error, ValueError) as error:
877         logger.error(error)
878         sys.exit(1)
879
880     if args.progress:
881         reporter = progress_writer(human_progress)
882     elif args.batch_progress:
883         reporter = progress_writer(machine_progress)
884     else:
885         reporter = None
886
887     # If this is used by a human, and there's at least one directory to be
888     # uploaded, the expected bytes calculation can take a moment.
889     if args.progress and any([os.path.isdir(f) for f in args.paths]):
890         logger.info("Calculating upload size, this could take some time...")
891     bytes_expected = expected_bytes_for(args.paths)
892
893     try:
894         writer = ArvPutUploadJob(paths = args.paths,
895                                  resume = args.resume,
896                                  use_cache = args.use_cache,
897                                  filename = args.filename,
898                                  reporter = reporter,
899                                  bytes_expected = bytes_expected,
900                                  num_retries = args.retries,
901                                  replication_desired = args.replication,
902                                  put_threads = args.threads,
903                                  name = collection_name,
904                                  owner_uuid = project_uuid,
905                                  ensure_unique_name = True,
906                                  update_collection = args.update_collection,
907                                  logger=logger,
908                                  dry_run=args.dry_run)
909     except ResumeCacheConflict:
910         logger.error("\n".join([
911             "arv-put: Another process is already uploading this data.",
912             "         Use --no-cache if this is really what you want."]))
913         sys.exit(1)
914     except CollectionUpdateError as error:
915         logger.error("\n".join([
916             "arv-put: %s" % str(error)]))
917         sys.exit(1)
918     except ArvPutUploadIsPending:
919         # Dry run check successful, return proper exit code.
920         sys.exit(2)
921     except ArvPutUploadNotPending:
922         # No files pending for upload
923         sys.exit(0)
924
925     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
926     # the originals.
927     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
928                             for sigcode in CAUGHT_SIGNALS}
929
930     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
931         logger.warning("\n".join([
932             "arv-put: Resuming previous upload from last checkpoint.",
933             "         Use the --no-resume option to start over."]))
934
935     if not args.dry_run:
936         writer.report_progress()
937     output = None
938     try:
939         writer.start(save_collection=not(args.stream or args.raw))
940     except arvados.errors.ApiError as error:
941         logger.error("\n".join([
942             "arv-put: %s" % str(error)]))
943         sys.exit(1)
944     except ArvPutUploadIsPending:
945         # Dry run check successful, return proper exit code.
946         sys.exit(2)
947     except ArvPutUploadNotPending:
948         # No files pending for upload
949         sys.exit(0)
950
951     if args.progress:  # Print newline to split stderr from stdout for humans.
952         logger.info("\n")
953
954     if args.stream:
955         if args.normalize:
956             output = writer.manifest_text(normalize=True)
957         else:
958             output = writer.manifest_text()
959     elif args.raw:
960         output = ','.join(writer.data_locators())
961     else:
962         try:
963             if args.update_collection:
964                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
965             else:
966                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
967             if args.portable_data_hash:
968                 output = writer.portable_data_hash()
969             else:
970                 output = writer.manifest_locator()
971         except apiclient_errors.Error as error:
972             logger.error(
973                 "arv-put: Error creating Collection on project: {}.".format(
974                     error))
975             status = 1
976
977     # Print the locator (uuid) of the new collection.
978     if output is None:
979         status = status or 1
980     else:
981         stdout.write(output)
982         if not output.endswith('\n'):
983             stdout.write('\n')
984
985     for sigcode, orig_handler in listitems(orig_signal_handlers):
986         signal.signal(sigcode, orig_handler)
987
988     if status != 0:
989         sys.exit(status)
990
991     # Success!
992     return output
993
994
995 if __name__ == '__main__':
996     main()