8937: Added cache validation method to arv-put. For now it only
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34
35 import arvados.commands._util as arv_cmd
36
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55                     default=-1, help=argparse.SUPPRESS)
56
57 _group.add_argument('--normalize', action='store_true',
58                     help="""
59 Normalize the manifest by re-ordering files and streams after writing
60 data.
61 """)
62
63 _group.add_argument('--dry-run', action='store_true', default=False,
64                     help="""
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
67 """)
68
69 _group = upload_opts.add_mutually_exclusive_group()
70
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
72                     help="""
73 Synonym for --stream.
74 """)
75
76 _group.add_argument('--stream', action='store_true',
77                     help="""
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
80 in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
145                          help="""
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
148 to use 2 threads.
149 On high latency installations, using a greater number will improve
150 overall throughput.
151 """)
152
153 run_opts = argparse.ArgumentParser(add_help=False)
154
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
157 project.
158 """)
159
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
162 """)
163
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165                       action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 For the special case of needing to exclude only files or dirs directly below
172 the given input directory, you can use a pattern like './exclude_this.gif'.
173 You can specify multiple patterns by using this argument more than once.
174 """)
175
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--progress', action='store_true',
178                     help="""
179 Display human-readable progress on stderr (bytes and, if possible,
180 percentage of total data size). This is the default behavior when
181 stderr is a tty.
182 """)
183
184 _group.add_argument('--no-progress', action='store_true',
185                     help="""
186 Do not display human-readable progress on stderr, even if stderr is a
187 tty.
188 """)
189
190 _group.add_argument('--batch-progress', action='store_true',
191                     help="""
192 Display machine-readable progress on stderr (bytes and, if known,
193 total data size).
194 """)
195
196 _group.add_argument('--silent', action='store_true',
197                     help="""
198 Do not print any debug messages to console. (Any error messages will still be displayed.)
199 """)
200
201 _group = run_opts.add_mutually_exclusive_group()
202 _group.add_argument('--resume', action='store_true', default=True,
203                     help="""
204 Continue interrupted uploads from cached state (default).
205 """)
206 _group.add_argument('--no-resume', action='store_false', dest='resume',
207                     help="""
208 Do not continue interrupted uploads from cached state.
209 """)
210
211 _group = run_opts.add_mutually_exclusive_group()
212 _group.add_argument('--follow-links', action='store_true', default=True,
213                     dest='follow_links', help="""
214 Follow file and directory symlinks (default).
215 """)
216 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
217                     help="""
218 Do not follow file and directory symlinks.
219 """)
220
221 _group = run_opts.add_mutually_exclusive_group()
222 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
223                     help="""
224 Save upload state in a cache file for resuming (default).
225 """)
226 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
227                     help="""
228 Do not save upload state in a cache file for resuming.
229 """)
230
231 arg_parser = argparse.ArgumentParser(
232     description='Copy data from the local filesystem to Keep.',
233     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
234
235 def parse_arguments(arguments):
236     args = arg_parser.parse_args(arguments)
237
238     if len(args.paths) == 0:
239         args.paths = ['-']
240
241     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
242
243     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
244         if args.filename:
245             arg_parser.error("""
246     --filename argument cannot be used when storing a directory or
247     multiple files.
248     """)
249
250     # Turn on --progress by default if stderr is a tty.
251     if (not (args.batch_progress or args.no_progress or args.silent)
252         and os.isatty(sys.stderr.fileno())):
253         args.progress = True
254
255     # Turn off --resume (default) if --no-cache is used.
256     if not args.use_cache:
257         args.resume = False
258
259     if args.paths == ['-']:
260         if args.update_collection:
261             arg_parser.error("""
262     --update-collection cannot be used when reading from stdin.
263     """)
264         args.resume = False
265         args.use_cache = False
266         if not args.filename:
267             args.filename = 'stdin'
268
269     # Remove possible duplicated patterns
270     if len(args.exclude) > 0:
271         args.exclude = list(set(args.exclude))
272
273     return args
274
275
276 class PathDoesNotExistError(Exception):
277     pass
278
279
280 class CollectionUpdateError(Exception):
281     pass
282
283
284 class ResumeCacheConflict(Exception):
285     pass
286
287
288 class ArvPutArgumentConflict(Exception):
289     pass
290
291
292 class ArvPutUploadIsPending(Exception):
293     pass
294
295
296 class ArvPutUploadNotPending(Exception):
297     pass
298
299
300 class FileUploadList(list):
301     def __init__(self, dry_run=False):
302         list.__init__(self)
303         self.dry_run = dry_run
304
305     def append(self, other):
306         if self.dry_run:
307             raise ArvPutUploadIsPending()
308         super(FileUploadList, self).append(other)
309
310
311 class ResumeCache(object):
312     CACHE_DIR = '.cache/arvados/arv-put'
313
314     def __init__(self, file_spec):
315         self.cache_file = open(file_spec, 'a+')
316         self._lock_file(self.cache_file)
317         self.filename = self.cache_file.name
318
319     @classmethod
320     def make_path(cls, args):
321         md5 = hashlib.md5()
322         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
323         realpaths = sorted(os.path.realpath(path) for path in args.paths)
324         md5.update(b'\0'.join([p.encode() for p in realpaths]))
325         if any(os.path.isdir(path) for path in realpaths):
326             md5.update(b'-1')
327         elif args.filename:
328             md5.update(args.filename.encode())
329         return os.path.join(
330             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
331             md5.hexdigest())
332
333     def _lock_file(self, fileobj):
334         try:
335             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
336         except IOError:
337             raise ResumeCacheConflict("{} locked".format(fileobj.name))
338
339     def load(self):
340         self.cache_file.seek(0)
341         return json.load(self.cache_file)
342
343     def check_cache(self, api_client=None, num_retries=0):
344         try:
345             state = self.load()
346             locator = None
347             try:
348                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
349                     locator = state["_finished_streams"][0][1][0]
350                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
351                     locator = state["_current_stream_locators"][0]
352                 if locator is not None:
353                     kc = arvados.keep.KeepClient(api_client=api_client)
354                     kc.head(locator, num_retries=num_retries)
355             except Exception as e:
356                 self.restart()
357         except (ValueError):
358             pass
359
360     def save(self, data):
361         try:
362             new_cache_fd, new_cache_name = tempfile.mkstemp(
363                 dir=os.path.dirname(self.filename))
364             self._lock_file(new_cache_fd)
365             new_cache = os.fdopen(new_cache_fd, 'r+')
366             json.dump(data, new_cache)
367             os.rename(new_cache_name, self.filename)
368         except (IOError, OSError, ResumeCacheConflict) as error:
369             try:
370                 os.unlink(new_cache_name)
371             except NameError:  # mkstemp failed.
372                 pass
373         else:
374             self.cache_file.close()
375             self.cache_file = new_cache
376
377     def close(self):
378         self.cache_file.close()
379
380     def destroy(self):
381         try:
382             os.unlink(self.filename)
383         except OSError as error:
384             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
385                 raise
386         self.close()
387
388     def restart(self):
389         self.destroy()
390         self.__init__(self.filename)
391
392
393 class ArvPutUploadJob(object):
394     CACHE_DIR = '.cache/arvados/arv-put'
395     EMPTY_STATE = {
396         'manifest' : None, # Last saved manifest checkpoint
397         'files' : {} # Previous run file list: {path : {size, mtime}}
398     }
399
400     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
401                  name=None, owner_uuid=None,
402                  ensure_unique_name=False, num_retries=None,
403                  put_threads=None, replication_desired=None,
404                  filename=None, update_time=60.0, update_collection=None,
405                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
406                  follow_links=True, exclude_paths=[], exclude_names=None):
407         self.paths = paths
408         self.resume = resume
409         self.use_cache = use_cache
410         self.update = False
411         self.reporter = reporter
412         # This will set to 0 before start counting, if no special files are going
413         # to be read.
414         self.bytes_expected = None
415         self.bytes_written = 0
416         self.bytes_skipped = 0
417         self.name = name
418         self.owner_uuid = owner_uuid
419         self.ensure_unique_name = ensure_unique_name
420         self.num_retries = num_retries
421         self.replication_desired = replication_desired
422         self.put_threads = put_threads
423         self.filename = filename
424         self._state_lock = threading.Lock()
425         self._state = None # Previous run state (file list & manifest)
426         self._current_files = [] # Current run file list
427         self._cache_file = None
428         self._collection_lock = threading.Lock()
429         self._remote_collection = None # Collection being updated (if asked)
430         self._local_collection = None # Collection from previous run manifest
431         self._file_paths = set() # Files to be updated in remote collection
432         self._stop_checkpointer = threading.Event()
433         self._checkpointer = threading.Thread(target=self._update_task)
434         self._checkpointer.daemon = True
435         self._update_task_time = update_time  # How many seconds wait between update runs
436         self._files_to_upload = FileUploadList(dry_run=dry_run)
437         self._upload_started = False
438         self.logger = logger
439         self.dry_run = dry_run
440         self._checkpoint_before_quit = True
441         self.follow_links = follow_links
442         self.exclude_paths = exclude_paths
443         self.exclude_names = exclude_names
444
445         if not self.use_cache and self.resume:
446             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
447
448         # Check for obvious dry-run responses
449         if self.dry_run and (not self.use_cache or not self.resume):
450             raise ArvPutUploadIsPending()
451
452         # Load cached data if any and if needed
453         self._setup_state(update_collection)
454
455         # Build the upload file list, excluding requested files and counting the
456         # bytes expected to be uploaded.
457         self._build_upload_list()
458
459     def _build_upload_list(self):
460         """
461         Scan the requested paths to count file sizes, excluding files & dirs if requested
462         and building the upload file list.
463         """
464         # If there aren't special files to be read, reset total bytes count to zero
465         # to start counting.
466         if not any([p for p in self.paths
467                     if not (os.path.isfile(p) or os.path.isdir(p))]):
468             self.bytes_expected = 0
469
470         for path in self.paths:
471             # Test for stdin first, in case some file named '-' exist
472             if path == '-':
473                 if self.dry_run:
474                     raise ArvPutUploadIsPending()
475                 self._write_stdin(self.filename or 'stdin')
476             elif not os.path.exists(path):
477                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
478             elif os.path.isdir(path):
479                 # Use absolute paths on cache index so CWD doesn't interfere
480                 # with the caching logic.
481                 orig_path = path
482                 path = os.path.abspath(path)
483                 if orig_path[-1:] == os.sep:
484                     # When passing a directory reference with a trailing slash,
485                     # its contents should be uploaded directly to the
486                     # collection's root.
487                     prefixdir = path
488                 else:
489                     # When passing a directory reference with no trailing slash,
490                     # upload the directory to the collection's root.
491                     prefixdir = os.path.dirname(path)
492                 prefixdir += os.sep
493                 for root, dirs, files in os.walk(path,
494                                                  followlinks=self.follow_links):
495                     root_relpath = os.path.relpath(root, path)
496                     if root_relpath == '.':
497                         root_relpath = ''
498                     # Exclude files/dirs by full path matching pattern
499                     if self.exclude_paths:
500                         dirs[:] = [d for d in dirs
501                                    if not any(pathname_match(
502                                            os.path.join(root_relpath, d), pat)
503                                               for pat in self.exclude_paths)]
504                         files = [f for f in files
505                                  if not any(pathname_match(
506                                          os.path.join(root_relpath, f), pat)
507                                             for pat in self.exclude_paths)]
508                     # Exclude files/dirs by name matching pattern
509                     if self.exclude_names is not None:
510                         dirs[:] = [d for d in dirs
511                                    if not self.exclude_names.match(d)]
512                         files = [f for f in files
513                                  if not self.exclude_names.match(f)]
514                     # Make os.walk()'s dir traversing order deterministic
515                     dirs.sort()
516                     files.sort()
517                     for f in files:
518                         filepath = os.path.join(root, f)
519                         # Add its size to the total bytes count (if applicable)
520                         if self.follow_links or (not os.path.islink(filepath)):
521                             if self.bytes_expected is not None:
522                                 self.bytes_expected += os.path.getsize(filepath)
523                         self._check_file(filepath,
524                                          os.path.join(root[len(prefixdir):], f))
525             else:
526                 filepath = os.path.abspath(path)
527                 # Add its size to the total bytes count (if applicable)
528                 if self.follow_links or (not os.path.islink(filepath)):
529                     if self.bytes_expected is not None:
530                         self.bytes_expected += os.path.getsize(filepath)
531                 self._check_file(filepath,
532                                  self.filename or os.path.basename(path))
533         # If dry-mode is on, and got up to this point, then we should notify that
534         # there aren't any file to upload.
535         if self.dry_run:
536             raise ArvPutUploadNotPending()
537         # Remove local_collection's files that don't exist locally anymore, so the
538         # bytes_written count is correct.
539         for f in self.collection_file_paths(self._local_collection,
540                                             path_prefix=""):
541             if f != 'stdin' and f != self.filename and not f in self._file_paths:
542                 self._local_collection.remove(f)
543
544     def start(self, save_collection):
545         """
546         Start supporting thread & file uploading
547         """
548         self._checkpointer.start()
549         try:
550             # Update bytes_written from current local collection and
551             # report initial progress.
552             self._update()
553             # Actual file upload
554             self._upload_started = True # Used by the update thread to start checkpointing
555             self._upload_files()
556         except (SystemExit, Exception) as e:
557             self._checkpoint_before_quit = False
558             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
559             # Note: We're expecting SystemExit instead of
560             # KeyboardInterrupt because we have a custom signal
561             # handler in place that raises SystemExit with the catched
562             # signal's code.
563             if isinstance(e, PathDoesNotExistError):
564                 # We aren't interested in the traceback for this case
565                 pass
566             elif not isinstance(e, SystemExit) or e.code != -2:
567                 self.logger.warning("Abnormal termination:\n{}".format(
568                     traceback.format_exc()))
569             raise
570         finally:
571             if not self.dry_run:
572                 # Stop the thread before doing anything else
573                 self._stop_checkpointer.set()
574                 self._checkpointer.join()
575                 if self._checkpoint_before_quit:
576                     # Commit all pending blocks & one last _update()
577                     self._local_collection.manifest_text()
578                     self._update(final=True)
579                     if save_collection:
580                         self.save_collection()
581             if self.use_cache:
582                 self._cache_file.close()
583
584     def save_collection(self):
585         if self.update:
586             # Check if files should be updated on the remote collection.
587             for fp in self._file_paths:
588                 remote_file = self._remote_collection.find(fp)
589                 if not remote_file:
590                     # File don't exist on remote collection, copy it.
591                     self._remote_collection.copy(fp, fp, self._local_collection)
592                 elif remote_file != self._local_collection.find(fp):
593                     # A different file exist on remote collection, overwrite it.
594                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
595                 else:
596                     # The file already exist on remote collection, skip it.
597                     pass
598             self._remote_collection.save(num_retries=self.num_retries)
599         else:
600             self._local_collection.save_new(
601                 name=self.name, owner_uuid=self.owner_uuid,
602                 ensure_unique_name=self.ensure_unique_name,
603                 num_retries=self.num_retries)
604
605     def destroy_cache(self):
606         if self.use_cache:
607             try:
608                 os.unlink(self._cache_filename)
609             except OSError as error:
610                 # That's what we wanted anyway.
611                 if error.errno != errno.ENOENT:
612                     raise
613             self._cache_file.close()
614
615     def _collection_size(self, collection):
616         """
617         Recursively get the total size of the collection
618         """
619         size = 0
620         for item in listvalues(collection):
621             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
622                 size += self._collection_size(item)
623             else:
624                 size += item.size()
625         return size
626
627     def _update_task(self):
628         """
629         Periodically called support task. File uploading is
630         asynchronous so we poll status from the collection.
631         """
632         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
633             self._update()
634
635     def _update(self, final=False):
636         """
637         Update cached manifest text and report progress.
638         """
639         if self._upload_started:
640             with self._collection_lock:
641                 self.bytes_written = self._collection_size(self._local_collection)
642                 if self.use_cache:
643                     if final:
644                         manifest = self._local_collection.manifest_text()
645                     else:
646                         # Get the manifest text without comitting pending blocks
647                         manifest = self._local_collection.manifest_text(strip=False,
648                                                                         normalize=False,
649                                                                         only_committed=True)
650                     # Update cache
651                     with self._state_lock:
652                         self._state['manifest'] = manifest
653             if self.use_cache:
654                 try:
655                     self._save_state()
656                 except Exception as e:
657                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
658         else:
659             self.bytes_written = self.bytes_skipped
660         # Call the reporter, if any
661         self.report_progress()
662
663     def report_progress(self):
664         if self.reporter is not None:
665             self.reporter(self.bytes_written, self.bytes_expected)
666
667     def _write_stdin(self, filename):
668         output = self._local_collection.open(filename, 'wb')
669         self._write(sys.stdin, output)
670         output.close()
671
672     def _check_file(self, source, filename):
673         """
674         Check if this file needs to be uploaded
675         """
676         # Ignore symlinks when requested
677         if (not self.follow_links) and os.path.islink(source):
678             return
679         resume_offset = 0
680         should_upload = False
681         new_file_in_cache = False
682         # Record file path for updating the remote collection before exiting
683         self._file_paths.add(filename)
684
685         with self._state_lock:
686             # If no previous cached data on this file, store it for an eventual
687             # repeated run.
688             if source not in self._state['files']:
689                 self._state['files'][source] = {
690                     'mtime': os.path.getmtime(source),
691                     'size' : os.path.getsize(source)
692                 }
693                 new_file_in_cache = True
694             cached_file_data = self._state['files'][source]
695
696         # Check if file was already uploaded (at least partially)
697         file_in_local_collection = self._local_collection.find(filename)
698
699         # If not resuming, upload the full file.
700         if not self.resume:
701             should_upload = True
702         # New file detected from last run, upload it.
703         elif new_file_in_cache:
704             should_upload = True
705         # Local file didn't change from last run.
706         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
707             if not file_in_local_collection:
708                 # File not uploaded yet, upload it completely
709                 should_upload = True
710             elif file_in_local_collection.permission_expired():
711                 # Permission token expired, re-upload file. This will change whenever
712                 # we have a API for refreshing tokens.
713                 should_upload = True
714                 self._local_collection.remove(filename)
715             elif cached_file_data['size'] == file_in_local_collection.size():
716                 # File already there, skip it.
717                 self.bytes_skipped += cached_file_data['size']
718             elif cached_file_data['size'] > file_in_local_collection.size():
719                 # File partially uploaded, resume!
720                 resume_offset = file_in_local_collection.size()
721                 self.bytes_skipped += resume_offset
722                 should_upload = True
723             else:
724                 # Inconsistent cache, re-upload the file
725                 should_upload = True
726                 self._local_collection.remove(filename)
727                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
728         # Local file differs from cached data, re-upload it.
729         else:
730             if file_in_local_collection:
731                 self._local_collection.remove(filename)
732             should_upload = True
733
734         if should_upload:
735             try:
736                 self._files_to_upload.append((source, resume_offset, filename))
737             except ArvPutUploadIsPending:
738                 # This could happen when running on dry-mode, close cache file to
739                 # avoid locking issues.
740                 self._cache_file.close()
741                 raise
742
743     def _upload_files(self):
744         for source, resume_offset, filename in self._files_to_upload:
745             with open(source, 'rb') as source_fd:
746                 with self._state_lock:
747                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
748                     self._state['files'][source]['size'] = os.path.getsize(source)
749                 if resume_offset > 0:
750                     # Start upload where we left off
751                     output = self._local_collection.open(filename, 'ab')
752                     source_fd.seek(resume_offset)
753                 else:
754                     # Start from scratch
755                     output = self._local_collection.open(filename, 'wb')
756                 self._write(source_fd, output)
757                 output.close(flush=False)
758
759     def _write(self, source_fd, output):
760         while True:
761             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
762             if not data:
763                 break
764             output.write(data)
765
766     def _my_collection(self):
767         return self._remote_collection if self.update else self._local_collection
768
769     def _setup_state(self, update_collection):
770         """
771         Create a new cache file or load a previously existing one.
772         """
773         # Load an already existing collection for update
774         if update_collection and re.match(arvados.util.collection_uuid_pattern,
775                                           update_collection):
776             try:
777                 self._remote_collection = arvados.collection.Collection(update_collection)
778             except arvados.errors.ApiError as error:
779                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
780             else:
781                 self.update = True
782         elif update_collection:
783             # Collection locator provided, but unknown format
784             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
785
786         if self.use_cache:
787             # Set up cache file name from input paths.
788             md5 = hashlib.md5()
789             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
790             realpaths = sorted(os.path.realpath(path) for path in self.paths)
791             md5.update(b'\0'.join([p.encode() for p in realpaths]))
792             if self.filename:
793                 md5.update(self.filename.encode())
794             cache_filename = md5.hexdigest()
795             cache_filepath = os.path.join(
796                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
797                 cache_filename)
798             if self.resume and os.path.exists(cache_filepath):
799                 if self._cache_is_valid(cache_filepath):
800                     self.logger.info(
801                         "Resuming upload from cache file {}".format(
802                             cache_filepath))
803                     self._cache_file = open(cache_filepath, 'a+')
804                 else:
805                     self.logger.info(
806                         "Cache file {} is not valid, starting from scratch".format(
807                             cache_filepath))
808                     self._cache_file = open(cache_filepath, 'w+')
809             else:
810                 # --no-resume means start with a empty cache file.
811                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
812                 self._cache_file = open(cache_filepath, 'w+')
813             self._cache_filename = self._cache_file.name
814             self._lock_file(self._cache_file)
815             self._cache_file.seek(0)
816
817         with self._state_lock:
818             if self.use_cache:
819                 try:
820                     self._state = json.load(self._cache_file)
821                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
822                         # Cache at least partially incomplete, set up new cache
823                         self._state = copy.deepcopy(self.EMPTY_STATE)
824                 except ValueError:
825                     # Cache file empty, set up new cache
826                     self._state = copy.deepcopy(self.EMPTY_STATE)
827             else:
828                 self.logger.info("No cache usage requested for this run.")
829                 # No cache file, set empty state
830                 self._state = copy.deepcopy(self.EMPTY_STATE)
831             # Load the previous manifest so we can check if files were modified remotely.
832             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
833
834     def _cache_is_valid(self, filepath):
835         try:
836             with open(filepath, 'r') as cache_file:
837                 manifest = json.load(cache_file)['manifest']
838             kc = arvados.keep.KeepClient(api_client=api_client)
839             # Check that the first block's token (oldest) is valid
840             for line in manifest.split('\n'):
841                 match = arvados.util.signed_locator_pattern.search(line)
842                 if match is not None:
843                     loc = match.group(0)
844                     return kc.head(loc, num_retries=self.num_retries)
845             # No signed locator found, all ok.
846             return True
847         except Exception as e:
848             self.logger.info("Something wrong happened when checking cache file: {}".format(e))
849             return False
850
851     def collection_file_paths(self, col, path_prefix='.'):
852         """Return a list of file paths by recursively go through the entire collection `col`"""
853         file_paths = []
854         for name, item in listitems(col):
855             if isinstance(item, arvados.arvfile.ArvadosFile):
856                 file_paths.append(os.path.join(path_prefix, name))
857             elif isinstance(item, arvados.collection.Subcollection):
858                 new_prefix = os.path.join(path_prefix, name)
859                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
860         return file_paths
861
862     def _lock_file(self, fileobj):
863         try:
864             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
865         except IOError:
866             raise ResumeCacheConflict("{} locked".format(fileobj.name))
867
868     def _save_state(self):
869         """
870         Atomically save current state into cache.
871         """
872         with self._state_lock:
873             # We're not using copy.deepcopy() here because it's a lot slower
874             # than json.dumps(), and we're already needing JSON format to be
875             # saved on disk.
876             state = json.dumps(self._state)
877         try:
878             new_cache = tempfile.NamedTemporaryFile(
879                 mode='w+',
880                 dir=os.path.dirname(self._cache_filename), delete=False)
881             self._lock_file(new_cache)
882             new_cache.write(state)
883             new_cache.flush()
884             os.fsync(new_cache)
885             os.rename(new_cache.name, self._cache_filename)
886         except (IOError, OSError, ResumeCacheConflict) as error:
887             self.logger.error("There was a problem while saving the cache file: {}".format(error))
888             try:
889                 os.unlink(new_cache_name)
890             except NameError:  # mkstemp failed.
891                 pass
892         else:
893             self._cache_file.close()
894             self._cache_file = new_cache
895
896     def collection_name(self):
897         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
898
899     def manifest_locator(self):
900         return self._my_collection().manifest_locator()
901
902     def portable_data_hash(self):
903         pdh = self._my_collection().portable_data_hash()
904         m = self._my_collection().stripped_manifest().encode()
905         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
906         if pdh != local_pdh:
907             logger.warning("\n".join([
908                 "arv-put: API server provided PDH differs from local manifest.",
909                 "         This should not happen; showing API server version."]))
910         return pdh
911
912     def manifest_text(self, stream_name=".", strip=False, normalize=False):
913         return self._my_collection().manifest_text(stream_name, strip, normalize)
914
915     def _datablocks_on_item(self, item):
916         """
917         Return a list of datablock locators, recursively navigating
918         through subcollections
919         """
920         if isinstance(item, arvados.arvfile.ArvadosFile):
921             if item.size() == 0:
922                 # Empty file locator
923                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
924             else:
925                 locators = []
926                 for segment in item.segments():
927                     loc = segment.locator
928                     locators.append(loc)
929                 return locators
930         elif isinstance(item, arvados.collection.Collection):
931             l = [self._datablocks_on_item(x) for x in listvalues(item)]
932             # Fast list flattener method taken from:
933             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
934             return [loc for sublist in l for loc in sublist]
935         else:
936             return None
937
938     def data_locators(self):
939         with self._collection_lock:
940             # Make sure all datablocks are flushed before getting the locators
941             self._my_collection().manifest_text()
942             datablocks = self._datablocks_on_item(self._my_collection())
943         return datablocks
944
945 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
946                                                             os.getpid())
947
948 # Simulate glob.glob() matching behavior without the need to scan the filesystem
949 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
950 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
951 # so instead we're using it on every path component.
952 def pathname_match(pathname, pattern):
953     name = pathname.split(os.sep)
954     # Fix patterns like 'some/subdir/' or 'some//subdir'
955     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
956     if len(name) != len(pat):
957         return False
958     for i in range(len(name)):
959         if not fnmatch.fnmatch(name[i], pat[i]):
960             return False
961     return True
962
963 def machine_progress(bytes_written, bytes_expected):
964     return _machine_format.format(
965         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
966
967 def human_progress(bytes_written, bytes_expected):
968     if bytes_expected:
969         return "\r{}M / {}M {:.1%} ".format(
970             bytes_written >> 20, bytes_expected >> 20,
971             float(bytes_written) / bytes_expected)
972     else:
973         return "\r{} ".format(bytes_written)
974
975 def progress_writer(progress_func, outfile=sys.stderr):
976     def write_progress(bytes_written, bytes_expected):
977         outfile.write(progress_func(bytes_written, bytes_expected))
978     return write_progress
979
980 def exit_signal_handler(sigcode, frame):
981     sys.exit(-sigcode)
982
983 def desired_project_uuid(api_client, project_uuid, num_retries):
984     if not project_uuid:
985         query = api_client.users().current()
986     elif arvados.util.user_uuid_pattern.match(project_uuid):
987         query = api_client.users().get(uuid=project_uuid)
988     elif arvados.util.group_uuid_pattern.match(project_uuid):
989         query = api_client.groups().get(uuid=project_uuid)
990     else:
991         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
992     return query.execute(num_retries=num_retries)['uuid']
993
994 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
995     global api_client
996
997     args = parse_arguments(arguments)
998     logger = logging.getLogger('arvados.arv_put')
999     if args.silent:
1000         logger.setLevel(logging.WARNING)
1001     else:
1002         logger.setLevel(logging.INFO)
1003     status = 0
1004     if api_client is None:
1005         api_client = arvados.api('v1')
1006
1007     # Determine the name to use
1008     if args.name:
1009         if args.stream or args.raw:
1010             logger.error("Cannot use --name with --stream or --raw")
1011             sys.exit(1)
1012         elif args.update_collection:
1013             logger.error("Cannot use --name with --update-collection")
1014             sys.exit(1)
1015         collection_name = args.name
1016     else:
1017         collection_name = "Saved at {} by {}@{}".format(
1018             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1019             pwd.getpwuid(os.getuid()).pw_name,
1020             socket.gethostname())
1021
1022     if args.project_uuid and (args.stream or args.raw):
1023         logger.error("Cannot use --project-uuid with --stream or --raw")
1024         sys.exit(1)
1025
1026     # Determine the parent project
1027     try:
1028         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1029                                             args.retries)
1030     except (apiclient_errors.Error, ValueError) as error:
1031         logger.error(error)
1032         sys.exit(1)
1033
1034     if args.progress:
1035         reporter = progress_writer(human_progress)
1036     elif args.batch_progress:
1037         reporter = progress_writer(machine_progress)
1038     else:
1039         reporter = None
1040
1041     # Setup exclude regex from all the --exclude arguments provided
1042     name_patterns = []
1043     exclude_paths = []
1044     exclude_names = None
1045     if len(args.exclude) > 0:
1046         # We're supporting 2 kinds of exclusion patterns:
1047         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1048         #                            the name, wherever the file is on the tree)
1049         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1050         #                            entire path, and should be relative to
1051         #                            any input dir argument)
1052         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1053         #                            placed directly underneath the input dir)
1054         for p in args.exclude:
1055             # Only relative paths patterns allowed
1056             if p.startswith(os.sep):
1057                 logger.error("Cannot use absolute paths with --exclude")
1058                 sys.exit(1)
1059             if os.path.dirname(p):
1060                 # We don't support of path patterns with '..'
1061                 p_parts = p.split(os.sep)
1062                 if '..' in p_parts:
1063                     logger.error(
1064                         "Cannot use path patterns that include or '..'")
1065                     sys.exit(1)
1066                 # Path search pattern
1067                 exclude_paths.append(p)
1068             else:
1069                 # Name-only search pattern
1070                 name_patterns.append(p)
1071         # For name only matching, we can combine all patterns into a single
1072         # regexp, for better performance.
1073         exclude_names = re.compile('|'.join(
1074             [fnmatch.translate(p) for p in name_patterns]
1075         )) if len(name_patterns) > 0 else None
1076         # Show the user the patterns to be used, just in case they weren't
1077         # specified inside quotes and got changed by the shell expansion.
1078         logger.info("Exclude patterns: {}".format(args.exclude))
1079
1080     # If this is used by a human, and there's at least one directory to be
1081     # uploaded, the expected bytes calculation can take a moment.
1082     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1083         logger.info("Calculating upload size, this could take some time...")
1084     try:
1085         writer = ArvPutUploadJob(paths = args.paths,
1086                                  resume = args.resume,
1087                                  use_cache = args.use_cache,
1088                                  filename = args.filename,
1089                                  reporter = reporter,
1090                                  num_retries = args.retries,
1091                                  replication_desired = args.replication,
1092                                  put_threads = args.threads,
1093                                  name = collection_name,
1094                                  owner_uuid = project_uuid,
1095                                  ensure_unique_name = True,
1096                                  update_collection = args.update_collection,
1097                                  logger=logger,
1098                                  dry_run=args.dry_run,
1099                                  follow_links=args.follow_links,
1100                                  exclude_paths=exclude_paths,
1101                                  exclude_names=exclude_names)
1102     except ResumeCacheConflict:
1103         logger.error("\n".join([
1104             "arv-put: Another process is already uploading this data.",
1105             "         Use --no-cache if this is really what you want."]))
1106         sys.exit(1)
1107     except CollectionUpdateError as error:
1108         logger.error("\n".join([
1109             "arv-put: %s" % str(error)]))
1110         sys.exit(1)
1111     except ArvPutUploadIsPending:
1112         # Dry run check successful, return proper exit code.
1113         sys.exit(2)
1114     except ArvPutUploadNotPending:
1115         # No files pending for upload
1116         sys.exit(0)
1117     except PathDoesNotExistError as error:
1118         logger.error("\n".join([
1119             "arv-put: %s" % str(error)]))
1120         sys.exit(1)
1121
1122     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1123     # the originals.
1124     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1125                             for sigcode in CAUGHT_SIGNALS}
1126
1127     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1128         logger.warning("\n".join([
1129             "arv-put: Resuming previous upload from last checkpoint.",
1130             "         Use the --no-resume option to start over."]))
1131
1132     if not args.dry_run:
1133         writer.report_progress()
1134     output = None
1135     try:
1136         writer.start(save_collection=not(args.stream or args.raw))
1137     except arvados.errors.ApiError as error:
1138         logger.error("\n".join([
1139             "arv-put: %s" % str(error)]))
1140         sys.exit(1)
1141
1142     if args.progress:  # Print newline to split stderr from stdout for humans.
1143         logger.info("\n")
1144
1145     if args.stream:
1146         if args.normalize:
1147             output = writer.manifest_text(normalize=True)
1148         else:
1149             output = writer.manifest_text()
1150     elif args.raw:
1151         output = ','.join(writer.data_locators())
1152     else:
1153         try:
1154             if args.update_collection:
1155                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1156             else:
1157                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1158             if args.portable_data_hash:
1159                 output = writer.portable_data_hash()
1160             else:
1161                 output = writer.manifest_locator()
1162         except apiclient_errors.Error as error:
1163             logger.error(
1164                 "arv-put: Error creating Collection on project: {}.".format(
1165                     error))
1166             status = 1
1167
1168     # Print the locator (uuid) of the new collection.
1169     if output is None:
1170         status = status or 1
1171     elif not args.silent:
1172         stdout.write(output)
1173         if not output.endswith('\n'):
1174             stdout.write('\n')
1175
1176     for sigcode, orig_handler in listitems(orig_signal_handlers):
1177         signal.signal(sigcode, orig_handler)
1178
1179     if status != 0:
1180         sys.exit(status)
1181
1182     # Success!
1183     return output
1184
1185
1186 if __name__ == '__main__':
1187     main()