Merge branch '8567-moar-docker' refs #8567
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 import traceback
27
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
30
31 import arvados.commands._util as arv_cmd
32
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
34 api_client = None
35
36 upload_opts = argparse.ArgumentParser(add_help=False)
37
38 upload_opts.add_argument('--version', action='version',
39                          version="%s %s" % (sys.argv[0], __version__),
40                          help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
42                          help="""
43 Local file or directory. Default: read from standard input.
44 """)
45
46 _group = upload_opts.add_mutually_exclusive_group()
47
48 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
49                     default=-1, help=argparse.SUPPRESS)
50
51 _group.add_argument('--normalize', action='store_true',
52                     help="""
53 Normalize the manifest by re-ordering files and streams after writing
54 data.
55 """)
56
57 _group.add_argument('--dry-run', action='store_true', default=False,
58                     help="""
59 Don't actually upload files, but only check if any file should be
60 uploaded. Exit with code=2 when files are pending for upload.
61 """)
62
63 _group = upload_opts.add_mutually_exclusive_group()
64
65 _group.add_argument('--as-stream', action='store_true', dest='stream',
66                     help="""
67 Synonym for --stream.
68 """)
69
70 _group.add_argument('--stream', action='store_true',
71                     help="""
72 Store the file content and display the resulting manifest on
73 stdout. Do not write the manifest to Keep or save a Collection object
74 in Arvados.
75 """)
76
77 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
78                     help="""
79 Synonym for --manifest.
80 """)
81
82 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
83                     help="""
84 Synonym for --manifest.
85 """)
86
87 _group.add_argument('--manifest', action='store_true',
88                     help="""
89 Store the file data and resulting manifest in Keep, save a Collection
90 object in Arvados, and display the manifest locator (Collection uuid)
91 on stdout. This is the default behavior.
92 """)
93
94 _group.add_argument('--as-raw', action='store_true', dest='raw',
95                     help="""
96 Synonym for --raw.
97 """)
98
99 _group.add_argument('--raw', action='store_true',
100                     help="""
101 Store the file content and display the data block locators on stdout,
102 separated by commas, with a trailing newline. Do not store a
103 manifest.
104 """)
105
106 upload_opts.add_argument('--update-collection', type=str, default=None,
107                          dest='update_collection', metavar="UUID", help="""
108 Update an existing collection identified by the given Arvados collection
109 UUID. All new local files will be uploaded.
110 """)
111
112 upload_opts.add_argument('--use-filename', type=str, default=None,
113                          dest='filename', help="""
114 Synonym for --filename.
115 """)
116
117 upload_opts.add_argument('--filename', type=str, default=None,
118                          help="""
119 Use the given filename in the manifest, instead of the name of the
120 local file. This is useful when "-" or "/dev/stdin" is given as an
121 input file. It can be used only if there is exactly one path given and
122 it is not a directory. Implies --manifest.
123 """)
124
125 upload_opts.add_argument('--portable-data-hash', action='store_true',
126                          help="""
127 Print the portable data hash instead of the Arvados UUID for the collection
128 created by the upload.
129 """)
130
131 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
132                          help="""
133 Set the replication level for the new collection: how many different
134 physical storage devices (e.g., disks) should have a copy of each data
135 block. Default is to use the server-provided default (if any) or 2.
136 """)
137
138 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
139                          help="""
140 Set the number of upload threads to be used. Take into account that
141 using lots of threads will increase the RAM requirements. Default is
142 to use 2 threads.
143 On high latency installations, using a greater number will improve
144 overall throughput.
145 """)
146
147 run_opts = argparse.ArgumentParser(add_help=False)
148
149 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
150 Store the collection in the specified project, instead of your Home
151 project.
152 """)
153
154 run_opts.add_argument('--name', help="""
155 Save the collection with the specified name.
156 """)
157
158 _group = run_opts.add_mutually_exclusive_group()
159 _group.add_argument('--progress', action='store_true',
160                     help="""
161 Display human-readable progress on stderr (bytes and, if possible,
162 percentage of total data size). This is the default behavior when
163 stderr is a tty.
164 """)
165
166 _group.add_argument('--no-progress', action='store_true',
167                     help="""
168 Do not display human-readable progress on stderr, even if stderr is a
169 tty.
170 """)
171
172 _group.add_argument('--batch-progress', action='store_true',
173                     help="""
174 Display machine-readable progress on stderr (bytes and, if known,
175 total data size).
176 """)
177
178 _group = run_opts.add_mutually_exclusive_group()
179 _group.add_argument('--resume', action='store_true', default=True,
180                     help="""
181 Continue interrupted uploads from cached state (default).
182 """)
183 _group.add_argument('--no-resume', action='store_false', dest='resume',
184                     help="""
185 Do not continue interrupted uploads from cached state.
186 """)
187
188 _group = run_opts.add_mutually_exclusive_group()
189 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
190                     help="""
191 Save upload state in a cache file for resuming (default).
192 """)
193 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
194                     help="""
195 Do not save upload state in a cache file for resuming.
196 """)
197
198 arg_parser = argparse.ArgumentParser(
199     description='Copy data from the local filesystem to Keep.',
200     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
201
202 def parse_arguments(arguments):
203     args = arg_parser.parse_args(arguments)
204
205     if len(args.paths) == 0:
206         args.paths = ['-']
207
208     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
209
210     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
211         if args.filename:
212             arg_parser.error("""
213     --filename argument cannot be used when storing a directory or
214     multiple files.
215     """)
216
217     # Turn on --progress by default if stderr is a tty.
218     if (not (args.batch_progress or args.no_progress)
219         and os.isatty(sys.stderr.fileno())):
220         args.progress = True
221
222     # Turn off --resume (default) if --no-cache is used.
223     if not args.use_cache:
224         args.resume = False
225
226     if args.paths == ['-']:
227         if args.update_collection:
228             arg_parser.error("""
229     --update-collection cannot be used when reading from stdin.
230     """)
231         args.resume = False
232         args.use_cache = False
233         if not args.filename:
234             args.filename = 'stdin'
235
236     return args
237
238
239 class CollectionUpdateError(Exception):
240     pass
241
242
243 class ResumeCacheConflict(Exception):
244     pass
245
246
247 class ArvPutArgumentConflict(Exception):
248     pass
249
250
251 class ArvPutUploadIsPending(Exception):
252     pass
253
254
255 class ArvPutUploadNotPending(Exception):
256     pass
257
258
259 class FileUploadList(list):
260     def __init__(self, dry_run=False):
261         list.__init__(self)
262         self.dry_run = dry_run
263
264     def append(self, other):
265         if self.dry_run:
266             raise ArvPutUploadIsPending()
267         super(FileUploadList, self).append(other)
268
269
270 class ResumeCache(object):
271     CACHE_DIR = '.cache/arvados/arv-put'
272
273     def __init__(self, file_spec):
274         self.cache_file = open(file_spec, 'a+')
275         self._lock_file(self.cache_file)
276         self.filename = self.cache_file.name
277
278     @classmethod
279     def make_path(cls, args):
280         md5 = hashlib.md5()
281         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
282         realpaths = sorted(os.path.realpath(path) for path in args.paths)
283         md5.update('\0'.join(realpaths))
284         if any(os.path.isdir(path) for path in realpaths):
285             md5.update("-1")
286         elif args.filename:
287             md5.update(args.filename)
288         return os.path.join(
289             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
290             md5.hexdigest())
291
292     def _lock_file(self, fileobj):
293         try:
294             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
295         except IOError:
296             raise ResumeCacheConflict("{} locked".format(fileobj.name))
297
298     def load(self):
299         self.cache_file.seek(0)
300         return json.load(self.cache_file)
301
302     def check_cache(self, api_client=None, num_retries=0):
303         try:
304             state = self.load()
305             locator = None
306             try:
307                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
308                     locator = state["_finished_streams"][0][1][0]
309                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
310                     locator = state["_current_stream_locators"][0]
311                 if locator is not None:
312                     kc = arvados.keep.KeepClient(api_client=api_client)
313                     kc.head(locator, num_retries=num_retries)
314             except Exception as e:
315                 self.restart()
316         except (ValueError):
317             pass
318
319     def save(self, data):
320         try:
321             new_cache_fd, new_cache_name = tempfile.mkstemp(
322                 dir=os.path.dirname(self.filename))
323             self._lock_file(new_cache_fd)
324             new_cache = os.fdopen(new_cache_fd, 'r+')
325             json.dump(data, new_cache)
326             os.rename(new_cache_name, self.filename)
327         except (IOError, OSError, ResumeCacheConflict) as error:
328             try:
329                 os.unlink(new_cache_name)
330             except NameError:  # mkstemp failed.
331                 pass
332         else:
333             self.cache_file.close()
334             self.cache_file = new_cache
335
336     def close(self):
337         self.cache_file.close()
338
339     def destroy(self):
340         try:
341             os.unlink(self.filename)
342         except OSError as error:
343             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
344                 raise
345         self.close()
346
347     def restart(self):
348         self.destroy()
349         self.__init__(self.filename)
350
351
352 class ArvPutUploadJob(object):
353     CACHE_DIR = '.cache/arvados/arv-put'
354     EMPTY_STATE = {
355         'manifest' : None, # Last saved manifest checkpoint
356         'files' : {} # Previous run file list: {path : {size, mtime}}
357     }
358
359     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
360                  bytes_expected=None, name=None, owner_uuid=None,
361                  ensure_unique_name=False, num_retries=None,
362                  put_threads=None, replication_desired=None,
363                  filename=None, update_time=60.0, update_collection=None,
364                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
365         self.paths = paths
366         self.resume = resume
367         self.use_cache = use_cache
368         self.update = False
369         self.reporter = reporter
370         self.bytes_expected = bytes_expected
371         self.bytes_written = 0
372         self.bytes_skipped = 0
373         self.name = name
374         self.owner_uuid = owner_uuid
375         self.ensure_unique_name = ensure_unique_name
376         self.num_retries = num_retries
377         self.replication_desired = replication_desired
378         self.put_threads = put_threads
379         self.filename = filename
380         self._state_lock = threading.Lock()
381         self._state = None # Previous run state (file list & manifest)
382         self._current_files = [] # Current run file list
383         self._cache_file = None
384         self._collection_lock = threading.Lock()
385         self._remote_collection = None # Collection being updated (if asked)
386         self._local_collection = None # Collection from previous run manifest
387         self._file_paths = set() # Files to be updated in remote collection
388         self._stop_checkpointer = threading.Event()
389         self._checkpointer = threading.Thread(target=self._update_task)
390         self._checkpointer.daemon = True
391         self._update_task_time = update_time  # How many seconds wait between update runs
392         self._files_to_upload = FileUploadList(dry_run=dry_run)
393         self._upload_started = False
394         self.logger = logger
395         self.dry_run = dry_run
396         self._checkpoint_before_quit = True
397
398         if not self.use_cache and self.resume:
399             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
400
401         # Check for obvious dry-run responses
402         if self.dry_run and (not self.use_cache or not self.resume):
403             raise ArvPutUploadIsPending()
404
405         # Load cached data if any and if needed
406         self._setup_state(update_collection)
407
408     def start(self, save_collection):
409         """
410         Start supporting thread & file uploading
411         """
412         if not self.dry_run:
413             self._checkpointer.start()
414         try:
415             for path in self.paths:
416                 # Test for stdin first, in case some file named '-' exist
417                 if path == '-':
418                     if self.dry_run:
419                         raise ArvPutUploadIsPending()
420                     self._write_stdin(self.filename or 'stdin')
421                 elif os.path.isdir(path):
422                     # Use absolute paths on cache index so CWD doesn't interfere
423                     # with the caching logic.
424                     prefixdir = path = os.path.abspath(path)
425                     if prefixdir != '/':
426                         prefixdir += '/'
427                     for root, dirs, files in os.walk(path):
428                         # Make os.walk()'s dir traversing order deterministic
429                         dirs.sort()
430                         files.sort()
431                         for f in files:
432                             self._check_file(os.path.join(root, f),
433                                              os.path.join(root[len(prefixdir):], f))
434                 else:
435                     self._check_file(os.path.abspath(path),
436                                      self.filename or os.path.basename(path))
437             # If dry-mode is on, and got up to this point, then we should notify that
438             # there aren't any file to upload.
439             if self.dry_run:
440                 raise ArvPutUploadNotPending()
441             # Remove local_collection's files that don't exist locally anymore, so the
442             # bytes_written count is correct.
443             for f in self.collection_file_paths(self._local_collection,
444                                                 path_prefix=""):
445                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
446                     self._local_collection.remove(f)
447             # Update bytes_written from current local collection and
448             # report initial progress.
449             self._update()
450             # Actual file upload
451             self._upload_started = True # Used by the update thread to start checkpointing
452             self._upload_files()
453         except (SystemExit, Exception) as e:
454             self._checkpoint_before_quit = False
455             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
456             # Note: We're expecting SystemExit instead of KeyboardInterrupt because
457             #   we have a custom signal handler in place that raises SystemExit with
458             #   the catched signal's code.
459             if not isinstance(e, SystemExit) or e.code != -2:
460                 self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
461             raise
462         finally:
463             if not self.dry_run:
464                 # Stop the thread before doing anything else
465                 self._stop_checkpointer.set()
466                 self._checkpointer.join()
467                 if self._checkpoint_before_quit:
468                     # Commit all pending blocks & one last _update()
469                     self._local_collection.manifest_text()
470                     self._update(final=True)
471                     if save_collection:
472                         self.save_collection()
473             if self.use_cache:
474                 self._cache_file.close()
475
476     def save_collection(self):
477         if self.update:
478             # Check if files should be updated on the remote collection.
479             for fp in self._file_paths:
480                 remote_file = self._remote_collection.find(fp)
481                 if not remote_file:
482                     # File don't exist on remote collection, copy it.
483                     self._remote_collection.copy(fp, fp, self._local_collection)
484                 elif remote_file != self._local_collection.find(fp):
485                     # A different file exist on remote collection, overwrite it.
486                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
487                 else:
488                     # The file already exist on remote collection, skip it.
489                     pass
490             self._remote_collection.save(num_retries=self.num_retries)
491         else:
492             self._local_collection.save_new(
493                 name=self.name, owner_uuid=self.owner_uuid,
494                 ensure_unique_name=self.ensure_unique_name,
495                 num_retries=self.num_retries)
496
497     def destroy_cache(self):
498         if self.use_cache:
499             try:
500                 os.unlink(self._cache_filename)
501             except OSError as error:
502                 # That's what we wanted anyway.
503                 if error.errno != errno.ENOENT:
504                     raise
505             self._cache_file.close()
506
507     def _collection_size(self, collection):
508         """
509         Recursively get the total size of the collection
510         """
511         size = 0
512         for item in collection.values():
513             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
514                 size += self._collection_size(item)
515             else:
516                 size += item.size()
517         return size
518
519     def _update_task(self):
520         """
521         Periodically called support task. File uploading is
522         asynchronous so we poll status from the collection.
523         """
524         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
525             self._update()
526
527     def _update(self, final=False):
528         """
529         Update cached manifest text and report progress.
530         """
531         if self._upload_started:
532             with self._collection_lock:
533                 self.bytes_written = self._collection_size(self._local_collection)
534                 if self.use_cache:
535                     if final:
536                         manifest = self._local_collection.manifest_text()
537                     else:
538                         # Get the manifest text without comitting pending blocks
539                         manifest = self._local_collection.manifest_text(strip=False,
540                                                                         normalize=False,
541                                                                         only_committed=True)
542                     # Update cache
543                     with self._state_lock:
544                         self._state['manifest'] = manifest
545             if self.use_cache:
546                 try:
547                     self._save_state()
548                 except Exception as e:
549                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
550         else:
551             self.bytes_written = self.bytes_skipped
552         # Call the reporter, if any
553         self.report_progress()
554
555     def report_progress(self):
556         if self.reporter is not None:
557             self.reporter(self.bytes_written, self.bytes_expected)
558
559     def _write_stdin(self, filename):
560         output = self._local_collection.open(filename, 'w')
561         self._write(sys.stdin, output)
562         output.close()
563
564     def _check_file(self, source, filename):
565         """Check if this file needs to be uploaded"""
566         resume_offset = 0
567         should_upload = False
568         new_file_in_cache = False
569         # Record file path for updating the remote collection before exiting
570         self._file_paths.add(filename)
571
572         with self._state_lock:
573             # If no previous cached data on this file, store it for an eventual
574             # repeated run.
575             if source not in self._state['files']:
576                 self._state['files'][source] = {
577                     'mtime': os.path.getmtime(source),
578                     'size' : os.path.getsize(source)
579                 }
580                 new_file_in_cache = True
581             cached_file_data = self._state['files'][source]
582
583         # Check if file was already uploaded (at least partially)
584         file_in_local_collection = self._local_collection.find(filename)
585
586         # If not resuming, upload the full file.
587         if not self.resume:
588             should_upload = True
589         # New file detected from last run, upload it.
590         elif new_file_in_cache:
591             should_upload = True
592         # Local file didn't change from last run.
593         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
594             if not file_in_local_collection:
595                 # File not uploaded yet, upload it completely
596                 should_upload = True
597             elif file_in_local_collection.permission_expired():
598                 # Permission token expired, re-upload file. This will change whenever
599                 # we have a API for refreshing tokens.
600                 should_upload = True
601                 self._local_collection.remove(filename)
602             elif cached_file_data['size'] == file_in_local_collection.size():
603                 # File already there, skip it.
604                 self.bytes_skipped += cached_file_data['size']
605             elif cached_file_data['size'] > file_in_local_collection.size():
606                 # File partially uploaded, resume!
607                 resume_offset = file_in_local_collection.size()
608                 self.bytes_skipped += resume_offset
609                 should_upload = True
610             else:
611                 # Inconsistent cache, re-upload the file
612                 should_upload = True
613                 self._local_collection.remove(filename)
614                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
615         # Local file differs from cached data, re-upload it.
616         else:
617             if file_in_local_collection:
618                 self._local_collection.remove(filename)
619             should_upload = True
620
621         if should_upload:
622             self._files_to_upload.append((source, resume_offset, filename))
623
624     def _upload_files(self):
625         for source, resume_offset, filename in self._files_to_upload:
626             with open(source, 'r') as source_fd:
627                 with self._state_lock:
628                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
629                     self._state['files'][source]['size'] = os.path.getsize(source)
630                 if resume_offset > 0:
631                     # Start upload where we left off
632                     output = self._local_collection.open(filename, 'a')
633                     source_fd.seek(resume_offset)
634                 else:
635                     # Start from scratch
636                     output = self._local_collection.open(filename, 'w')
637                 self._write(source_fd, output)
638                 output.close(flush=False)
639
640     def _write(self, source_fd, output):
641         while True:
642             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
643             if not data:
644                 break
645             output.write(data)
646
647     def _my_collection(self):
648         return self._remote_collection if self.update else self._local_collection
649
650     def _setup_state(self, update_collection):
651         """
652         Create a new cache file or load a previously existing one.
653         """
654         # Load an already existing collection for update
655         if update_collection and re.match(arvados.util.collection_uuid_pattern,
656                                           update_collection):
657             try:
658                 self._remote_collection = arvados.collection.Collection(update_collection)
659             except arvados.errors.ApiError as error:
660                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
661             else:
662                 self.update = True
663         elif update_collection:
664             # Collection locator provided, but unknown format
665             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
666
667         if self.use_cache:
668             # Set up cache file name from input paths.
669             md5 = hashlib.md5()
670             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
671             realpaths = sorted(os.path.realpath(path) for path in self.paths)
672             md5.update('\0'.join(realpaths))
673             if self.filename:
674                 md5.update(self.filename)
675             cache_filename = md5.hexdigest()
676             cache_filepath = os.path.join(
677                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
678                 cache_filename)
679             if self.resume and os.path.exists(cache_filepath):
680                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
681                 self._cache_file = open(cache_filepath, 'a+')
682             else:
683                 # --no-resume means start with a empty cache file.
684                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
685                 self._cache_file = open(cache_filepath, 'w+')
686             self._cache_filename = self._cache_file.name
687             self._lock_file(self._cache_file)
688             self._cache_file.seek(0)
689
690         with self._state_lock:
691             if self.use_cache:
692                 try:
693                     self._state = json.load(self._cache_file)
694                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
695                         # Cache at least partially incomplete, set up new cache
696                         self._state = copy.deepcopy(self.EMPTY_STATE)
697                 except ValueError:
698                     # Cache file empty, set up new cache
699                     self._state = copy.deepcopy(self.EMPTY_STATE)
700             else:
701                 self.logger.info("No cache usage requested for this run.")
702                 # No cache file, set empty state
703                 self._state = copy.deepcopy(self.EMPTY_STATE)
704             # Load the previous manifest so we can check if files were modified remotely.
705             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
706
707     def collection_file_paths(self, col, path_prefix='.'):
708         """Return a list of file paths by recursively go through the entire collection `col`"""
709         file_paths = []
710         for name, item in col.items():
711             if isinstance(item, arvados.arvfile.ArvadosFile):
712                 file_paths.append(os.path.join(path_prefix, name))
713             elif isinstance(item, arvados.collection.Subcollection):
714                 new_prefix = os.path.join(path_prefix, name)
715                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
716         return file_paths
717
718     def _lock_file(self, fileobj):
719         try:
720             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
721         except IOError:
722             raise ResumeCacheConflict("{} locked".format(fileobj.name))
723
724     def _save_state(self):
725         """
726         Atomically save current state into cache.
727         """
728         with self._state_lock:
729             # We're not using copy.deepcopy() here because it's a lot slower
730             # than json.dumps(), and we're already needing JSON format to be
731             # saved on disk.
732             state = json.dumps(self._state)
733         try:
734             new_cache = tempfile.NamedTemporaryFile(
735                 dir=os.path.dirname(self._cache_filename), delete=False)
736             self._lock_file(new_cache)
737             new_cache.write(state)
738             new_cache.flush()
739             os.fsync(new_cache)
740             os.rename(new_cache.name, self._cache_filename)
741         except (IOError, OSError, ResumeCacheConflict) as error:
742             self.logger.error("There was a problem while saving the cache file: {}".format(error))
743             try:
744                 os.unlink(new_cache_name)
745             except NameError:  # mkstemp failed.
746                 pass
747         else:
748             self._cache_file.close()
749             self._cache_file = new_cache
750
751     def collection_name(self):
752         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
753
754     def manifest_locator(self):
755         return self._my_collection().manifest_locator()
756
757     def portable_data_hash(self):
758         pdh = self._my_collection().portable_data_hash()
759         m = self._my_collection().stripped_manifest()
760         local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
761         if pdh != local_pdh:
762             logger.warning("\n".join([
763                 "arv-put: API server provided PDH differs from local manifest.",
764                 "         This should not happen; showing API server version."]))
765         return pdh
766
767     def manifest_text(self, stream_name=".", strip=False, normalize=False):
768         return self._my_collection().manifest_text(stream_name, strip, normalize)
769
770     def _datablocks_on_item(self, item):
771         """
772         Return a list of datablock locators, recursively navigating
773         through subcollections
774         """
775         if isinstance(item, arvados.arvfile.ArvadosFile):
776             if item.size() == 0:
777                 # Empty file locator
778                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
779             else:
780                 locators = []
781                 for segment in item.segments():
782                     loc = segment.locator
783                     locators.append(loc)
784                 return locators
785         elif isinstance(item, arvados.collection.Collection):
786             l = [self._datablocks_on_item(x) for x in item.values()]
787             # Fast list flattener method taken from:
788             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
789             return [loc for sublist in l for loc in sublist]
790         else:
791             return None
792
793     def data_locators(self):
794         with self._collection_lock:
795             # Make sure all datablocks are flushed before getting the locators
796             self._my_collection().manifest_text()
797             datablocks = self._datablocks_on_item(self._my_collection())
798         return datablocks
799
800
801 def expected_bytes_for(pathlist):
802     # Walk the given directory trees and stat files, adding up file sizes,
803     # so we can display progress as percent
804     bytesum = 0
805     for path in pathlist:
806         if os.path.isdir(path):
807             for filename in arvados.util.listdir_recursive(path):
808                 bytesum += os.path.getsize(os.path.join(path, filename))
809         elif not os.path.isfile(path):
810             return None
811         else:
812             bytesum += os.path.getsize(path)
813     return bytesum
814
815 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
816                                                             os.getpid())
817 def machine_progress(bytes_written, bytes_expected):
818     return _machine_format.format(
819         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
820
821 def human_progress(bytes_written, bytes_expected):
822     if bytes_expected:
823         return "\r{}M / {}M {:.1%} ".format(
824             bytes_written >> 20, bytes_expected >> 20,
825             float(bytes_written) / bytes_expected)
826     else:
827         return "\r{} ".format(bytes_written)
828
829 def progress_writer(progress_func, outfile=sys.stderr):
830     def write_progress(bytes_written, bytes_expected):
831         outfile.write(progress_func(bytes_written, bytes_expected))
832     return write_progress
833
834 def exit_signal_handler(sigcode, frame):
835     sys.exit(-sigcode)
836
837 def desired_project_uuid(api_client, project_uuid, num_retries):
838     if not project_uuid:
839         query = api_client.users().current()
840     elif arvados.util.user_uuid_pattern.match(project_uuid):
841         query = api_client.users().get(uuid=project_uuid)
842     elif arvados.util.group_uuid_pattern.match(project_uuid):
843         query = api_client.groups().get(uuid=project_uuid)
844     else:
845         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
846     return query.execute(num_retries=num_retries)['uuid']
847
848 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
849     global api_client
850
851     logger = logging.getLogger('arvados.arv_put')
852     logger.setLevel(logging.INFO)
853     args = parse_arguments(arguments)
854     status = 0
855     if api_client is None:
856         api_client = arvados.api('v1')
857
858     # Determine the name to use
859     if args.name:
860         if args.stream or args.raw:
861             logger.error("Cannot use --name with --stream or --raw")
862             sys.exit(1)
863         elif args.update_collection:
864             logger.error("Cannot use --name with --update-collection")
865             sys.exit(1)
866         collection_name = args.name
867     else:
868         collection_name = "Saved at {} by {}@{}".format(
869             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
870             pwd.getpwuid(os.getuid()).pw_name,
871             socket.gethostname())
872
873     if args.project_uuid and (args.stream or args.raw):
874         logger.error("Cannot use --project-uuid with --stream or --raw")
875         sys.exit(1)
876
877     # Determine the parent project
878     try:
879         project_uuid = desired_project_uuid(api_client, args.project_uuid,
880                                             args.retries)
881     except (apiclient_errors.Error, ValueError) as error:
882         logger.error(error)
883         sys.exit(1)
884
885     if args.progress:
886         reporter = progress_writer(human_progress)
887     elif args.batch_progress:
888         reporter = progress_writer(machine_progress)
889     else:
890         reporter = None
891
892     # If this is used by a human, and there's at least one directory to be
893     # uploaded, the expected bytes calculation can take a moment.
894     if args.progress and any([os.path.isdir(f) for f in args.paths]):
895         logger.info("Calculating upload size, this could take some time...")
896     bytes_expected = expected_bytes_for(args.paths)
897
898     try:
899         writer = ArvPutUploadJob(paths = args.paths,
900                                  resume = args.resume,
901                                  use_cache = args.use_cache,
902                                  filename = args.filename,
903                                  reporter = reporter,
904                                  bytes_expected = bytes_expected,
905                                  num_retries = args.retries,
906                                  replication_desired = args.replication,
907                                  put_threads = args.threads,
908                                  name = collection_name,
909                                  owner_uuid = project_uuid,
910                                  ensure_unique_name = True,
911                                  update_collection = args.update_collection,
912                                  logger=logger,
913                                  dry_run=args.dry_run)
914     except ResumeCacheConflict:
915         logger.error("\n".join([
916             "arv-put: Another process is already uploading this data.",
917             "         Use --no-cache if this is really what you want."]))
918         sys.exit(1)
919     except CollectionUpdateError as error:
920         logger.error("\n".join([
921             "arv-put: %s" % str(error)]))
922         sys.exit(1)
923     except ArvPutUploadIsPending:
924         # Dry run check successful, return proper exit code.
925         sys.exit(2)
926     except ArvPutUploadNotPending:
927         # No files pending for upload
928         sys.exit(0)
929
930     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
931     # the originals.
932     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
933                             for sigcode in CAUGHT_SIGNALS}
934
935     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
936         logger.warning("\n".join([
937             "arv-put: Resuming previous upload from last checkpoint.",
938             "         Use the --no-resume option to start over."]))
939
940     if not args.dry_run:
941         writer.report_progress()
942     output = None
943     try:
944         writer.start(save_collection=not(args.stream or args.raw))
945     except arvados.errors.ApiError as error:
946         logger.error("\n".join([
947             "arv-put: %s" % str(error)]))
948         sys.exit(1)
949     except ArvPutUploadIsPending:
950         # Dry run check successful, return proper exit code.
951         sys.exit(2)
952     except ArvPutUploadNotPending:
953         # No files pending for upload
954         sys.exit(0)
955
956     if args.progress:  # Print newline to split stderr from stdout for humans.
957         logger.info("\n")
958
959     if args.stream:
960         if args.normalize:
961             output = writer.manifest_text(normalize=True)
962         else:
963             output = writer.manifest_text()
964     elif args.raw:
965         output = ','.join(writer.data_locators())
966     else:
967         try:
968             if args.update_collection:
969                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
970             else:
971                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
972             if args.portable_data_hash:
973                 output = writer.portable_data_hash()
974             else:
975                 output = writer.manifest_locator()
976         except apiclient_errors.Error as error:
977             logger.error(
978                 "arv-put: Error creating Collection on project: {}.".format(
979                     error))
980             status = 1
981
982     # Print the locator (uuid) of the new collection.
983     if output is None:
984         status = status or 1
985     else:
986         stdout.write(output)
987         if not output.endswith('\n'):
988             stdout.write('\n')
989
990     for sigcode, orig_handler in orig_signal_handlers.items():
991         signal.signal(sigcode, orig_handler)
992
993     if status != 0:
994         sys.exit(status)
995
996     # Success!
997     return output
998
999
1000 if __name__ == '__main__':
1001     main()