11308: Futurize stage2.
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 from __future__ import division
7 from builtins import str
8 from past.utils import old_div
9 from builtins import object
10 import argparse
11 import arvados
12 import arvados.collection
13 import base64
14 import copy
15 import datetime
16 import errno
17 import fcntl
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34
35 import arvados.commands._util as arv_cmd
36
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. Default: read from standard input.
48 """)
49
50 _group = upload_opts.add_mutually_exclusive_group()
51
52 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
53                     default=-1, help=argparse.SUPPRESS)
54
55 _group.add_argument('--normalize', action='store_true',
56                     help="""
57 Normalize the manifest by re-ordering files and streams after writing
58 data.
59 """)
60
61 _group.add_argument('--dry-run', action='store_true', default=False,
62                     help="""
63 Don't actually upload files, but only check if any file should be
64 uploaded. Exit with code=2 when files are pending for upload.
65 """)
66
67 _group = upload_opts.add_mutually_exclusive_group()
68
69 _group.add_argument('--as-stream', action='store_true', dest='stream',
70                     help="""
71 Synonym for --stream.
72 """)
73
74 _group.add_argument('--stream', action='store_true',
75                     help="""
76 Store the file content and display the resulting manifest on
77 stdout. Do not write the manifest to Keep or save a Collection object
78 in Arvados.
79 """)
80
81 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
82                     help="""
83 Synonym for --manifest.
84 """)
85
86 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
87                     help="""
88 Synonym for --manifest.
89 """)
90
91 _group.add_argument('--manifest', action='store_true',
92                     help="""
93 Store the file data and resulting manifest in Keep, save a Collection
94 object in Arvados, and display the manifest locator (Collection uuid)
95 on stdout. This is the default behavior.
96 """)
97
98 _group.add_argument('--as-raw', action='store_true', dest='raw',
99                     help="""
100 Synonym for --raw.
101 """)
102
103 _group.add_argument('--raw', action='store_true',
104                     help="""
105 Store the file content and display the data block locators on stdout,
106 separated by commas, with a trailing newline. Do not store a
107 manifest.
108 """)
109
110 upload_opts.add_argument('--update-collection', type=str, default=None,
111                          dest='update_collection', metavar="UUID", help="""
112 Update an existing collection identified by the given Arvados collection
113 UUID. All new local files will be uploaded.
114 """)
115
116 upload_opts.add_argument('--use-filename', type=str, default=None,
117                          dest='filename', help="""
118 Synonym for --filename.
119 """)
120
121 upload_opts.add_argument('--filename', type=str, default=None,
122                          help="""
123 Use the given filename in the manifest, instead of the name of the
124 local file. This is useful when "-" or "/dev/stdin" is given as an
125 input file. It can be used only if there is exactly one path given and
126 it is not a directory. Implies --manifest.
127 """)
128
129 upload_opts.add_argument('--portable-data-hash', action='store_true',
130                          help="""
131 Print the portable data hash instead of the Arvados UUID for the collection
132 created by the upload.
133 """)
134
135 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
136                          help="""
137 Set the replication level for the new collection: how many different
138 physical storage devices (e.g., disks) should have a copy of each data
139 block. Default is to use the server-provided default (if any) or 2.
140 """)
141
142 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
143                          help="""
144 Set the number of upload threads to be used. Take into account that
145 using lots of threads will increase the RAM requirements. Default is
146 to use 2 threads.
147 On high latency installations, using a greater number will improve
148 overall throughput.
149 """)
150
151 run_opts = argparse.ArgumentParser(add_help=False)
152
153 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
154 Store the collection in the specified project, instead of your Home
155 project.
156 """)
157
158 run_opts.add_argument('--name', help="""
159 Save the collection with the specified name.
160 """)
161
162 _group = run_opts.add_mutually_exclusive_group()
163 _group.add_argument('--progress', action='store_true',
164                     help="""
165 Display human-readable progress on stderr (bytes and, if possible,
166 percentage of total data size). This is the default behavior when
167 stderr is a tty.
168 """)
169
170 _group.add_argument('--no-progress', action='store_true',
171                     help="""
172 Do not display human-readable progress on stderr, even if stderr is a
173 tty.
174 """)
175
176 _group.add_argument('--batch-progress', action='store_true',
177                     help="""
178 Display machine-readable progress on stderr (bytes and, if known,
179 total data size).
180 """)
181
182 _group = run_opts.add_mutually_exclusive_group()
183 _group.add_argument('--resume', action='store_true', default=True,
184                     help="""
185 Continue interrupted uploads from cached state (default).
186 """)
187 _group.add_argument('--no-resume', action='store_false', dest='resume',
188                     help="""
189 Do not continue interrupted uploads from cached state.
190 """)
191
192 _group = run_opts.add_mutually_exclusive_group()
193 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
194                     help="""
195 Save upload state in a cache file for resuming (default).
196 """)
197 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
198                     help="""
199 Do not save upload state in a cache file for resuming.
200 """)
201
202 arg_parser = argparse.ArgumentParser(
203     description='Copy data from the local filesystem to Keep.',
204     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
205
206 def parse_arguments(arguments):
207     args = arg_parser.parse_args(arguments)
208
209     if len(args.paths) == 0:
210         args.paths = ['-']
211
212     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
213
214     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
215         if args.filename:
216             arg_parser.error("""
217     --filename argument cannot be used when storing a directory or
218     multiple files.
219     """)
220
221     # Turn on --progress by default if stderr is a tty.
222     if (not (args.batch_progress or args.no_progress)
223         and os.isatty(sys.stderr.fileno())):
224         args.progress = True
225
226     # Turn off --resume (default) if --no-cache is used.
227     if not args.use_cache:
228         args.resume = False
229
230     if args.paths == ['-']:
231         if args.update_collection:
232             arg_parser.error("""
233     --update-collection cannot be used when reading from stdin.
234     """)
235         args.resume = False
236         args.use_cache = False
237         if not args.filename:
238             args.filename = 'stdin'
239
240     return args
241
242
243 class CollectionUpdateError(Exception):
244     pass
245
246
247 class ResumeCacheConflict(Exception):
248     pass
249
250
251 class ArvPutArgumentConflict(Exception):
252     pass
253
254
255 class ArvPutUploadIsPending(Exception):
256     pass
257
258
259 class ArvPutUploadNotPending(Exception):
260     pass
261
262
263 class FileUploadList(list):
264     def __init__(self, dry_run=False):
265         list.__init__(self)
266         self.dry_run = dry_run
267
268     def append(self, other):
269         if self.dry_run:
270             raise ArvPutUploadIsPending()
271         super(FileUploadList, self).append(other)
272
273
274 class ResumeCache(object):
275     CACHE_DIR = '.cache/arvados/arv-put'
276
277     def __init__(self, file_spec):
278         self.cache_file = open(file_spec, 'a+')
279         self._lock_file(self.cache_file)
280         self.filename = self.cache_file.name
281
282     @classmethod
283     def make_path(cls, args):
284         md5 = hashlib.md5()
285         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
286         realpaths = sorted(os.path.realpath(path) for path in args.paths)
287         md5.update('\0'.join(realpaths))
288         if any(os.path.isdir(path) for path in realpaths):
289             md5.update("-1")
290         elif args.filename:
291             md5.update(args.filename)
292         return os.path.join(
293             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
294             md5.hexdigest())
295
296     def _lock_file(self, fileobj):
297         try:
298             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
299         except IOError:
300             raise ResumeCacheConflict("{} locked".format(fileobj.name))
301
302     def load(self):
303         self.cache_file.seek(0)
304         return json.load(self.cache_file)
305
306     def check_cache(self, api_client=None, num_retries=0):
307         try:
308             state = self.load()
309             locator = None
310             try:
311                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
312                     locator = state["_finished_streams"][0][1][0]
313                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
314                     locator = state["_current_stream_locators"][0]
315                 if locator is not None:
316                     kc = arvados.keep.KeepClient(api_client=api_client)
317                     kc.head(locator, num_retries=num_retries)
318             except Exception as e:
319                 self.restart()
320         except (ValueError):
321             pass
322
323     def save(self, data):
324         try:
325             new_cache_fd, new_cache_name = tempfile.mkstemp(
326                 dir=os.path.dirname(self.filename))
327             self._lock_file(new_cache_fd)
328             new_cache = os.fdopen(new_cache_fd, 'r+')
329             json.dump(data, new_cache)
330             os.rename(new_cache_name, self.filename)
331         except (IOError, OSError, ResumeCacheConflict) as error:
332             try:
333                 os.unlink(new_cache_name)
334             except NameError:  # mkstemp failed.
335                 pass
336         else:
337             self.cache_file.close()
338             self.cache_file = new_cache
339
340     def close(self):
341         self.cache_file.close()
342
343     def destroy(self):
344         try:
345             os.unlink(self.filename)
346         except OSError as error:
347             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
348                 raise
349         self.close()
350
351     def restart(self):
352         self.destroy()
353         self.__init__(self.filename)
354
355
356 class ArvPutUploadJob(object):
357     CACHE_DIR = '.cache/arvados/arv-put'
358     EMPTY_STATE = {
359         'manifest' : None, # Last saved manifest checkpoint
360         'files' : {} # Previous run file list: {path : {size, mtime}}
361     }
362
363     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
364                  bytes_expected=None, name=None, owner_uuid=None,
365                  ensure_unique_name=False, num_retries=None,
366                  put_threads=None, replication_desired=None,
367                  filename=None, update_time=60.0, update_collection=None,
368                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
369         self.paths = paths
370         self.resume = resume
371         self.use_cache = use_cache
372         self.update = False
373         self.reporter = reporter
374         self.bytes_expected = bytes_expected
375         self.bytes_written = 0
376         self.bytes_skipped = 0
377         self.name = name
378         self.owner_uuid = owner_uuid
379         self.ensure_unique_name = ensure_unique_name
380         self.num_retries = num_retries
381         self.replication_desired = replication_desired
382         self.put_threads = put_threads
383         self.filename = filename
384         self._state_lock = threading.Lock()
385         self._state = None # Previous run state (file list & manifest)
386         self._current_files = [] # Current run file list
387         self._cache_file = None
388         self._collection_lock = threading.Lock()
389         self._remote_collection = None # Collection being updated (if asked)
390         self._local_collection = None # Collection from previous run manifest
391         self._file_paths = set() # Files to be updated in remote collection
392         self._stop_checkpointer = threading.Event()
393         self._checkpointer = threading.Thread(target=self._update_task)
394         self._checkpointer.daemon = True
395         self._update_task_time = update_time  # How many seconds wait between update runs
396         self._files_to_upload = FileUploadList(dry_run=dry_run)
397         self._upload_started = False
398         self.logger = logger
399         self.dry_run = dry_run
400         self._checkpoint_before_quit = True
401
402         if not self.use_cache and self.resume:
403             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
404
405         # Check for obvious dry-run responses
406         if self.dry_run and (not self.use_cache or not self.resume):
407             raise ArvPutUploadIsPending()
408
409         # Load cached data if any and if needed
410         self._setup_state(update_collection)
411
412     def start(self, save_collection):
413         """
414         Start supporting thread & file uploading
415         """
416         if not self.dry_run:
417             self._checkpointer.start()
418         try:
419             for path in self.paths:
420                 # Test for stdin first, in case some file named '-' exist
421                 if path == '-':
422                     if self.dry_run:
423                         raise ArvPutUploadIsPending()
424                     self._write_stdin(self.filename or 'stdin')
425                 elif os.path.isdir(path):
426                     # Use absolute paths on cache index so CWD doesn't interfere
427                     # with the caching logic.
428                     prefixdir = path = os.path.abspath(path)
429                     if prefixdir != '/':
430                         prefixdir += '/'
431                     for root, dirs, files in os.walk(path):
432                         # Make os.walk()'s dir traversing order deterministic
433                         dirs.sort()
434                         files.sort()
435                         for f in files:
436                             self._check_file(os.path.join(root, f),
437                                              os.path.join(root[len(prefixdir):], f))
438                 else:
439                     self._check_file(os.path.abspath(path),
440                                      self.filename or os.path.basename(path))
441             # If dry-mode is on, and got up to this point, then we should notify that
442             # there aren't any file to upload.
443             if self.dry_run:
444                 raise ArvPutUploadNotPending()
445             # Remove local_collection's files that don't exist locally anymore, so the
446             # bytes_written count is correct.
447             for f in self.collection_file_paths(self._local_collection,
448                                                 path_prefix=""):
449                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
450                     self._local_collection.remove(f)
451             # Update bytes_written from current local collection and
452             # report initial progress.
453             self._update()
454             # Actual file upload
455             self._upload_started = True # Used by the update thread to start checkpointing
456             self._upload_files()
457         except (SystemExit, Exception) as e:
458             self._checkpoint_before_quit = False
459             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
460             # Note: We're expecting SystemExit instead of KeyboardInterrupt because
461             #   we have a custom signal handler in place that raises SystemExit with
462             #   the catched signal's code.
463             if not isinstance(e, SystemExit) or e.code != -2:
464                 self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
465             raise
466         finally:
467             if not self.dry_run:
468                 # Stop the thread before doing anything else
469                 self._stop_checkpointer.set()
470                 self._checkpointer.join()
471                 if self._checkpoint_before_quit:
472                     # Commit all pending blocks & one last _update()
473                     self._local_collection.manifest_text()
474                     self._update(final=True)
475                     if save_collection:
476                         self.save_collection()
477             if self.use_cache:
478                 self._cache_file.close()
479
480     def save_collection(self):
481         if self.update:
482             # Check if files should be updated on the remote collection.
483             for fp in self._file_paths:
484                 remote_file = self._remote_collection.find(fp)
485                 if not remote_file:
486                     # File don't exist on remote collection, copy it.
487                     self._remote_collection.copy(fp, fp, self._local_collection)
488                 elif remote_file != self._local_collection.find(fp):
489                     # A different file exist on remote collection, overwrite it.
490                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
491                 else:
492                     # The file already exist on remote collection, skip it.
493                     pass
494             self._remote_collection.save(num_retries=self.num_retries)
495         else:
496             self._local_collection.save_new(
497                 name=self.name, owner_uuid=self.owner_uuid,
498                 ensure_unique_name=self.ensure_unique_name,
499                 num_retries=self.num_retries)
500
501     def destroy_cache(self):
502         if self.use_cache:
503             try:
504                 os.unlink(self._cache_filename)
505             except OSError as error:
506                 # That's what we wanted anyway.
507                 if error.errno != errno.ENOENT:
508                     raise
509             self._cache_file.close()
510
511     def _collection_size(self, collection):
512         """
513         Recursively get the total size of the collection
514         """
515         size = 0
516         for item in list(collection.values()):
517             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
518                 size += self._collection_size(item)
519             else:
520                 size += item.size()
521         return size
522
523     def _update_task(self):
524         """
525         Periodically called support task. File uploading is
526         asynchronous so we poll status from the collection.
527         """
528         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
529             self._update()
530
531     def _update(self, final=False):
532         """
533         Update cached manifest text and report progress.
534         """
535         if self._upload_started:
536             with self._collection_lock:
537                 self.bytes_written = self._collection_size(self._local_collection)
538                 if self.use_cache:
539                     if final:
540                         manifest = self._local_collection.manifest_text()
541                     else:
542                         # Get the manifest text without comitting pending blocks
543                         manifest = self._local_collection.manifest_text(strip=False,
544                                                                         normalize=False,
545                                                                         only_committed=True)
546                     # Update cache
547                     with self._state_lock:
548                         self._state['manifest'] = manifest
549             if self.use_cache:
550                 self._save_state()
551         else:
552             self.bytes_written = self.bytes_skipped
553         # Call the reporter, if any
554         self.report_progress()
555
556     def report_progress(self):
557         if self.reporter is not None:
558             self.reporter(self.bytes_written, self.bytes_expected)
559
560     def _write_stdin(self, filename):
561         output = self._local_collection.open(filename, 'w')
562         self._write(sys.stdin, output)
563         output.close()
564
565     def _check_file(self, source, filename):
566         """Check if this file needs to be uploaded"""
567         resume_offset = 0
568         should_upload = False
569         new_file_in_cache = False
570         # Record file path for updating the remote collection before exiting
571         self._file_paths.add(filename)
572
573         with self._state_lock:
574             # If no previous cached data on this file, store it for an eventual
575             # repeated run.
576             if source not in self._state['files']:
577                 self._state['files'][source] = {
578                     'mtime': os.path.getmtime(source),
579                     'size' : os.path.getsize(source)
580                 }
581                 new_file_in_cache = True
582             cached_file_data = self._state['files'][source]
583
584         # Check if file was already uploaded (at least partially)
585         file_in_local_collection = self._local_collection.find(filename)
586
587         # If not resuming, upload the full file.
588         if not self.resume:
589             should_upload = True
590         # New file detected from last run, upload it.
591         elif new_file_in_cache:
592             should_upload = True
593         # Local file didn't change from last run.
594         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
595             if not file_in_local_collection:
596                 # File not uploaded yet, upload it completely
597                 should_upload = True
598             elif file_in_local_collection.permission_expired():
599                 # Permission token expired, re-upload file. This will change whenever
600                 # we have a API for refreshing tokens.
601                 should_upload = True
602                 self._local_collection.remove(filename)
603             elif cached_file_data['size'] == file_in_local_collection.size():
604                 # File already there, skip it.
605                 self.bytes_skipped += cached_file_data['size']
606             elif cached_file_data['size'] > file_in_local_collection.size():
607                 # File partially uploaded, resume!
608                 resume_offset = file_in_local_collection.size()
609                 self.bytes_skipped += resume_offset
610                 should_upload = True
611             else:
612                 # Inconsistent cache, re-upload the file
613                 should_upload = True
614                 self._local_collection.remove(filename)
615                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
616         # Local file differs from cached data, re-upload it.
617         else:
618             if file_in_local_collection:
619                 self._local_collection.remove(filename)
620             should_upload = True
621
622         if should_upload:
623             self._files_to_upload.append((source, resume_offset, filename))
624
625     def _upload_files(self):
626         for source, resume_offset, filename in self._files_to_upload:
627             with open(source, 'r') as source_fd:
628                 with self._state_lock:
629                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
630                     self._state['files'][source]['size'] = os.path.getsize(source)
631                 if resume_offset > 0:
632                     # Start upload where we left off
633                     output = self._local_collection.open(filename, 'a')
634                     source_fd.seek(resume_offset)
635                 else:
636                     # Start from scratch
637                     output = self._local_collection.open(filename, 'w')
638                 self._write(source_fd, output)
639                 output.close(flush=False)
640
641     def _write(self, source_fd, output):
642         while True:
643             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
644             if not data:
645                 break
646             output.write(data)
647
648     def _my_collection(self):
649         return self._remote_collection if self.update else self._local_collection
650
651     def _setup_state(self, update_collection):
652         """
653         Create a new cache file or load a previously existing one.
654         """
655         # Load an already existing collection for update
656         if update_collection and re.match(arvados.util.collection_uuid_pattern,
657                                           update_collection):
658             try:
659                 self._remote_collection = arvados.collection.Collection(update_collection)
660             except arvados.errors.ApiError as error:
661                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
662             else:
663                 self.update = True
664         elif update_collection:
665             # Collection locator provided, but unknown format
666             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
667
668         if self.use_cache:
669             # Set up cache file name from input paths.
670             md5 = hashlib.md5()
671             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
672             realpaths = sorted(os.path.realpath(path) for path in self.paths)
673             md5.update('\0'.join(realpaths))
674             if self.filename:
675                 md5.update(self.filename)
676             cache_filename = md5.hexdigest()
677             cache_filepath = os.path.join(
678                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
679                 cache_filename)
680             if self.resume:
681                 self._cache_file = open(cache_filepath, 'a+')
682             else:
683                 # --no-resume means start with a empty cache file.
684                 self._cache_file = open(cache_filepath, 'w+')
685             self._cache_filename = self._cache_file.name
686             self._lock_file(self._cache_file)
687             self._cache_file.seek(0)
688
689         with self._state_lock:
690             if self.use_cache:
691                 try:
692                     self._state = json.load(self._cache_file)
693                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
694                         # Cache at least partially incomplete, set up new cache
695                         self._state = copy.deepcopy(self.EMPTY_STATE)
696                 except ValueError:
697                     # Cache file empty, set up new cache
698                     self._state = copy.deepcopy(self.EMPTY_STATE)
699             else:
700                 # No cache file, set empty state
701                 self._state = copy.deepcopy(self.EMPTY_STATE)
702             # Load the previous manifest so we can check if files were modified remotely.
703             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
704
705     def collection_file_paths(self, col, path_prefix='.'):
706         """Return a list of file paths by recursively go through the entire collection `col`"""
707         file_paths = []
708         for name, item in list(col.items()):
709             if isinstance(item, arvados.arvfile.ArvadosFile):
710                 file_paths.append(os.path.join(path_prefix, name))
711             elif isinstance(item, arvados.collection.Subcollection):
712                 new_prefix = os.path.join(path_prefix, name)
713                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
714         return file_paths
715
716     def _lock_file(self, fileobj):
717         try:
718             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
719         except IOError:
720             raise ResumeCacheConflict("{} locked".format(fileobj.name))
721
722     def _save_state(self):
723         """
724         Atomically save current state into cache.
725         """
726         try:
727             with self._state_lock:
728                 # We're not using copy.deepcopy() here because it's a lot slower
729                 # than json.dumps(), and we're already needing JSON format to be
730                 # saved on disk.
731                 state = json.dumps(self._state)
732             new_cache_fd, new_cache_name = tempfile.mkstemp(
733                 dir=os.path.dirname(self._cache_filename))
734             self._lock_file(new_cache_fd)
735             new_cache = os.fdopen(new_cache_fd, 'r+')
736             new_cache.write(state)
737             new_cache.flush()
738             os.fsync(new_cache)
739             os.rename(new_cache_name, self._cache_filename)
740         except (IOError, OSError, ResumeCacheConflict) as error:
741             self.logger.error("There was a problem while saving the cache file: {}".format(error))
742             try:
743                 os.unlink(new_cache_name)
744             except NameError:  # mkstemp failed.
745                 pass
746         else:
747             self._cache_file.close()
748             self._cache_file = new_cache
749
750     def collection_name(self):
751         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
752
753     def manifest_locator(self):
754         return self._my_collection().manifest_locator()
755
756     def portable_data_hash(self):
757         pdh = self._my_collection().portable_data_hash()
758         m = self._my_collection().stripped_manifest()
759         local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
760         if pdh != local_pdh:
761             logger.warning("\n".join([
762                 "arv-put: API server provided PDH differs from local manifest.",
763                 "         This should not happen; showing API server version."]))
764         return pdh
765
766     def manifest_text(self, stream_name=".", strip=False, normalize=False):
767         return self._my_collection().manifest_text(stream_name, strip, normalize)
768
769     def _datablocks_on_item(self, item):
770         """
771         Return a list of datablock locators, recursively navigating
772         through subcollections
773         """
774         if isinstance(item, arvados.arvfile.ArvadosFile):
775             if item.size() == 0:
776                 # Empty file locator
777                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
778             else:
779                 locators = []
780                 for segment in item.segments():
781                     loc = segment.locator
782                     locators.append(loc)
783                 return locators
784         elif isinstance(item, arvados.collection.Collection):
785             l = [self._datablocks_on_item(x) for x in list(item.values())]
786             # Fast list flattener method taken from:
787             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
788             return [loc for sublist in l for loc in sublist]
789         else:
790             return None
791
792     def data_locators(self):
793         with self._collection_lock:
794             # Make sure all datablocks are flushed before getting the locators
795             self._my_collection().manifest_text()
796             datablocks = self._datablocks_on_item(self._my_collection())
797         return datablocks
798
799
800 def expected_bytes_for(pathlist):
801     # Walk the given directory trees and stat files, adding up file sizes,
802     # so we can display progress as percent
803     bytesum = 0
804     for path in pathlist:
805         if os.path.isdir(path):
806             for filename in arvados.util.listdir_recursive(path):
807                 bytesum += os.path.getsize(os.path.join(path, filename))
808         elif not os.path.isfile(path):
809             return None
810         else:
811             bytesum += os.path.getsize(path)
812     return bytesum
813
814 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
815                                                             os.getpid())
816 def machine_progress(bytes_written, bytes_expected):
817     return _machine_format.format(
818         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
819
820 def human_progress(bytes_written, bytes_expected):
821     if bytes_expected:
822         return "\r{}M / {}M {:.1%} ".format(
823             bytes_written >> 20, bytes_expected >> 20,
824             old_div(float(bytes_written), bytes_expected))
825     else:
826         return "\r{} ".format(bytes_written)
827
828 def progress_writer(progress_func, outfile=sys.stderr):
829     def write_progress(bytes_written, bytes_expected):
830         outfile.write(progress_func(bytes_written, bytes_expected))
831     return write_progress
832
833 def exit_signal_handler(sigcode, frame):
834     sys.exit(-sigcode)
835
836 def desired_project_uuid(api_client, project_uuid, num_retries):
837     if not project_uuid:
838         query = api_client.users().current()
839     elif arvados.util.user_uuid_pattern.match(project_uuid):
840         query = api_client.users().get(uuid=project_uuid)
841     elif arvados.util.group_uuid_pattern.match(project_uuid):
842         query = api_client.groups().get(uuid=project_uuid)
843     else:
844         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
845     return query.execute(num_retries=num_retries)['uuid']
846
847 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
848     global api_client
849
850     logger = logging.getLogger('arvados.arv_put')
851     logger.setLevel(logging.INFO)
852     args = parse_arguments(arguments)
853     status = 0
854     if api_client is None:
855         api_client = arvados.api('v1')
856
857     # Determine the name to use
858     if args.name:
859         if args.stream or args.raw:
860             logger.error("Cannot use --name with --stream or --raw")
861             sys.exit(1)
862         elif args.update_collection:
863             logger.error("Cannot use --name with --update-collection")
864             sys.exit(1)
865         collection_name = args.name
866     else:
867         collection_name = "Saved at {} by {}@{}".format(
868             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
869             pwd.getpwuid(os.getuid()).pw_name,
870             socket.gethostname())
871
872     if args.project_uuid and (args.stream or args.raw):
873         logger.error("Cannot use --project-uuid with --stream or --raw")
874         sys.exit(1)
875
876     # Determine the parent project
877     try:
878         project_uuid = desired_project_uuid(api_client, args.project_uuid,
879                                             args.retries)
880     except (apiclient_errors.Error, ValueError) as error:
881         logger.error(error)
882         sys.exit(1)
883
884     if args.progress:
885         reporter = progress_writer(human_progress)
886     elif args.batch_progress:
887         reporter = progress_writer(machine_progress)
888     else:
889         reporter = None
890
891     # If this is used by a human, and there's at least one directory to be
892     # uploaded, the expected bytes calculation can take a moment.
893     if args.progress and any([os.path.isdir(f) for f in args.paths]):
894         logger.info("Calculating upload size, this could take some time...")
895     bytes_expected = expected_bytes_for(args.paths)
896
897     try:
898         writer = ArvPutUploadJob(paths = args.paths,
899                                  resume = args.resume,
900                                  use_cache = args.use_cache,
901                                  filename = args.filename,
902                                  reporter = reporter,
903                                  bytes_expected = bytes_expected,
904                                  num_retries = args.retries,
905                                  replication_desired = args.replication,
906                                  put_threads = args.threads,
907                                  name = collection_name,
908                                  owner_uuid = project_uuid,
909                                  ensure_unique_name = True,
910                                  update_collection = args.update_collection,
911                                  logger=logger,
912                                  dry_run=args.dry_run)
913     except ResumeCacheConflict:
914         logger.error("\n".join([
915             "arv-put: Another process is already uploading this data.",
916             "         Use --no-cache if this is really what you want."]))
917         sys.exit(1)
918     except CollectionUpdateError as error:
919         logger.error("\n".join([
920             "arv-put: %s" % str(error)]))
921         sys.exit(1)
922     except ArvPutUploadIsPending:
923         # Dry run check successful, return proper exit code.
924         sys.exit(2)
925     except ArvPutUploadNotPending:
926         # No files pending for upload
927         sys.exit(0)
928
929     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
930     # the originals.
931     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
932                             for sigcode in CAUGHT_SIGNALS}
933
934     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
935         logger.warning("\n".join([
936             "arv-put: Resuming previous upload from last checkpoint.",
937             "         Use the --no-resume option to start over."]))
938
939     if not args.dry_run:
940         writer.report_progress()
941     output = None
942     try:
943         writer.start(save_collection=not(args.stream or args.raw))
944     except arvados.errors.ApiError as error:
945         logger.error("\n".join([
946             "arv-put: %s" % str(error)]))
947         sys.exit(1)
948     except ArvPutUploadIsPending:
949         # Dry run check successful, return proper exit code.
950         sys.exit(2)
951     except ArvPutUploadNotPending:
952         # No files pending for upload
953         sys.exit(0)
954
955     if args.progress:  # Print newline to split stderr from stdout for humans.
956         logger.info("\n")
957
958     if args.stream:
959         if args.normalize:
960             output = writer.manifest_text(normalize=True)
961         else:
962             output = writer.manifest_text()
963     elif args.raw:
964         output = ','.join(writer.data_locators())
965     else:
966         try:
967             if args.update_collection:
968                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
969             else:
970                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
971             if args.portable_data_hash:
972                 output = writer.portable_data_hash()
973             else:
974                 output = writer.manifest_locator()
975         except apiclient_errors.Error as error:
976             logger.error(
977                 "arv-put: Error creating Collection on project: {}.".format(
978                     error))
979             status = 1
980
981     # Print the locator (uuid) of the new collection.
982     if output is None:
983         status = status or 1
984     else:
985         stdout.write(output)
986         if not output.endswith('\n'):
987             stdout.write('\n')
988
989     for sigcode, orig_handler in list(orig_signal_handlers.items()):
990         signal.signal(sigcode, orig_handler)
991
992     if status != 0:
993         sys.exit(status)
994
995     # Success!
996     return output
997
998
999 if __name__ == '__main__':
1000     main()