12260: Merge branch 'master' into 12260-system-health
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34
35 import arvados.commands._util as arv_cmd
36
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55                     default=-1, help=argparse.SUPPRESS)
56
57 _group.add_argument('--normalize', action='store_true',
58                     help="""
59 Normalize the manifest by re-ordering files and streams after writing
60 data.
61 """)
62
63 _group.add_argument('--dry-run', action='store_true', default=False,
64                     help="""
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
67 """)
68
69 _group = upload_opts.add_mutually_exclusive_group()
70
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
72                     help="""
73 Synonym for --stream.
74 """)
75
76 _group.add_argument('--stream', action='store_true',
77                     help="""
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
80 in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
145                          help="""
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
148 to use 2 threads.
149 On high latency installations, using a greater number will improve
150 overall throughput.
151 """)
152
153 run_opts = argparse.ArgumentParser(add_help=False)
154
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
157 project.
158 """)
159
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
162 """)
163
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165                       action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 For the special case of needing to exclude only files or dirs directly below
172 the given input directory, you can use a pattern like './exclude_this.gif'.
173 You can specify multiple patterns by using this argument more than once.
174 """)
175
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--progress', action='store_true',
178                     help="""
179 Display human-readable progress on stderr (bytes and, if possible,
180 percentage of total data size). This is the default behavior when
181 stderr is a tty.
182 """)
183
184 _group.add_argument('--no-progress', action='store_true',
185                     help="""
186 Do not display human-readable progress on stderr, even if stderr is a
187 tty.
188 """)
189
190 _group.add_argument('--batch-progress', action='store_true',
191                     help="""
192 Display machine-readable progress on stderr (bytes and, if known,
193 total data size).
194 """)
195
196 _group.add_argument('--silent', action='store_true',
197                     help="""
198 Do not print any debug messages to console. (Any error messages will still be displayed.)
199 """)
200
201 _group = run_opts.add_mutually_exclusive_group()
202 _group.add_argument('--resume', action='store_true', default=True,
203                     help="""
204 Continue interrupted uploads from cached state (default).
205 """)
206 _group.add_argument('--no-resume', action='store_false', dest='resume',
207                     help="""
208 Do not continue interrupted uploads from cached state.
209 """)
210
211 _group = run_opts.add_mutually_exclusive_group()
212 _group.add_argument('--follow-links', action='store_true', default=True,
213                     dest='follow_links', help="""
214 Follow file and directory symlinks (default).
215 """)
216 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
217                     help="""
218 Do not follow file and directory symlinks.
219 """)
220
221 _group = run_opts.add_mutually_exclusive_group()
222 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
223                     help="""
224 Save upload state in a cache file for resuming (default).
225 """)
226 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
227                     help="""
228 Do not save upload state in a cache file for resuming.
229 """)
230
231 arg_parser = argparse.ArgumentParser(
232     description='Copy data from the local filesystem to Keep.',
233     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
234
235 def parse_arguments(arguments):
236     args = arg_parser.parse_args(arguments)
237
238     if len(args.paths) == 0:
239         args.paths = ['-']
240
241     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
242
243     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
244         if args.filename:
245             arg_parser.error("""
246     --filename argument cannot be used when storing a directory or
247     multiple files.
248     """)
249
250     # Turn on --progress by default if stderr is a tty.
251     if (not (args.batch_progress or args.no_progress or args.silent)
252         and os.isatty(sys.stderr.fileno())):
253         args.progress = True
254
255     # Turn off --resume (default) if --no-cache is used.
256     if not args.use_cache:
257         args.resume = False
258
259     if args.paths == ['-']:
260         if args.update_collection:
261             arg_parser.error("""
262     --update-collection cannot be used when reading from stdin.
263     """)
264         args.resume = False
265         args.use_cache = False
266         if not args.filename:
267             args.filename = 'stdin'
268
269     # Remove possible duplicated patterns
270     if len(args.exclude) > 0:
271         args.exclude = list(set(args.exclude))
272
273     return args
274
275
276 class PathDoesNotExistError(Exception):
277     pass
278
279
280 class CollectionUpdateError(Exception):
281     pass
282
283
284 class ResumeCacheConflict(Exception):
285     pass
286
287
288 class ArvPutArgumentConflict(Exception):
289     pass
290
291
292 class ArvPutUploadIsPending(Exception):
293     pass
294
295
296 class ArvPutUploadNotPending(Exception):
297     pass
298
299
300 class FileUploadList(list):
301     def __init__(self, dry_run=False):
302         list.__init__(self)
303         self.dry_run = dry_run
304
305     def append(self, other):
306         if self.dry_run:
307             raise ArvPutUploadIsPending()
308         super(FileUploadList, self).append(other)
309
310
311 class ResumeCache(object):
312     CACHE_DIR = '.cache/arvados/arv-put'
313
314     def __init__(self, file_spec):
315         self.cache_file = open(file_spec, 'a+')
316         self._lock_file(self.cache_file)
317         self.filename = self.cache_file.name
318
319     @classmethod
320     def make_path(cls, args):
321         md5 = hashlib.md5()
322         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
323         realpaths = sorted(os.path.realpath(path) for path in args.paths)
324         md5.update(b'\0'.join([p.encode() for p in realpaths]))
325         if any(os.path.isdir(path) for path in realpaths):
326             md5.update(b'-1')
327         elif args.filename:
328             md5.update(args.filename.encode())
329         return os.path.join(
330             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
331             md5.hexdigest())
332
333     def _lock_file(self, fileobj):
334         try:
335             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
336         except IOError:
337             raise ResumeCacheConflict("{} locked".format(fileobj.name))
338
339     def load(self):
340         self.cache_file.seek(0)
341         return json.load(self.cache_file)
342
343     def check_cache(self, api_client=None, num_retries=0):
344         try:
345             state = self.load()
346             locator = None
347             try:
348                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
349                     locator = state["_finished_streams"][0][1][0]
350                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
351                     locator = state["_current_stream_locators"][0]
352                 if locator is not None:
353                     kc = arvados.keep.KeepClient(api_client=api_client)
354                     kc.head(locator, num_retries=num_retries)
355             except Exception as e:
356                 self.restart()
357         except (ValueError):
358             pass
359
360     def save(self, data):
361         try:
362             new_cache_fd, new_cache_name = tempfile.mkstemp(
363                 dir=os.path.dirname(self.filename))
364             self._lock_file(new_cache_fd)
365             new_cache = os.fdopen(new_cache_fd, 'r+')
366             json.dump(data, new_cache)
367             os.rename(new_cache_name, self.filename)
368         except (IOError, OSError, ResumeCacheConflict) as error:
369             try:
370                 os.unlink(new_cache_name)
371             except NameError:  # mkstemp failed.
372                 pass
373         else:
374             self.cache_file.close()
375             self.cache_file = new_cache
376
377     def close(self):
378         self.cache_file.close()
379
380     def destroy(self):
381         try:
382             os.unlink(self.filename)
383         except OSError as error:
384             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
385                 raise
386         self.close()
387
388     def restart(self):
389         self.destroy()
390         self.__init__(self.filename)
391
392
393 class ArvPutUploadJob(object):
394     CACHE_DIR = '.cache/arvados/arv-put'
395     EMPTY_STATE = {
396         'manifest' : None, # Last saved manifest checkpoint
397         'files' : {} # Previous run file list: {path : {size, mtime}}
398     }
399
400     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
401                  name=None, owner_uuid=None,
402                  ensure_unique_name=False, num_retries=None,
403                  put_threads=None, replication_desired=None,
404                  filename=None, update_time=60.0, update_collection=None,
405                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
406                  follow_links=True, exclude_paths=[], exclude_names=None):
407         self.paths = paths
408         self.resume = resume
409         self.use_cache = use_cache
410         self.update = False
411         self.reporter = reporter
412         # This will set to 0 before start counting, if no special files are going
413         # to be read.
414         self.bytes_expected = None
415         self.bytes_written = 0
416         self.bytes_skipped = 0
417         self.name = name
418         self.owner_uuid = owner_uuid
419         self.ensure_unique_name = ensure_unique_name
420         self.num_retries = num_retries
421         self.replication_desired = replication_desired
422         self.put_threads = put_threads
423         self.filename = filename
424         self._state_lock = threading.Lock()
425         self._state = None # Previous run state (file list & manifest)
426         self._current_files = [] # Current run file list
427         self._cache_file = None
428         self._collection_lock = threading.Lock()
429         self._remote_collection = None # Collection being updated (if asked)
430         self._local_collection = None # Collection from previous run manifest
431         self._file_paths = set() # Files to be updated in remote collection
432         self._stop_checkpointer = threading.Event()
433         self._checkpointer = threading.Thread(target=self._update_task)
434         self._checkpointer.daemon = True
435         self._update_task_time = update_time  # How many seconds wait between update runs
436         self._files_to_upload = FileUploadList(dry_run=dry_run)
437         self._upload_started = False
438         self.logger = logger
439         self.dry_run = dry_run
440         self._checkpoint_before_quit = True
441         self.follow_links = follow_links
442         self.exclude_paths = exclude_paths
443         self.exclude_names = exclude_names
444
445         if not self.use_cache and self.resume:
446             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
447
448         # Check for obvious dry-run responses
449         if self.dry_run and (not self.use_cache or not self.resume):
450             raise ArvPutUploadIsPending()
451
452         # Load cached data if any and if needed
453         self._setup_state(update_collection)
454
455         # Build the upload file list, excluding requested files and counting the
456         # bytes expected to be uploaded.
457         self._build_upload_list()
458
459     def _build_upload_list(self):
460         """
461         Scan the requested paths to count file sizes, excluding files & dirs if requested
462         and building the upload file list.
463         """
464         # If there aren't special files to be read, reset total bytes count to zero
465         # to start counting.
466         if not any([p for p in self.paths
467                     if not (os.path.isfile(p) or os.path.isdir(p))]):
468             self.bytes_expected = 0
469
470         for path in self.paths:
471             # Test for stdin first, in case some file named '-' exist
472             if path == '-':
473                 if self.dry_run:
474                     raise ArvPutUploadIsPending()
475                 self._write_stdin(self.filename or 'stdin')
476             elif not os.path.exists(path):
477                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
478             elif os.path.isdir(path):
479                 # Use absolute paths on cache index so CWD doesn't interfere
480                 # with the caching logic.
481                 orig_path = path
482                 path = os.path.abspath(path)
483                 if orig_path[-1:] == os.sep:
484                     # When passing a directory reference with a trailing slash,
485                     # its contents should be uploaded directly to the
486                     # collection's root.
487                     prefixdir = path
488                 else:
489                     # When passing a directory reference with no trailing slash,
490                     # upload the directory to the collection's root.
491                     prefixdir = os.path.dirname(path)
492                 prefixdir += os.sep
493                 for root, dirs, files in os.walk(path,
494                                                  followlinks=self.follow_links):
495                     root_relpath = os.path.relpath(root, path)
496                     if root_relpath == '.':
497                         root_relpath = ''
498                     # Exclude files/dirs by full path matching pattern
499                     if self.exclude_paths:
500                         dirs[:] = [d for d in dirs
501                                    if not any(pathname_match(
502                                            os.path.join(root_relpath, d), pat)
503                                               for pat in self.exclude_paths)]
504                         files = [f for f in files
505                                  if not any(pathname_match(
506                                          os.path.join(root_relpath, f), pat)
507                                             for pat in self.exclude_paths)]
508                     # Exclude files/dirs by name matching pattern
509                     if self.exclude_names is not None:
510                         dirs[:] = [d for d in dirs
511                                    if not self.exclude_names.match(d)]
512                         files = [f for f in files
513                                  if not self.exclude_names.match(f)]
514                     # Make os.walk()'s dir traversing order deterministic
515                     dirs.sort()
516                     files.sort()
517                     for f in files:
518                         filepath = os.path.join(root, f)
519                         # Add its size to the total bytes count (if applicable)
520                         if self.follow_links or (not os.path.islink(filepath)):
521                             if self.bytes_expected is not None:
522                                 self.bytes_expected += os.path.getsize(filepath)
523                         self._check_file(filepath,
524                                          os.path.join(root[len(prefixdir):], f))
525             else:
526                 filepath = os.path.abspath(path)
527                 # Add its size to the total bytes count (if applicable)
528                 if self.follow_links or (not os.path.islink(filepath)):
529                     if self.bytes_expected is not None:
530                         self.bytes_expected += os.path.getsize(filepath)
531                 self._check_file(filepath,
532                                  self.filename or os.path.basename(path))
533         # If dry-mode is on, and got up to this point, then we should notify that
534         # there aren't any file to upload.
535         if self.dry_run:
536             raise ArvPutUploadNotPending()
537         # Remove local_collection's files that don't exist locally anymore, so the
538         # bytes_written count is correct.
539         for f in self.collection_file_paths(self._local_collection,
540                                             path_prefix=""):
541             if f != 'stdin' and f != self.filename and not f in self._file_paths:
542                 self._local_collection.remove(f)
543
544     def start(self, save_collection):
545         """
546         Start supporting thread & file uploading
547         """
548         self._checkpointer.start()
549         try:
550             # Update bytes_written from current local collection and
551             # report initial progress.
552             self._update()
553             # Actual file upload
554             self._upload_started = True # Used by the update thread to start checkpointing
555             self._upload_files()
556         except (SystemExit, Exception) as e:
557             self._checkpoint_before_quit = False
558             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
559             # Note: We're expecting SystemExit instead of
560             # KeyboardInterrupt because we have a custom signal
561             # handler in place that raises SystemExit with the catched
562             # signal's code.
563             if isinstance(e, PathDoesNotExistError):
564                 # We aren't interested in the traceback for this case
565                 pass
566             elif not isinstance(e, SystemExit) or e.code != -2:
567                 self.logger.warning("Abnormal termination:\n{}".format(
568                     traceback.format_exc()))
569             raise
570         finally:
571             if not self.dry_run:
572                 # Stop the thread before doing anything else
573                 self._stop_checkpointer.set()
574                 self._checkpointer.join()
575                 if self._checkpoint_before_quit:
576                     # Commit all pending blocks & one last _update()
577                     self._local_collection.manifest_text()
578                     self._update(final=True)
579                     if save_collection:
580                         self.save_collection()
581             if self.use_cache:
582                 self._cache_file.close()
583
584     def save_collection(self):
585         if self.update:
586             # Check if files should be updated on the remote collection.
587             for fp in self._file_paths:
588                 remote_file = self._remote_collection.find(fp)
589                 if not remote_file:
590                     # File don't exist on remote collection, copy it.
591                     self._remote_collection.copy(fp, fp, self._local_collection)
592                 elif remote_file != self._local_collection.find(fp):
593                     # A different file exist on remote collection, overwrite it.
594                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
595                 else:
596                     # The file already exist on remote collection, skip it.
597                     pass
598             self._remote_collection.save(num_retries=self.num_retries)
599         else:
600             self._local_collection.save_new(
601                 name=self.name, owner_uuid=self.owner_uuid,
602                 ensure_unique_name=self.ensure_unique_name,
603                 num_retries=self.num_retries)
604
605     def destroy_cache(self):
606         if self.use_cache:
607             try:
608                 os.unlink(self._cache_filename)
609             except OSError as error:
610                 # That's what we wanted anyway.
611                 if error.errno != errno.ENOENT:
612                     raise
613             self._cache_file.close()
614
615     def _collection_size(self, collection):
616         """
617         Recursively get the total size of the collection
618         """
619         size = 0
620         for item in listvalues(collection):
621             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
622                 size += self._collection_size(item)
623             else:
624                 size += item.size()
625         return size
626
627     def _update_task(self):
628         """
629         Periodically called support task. File uploading is
630         asynchronous so we poll status from the collection.
631         """
632         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
633             self._update()
634
635     def _update(self, final=False):
636         """
637         Update cached manifest text and report progress.
638         """
639         if self._upload_started:
640             with self._collection_lock:
641                 self.bytes_written = self._collection_size(self._local_collection)
642                 if self.use_cache:
643                     if final:
644                         manifest = self._local_collection.manifest_text()
645                     else:
646                         # Get the manifest text without comitting pending blocks
647                         manifest = self._local_collection.manifest_text(strip=False,
648                                                                         normalize=False,
649                                                                         only_committed=True)
650                     # Update cache
651                     with self._state_lock:
652                         self._state['manifest'] = manifest
653             if self.use_cache:
654                 try:
655                     self._save_state()
656                 except Exception as e:
657                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
658         else:
659             self.bytes_written = self.bytes_skipped
660         # Call the reporter, if any
661         self.report_progress()
662
663     def report_progress(self):
664         if self.reporter is not None:
665             self.reporter(self.bytes_written, self.bytes_expected)
666
667     def _write_stdin(self, filename):
668         output = self._local_collection.open(filename, 'wb')
669         self._write(sys.stdin, output)
670         output.close()
671
672     def _check_file(self, source, filename):
673         """
674         Check if this file needs to be uploaded
675         """
676         # Ignore symlinks when requested
677         if (not self.follow_links) and os.path.islink(source):
678             return
679         resume_offset = 0
680         should_upload = False
681         new_file_in_cache = False
682         # Record file path for updating the remote collection before exiting
683         self._file_paths.add(filename)
684
685         with self._state_lock:
686             # If no previous cached data on this file, store it for an eventual
687             # repeated run.
688             if source not in self._state['files']:
689                 self._state['files'][source] = {
690                     'mtime': os.path.getmtime(source),
691                     'size' : os.path.getsize(source)
692                 }
693                 new_file_in_cache = True
694             cached_file_data = self._state['files'][source]
695
696         # Check if file was already uploaded (at least partially)
697         file_in_local_collection = self._local_collection.find(filename)
698
699         # If not resuming, upload the full file.
700         if not self.resume:
701             should_upload = True
702         # New file detected from last run, upload it.
703         elif new_file_in_cache:
704             should_upload = True
705         # Local file didn't change from last run.
706         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
707             if not file_in_local_collection:
708                 # File not uploaded yet, upload it completely
709                 should_upload = True
710             elif file_in_local_collection.permission_expired():
711                 # Permission token expired, re-upload file. This will change whenever
712                 # we have a API for refreshing tokens.
713                 self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
714                 should_upload = True
715                 self._local_collection.remove(filename)
716             elif cached_file_data['size'] == file_in_local_collection.size():
717                 # File already there, skip it.
718                 self.bytes_skipped += cached_file_data['size']
719             elif cached_file_data['size'] > file_in_local_collection.size():
720                 # File partially uploaded, resume!
721                 resume_offset = file_in_local_collection.size()
722                 self.bytes_skipped += resume_offset
723                 should_upload = True
724             else:
725                 # Inconsistent cache, re-upload the file
726                 should_upload = True
727                 self._local_collection.remove(filename)
728                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
729         # Local file differs from cached data, re-upload it.
730         else:
731             if file_in_local_collection:
732                 self._local_collection.remove(filename)
733             should_upload = True
734
735         if should_upload:
736             try:
737                 self._files_to_upload.append((source, resume_offset, filename))
738             except ArvPutUploadIsPending:
739                 # This could happen when running on dry-mode, close cache file to
740                 # avoid locking issues.
741                 self._cache_file.close()
742                 raise
743
744     def _upload_files(self):
745         for source, resume_offset, filename in self._files_to_upload:
746             with open(source, 'rb') as source_fd:
747                 with self._state_lock:
748                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
749                     self._state['files'][source]['size'] = os.path.getsize(source)
750                 if resume_offset > 0:
751                     # Start upload where we left off
752                     output = self._local_collection.open(filename, 'ab')
753                     source_fd.seek(resume_offset)
754                 else:
755                     # Start from scratch
756                     output = self._local_collection.open(filename, 'wb')
757                 self._write(source_fd, output)
758                 output.close(flush=False)
759
760     def _write(self, source_fd, output):
761         while True:
762             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
763             if not data:
764                 break
765             output.write(data)
766
767     def _my_collection(self):
768         return self._remote_collection if self.update else self._local_collection
769
770     def _setup_state(self, update_collection):
771         """
772         Create a new cache file or load a previously existing one.
773         """
774         # Load an already existing collection for update
775         if update_collection and re.match(arvados.util.collection_uuid_pattern,
776                                           update_collection):
777             try:
778                 self._remote_collection = arvados.collection.Collection(update_collection)
779             except arvados.errors.ApiError as error:
780                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
781             else:
782                 self.update = True
783         elif update_collection:
784             # Collection locator provided, but unknown format
785             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
786
787         if self.use_cache:
788             # Set up cache file name from input paths.
789             md5 = hashlib.md5()
790             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
791             realpaths = sorted(os.path.realpath(path) for path in self.paths)
792             md5.update(b'\0'.join([p.encode() for p in realpaths]))
793             if self.filename:
794                 md5.update(self.filename.encode())
795             cache_filename = md5.hexdigest()
796             cache_filepath = os.path.join(
797                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
798                 cache_filename)
799             if self.resume and os.path.exists(cache_filepath):
800                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
801                 self._cache_file = open(cache_filepath, 'a+')
802             else:
803                 # --no-resume means start with a empty cache file.
804                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
805                 self._cache_file = open(cache_filepath, 'w+')
806             self._cache_filename = self._cache_file.name
807             self._lock_file(self._cache_file)
808             self._cache_file.seek(0)
809
810         with self._state_lock:
811             if self.use_cache:
812                 try:
813                     self._state = json.load(self._cache_file)
814                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
815                         # Cache at least partially incomplete, set up new cache
816                         self._state = copy.deepcopy(self.EMPTY_STATE)
817                 except ValueError:
818                     # Cache file empty, set up new cache
819                     self._state = copy.deepcopy(self.EMPTY_STATE)
820             else:
821                 self.logger.info("No cache usage requested for this run.")
822                 # No cache file, set empty state
823                 self._state = copy.deepcopy(self.EMPTY_STATE)
824             # Load the previous manifest so we can check if files were modified remotely.
825             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
826
827     def collection_file_paths(self, col, path_prefix='.'):
828         """Return a list of file paths by recursively go through the entire collection `col`"""
829         file_paths = []
830         for name, item in listitems(col):
831             if isinstance(item, arvados.arvfile.ArvadosFile):
832                 file_paths.append(os.path.join(path_prefix, name))
833             elif isinstance(item, arvados.collection.Subcollection):
834                 new_prefix = os.path.join(path_prefix, name)
835                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
836         return file_paths
837
838     def _lock_file(self, fileobj):
839         try:
840             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
841         except IOError:
842             raise ResumeCacheConflict("{} locked".format(fileobj.name))
843
844     def _save_state(self):
845         """
846         Atomically save current state into cache.
847         """
848         with self._state_lock:
849             # We're not using copy.deepcopy() here because it's a lot slower
850             # than json.dumps(), and we're already needing JSON format to be
851             # saved on disk.
852             state = json.dumps(self._state)
853         try:
854             new_cache = tempfile.NamedTemporaryFile(
855                 mode='w+',
856                 dir=os.path.dirname(self._cache_filename), delete=False)
857             self._lock_file(new_cache)
858             new_cache.write(state)
859             new_cache.flush()
860             os.fsync(new_cache)
861             os.rename(new_cache.name, self._cache_filename)
862         except (IOError, OSError, ResumeCacheConflict) as error:
863             self.logger.error("There was a problem while saving the cache file: {}".format(error))
864             try:
865                 os.unlink(new_cache_name)
866             except NameError:  # mkstemp failed.
867                 pass
868         else:
869             self._cache_file.close()
870             self._cache_file = new_cache
871
872     def collection_name(self):
873         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
874
875     def manifest_locator(self):
876         return self._my_collection().manifest_locator()
877
878     def portable_data_hash(self):
879         pdh = self._my_collection().portable_data_hash()
880         m = self._my_collection().stripped_manifest().encode()
881         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
882         if pdh != local_pdh:
883             logger.warning("\n".join([
884                 "arv-put: API server provided PDH differs from local manifest.",
885                 "         This should not happen; showing API server version."]))
886         return pdh
887
888     def manifest_text(self, stream_name=".", strip=False, normalize=False):
889         return self._my_collection().manifest_text(stream_name, strip, normalize)
890
891     def _datablocks_on_item(self, item):
892         """
893         Return a list of datablock locators, recursively navigating
894         through subcollections
895         """
896         if isinstance(item, arvados.arvfile.ArvadosFile):
897             if item.size() == 0:
898                 # Empty file locator
899                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
900             else:
901                 locators = []
902                 for segment in item.segments():
903                     loc = segment.locator
904                     locators.append(loc)
905                 return locators
906         elif isinstance(item, arvados.collection.Collection):
907             l = [self._datablocks_on_item(x) for x in listvalues(item)]
908             # Fast list flattener method taken from:
909             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
910             return [loc for sublist in l for loc in sublist]
911         else:
912             return None
913
914     def data_locators(self):
915         with self._collection_lock:
916             # Make sure all datablocks are flushed before getting the locators
917             self._my_collection().manifest_text()
918             datablocks = self._datablocks_on_item(self._my_collection())
919         return datablocks
920
921 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
922                                                             os.getpid())
923
924 # Simulate glob.glob() matching behavior without the need to scan the filesystem
925 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
926 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
927 # so instead we're using it on every path component.
928 def pathname_match(pathname, pattern):
929     name = pathname.split(os.sep)
930     # Fix patterns like 'some/subdir/' or 'some//subdir'
931     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
932     if len(name) != len(pat):
933         return False
934     for i in range(len(name)):
935         if not fnmatch.fnmatch(name[i], pat[i]):
936             return False
937     return True
938
939 def machine_progress(bytes_written, bytes_expected):
940     return _machine_format.format(
941         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
942
943 def human_progress(bytes_written, bytes_expected):
944     if bytes_expected:
945         return "\r{}M / {}M {:.1%} ".format(
946             bytes_written >> 20, bytes_expected >> 20,
947             float(bytes_written) / bytes_expected)
948     else:
949         return "\r{} ".format(bytes_written)
950
951 def progress_writer(progress_func, outfile=sys.stderr):
952     def write_progress(bytes_written, bytes_expected):
953         outfile.write(progress_func(bytes_written, bytes_expected))
954     return write_progress
955
956 def exit_signal_handler(sigcode, frame):
957     sys.exit(-sigcode)
958
959 def desired_project_uuid(api_client, project_uuid, num_retries):
960     if not project_uuid:
961         query = api_client.users().current()
962     elif arvados.util.user_uuid_pattern.match(project_uuid):
963         query = api_client.users().get(uuid=project_uuid)
964     elif arvados.util.group_uuid_pattern.match(project_uuid):
965         query = api_client.groups().get(uuid=project_uuid)
966     else:
967         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
968     return query.execute(num_retries=num_retries)['uuid']
969
970 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
971     global api_client
972
973     args = parse_arguments(arguments)
974     logger = logging.getLogger('arvados.arv_put')
975     if args.silent:
976         logger.setLevel(logging.WARNING)
977     else:
978         logger.setLevel(logging.INFO)
979     status = 0
980     if api_client is None:
981         api_client = arvados.api('v1')
982
983     # Determine the name to use
984     if args.name:
985         if args.stream or args.raw:
986             logger.error("Cannot use --name with --stream or --raw")
987             sys.exit(1)
988         elif args.update_collection:
989             logger.error("Cannot use --name with --update-collection")
990             sys.exit(1)
991         collection_name = args.name
992     else:
993         collection_name = "Saved at {} by {}@{}".format(
994             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
995             pwd.getpwuid(os.getuid()).pw_name,
996             socket.gethostname())
997
998     if args.project_uuid and (args.stream or args.raw):
999         logger.error("Cannot use --project-uuid with --stream or --raw")
1000         sys.exit(1)
1001
1002     # Determine the parent project
1003     try:
1004         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1005                                             args.retries)
1006     except (apiclient_errors.Error, ValueError) as error:
1007         logger.error(error)
1008         sys.exit(1)
1009
1010     if args.progress:
1011         reporter = progress_writer(human_progress)
1012     elif args.batch_progress:
1013         reporter = progress_writer(machine_progress)
1014     else:
1015         reporter = None
1016
1017     # Setup exclude regex from all the --exclude arguments provided
1018     name_patterns = []
1019     exclude_paths = []
1020     exclude_names = None
1021     if len(args.exclude) > 0:
1022         # We're supporting 2 kinds of exclusion patterns:
1023         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1024         #                            the name, wherever the file is on the tree)
1025         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1026         #                            entire path, and should be relative to
1027         #                            any input dir argument)
1028         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1029         #                            placed directly underneath the input dir)
1030         for p in args.exclude:
1031             # Only relative paths patterns allowed
1032             if p.startswith(os.sep):
1033                 logger.error("Cannot use absolute paths with --exclude")
1034                 sys.exit(1)
1035             if os.path.dirname(p):
1036                 # We don't support of path patterns with '..'
1037                 p_parts = p.split(os.sep)
1038                 if '..' in p_parts:
1039                     logger.error(
1040                         "Cannot use path patterns that include or '..'")
1041                     sys.exit(1)
1042                 # Path search pattern
1043                 exclude_paths.append(p)
1044             else:
1045                 # Name-only search pattern
1046                 name_patterns.append(p)
1047         # For name only matching, we can combine all patterns into a single
1048         # regexp, for better performance.
1049         exclude_names = re.compile('|'.join(
1050             [fnmatch.translate(p) for p in name_patterns]
1051         )) if len(name_patterns) > 0 else None
1052         # Show the user the patterns to be used, just in case they weren't
1053         # specified inside quotes and got changed by the shell expansion.
1054         logger.info("Exclude patterns: {}".format(args.exclude))
1055
1056     # If this is used by a human, and there's at least one directory to be
1057     # uploaded, the expected bytes calculation can take a moment.
1058     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1059         logger.info("Calculating upload size, this could take some time...")
1060     try:
1061         writer = ArvPutUploadJob(paths = args.paths,
1062                                  resume = args.resume,
1063                                  use_cache = args.use_cache,
1064                                  filename = args.filename,
1065                                  reporter = reporter,
1066                                  num_retries = args.retries,
1067                                  replication_desired = args.replication,
1068                                  put_threads = args.threads,
1069                                  name = collection_name,
1070                                  owner_uuid = project_uuid,
1071                                  ensure_unique_name = True,
1072                                  update_collection = args.update_collection,
1073                                  logger=logger,
1074                                  dry_run=args.dry_run,
1075                                  follow_links=args.follow_links,
1076                                  exclude_paths=exclude_paths,
1077                                  exclude_names=exclude_names)
1078     except ResumeCacheConflict:
1079         logger.error("\n".join([
1080             "arv-put: Another process is already uploading this data.",
1081             "         Use --no-cache if this is really what you want."]))
1082         sys.exit(1)
1083     except CollectionUpdateError as error:
1084         logger.error("\n".join([
1085             "arv-put: %s" % str(error)]))
1086         sys.exit(1)
1087     except ArvPutUploadIsPending:
1088         # Dry run check successful, return proper exit code.
1089         sys.exit(2)
1090     except ArvPutUploadNotPending:
1091         # No files pending for upload
1092         sys.exit(0)
1093     except PathDoesNotExistError as error:
1094         logger.error("\n".join([
1095             "arv-put: %s" % str(error)]))
1096         sys.exit(1)
1097
1098     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1099     # the originals.
1100     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1101                             for sigcode in CAUGHT_SIGNALS}
1102
1103     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1104         logger.warning("\n".join([
1105             "arv-put: Resuming previous upload from last checkpoint.",
1106             "         Use the --no-resume option to start over."]))
1107
1108     if not args.dry_run:
1109         writer.report_progress()
1110     output = None
1111     try:
1112         writer.start(save_collection=not(args.stream or args.raw))
1113     except arvados.errors.ApiError as error:
1114         logger.error("\n".join([
1115             "arv-put: %s" % str(error)]))
1116         sys.exit(1)
1117
1118     if args.progress:  # Print newline to split stderr from stdout for humans.
1119         logger.info("\n")
1120
1121     if args.stream:
1122         if args.normalize:
1123             output = writer.manifest_text(normalize=True)
1124         else:
1125             output = writer.manifest_text()
1126     elif args.raw:
1127         output = ','.join(writer.data_locators())
1128     else:
1129         try:
1130             if args.update_collection:
1131                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1132             else:
1133                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1134             if args.portable_data_hash:
1135                 output = writer.portable_data_hash()
1136             else:
1137                 output = writer.manifest_locator()
1138         except apiclient_errors.Error as error:
1139             logger.error(
1140                 "arv-put: Error creating Collection on project: {}.".format(
1141                     error))
1142             status = 1
1143
1144     # Print the locator (uuid) of the new collection.
1145     if output is None:
1146         status = status or 1
1147     elif not args.silent:
1148         stdout.write(output)
1149         if not output.endswith('\n'):
1150             stdout.write('\n')
1151
1152     for sigcode, orig_handler in listitems(orig_signal_handlers):
1153         signal.signal(sigcode, orig_handler)
1154
1155     if status != 0:
1156         sys.exit(status)
1157
1158     # Success!
1159     return output
1160
1161
1162 if __name__ == '__main__':
1163     main()