11167: Merge branch 'master' into 11167-wb-remove-arvget
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34
35 import arvados.commands._util as arv_cmd
36
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55                     default=-1, help=argparse.SUPPRESS)
56
57 _group.add_argument('--normalize', action='store_true',
58                     help="""
59 Normalize the manifest by re-ordering files and streams after writing
60 data.
61 """)
62
63 _group.add_argument('--dry-run', action='store_true', default=False,
64                     help="""
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
67 """)
68
69 _group = upload_opts.add_mutually_exclusive_group()
70
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
72                     help="""
73 Synonym for --stream.
74 """)
75
76 _group.add_argument('--stream', action='store_true',
77                     help="""
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
80 in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
145                          help="""
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
148 to use 2 threads.
149 On high latency installations, using a greater number will improve
150 overall throughput.
151 """)
152
153 run_opts = argparse.ArgumentParser(add_help=False)
154
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
157 project.
158 """)
159
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
162 """)
163
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165                       action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 For the special case of needing to exclude only files or dirs directly below
172 the given input directory, you can use a pattern like './exclude_this.gif'.
173 You can specify multiple patterns by using this argument more than once.
174 """)
175
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--progress', action='store_true',
178                     help="""
179 Display human-readable progress on stderr (bytes and, if possible,
180 percentage of total data size). This is the default behavior when
181 stderr is a tty.
182 """)
183
184 _group.add_argument('--no-progress', action='store_true',
185                     help="""
186 Do not display human-readable progress on stderr, even if stderr is a
187 tty.
188 """)
189
190 _group.add_argument('--batch-progress', action='store_true',
191                     help="""
192 Display machine-readable progress on stderr (bytes and, if known,
193 total data size).
194 """)
195
196 _group.add_argument('--silent', action='store_true',
197                     help="""
198 Do not print any debug messages to console. (Any error messages will still be displayed.)
199 """)
200
201 _group = run_opts.add_mutually_exclusive_group()
202 _group.add_argument('--resume', action='store_true', default=True,
203                     help="""
204 Continue interrupted uploads from cached state (default).
205 """)
206 _group.add_argument('--no-resume', action='store_false', dest='resume',
207                     help="""
208 Do not continue interrupted uploads from cached state.
209 """)
210
211 _group = run_opts.add_mutually_exclusive_group()
212 _group.add_argument('--follow-links', action='store_true', default=True,
213                     dest='follow_links', help="""
214 Follow file and directory symlinks (default).
215 """)
216 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
217                     help="""
218 Do not follow file and directory symlinks.
219 """)
220
221 _group = run_opts.add_mutually_exclusive_group()
222 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
223                     help="""
224 Save upload state in a cache file for resuming (default).
225 """)
226 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
227                     help="""
228 Do not save upload state in a cache file for resuming.
229 """)
230
231 arg_parser = argparse.ArgumentParser(
232     description='Copy data from the local filesystem to Keep.',
233     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
234
235 def parse_arguments(arguments):
236     args = arg_parser.parse_args(arguments)
237
238     if len(args.paths) == 0:
239         args.paths = ['-']
240
241     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
242
243     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
244         if args.filename:
245             arg_parser.error("""
246     --filename argument cannot be used when storing a directory or
247     multiple files.
248     """)
249
250     # Turn on --progress by default if stderr is a tty.
251     if (not (args.batch_progress or args.no_progress or args.silent)
252         and os.isatty(sys.stderr.fileno())):
253         args.progress = True
254
255     # Turn off --resume (default) if --no-cache is used.
256     if not args.use_cache:
257         args.resume = False
258
259     if args.paths == ['-']:
260         if args.update_collection:
261             arg_parser.error("""
262     --update-collection cannot be used when reading from stdin.
263     """)
264         args.resume = False
265         args.use_cache = False
266         if not args.filename:
267             args.filename = 'stdin'
268
269     # Remove possible duplicated patterns
270     if len(args.exclude) > 0:
271         args.exclude = list(set(args.exclude))
272
273     return args
274
275
276 class PathDoesNotExistError(Exception):
277     pass
278
279
280 class CollectionUpdateError(Exception):
281     pass
282
283
284 class ResumeCacheConflict(Exception):
285     pass
286
287
288 class ArvPutArgumentConflict(Exception):
289     pass
290
291
292 class ArvPutUploadIsPending(Exception):
293     pass
294
295
296 class ArvPutUploadNotPending(Exception):
297     pass
298
299
300 class FileUploadList(list):
301     def __init__(self, dry_run=False):
302         list.__init__(self)
303         self.dry_run = dry_run
304
305     def append(self, other):
306         if self.dry_run:
307             raise ArvPutUploadIsPending()
308         super(FileUploadList, self).append(other)
309
310
311 class ResumeCache(object):
312     CACHE_DIR = '.cache/arvados/arv-put'
313
314     def __init__(self, file_spec):
315         self.cache_file = open(file_spec, 'a+')
316         self._lock_file(self.cache_file)
317         self.filename = self.cache_file.name
318
319     @classmethod
320     def make_path(cls, args):
321         md5 = hashlib.md5()
322         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
323         realpaths = sorted(os.path.realpath(path) for path in args.paths)
324         md5.update(b'\0'.join([p.encode() for p in realpaths]))
325         if any(os.path.isdir(path) for path in realpaths):
326             md5.update(b'-1')
327         elif args.filename:
328             md5.update(args.filename.encode())
329         return os.path.join(
330             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
331             md5.hexdigest())
332
333     def _lock_file(self, fileobj):
334         try:
335             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
336         except IOError:
337             raise ResumeCacheConflict("{} locked".format(fileobj.name))
338
339     def load(self):
340         self.cache_file.seek(0)
341         return json.load(self.cache_file)
342
343     def check_cache(self, api_client=None, num_retries=0):
344         try:
345             state = self.load()
346             locator = None
347             try:
348                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
349                     locator = state["_finished_streams"][0][1][0]
350                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
351                     locator = state["_current_stream_locators"][0]
352                 if locator is not None:
353                     kc = arvados.keep.KeepClient(api_client=api_client)
354                     kc.head(locator, num_retries=num_retries)
355             except Exception as e:
356                 self.restart()
357         except (ValueError):
358             pass
359
360     def save(self, data):
361         try:
362             new_cache_fd, new_cache_name = tempfile.mkstemp(
363                 dir=os.path.dirname(self.filename))
364             self._lock_file(new_cache_fd)
365             new_cache = os.fdopen(new_cache_fd, 'r+')
366             json.dump(data, new_cache)
367             os.rename(new_cache_name, self.filename)
368         except (IOError, OSError, ResumeCacheConflict) as error:
369             try:
370                 os.unlink(new_cache_name)
371             except NameError:  # mkstemp failed.
372                 pass
373         else:
374             self.cache_file.close()
375             self.cache_file = new_cache
376
377     def close(self):
378         self.cache_file.close()
379
380     def destroy(self):
381         try:
382             os.unlink(self.filename)
383         except OSError as error:
384             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
385                 raise
386         self.close()
387
388     def restart(self):
389         self.destroy()
390         self.__init__(self.filename)
391
392
393 class ArvPutUploadJob(object):
394     CACHE_DIR = '.cache/arvados/arv-put'
395     EMPTY_STATE = {
396         'manifest' : None, # Last saved manifest checkpoint
397         'files' : {} # Previous run file list: {path : {size, mtime}}
398     }
399
400     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
401                  name=None, owner_uuid=None,
402                  ensure_unique_name=False, num_retries=None,
403                  put_threads=None, replication_desired=None,
404                  filename=None, update_time=60.0, update_collection=None,
405                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
406                  follow_links=True, exclude_paths=[], exclude_names=None):
407         self.paths = paths
408         self.resume = resume
409         self.use_cache = use_cache
410         self.update = False
411         self.reporter = reporter
412         # This will set to 0 before start counting, if no special files are going
413         # to be read.
414         self.bytes_expected = None
415         self.bytes_written = 0
416         self.bytes_skipped = 0
417         self.name = name
418         self.owner_uuid = owner_uuid
419         self.ensure_unique_name = ensure_unique_name
420         self.num_retries = num_retries
421         self.replication_desired = replication_desired
422         self.put_threads = put_threads
423         self.filename = filename
424         self._state_lock = threading.Lock()
425         self._state = None # Previous run state (file list & manifest)
426         self._current_files = [] # Current run file list
427         self._cache_file = None
428         self._collection_lock = threading.Lock()
429         self._remote_collection = None # Collection being updated (if asked)
430         self._local_collection = None # Collection from previous run manifest
431         self._file_paths = set() # Files to be updated in remote collection
432         self._stop_checkpointer = threading.Event()
433         self._checkpointer = threading.Thread(target=self._update_task)
434         self._checkpointer.daemon = True
435         self._update_task_time = update_time  # How many seconds wait between update runs
436         self._files_to_upload = FileUploadList(dry_run=dry_run)
437         self._upload_started = False
438         self.logger = logger
439         self.dry_run = dry_run
440         self._checkpoint_before_quit = True
441         self.follow_links = follow_links
442         self.exclude_paths = exclude_paths
443         self.exclude_names = exclude_names
444
445         if not self.use_cache and self.resume:
446             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
447
448         # Check for obvious dry-run responses
449         if self.dry_run and (not self.use_cache or not self.resume):
450             raise ArvPutUploadIsPending()
451
452         # Load cached data if any and if needed
453         self._setup_state(update_collection)
454
455         # Build the upload file list, excluding requested files and counting the
456         # bytes expected to be uploaded.
457         self._build_upload_list()
458
459     def _build_upload_list(self):
460         """
461         Scan the requested paths to count file sizes, excluding files & dirs if requested
462         and building the upload file list.
463         """
464         # If there aren't special files to be read, reset total bytes count to zero
465         # to start counting.
466         if not any([p for p in self.paths
467                     if not (os.path.isfile(p) or os.path.isdir(p))]):
468             self.bytes_expected = 0
469
470         for path in self.paths:
471             # Test for stdin first, in case some file named '-' exist
472             if path == '-':
473                 if self.dry_run:
474                     raise ArvPutUploadIsPending()
475                 self._write_stdin(self.filename or 'stdin')
476             elif not os.path.exists(path):
477                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
478             elif os.path.isdir(path):
479                 # Use absolute paths on cache index so CWD doesn't interfere
480                 # with the caching logic.
481                 orig_path = path
482                 path = os.path.abspath(path)
483                 if orig_path[-1:] == os.sep:
484                     # When passing a directory reference with a trailing slash,
485                     # its contents should be uploaded directly to the
486                     # collection's root.
487                     prefixdir = path
488                 else:
489                     # When passing a directory reference with no trailing slash,
490                     # upload the directory to the collection's root.
491                     prefixdir = os.path.dirname(path)
492                 prefixdir += os.sep
493                 for root, dirs, files in os.walk(path,
494                                                  followlinks=self.follow_links):
495                     root_relpath = os.path.relpath(root, path)
496                     if root_relpath == '.':
497                         root_relpath = ''
498                     # Exclude files/dirs by full path matching pattern
499                     if self.exclude_paths:
500                         dirs[:] = [d for d in dirs
501                                    if not any(pathname_match(
502                                            os.path.join(root_relpath, d), pat)
503                                               for pat in self.exclude_paths)]
504                         files = [f for f in files
505                                  if not any(pathname_match(
506                                          os.path.join(root_relpath, f), pat)
507                                             for pat in self.exclude_paths)]
508                     # Exclude files/dirs by name matching pattern
509                     if self.exclude_names is not None:
510                         dirs[:] = [d for d in dirs
511                                    if not self.exclude_names.match(d)]
512                         files = [f for f in files
513                                  if not self.exclude_names.match(f)]
514                     # Make os.walk()'s dir traversing order deterministic
515                     dirs.sort()
516                     files.sort()
517                     for f in files:
518                         filepath = os.path.join(root, f)
519                         # Add its size to the total bytes count (if applicable)
520                         if self.follow_links or (not os.path.islink(filepath)):
521                             if self.bytes_expected is not None:
522                                 self.bytes_expected += os.path.getsize(filepath)
523                         self._check_file(filepath,
524                                          os.path.join(root[len(prefixdir):], f))
525             else:
526                 filepath = os.path.abspath(path)
527                 # Add its size to the total bytes count (if applicable)
528                 if self.follow_links or (not os.path.islink(filepath)):
529                     if self.bytes_expected is not None:
530                         self.bytes_expected += os.path.getsize(filepath)
531                 self._check_file(filepath,
532                                  self.filename or os.path.basename(path))
533         # If dry-mode is on, and got up to this point, then we should notify that
534         # there aren't any file to upload.
535         if self.dry_run:
536             raise ArvPutUploadNotPending()
537         # Remove local_collection's files that don't exist locally anymore, so the
538         # bytes_written count is correct.
539         for f in self.collection_file_paths(self._local_collection,
540                                             path_prefix=""):
541             if f != 'stdin' and f != self.filename and not f in self._file_paths:
542                 self._local_collection.remove(f)
543
544     def start(self, save_collection):
545         """
546         Start supporting thread & file uploading
547         """
548         self._checkpointer.start()
549         try:
550             # Update bytes_written from current local collection and
551             # report initial progress.
552             self._update()
553             # Actual file upload
554             self._upload_started = True # Used by the update thread to start checkpointing
555             self._upload_files()
556         except (SystemExit, Exception) as e:
557             self._checkpoint_before_quit = False
558             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
559             # Note: We're expecting SystemExit instead of
560             # KeyboardInterrupt because we have a custom signal
561             # handler in place that raises SystemExit with the catched
562             # signal's code.
563             if isinstance(e, PathDoesNotExistError):
564                 # We aren't interested in the traceback for this case
565                 pass
566             elif not isinstance(e, SystemExit) or e.code != -2:
567                 self.logger.warning("Abnormal termination:\n{}".format(
568                     traceback.format_exc()))
569             raise
570         finally:
571             if not self.dry_run:
572                 # Stop the thread before doing anything else
573                 self._stop_checkpointer.set()
574                 self._checkpointer.join()
575                 if self._checkpoint_before_quit:
576                     # Commit all pending blocks & one last _update()
577                     self._local_collection.manifest_text()
578                     self._update(final=True)
579                     if save_collection:
580                         self.save_collection()
581             if self.use_cache:
582                 self._cache_file.close()
583
584     def save_collection(self):
585         if self.update:
586             # Check if files should be updated on the remote collection.
587             for fp in self._file_paths:
588                 remote_file = self._remote_collection.find(fp)
589                 if not remote_file:
590                     # File don't exist on remote collection, copy it.
591                     self._remote_collection.copy(fp, fp, self._local_collection)
592                 elif remote_file != self._local_collection.find(fp):
593                     # A different file exist on remote collection, overwrite it.
594                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
595                 else:
596                     # The file already exist on remote collection, skip it.
597                     pass
598             self._remote_collection.save(num_retries=self.num_retries)
599         else:
600             self._local_collection.save_new(
601                 name=self.name, owner_uuid=self.owner_uuid,
602                 ensure_unique_name=self.ensure_unique_name,
603                 num_retries=self.num_retries)
604
605     def destroy_cache(self):
606         if self.use_cache:
607             try:
608                 os.unlink(self._cache_filename)
609             except OSError as error:
610                 # That's what we wanted anyway.
611                 if error.errno != errno.ENOENT:
612                     raise
613             self._cache_file.close()
614
615     def _collection_size(self, collection):
616         """
617         Recursively get the total size of the collection
618         """
619         size = 0
620         for item in listvalues(collection):
621             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
622                 size += self._collection_size(item)
623             else:
624                 size += item.size()
625         return size
626
627     def _update_task(self):
628         """
629         Periodically called support task. File uploading is
630         asynchronous so we poll status from the collection.
631         """
632         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
633             self._update()
634
635     def _update(self, final=False):
636         """
637         Update cached manifest text and report progress.
638         """
639         if self._upload_started:
640             with self._collection_lock:
641                 self.bytes_written = self._collection_size(self._local_collection)
642                 if self.use_cache:
643                     if final:
644                         manifest = self._local_collection.manifest_text()
645                     else:
646                         # Get the manifest text without comitting pending blocks
647                         manifest = self._local_collection.manifest_text(strip=False,
648                                                                         normalize=False,
649                                                                         only_committed=True)
650                     # Update cache
651                     with self._state_lock:
652                         self._state['manifest'] = manifest
653             if self.use_cache:
654                 try:
655                     self._save_state()
656                 except Exception as e:
657                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
658         else:
659             self.bytes_written = self.bytes_skipped
660         # Call the reporter, if any
661         self.report_progress()
662
663     def report_progress(self):
664         if self.reporter is not None:
665             self.reporter(self.bytes_written, self.bytes_expected)
666
667     def _write_stdin(self, filename):
668         output = self._local_collection.open(filename, 'wb')
669         self._write(sys.stdin, output)
670         output.close()
671
672     def _check_file(self, source, filename):
673         """
674         Check if this file needs to be uploaded
675         """
676         # Ignore symlinks when requested
677         if (not self.follow_links) and os.path.islink(source):
678             return
679         resume_offset = 0
680         should_upload = False
681         new_file_in_cache = False
682         # Record file path for updating the remote collection before exiting
683         self._file_paths.add(filename)
684
685         with self._state_lock:
686             # If no previous cached data on this file, store it for an eventual
687             # repeated run.
688             if source not in self._state['files']:
689                 self._state['files'][source] = {
690                     'mtime': os.path.getmtime(source),
691                     'size' : os.path.getsize(source)
692                 }
693                 new_file_in_cache = True
694             cached_file_data = self._state['files'][source]
695
696         # Check if file was already uploaded (at least partially)
697         file_in_local_collection = self._local_collection.find(filename)
698
699         # If not resuming, upload the full file.
700         if not self.resume:
701             should_upload = True
702         # New file detected from last run, upload it.
703         elif new_file_in_cache:
704             should_upload = True
705         # Local file didn't change from last run.
706         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
707             if not file_in_local_collection:
708                 # File not uploaded yet, upload it completely
709                 should_upload = True
710             elif file_in_local_collection.permission_expired():
711                 # Permission token expired, re-upload file. This will change whenever
712                 # we have a API for refreshing tokens.
713                 should_upload = True
714                 self._local_collection.remove(filename)
715             elif cached_file_data['size'] == file_in_local_collection.size():
716                 # File already there, skip it.
717                 self.bytes_skipped += cached_file_data['size']
718             elif cached_file_data['size'] > file_in_local_collection.size():
719                 # File partially uploaded, resume!
720                 resume_offset = file_in_local_collection.size()
721                 self.bytes_skipped += resume_offset
722                 should_upload = True
723             else:
724                 # Inconsistent cache, re-upload the file
725                 should_upload = True
726                 self._local_collection.remove(filename)
727                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
728         # Local file differs from cached data, re-upload it.
729         else:
730             if file_in_local_collection:
731                 self._local_collection.remove(filename)
732             should_upload = True
733
734         if should_upload:
735             try:
736                 self._files_to_upload.append((source, resume_offset, filename))
737             except ArvPutUploadIsPending:
738                 # This could happen when running on dry-mode, close cache file to
739                 # avoid locking issues.
740                 self._cache_file.close()
741                 raise
742
743     def _upload_files(self):
744         for source, resume_offset, filename in self._files_to_upload:
745             with open(source, 'rb') as source_fd:
746                 with self._state_lock:
747                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
748                     self._state['files'][source]['size'] = os.path.getsize(source)
749                 if resume_offset > 0:
750                     # Start upload where we left off
751                     output = self._local_collection.open(filename, 'ab')
752                     source_fd.seek(resume_offset)
753                 else:
754                     # Start from scratch
755                     output = self._local_collection.open(filename, 'wb')
756                 self._write(source_fd, output)
757                 output.close(flush=False)
758
759     def _write(self, source_fd, output):
760         while True:
761             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
762             if not data:
763                 break
764             output.write(data)
765
766     def _my_collection(self):
767         return self._remote_collection if self.update else self._local_collection
768
769     def _setup_state(self, update_collection):
770         """
771         Create a new cache file or load a previously existing one.
772         """
773         # Load an already existing collection for update
774         if update_collection and re.match(arvados.util.collection_uuid_pattern,
775                                           update_collection):
776             try:
777                 self._remote_collection = arvados.collection.Collection(update_collection)
778             except arvados.errors.ApiError as error:
779                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
780             else:
781                 self.update = True
782         elif update_collection:
783             # Collection locator provided, but unknown format
784             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
785
786         if self.use_cache:
787             # Set up cache file name from input paths.
788             md5 = hashlib.md5()
789             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
790             realpaths = sorted(os.path.realpath(path) for path in self.paths)
791             md5.update(b'\0'.join([p.encode() for p in realpaths]))
792             if self.filename:
793                 md5.update(self.filename.encode())
794             cache_filename = md5.hexdigest()
795             cache_filepath = os.path.join(
796                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
797                 cache_filename)
798             if self.resume and os.path.exists(cache_filepath):
799                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
800                 self._cache_file = open(cache_filepath, 'a+')
801             else:
802                 # --no-resume means start with a empty cache file.
803                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
804                 self._cache_file = open(cache_filepath, 'w+')
805             self._cache_filename = self._cache_file.name
806             self._lock_file(self._cache_file)
807             self._cache_file.seek(0)
808
809         with self._state_lock:
810             if self.use_cache:
811                 try:
812                     self._state = json.load(self._cache_file)
813                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
814                         # Cache at least partially incomplete, set up new cache
815                         self._state = copy.deepcopy(self.EMPTY_STATE)
816                 except ValueError:
817                     # Cache file empty, set up new cache
818                     self._state = copy.deepcopy(self.EMPTY_STATE)
819             else:
820                 self.logger.info("No cache usage requested for this run.")
821                 # No cache file, set empty state
822                 self._state = copy.deepcopy(self.EMPTY_STATE)
823             # Load the previous manifest so we can check if files were modified remotely.
824             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
825
826     def collection_file_paths(self, col, path_prefix='.'):
827         """Return a list of file paths by recursively go through the entire collection `col`"""
828         file_paths = []
829         for name, item in listitems(col):
830             if isinstance(item, arvados.arvfile.ArvadosFile):
831                 file_paths.append(os.path.join(path_prefix, name))
832             elif isinstance(item, arvados.collection.Subcollection):
833                 new_prefix = os.path.join(path_prefix, name)
834                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
835         return file_paths
836
837     def _lock_file(self, fileobj):
838         try:
839             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
840         except IOError:
841             raise ResumeCacheConflict("{} locked".format(fileobj.name))
842
843     def _save_state(self):
844         """
845         Atomically save current state into cache.
846         """
847         with self._state_lock:
848             # We're not using copy.deepcopy() here because it's a lot slower
849             # than json.dumps(), and we're already needing JSON format to be
850             # saved on disk.
851             state = json.dumps(self._state)
852         try:
853             new_cache = tempfile.NamedTemporaryFile(
854                 mode='w+',
855                 dir=os.path.dirname(self._cache_filename), delete=False)
856             self._lock_file(new_cache)
857             new_cache.write(state)
858             new_cache.flush()
859             os.fsync(new_cache)
860             os.rename(new_cache.name, self._cache_filename)
861         except (IOError, OSError, ResumeCacheConflict) as error:
862             self.logger.error("There was a problem while saving the cache file: {}".format(error))
863             try:
864                 os.unlink(new_cache_name)
865             except NameError:  # mkstemp failed.
866                 pass
867         else:
868             self._cache_file.close()
869             self._cache_file = new_cache
870
871     def collection_name(self):
872         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
873
874     def manifest_locator(self):
875         return self._my_collection().manifest_locator()
876
877     def portable_data_hash(self):
878         pdh = self._my_collection().portable_data_hash()
879         m = self._my_collection().stripped_manifest().encode()
880         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
881         if pdh != local_pdh:
882             logger.warning("\n".join([
883                 "arv-put: API server provided PDH differs from local manifest.",
884                 "         This should not happen; showing API server version."]))
885         return pdh
886
887     def manifest_text(self, stream_name=".", strip=False, normalize=False):
888         return self._my_collection().manifest_text(stream_name, strip, normalize)
889
890     def _datablocks_on_item(self, item):
891         """
892         Return a list of datablock locators, recursively navigating
893         through subcollections
894         """
895         if isinstance(item, arvados.arvfile.ArvadosFile):
896             if item.size() == 0:
897                 # Empty file locator
898                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
899             else:
900                 locators = []
901                 for segment in item.segments():
902                     loc = segment.locator
903                     locators.append(loc)
904                 return locators
905         elif isinstance(item, arvados.collection.Collection):
906             l = [self._datablocks_on_item(x) for x in listvalues(item)]
907             # Fast list flattener method taken from:
908             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
909             return [loc for sublist in l for loc in sublist]
910         else:
911             return None
912
913     def data_locators(self):
914         with self._collection_lock:
915             # Make sure all datablocks are flushed before getting the locators
916             self._my_collection().manifest_text()
917             datablocks = self._datablocks_on_item(self._my_collection())
918         return datablocks
919
920 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
921                                                             os.getpid())
922
923 # Simulate glob.glob() matching behavior without the need to scan the filesystem
924 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
925 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
926 # so instead we're using it on every path component.
927 def pathname_match(pathname, pattern):
928     name = pathname.split(os.sep)
929     # Fix patterns like 'some/subdir/' or 'some//subdir'
930     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
931     if len(name) != len(pat):
932         return False
933     for i in range(len(name)):
934         if not fnmatch.fnmatch(name[i], pat[i]):
935             return False
936     return True
937
938 def machine_progress(bytes_written, bytes_expected):
939     return _machine_format.format(
940         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
941
942 def human_progress(bytes_written, bytes_expected):
943     if bytes_expected:
944         return "\r{}M / {}M {:.1%} ".format(
945             bytes_written >> 20, bytes_expected >> 20,
946             float(bytes_written) / bytes_expected)
947     else:
948         return "\r{} ".format(bytes_written)
949
950 def progress_writer(progress_func, outfile=sys.stderr):
951     def write_progress(bytes_written, bytes_expected):
952         outfile.write(progress_func(bytes_written, bytes_expected))
953     return write_progress
954
955 def exit_signal_handler(sigcode, frame):
956     sys.exit(-sigcode)
957
958 def desired_project_uuid(api_client, project_uuid, num_retries):
959     if not project_uuid:
960         query = api_client.users().current()
961     elif arvados.util.user_uuid_pattern.match(project_uuid):
962         query = api_client.users().get(uuid=project_uuid)
963     elif arvados.util.group_uuid_pattern.match(project_uuid):
964         query = api_client.groups().get(uuid=project_uuid)
965     else:
966         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
967     return query.execute(num_retries=num_retries)['uuid']
968
969 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
970     global api_client
971
972     args = parse_arguments(arguments)
973     logger = logging.getLogger('arvados.arv_put')
974     if args.silent:
975         logger.setLevel(logging.WARNING)
976     else:
977         logger.setLevel(logging.INFO)
978     status = 0
979     if api_client is None:
980         api_client = arvados.api('v1')
981
982     # Determine the name to use
983     if args.name:
984         if args.stream or args.raw:
985             logger.error("Cannot use --name with --stream or --raw")
986             sys.exit(1)
987         elif args.update_collection:
988             logger.error("Cannot use --name with --update-collection")
989             sys.exit(1)
990         collection_name = args.name
991     else:
992         collection_name = "Saved at {} by {}@{}".format(
993             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
994             pwd.getpwuid(os.getuid()).pw_name,
995             socket.gethostname())
996
997     if args.project_uuid and (args.stream or args.raw):
998         logger.error("Cannot use --project-uuid with --stream or --raw")
999         sys.exit(1)
1000
1001     # Determine the parent project
1002     try:
1003         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1004                                             args.retries)
1005     except (apiclient_errors.Error, ValueError) as error:
1006         logger.error(error)
1007         sys.exit(1)
1008
1009     if args.progress:
1010         reporter = progress_writer(human_progress)
1011     elif args.batch_progress:
1012         reporter = progress_writer(machine_progress)
1013     else:
1014         reporter = None
1015
1016     # Setup exclude regex from all the --exclude arguments provided
1017     name_patterns = []
1018     exclude_paths = []
1019     exclude_names = None
1020     if len(args.exclude) > 0:
1021         # We're supporting 2 kinds of exclusion patterns:
1022         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1023         #                            the name, wherever the file is on the tree)
1024         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1025         #                            entire path, and should be relative to
1026         #                            any input dir argument)
1027         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1028         #                            placed directly underneath the input dir)
1029         for p in args.exclude:
1030             # Only relative paths patterns allowed
1031             if p.startswith(os.sep):
1032                 logger.error("Cannot use absolute paths with --exclude")
1033                 sys.exit(1)
1034             if os.path.dirname(p):
1035                 # We don't support of path patterns with '..'
1036                 p_parts = p.split(os.sep)
1037                 if '..' in p_parts:
1038                     logger.error(
1039                         "Cannot use path patterns that include or '..'")
1040                     sys.exit(1)
1041                 # Path search pattern
1042                 exclude_paths.append(p)
1043             else:
1044                 # Name-only search pattern
1045                 name_patterns.append(p)
1046         # For name only matching, we can combine all patterns into a single
1047         # regexp, for better performance.
1048         exclude_names = re.compile('|'.join(
1049             [fnmatch.translate(p) for p in name_patterns]
1050         )) if len(name_patterns) > 0 else None
1051         # Show the user the patterns to be used, just in case they weren't
1052         # specified inside quotes and got changed by the shell expansion.
1053         logger.info("Exclude patterns: {}".format(args.exclude))
1054
1055     # If this is used by a human, and there's at least one directory to be
1056     # uploaded, the expected bytes calculation can take a moment.
1057     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1058         logger.info("Calculating upload size, this could take some time...")
1059     try:
1060         writer = ArvPutUploadJob(paths = args.paths,
1061                                  resume = args.resume,
1062                                  use_cache = args.use_cache,
1063                                  filename = args.filename,
1064                                  reporter = reporter,
1065                                  num_retries = args.retries,
1066                                  replication_desired = args.replication,
1067                                  put_threads = args.threads,
1068                                  name = collection_name,
1069                                  owner_uuid = project_uuid,
1070                                  ensure_unique_name = True,
1071                                  update_collection = args.update_collection,
1072                                  logger=logger,
1073                                  dry_run=args.dry_run,
1074                                  follow_links=args.follow_links,
1075                                  exclude_paths=exclude_paths,
1076                                  exclude_names=exclude_names)
1077     except ResumeCacheConflict:
1078         logger.error("\n".join([
1079             "arv-put: Another process is already uploading this data.",
1080             "         Use --no-cache if this is really what you want."]))
1081         sys.exit(1)
1082     except CollectionUpdateError as error:
1083         logger.error("\n".join([
1084             "arv-put: %s" % str(error)]))
1085         sys.exit(1)
1086     except ArvPutUploadIsPending:
1087         # Dry run check successful, return proper exit code.
1088         sys.exit(2)
1089     except ArvPutUploadNotPending:
1090         # No files pending for upload
1091         sys.exit(0)
1092     except PathDoesNotExistError as error:
1093         logger.error("\n".join([
1094             "arv-put: %s" % str(error)]))
1095         sys.exit(1)
1096
1097     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1098     # the originals.
1099     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1100                             for sigcode in CAUGHT_SIGNALS}
1101
1102     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1103         logger.warning("\n".join([
1104             "arv-put: Resuming previous upload from last checkpoint.",
1105             "         Use the --no-resume option to start over."]))
1106
1107     if not args.dry_run:
1108         writer.report_progress()
1109     output = None
1110     try:
1111         writer.start(save_collection=not(args.stream or args.raw))
1112     except arvados.errors.ApiError as error:
1113         logger.error("\n".join([
1114             "arv-put: %s" % str(error)]))
1115         sys.exit(1)
1116
1117     if args.progress:  # Print newline to split stderr from stdout for humans.
1118         logger.info("\n")
1119
1120     if args.stream:
1121         if args.normalize:
1122             output = writer.manifest_text(normalize=True)
1123         else:
1124             output = writer.manifest_text()
1125     elif args.raw:
1126         output = ','.join(writer.data_locators())
1127     else:
1128         try:
1129             if args.update_collection:
1130                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1131             else:
1132                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1133             if args.portable_data_hash:
1134                 output = writer.portable_data_hash()
1135             else:
1136                 output = writer.manifest_locator()
1137         except apiclient_errors.Error as error:
1138             logger.error(
1139                 "arv-put: Error creating Collection on project: {}.".format(
1140                     error))
1141             status = 1
1142
1143     # Print the locator (uuid) of the new collection.
1144     if output is None:
1145         status = status or 1
1146     elif not args.silent:
1147         stdout.write(output)
1148         if not output.endswith('\n'):
1149             stdout.write('\n')
1150
1151     for sigcode, orig_handler in listitems(orig_signal_handlers):
1152         signal.signal(sigcode, orig_handler)
1153
1154     if status != 0:
1155         sys.exit(status)
1156
1157     # Success!
1158     return output
1159
1160
1161 if __name__ == '__main__':
1162     main()