Merge branch '11185-wb-disable-reuse'
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 import traceback
27
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
30
31 import arvados.commands._util as arv_cmd
32
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
34 api_client = None
35
36 upload_opts = argparse.ArgumentParser(add_help=False)
37
38 upload_opts.add_argument('--version', action='version',
39                          version="%s %s" % (sys.argv[0], __version__),
40                          help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
42                          help="""
43 Local file or directory. Default: read from standard input.
44 """)
45
46 _group = upload_opts.add_mutually_exclusive_group()
47
48 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
49                     default=-1, help=argparse.SUPPRESS)
50
51 _group.add_argument('--normalize', action='store_true',
52                     help="""
53 Normalize the manifest by re-ordering files and streams after writing
54 data.
55 """)
56
57 _group.add_argument('--dry-run', action='store_true', default=False,
58                     help="""
59 Don't actually upload files, but only check if any file should be
60 uploaded. Exit with code=2 when files are pending for upload.
61 """)
62
63 _group = upload_opts.add_mutually_exclusive_group()
64
65 _group.add_argument('--as-stream', action='store_true', dest='stream',
66                     help="""
67 Synonym for --stream.
68 """)
69
70 _group.add_argument('--stream', action='store_true',
71                     help="""
72 Store the file content and display the resulting manifest on
73 stdout. Do not write the manifest to Keep or save a Collection object
74 in Arvados.
75 """)
76
77 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
78                     help="""
79 Synonym for --manifest.
80 """)
81
82 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
83                     help="""
84 Synonym for --manifest.
85 """)
86
87 _group.add_argument('--manifest', action='store_true',
88                     help="""
89 Store the file data and resulting manifest in Keep, save a Collection
90 object in Arvados, and display the manifest locator (Collection uuid)
91 on stdout. This is the default behavior.
92 """)
93
94 _group.add_argument('--as-raw', action='store_true', dest='raw',
95                     help="""
96 Synonym for --raw.
97 """)
98
99 _group.add_argument('--raw', action='store_true',
100                     help="""
101 Store the file content and display the data block locators on stdout,
102 separated by commas, with a trailing newline. Do not store a
103 manifest.
104 """)
105
106 upload_opts.add_argument('--update-collection', type=str, default=None,
107                          dest='update_collection', metavar="UUID", help="""
108 Update an existing collection identified by the given Arvados collection
109 UUID. All new local files will be uploaded.
110 """)
111
112 upload_opts.add_argument('--use-filename', type=str, default=None,
113                          dest='filename', help="""
114 Synonym for --filename.
115 """)
116
117 upload_opts.add_argument('--filename', type=str, default=None,
118                          help="""
119 Use the given filename in the manifest, instead of the name of the
120 local file. This is useful when "-" or "/dev/stdin" is given as an
121 input file. It can be used only if there is exactly one path given and
122 it is not a directory. Implies --manifest.
123 """)
124
125 upload_opts.add_argument('--portable-data-hash', action='store_true',
126                          help="""
127 Print the portable data hash instead of the Arvados UUID for the collection
128 created by the upload.
129 """)
130
131 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
132                          help="""
133 Set the replication level for the new collection: how many different
134 physical storage devices (e.g., disks) should have a copy of each data
135 block. Default is to use the server-provided default (if any) or 2.
136 """)
137
138 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
139                          help="""
140 Set the number of upload threads to be used. Take into account that
141 using lots of threads will increase the RAM requirements. Default is
142 to use 2 threads.
143 On high latency installations, using a greater number will improve
144 overall throughput.
145 """)
146
147 run_opts = argparse.ArgumentParser(add_help=False)
148
149 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
150 Store the collection in the specified project, instead of your Home
151 project.
152 """)
153
154 run_opts.add_argument('--name', help="""
155 Save the collection with the specified name.
156 """)
157
158 _group = run_opts.add_mutually_exclusive_group()
159 _group.add_argument('--progress', action='store_true',
160                     help="""
161 Display human-readable progress on stderr (bytes and, if possible,
162 percentage of total data size). This is the default behavior when
163 stderr is a tty.
164 """)
165
166 _group.add_argument('--no-progress', action='store_true',
167                     help="""
168 Do not display human-readable progress on stderr, even if stderr is a
169 tty.
170 """)
171
172 _group.add_argument('--batch-progress', action='store_true',
173                     help="""
174 Display machine-readable progress on stderr (bytes and, if known,
175 total data size).
176 """)
177
178 _group = run_opts.add_mutually_exclusive_group()
179 _group.add_argument('--resume', action='store_true', default=True,
180                     help="""
181 Continue interrupted uploads from cached state (default).
182 """)
183 _group.add_argument('--no-resume', action='store_false', dest='resume',
184                     help="""
185 Do not continue interrupted uploads from cached state.
186 """)
187
188 _group = run_opts.add_mutually_exclusive_group()
189 _group.add_argument('--follow-links', action='store_true', default=True,
190                     dest='follow_links', help="""
191 Follow file and directory symlinks (default).
192 """)
193 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
194                     help="""
195 Do not follow file and directory symlinks.
196 """)
197
198 _group = run_opts.add_mutually_exclusive_group()
199 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
200                     help="""
201 Save upload state in a cache file for resuming (default).
202 """)
203 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
204                     help="""
205 Do not save upload state in a cache file for resuming.
206 """)
207
208 arg_parser = argparse.ArgumentParser(
209     description='Copy data from the local filesystem to Keep.',
210     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
211
212 def parse_arguments(arguments):
213     args = arg_parser.parse_args(arguments)
214
215     if len(args.paths) == 0:
216         args.paths = ['-']
217
218     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
219
220     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
221         if args.filename:
222             arg_parser.error("""
223     --filename argument cannot be used when storing a directory or
224     multiple files.
225     """)
226
227     # Turn on --progress by default if stderr is a tty.
228     if (not (args.batch_progress or args.no_progress)
229         and os.isatty(sys.stderr.fileno())):
230         args.progress = True
231
232     # Turn off --resume (default) if --no-cache is used.
233     if not args.use_cache:
234         args.resume = False
235
236     if args.paths == ['-']:
237         if args.update_collection:
238             arg_parser.error("""
239     --update-collection cannot be used when reading from stdin.
240     """)
241         args.resume = False
242         args.use_cache = False
243         if not args.filename:
244             args.filename = 'stdin'
245
246     return args
247
248
249 class PathDoesNotExistError(Exception):
250     pass
251
252
253 class CollectionUpdateError(Exception):
254     pass
255
256
257 class ResumeCacheConflict(Exception):
258     pass
259
260
261 class ArvPutArgumentConflict(Exception):
262     pass
263
264
265 class ArvPutUploadIsPending(Exception):
266     pass
267
268
269 class ArvPutUploadNotPending(Exception):
270     pass
271
272
273 class FileUploadList(list):
274     def __init__(self, dry_run=False):
275         list.__init__(self)
276         self.dry_run = dry_run
277
278     def append(self, other):
279         if self.dry_run:
280             raise ArvPutUploadIsPending()
281         super(FileUploadList, self).append(other)
282
283
284 class ResumeCache(object):
285     CACHE_DIR = '.cache/arvados/arv-put'
286
287     def __init__(self, file_spec):
288         self.cache_file = open(file_spec, 'a+')
289         self._lock_file(self.cache_file)
290         self.filename = self.cache_file.name
291
292     @classmethod
293     def make_path(cls, args):
294         md5 = hashlib.md5()
295         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
296         realpaths = sorted(os.path.realpath(path) for path in args.paths)
297         md5.update('\0'.join(realpaths))
298         if any(os.path.isdir(path) for path in realpaths):
299             md5.update("-1")
300         elif args.filename:
301             md5.update(args.filename)
302         return os.path.join(
303             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
304             md5.hexdigest())
305
306     def _lock_file(self, fileobj):
307         try:
308             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
309         except IOError:
310             raise ResumeCacheConflict("{} locked".format(fileobj.name))
311
312     def load(self):
313         self.cache_file.seek(0)
314         return json.load(self.cache_file)
315
316     def check_cache(self, api_client=None, num_retries=0):
317         try:
318             state = self.load()
319             locator = None
320             try:
321                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
322                     locator = state["_finished_streams"][0][1][0]
323                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
324                     locator = state["_current_stream_locators"][0]
325                 if locator is not None:
326                     kc = arvados.keep.KeepClient(api_client=api_client)
327                     kc.head(locator, num_retries=num_retries)
328             except Exception as e:
329                 self.restart()
330         except (ValueError):
331             pass
332
333     def save(self, data):
334         try:
335             new_cache_fd, new_cache_name = tempfile.mkstemp(
336                 dir=os.path.dirname(self.filename))
337             self._lock_file(new_cache_fd)
338             new_cache = os.fdopen(new_cache_fd, 'r+')
339             json.dump(data, new_cache)
340             os.rename(new_cache_name, self.filename)
341         except (IOError, OSError, ResumeCacheConflict) as error:
342             try:
343                 os.unlink(new_cache_name)
344             except NameError:  # mkstemp failed.
345                 pass
346         else:
347             self.cache_file.close()
348             self.cache_file = new_cache
349
350     def close(self):
351         self.cache_file.close()
352
353     def destroy(self):
354         try:
355             os.unlink(self.filename)
356         except OSError as error:
357             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
358                 raise
359         self.close()
360
361     def restart(self):
362         self.destroy()
363         self.__init__(self.filename)
364
365
366 class ArvPutUploadJob(object):
367     CACHE_DIR = '.cache/arvados/arv-put'
368     EMPTY_STATE = {
369         'manifest' : None, # Last saved manifest checkpoint
370         'files' : {} # Previous run file list: {path : {size, mtime}}
371     }
372
373     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
374                  bytes_expected=None, name=None, owner_uuid=None,
375                  ensure_unique_name=False, num_retries=None,
376                  put_threads=None, replication_desired=None,
377                  filename=None, update_time=60.0, update_collection=None,
378                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
379                  follow_links=True):
380         self.paths = paths
381         self.resume = resume
382         self.use_cache = use_cache
383         self.update = False
384         self.reporter = reporter
385         self.bytes_expected = bytes_expected
386         self.bytes_written = 0
387         self.bytes_skipped = 0
388         self.name = name
389         self.owner_uuid = owner_uuid
390         self.ensure_unique_name = ensure_unique_name
391         self.num_retries = num_retries
392         self.replication_desired = replication_desired
393         self.put_threads = put_threads
394         self.filename = filename
395         self._state_lock = threading.Lock()
396         self._state = None # Previous run state (file list & manifest)
397         self._current_files = [] # Current run file list
398         self._cache_file = None
399         self._collection_lock = threading.Lock()
400         self._remote_collection = None # Collection being updated (if asked)
401         self._local_collection = None # Collection from previous run manifest
402         self._file_paths = set() # Files to be updated in remote collection
403         self._stop_checkpointer = threading.Event()
404         self._checkpointer = threading.Thread(target=self._update_task)
405         self._checkpointer.daemon = True
406         self._update_task_time = update_time  # How many seconds wait between update runs
407         self._files_to_upload = FileUploadList(dry_run=dry_run)
408         self._upload_started = False
409         self.logger = logger
410         self.dry_run = dry_run
411         self._checkpoint_before_quit = True
412         self.follow_links = follow_links
413
414         if not self.use_cache and self.resume:
415             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
416
417         # Check for obvious dry-run responses
418         if self.dry_run and (not self.use_cache or not self.resume):
419             raise ArvPutUploadIsPending()
420
421         # Load cached data if any and if needed
422         self._setup_state(update_collection)
423
424     def start(self, save_collection):
425         """
426         Start supporting thread & file uploading
427         """
428         if not self.dry_run:
429             self._checkpointer.start()
430         try:
431             for path in self.paths:
432                 # Test for stdin first, in case some file named '-' exist
433                 if path == '-':
434                     if self.dry_run:
435                         raise ArvPutUploadIsPending()
436                     self._write_stdin(self.filename or 'stdin')
437                 elif not os.path.exists(path):
438                      raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
439                 elif os.path.isdir(path):
440                     # Use absolute paths on cache index so CWD doesn't interfere
441                     # with the caching logic.
442                     prefixdir = path = os.path.abspath(path)
443                     if prefixdir != '/':
444                         prefixdir += '/'
445                     for root, dirs, files in os.walk(path, followlinks=self.follow_links):
446                         # Make os.walk()'s dir traversing order deterministic
447                         dirs.sort()
448                         files.sort()
449                         for f in files:
450                             self._check_file(os.path.join(root, f),
451                                              os.path.join(root[len(prefixdir):], f))
452                 else:
453                     self._check_file(os.path.abspath(path),
454                                      self.filename or os.path.basename(path))
455             # If dry-mode is on, and got up to this point, then we should notify that
456             # there aren't any file to upload.
457             if self.dry_run:
458                 raise ArvPutUploadNotPending()
459             # Remove local_collection's files that don't exist locally anymore, so the
460             # bytes_written count is correct.
461             for f in self.collection_file_paths(self._local_collection,
462                                                 path_prefix=""):
463                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
464                     self._local_collection.remove(f)
465             # Update bytes_written from current local collection and
466             # report initial progress.
467             self._update()
468             # Actual file upload
469             self._upload_started = True # Used by the update thread to start checkpointing
470             self._upload_files()
471         except (SystemExit, Exception) as e:
472             self._checkpoint_before_quit = False
473             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
474             # Note: We're expecting SystemExit instead of KeyboardInterrupt because
475             #   we have a custom signal handler in place that raises SystemExit with
476             #   the catched signal's code.
477             if isinstance(e, PathDoesNotExistError):
478                 # We aren't interested in the traceback for this case
479                 pass
480             elif not isinstance(e, SystemExit) or e.code != -2:
481                 self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
482             raise
483         finally:
484             if not self.dry_run:
485                 # Stop the thread before doing anything else
486                 self._stop_checkpointer.set()
487                 self._checkpointer.join()
488                 if self._checkpoint_before_quit:
489                     # Commit all pending blocks & one last _update()
490                     self._local_collection.manifest_text()
491                     self._update(final=True)
492                     if save_collection:
493                         self.save_collection()
494             if self.use_cache:
495                 self._cache_file.close()
496
497     def save_collection(self):
498         if self.update:
499             # Check if files should be updated on the remote collection.
500             for fp in self._file_paths:
501                 remote_file = self._remote_collection.find(fp)
502                 if not remote_file:
503                     # File don't exist on remote collection, copy it.
504                     self._remote_collection.copy(fp, fp, self._local_collection)
505                 elif remote_file != self._local_collection.find(fp):
506                     # A different file exist on remote collection, overwrite it.
507                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
508                 else:
509                     # The file already exist on remote collection, skip it.
510                     pass
511             self._remote_collection.save(num_retries=self.num_retries)
512         else:
513             self._local_collection.save_new(
514                 name=self.name, owner_uuid=self.owner_uuid,
515                 ensure_unique_name=self.ensure_unique_name,
516                 num_retries=self.num_retries)
517
518     def destroy_cache(self):
519         if self.use_cache:
520             try:
521                 os.unlink(self._cache_filename)
522             except OSError as error:
523                 # That's what we wanted anyway.
524                 if error.errno != errno.ENOENT:
525                     raise
526             self._cache_file.close()
527
528     def _collection_size(self, collection):
529         """
530         Recursively get the total size of the collection
531         """
532         size = 0
533         for item in collection.values():
534             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
535                 size += self._collection_size(item)
536             else:
537                 size += item.size()
538         return size
539
540     def _update_task(self):
541         """
542         Periodically called support task. File uploading is
543         asynchronous so we poll status from the collection.
544         """
545         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
546             self._update()
547
548     def _update(self, final=False):
549         """
550         Update cached manifest text and report progress.
551         """
552         if self._upload_started:
553             with self._collection_lock:
554                 self.bytes_written = self._collection_size(self._local_collection)
555                 if self.use_cache:
556                     if final:
557                         manifest = self._local_collection.manifest_text()
558                     else:
559                         # Get the manifest text without comitting pending blocks
560                         manifest = self._local_collection.manifest_text(strip=False,
561                                                                         normalize=False,
562                                                                         only_committed=True)
563                     # Update cache
564                     with self._state_lock:
565                         self._state['manifest'] = manifest
566             if self.use_cache:
567                 try:
568                     self._save_state()
569                 except Exception as e:
570                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
571         else:
572             self.bytes_written = self.bytes_skipped
573         # Call the reporter, if any
574         self.report_progress()
575
576     def report_progress(self):
577         if self.reporter is not None:
578             self.reporter(self.bytes_written, self.bytes_expected)
579
580     def _write_stdin(self, filename):
581         output = self._local_collection.open(filename, 'w')
582         self._write(sys.stdin, output)
583         output.close()
584
585     def _check_file(self, source, filename):
586         """
587         Check if this file needs to be uploaded
588         """
589         # Ignore symlinks when requested
590         if (not self.follow_links) and os.path.islink(source):
591             return
592         resume_offset = 0
593         should_upload = False
594         new_file_in_cache = False
595         # Record file path for updating the remote collection before exiting
596         self._file_paths.add(filename)
597
598         with self._state_lock:
599             # If no previous cached data on this file, store it for an eventual
600             # repeated run.
601             if source not in self._state['files']:
602                 self._state['files'][source] = {
603                     'mtime': os.path.getmtime(source),
604                     'size' : os.path.getsize(source)
605                 }
606                 new_file_in_cache = True
607             cached_file_data = self._state['files'][source]
608
609         # Check if file was already uploaded (at least partially)
610         file_in_local_collection = self._local_collection.find(filename)
611
612         # If not resuming, upload the full file.
613         if not self.resume:
614             should_upload = True
615         # New file detected from last run, upload it.
616         elif new_file_in_cache:
617             should_upload = True
618         # Local file didn't change from last run.
619         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
620             if not file_in_local_collection:
621                 # File not uploaded yet, upload it completely
622                 should_upload = True
623             elif file_in_local_collection.permission_expired():
624                 # Permission token expired, re-upload file. This will change whenever
625                 # we have a API for refreshing tokens.
626                 should_upload = True
627                 self._local_collection.remove(filename)
628             elif cached_file_data['size'] == file_in_local_collection.size():
629                 # File already there, skip it.
630                 self.bytes_skipped += cached_file_data['size']
631             elif cached_file_data['size'] > file_in_local_collection.size():
632                 # File partially uploaded, resume!
633                 resume_offset = file_in_local_collection.size()
634                 self.bytes_skipped += resume_offset
635                 should_upload = True
636             else:
637                 # Inconsistent cache, re-upload the file
638                 should_upload = True
639                 self._local_collection.remove(filename)
640                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
641         # Local file differs from cached data, re-upload it.
642         else:
643             if file_in_local_collection:
644                 self._local_collection.remove(filename)
645             should_upload = True
646
647         if should_upload:
648             self._files_to_upload.append((source, resume_offset, filename))
649
650     def _upload_files(self):
651         for source, resume_offset, filename in self._files_to_upload:
652             with open(source, 'r') as source_fd:
653                 with self._state_lock:
654                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
655                     self._state['files'][source]['size'] = os.path.getsize(source)
656                 if resume_offset > 0:
657                     # Start upload where we left off
658                     output = self._local_collection.open(filename, 'a')
659                     source_fd.seek(resume_offset)
660                 else:
661                     # Start from scratch
662                     output = self._local_collection.open(filename, 'w')
663                 self._write(source_fd, output)
664                 output.close(flush=False)
665
666     def _write(self, source_fd, output):
667         while True:
668             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
669             if not data:
670                 break
671             output.write(data)
672
673     def _my_collection(self):
674         return self._remote_collection if self.update else self._local_collection
675
676     def _setup_state(self, update_collection):
677         """
678         Create a new cache file or load a previously existing one.
679         """
680         # Load an already existing collection for update
681         if update_collection and re.match(arvados.util.collection_uuid_pattern,
682                                           update_collection):
683             try:
684                 self._remote_collection = arvados.collection.Collection(update_collection)
685             except arvados.errors.ApiError as error:
686                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
687             else:
688                 self.update = True
689         elif update_collection:
690             # Collection locator provided, but unknown format
691             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
692
693         if self.use_cache:
694             # Set up cache file name from input paths.
695             md5 = hashlib.md5()
696             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
697             realpaths = sorted(os.path.realpath(path) for path in self.paths)
698             md5.update('\0'.join(realpaths))
699             if self.filename:
700                 md5.update(self.filename)
701             cache_filename = md5.hexdigest()
702             cache_filepath = os.path.join(
703                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
704                 cache_filename)
705             if self.resume and os.path.exists(cache_filepath):
706                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
707                 self._cache_file = open(cache_filepath, 'a+')
708             else:
709                 # --no-resume means start with a empty cache file.
710                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
711                 self._cache_file = open(cache_filepath, 'w+')
712             self._cache_filename = self._cache_file.name
713             self._lock_file(self._cache_file)
714             self._cache_file.seek(0)
715
716         with self._state_lock:
717             if self.use_cache:
718                 try:
719                     self._state = json.load(self._cache_file)
720                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
721                         # Cache at least partially incomplete, set up new cache
722                         self._state = copy.deepcopy(self.EMPTY_STATE)
723                 except ValueError:
724                     # Cache file empty, set up new cache
725                     self._state = copy.deepcopy(self.EMPTY_STATE)
726             else:
727                 self.logger.info("No cache usage requested for this run.")
728                 # No cache file, set empty state
729                 self._state = copy.deepcopy(self.EMPTY_STATE)
730             # Load the previous manifest so we can check if files were modified remotely.
731             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
732
733     def collection_file_paths(self, col, path_prefix='.'):
734         """Return a list of file paths by recursively go through the entire collection `col`"""
735         file_paths = []
736         for name, item in col.items():
737             if isinstance(item, arvados.arvfile.ArvadosFile):
738                 file_paths.append(os.path.join(path_prefix, name))
739             elif isinstance(item, arvados.collection.Subcollection):
740                 new_prefix = os.path.join(path_prefix, name)
741                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
742         return file_paths
743
744     def _lock_file(self, fileobj):
745         try:
746             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
747         except IOError:
748             raise ResumeCacheConflict("{} locked".format(fileobj.name))
749
750     def _save_state(self):
751         """
752         Atomically save current state into cache.
753         """
754         with self._state_lock:
755             # We're not using copy.deepcopy() here because it's a lot slower
756             # than json.dumps(), and we're already needing JSON format to be
757             # saved on disk.
758             state = json.dumps(self._state)
759         try:
760             new_cache = tempfile.NamedTemporaryFile(
761                 dir=os.path.dirname(self._cache_filename), delete=False)
762             self._lock_file(new_cache)
763             new_cache.write(state)
764             new_cache.flush()
765             os.fsync(new_cache)
766             os.rename(new_cache.name, self._cache_filename)
767         except (IOError, OSError, ResumeCacheConflict) as error:
768             self.logger.error("There was a problem while saving the cache file: {}".format(error))
769             try:
770                 os.unlink(new_cache_name)
771             except NameError:  # mkstemp failed.
772                 pass
773         else:
774             self._cache_file.close()
775             self._cache_file = new_cache
776
777     def collection_name(self):
778         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
779
780     def manifest_locator(self):
781         return self._my_collection().manifest_locator()
782
783     def portable_data_hash(self):
784         pdh = self._my_collection().portable_data_hash()
785         m = self._my_collection().stripped_manifest()
786         local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
787         if pdh != local_pdh:
788             logger.warning("\n".join([
789                 "arv-put: API server provided PDH differs from local manifest.",
790                 "         This should not happen; showing API server version."]))
791         return pdh
792
793     def manifest_text(self, stream_name=".", strip=False, normalize=False):
794         return self._my_collection().manifest_text(stream_name, strip, normalize)
795
796     def _datablocks_on_item(self, item):
797         """
798         Return a list of datablock locators, recursively navigating
799         through subcollections
800         """
801         if isinstance(item, arvados.arvfile.ArvadosFile):
802             if item.size() == 0:
803                 # Empty file locator
804                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
805             else:
806                 locators = []
807                 for segment in item.segments():
808                     loc = segment.locator
809                     locators.append(loc)
810                 return locators
811         elif isinstance(item, arvados.collection.Collection):
812             l = [self._datablocks_on_item(x) for x in item.values()]
813             # Fast list flattener method taken from:
814             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
815             return [loc for sublist in l for loc in sublist]
816         else:
817             return None
818
819     def data_locators(self):
820         with self._collection_lock:
821             # Make sure all datablocks are flushed before getting the locators
822             self._my_collection().manifest_text()
823             datablocks = self._datablocks_on_item(self._my_collection())
824         return datablocks
825
826
827 def expected_bytes_for(pathlist, follow_links=True):
828     # Walk the given directory trees and stat files, adding up file sizes,
829     # so we can display progress as percent
830     bytesum = 0
831     for path in pathlist:
832         if os.path.isdir(path):
833             for root, dirs, files in os.walk(path, followlinks=follow_links):
834                 # Sum file sizes
835                 for f in files:
836                     filepath = os.path.join(root, f)
837                     # Ignore symlinked files when requested
838                     if (not follow_links) and os.path.islink(filepath):
839                         continue
840                     bytesum += os.path.getsize(filepath)
841         elif not os.path.isfile(path):
842             return None
843         else:
844             bytesum += os.path.getsize(path)
845     return bytesum
846
847 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
848                                                             os.getpid())
849 def machine_progress(bytes_written, bytes_expected):
850     return _machine_format.format(
851         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
852
853 def human_progress(bytes_written, bytes_expected):
854     if bytes_expected:
855         return "\r{}M / {}M {:.1%} ".format(
856             bytes_written >> 20, bytes_expected >> 20,
857             float(bytes_written) / bytes_expected)
858     else:
859         return "\r{} ".format(bytes_written)
860
861 def progress_writer(progress_func, outfile=sys.stderr):
862     def write_progress(bytes_written, bytes_expected):
863         outfile.write(progress_func(bytes_written, bytes_expected))
864     return write_progress
865
866 def exit_signal_handler(sigcode, frame):
867     sys.exit(-sigcode)
868
869 def desired_project_uuid(api_client, project_uuid, num_retries):
870     if not project_uuid:
871         query = api_client.users().current()
872     elif arvados.util.user_uuid_pattern.match(project_uuid):
873         query = api_client.users().get(uuid=project_uuid)
874     elif arvados.util.group_uuid_pattern.match(project_uuid):
875         query = api_client.groups().get(uuid=project_uuid)
876     else:
877         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
878     return query.execute(num_retries=num_retries)['uuid']
879
880 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
881     global api_client
882
883     logger = logging.getLogger('arvados.arv_put')
884     logger.setLevel(logging.INFO)
885     args = parse_arguments(arguments)
886     status = 0
887     if api_client is None:
888         api_client = arvados.api('v1')
889
890     # Determine the name to use
891     if args.name:
892         if args.stream or args.raw:
893             logger.error("Cannot use --name with --stream or --raw")
894             sys.exit(1)
895         elif args.update_collection:
896             logger.error("Cannot use --name with --update-collection")
897             sys.exit(1)
898         collection_name = args.name
899     else:
900         collection_name = "Saved at {} by {}@{}".format(
901             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
902             pwd.getpwuid(os.getuid()).pw_name,
903             socket.gethostname())
904
905     if args.project_uuid and (args.stream or args.raw):
906         logger.error("Cannot use --project-uuid with --stream or --raw")
907         sys.exit(1)
908
909     # Determine the parent project
910     try:
911         project_uuid = desired_project_uuid(api_client, args.project_uuid,
912                                             args.retries)
913     except (apiclient_errors.Error, ValueError) as error:
914         logger.error(error)
915         sys.exit(1)
916
917     if args.progress:
918         reporter = progress_writer(human_progress)
919     elif args.batch_progress:
920         reporter = progress_writer(machine_progress)
921     else:
922         reporter = None
923
924     # If this is used by a human, and there's at least one directory to be
925     # uploaded, the expected bytes calculation can take a moment.
926     if args.progress and any([os.path.isdir(f) for f in args.paths]):
927         logger.info("Calculating upload size, this could take some time...")
928     bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
929
930     try:
931         writer = ArvPutUploadJob(paths = args.paths,
932                                  resume = args.resume,
933                                  use_cache = args.use_cache,
934                                  filename = args.filename,
935                                  reporter = reporter,
936                                  bytes_expected = bytes_expected,
937                                  num_retries = args.retries,
938                                  replication_desired = args.replication,
939                                  put_threads = args.threads,
940                                  name = collection_name,
941                                  owner_uuid = project_uuid,
942                                  ensure_unique_name = True,
943                                  update_collection = args.update_collection,
944                                  logger=logger,
945                                  dry_run=args.dry_run,
946                                  follow_links=args.follow_links)
947     except ResumeCacheConflict:
948         logger.error("\n".join([
949             "arv-put: Another process is already uploading this data.",
950             "         Use --no-cache if this is really what you want."]))
951         sys.exit(1)
952     except CollectionUpdateError as error:
953         logger.error("\n".join([
954             "arv-put: %s" % str(error)]))
955         sys.exit(1)
956     except ArvPutUploadIsPending:
957         # Dry run check successful, return proper exit code.
958         sys.exit(2)
959     except ArvPutUploadNotPending:
960         # No files pending for upload
961         sys.exit(0)
962
963     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
964     # the originals.
965     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
966                             for sigcode in CAUGHT_SIGNALS}
967
968     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
969         logger.warning("\n".join([
970             "arv-put: Resuming previous upload from last checkpoint.",
971             "         Use the --no-resume option to start over."]))
972
973     if not args.dry_run:
974         writer.report_progress()
975     output = None
976     try:
977         writer.start(save_collection=not(args.stream or args.raw))
978     except arvados.errors.ApiError as error:
979         logger.error("\n".join([
980             "arv-put: %s" % str(error)]))
981         sys.exit(1)
982     except ArvPutUploadIsPending:
983         # Dry run check successful, return proper exit code.
984         sys.exit(2)
985     except ArvPutUploadNotPending:
986         # No files pending for upload
987         sys.exit(0)
988     except PathDoesNotExistError as error:
989         logger.error("\n".join([
990             "arv-put: %s" % str(error)]))
991         sys.exit(1)
992
993     if args.progress:  # Print newline to split stderr from stdout for humans.
994         logger.info("\n")
995
996     if args.stream:
997         if args.normalize:
998             output = writer.manifest_text(normalize=True)
999         else:
1000             output = writer.manifest_text()
1001     elif args.raw:
1002         output = ','.join(writer.data_locators())
1003     else:
1004         try:
1005             if args.update_collection:
1006                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1007             else:
1008                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1009             if args.portable_data_hash:
1010                 output = writer.portable_data_hash()
1011             else:
1012                 output = writer.manifest_locator()
1013         except apiclient_errors.Error as error:
1014             logger.error(
1015                 "arv-put: Error creating Collection on project: {}.".format(
1016                     error))
1017             status = 1
1018
1019     # Print the locator (uuid) of the new collection.
1020     if output is None:
1021         status = status or 1
1022     else:
1023         stdout.write(output)
1024         if not output.endswith('\n'):
1025             stdout.write('\n')
1026
1027     for sigcode, orig_handler in orig_signal_handlers.items():
1028         signal.signal(sigcode, orig_handler)
1029
1030     if status != 0:
1031         sys.exit(status)
1032
1033     # Success!
1034     return output
1035
1036
1037 if __name__ == '__main__':
1038     main()