11789: Merge branch 'master' into 11789-arvput-exclude-flag
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34
35 import arvados.commands._util as arv_cmd
36
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55                     default=-1, help=argparse.SUPPRESS)
56
57 _group.add_argument('--normalize', action='store_true',
58                     help="""
59 Normalize the manifest by re-ordering files and streams after writing
60 data.
61 """)
62
63 _group.add_argument('--dry-run', action='store_true', default=False,
64                     help="""
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
67 """)
68
69 _group = upload_opts.add_mutually_exclusive_group()
70
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
72                     help="""
73 Synonym for --stream.
74 """)
75
76 _group.add_argument('--stream', action='store_true',
77                     help="""
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
80 in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
145                          help="""
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
148 to use 2 threads.
149 On high latency installations, using a greater number will improve
150 overall throughput.
151 """)
152
153 run_opts = argparse.ArgumentParser(add_help=False)
154
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
157 project.
158 """)
159
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
162 """)
163
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165                       action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 You can specify multiple patterns by using this argument more than once.
172 """)
173
174 _group = run_opts.add_mutually_exclusive_group()
175 _group.add_argument('--progress', action='store_true',
176                     help="""
177 Display human-readable progress on stderr (bytes and, if possible,
178 percentage of total data size). This is the default behavior when
179 stderr is a tty.
180 """)
181
182 _group.add_argument('--no-progress', action='store_true',
183                     help="""
184 Do not display human-readable progress on stderr, even if stderr is a
185 tty.
186 """)
187
188 _group.add_argument('--batch-progress', action='store_true',
189                     help="""
190 Display machine-readable progress on stderr (bytes and, if known,
191 total data size).
192 """)
193
194 _group = run_opts.add_mutually_exclusive_group()
195 _group.add_argument('--resume', action='store_true', default=True,
196                     help="""
197 Continue interrupted uploads from cached state (default).
198 """)
199 _group.add_argument('--no-resume', action='store_false', dest='resume',
200                     help="""
201 Do not continue interrupted uploads from cached state.
202 """)
203
204 _group = run_opts.add_mutually_exclusive_group()
205 _group.add_argument('--follow-links', action='store_true', default=True,
206                     dest='follow_links', help="""
207 Follow file and directory symlinks (default).
208 """)
209 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
210                     help="""
211 Do not follow file and directory symlinks.
212 """)
213
214 _group = run_opts.add_mutually_exclusive_group()
215 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
216                     help="""
217 Save upload state in a cache file for resuming (default).
218 """)
219 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
220                     help="""
221 Do not save upload state in a cache file for resuming.
222 """)
223
224 arg_parser = argparse.ArgumentParser(
225     description='Copy data from the local filesystem to Keep.',
226     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
227
228 def parse_arguments(arguments):
229     args = arg_parser.parse_args(arguments)
230
231     if len(args.paths) == 0:
232         args.paths = ['-']
233
234     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
235
236     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
237         if args.filename:
238             arg_parser.error("""
239     --filename argument cannot be used when storing a directory or
240     multiple files.
241     """)
242
243     # Turn on --progress by default if stderr is a tty.
244     if (not (args.batch_progress or args.no_progress)
245         and os.isatty(sys.stderr.fileno())):
246         args.progress = True
247
248     # Turn off --resume (default) if --no-cache is used.
249     if not args.use_cache:
250         args.resume = False
251
252     if args.paths == ['-']:
253         if args.update_collection:
254             arg_parser.error("""
255     --update-collection cannot be used when reading from stdin.
256     """)
257         args.resume = False
258         args.use_cache = False
259         if not args.filename:
260             args.filename = 'stdin'
261
262     # Remove possible duplicated patterns
263     if len(args.exclude) > 0:
264         args.exclude = list(set(args.exclude))
265
266     return args
267
268
269 class PathDoesNotExistError(Exception):
270     pass
271
272
273 class CollectionUpdateError(Exception):
274     pass
275
276
277 class ResumeCacheConflict(Exception):
278     pass
279
280
281 class ArvPutArgumentConflict(Exception):
282     pass
283
284
285 class ArvPutUploadIsPending(Exception):
286     pass
287
288
289 class ArvPutUploadNotPending(Exception):
290     pass
291
292
293 class FileUploadList(list):
294     def __init__(self, dry_run=False):
295         list.__init__(self)
296         self.dry_run = dry_run
297
298     def append(self, other):
299         if self.dry_run:
300             raise ArvPutUploadIsPending()
301         super(FileUploadList, self).append(other)
302
303
304 class ResumeCache(object):
305     CACHE_DIR = '.cache/arvados/arv-put'
306
307     def __init__(self, file_spec):
308         self.cache_file = open(file_spec, 'a+')
309         self._lock_file(self.cache_file)
310         self.filename = self.cache_file.name
311
312     @classmethod
313     def make_path(cls, args):
314         md5 = hashlib.md5()
315         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
316         realpaths = sorted(os.path.realpath(path) for path in args.paths)
317         md5.update(b'\0'.join([p.encode() for p in realpaths]))
318         if any(os.path.isdir(path) for path in realpaths):
319             md5.update(b'-1')
320         elif args.filename:
321             md5.update(args.filename.encode())
322         return os.path.join(
323             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
324             md5.hexdigest())
325
326     def _lock_file(self, fileobj):
327         try:
328             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
329         except IOError:
330             raise ResumeCacheConflict("{} locked".format(fileobj.name))
331
332     def load(self):
333         self.cache_file.seek(0)
334         return json.load(self.cache_file)
335
336     def check_cache(self, api_client=None, num_retries=0):
337         try:
338             state = self.load()
339             locator = None
340             try:
341                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
342                     locator = state["_finished_streams"][0][1][0]
343                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
344                     locator = state["_current_stream_locators"][0]
345                 if locator is not None:
346                     kc = arvados.keep.KeepClient(api_client=api_client)
347                     kc.head(locator, num_retries=num_retries)
348             except Exception as e:
349                 self.restart()
350         except (ValueError):
351             pass
352
353     def save(self, data):
354         try:
355             new_cache_fd, new_cache_name = tempfile.mkstemp(
356                 dir=os.path.dirname(self.filename))
357             self._lock_file(new_cache_fd)
358             new_cache = os.fdopen(new_cache_fd, 'r+')
359             json.dump(data, new_cache)
360             os.rename(new_cache_name, self.filename)
361         except (IOError, OSError, ResumeCacheConflict) as error:
362             try:
363                 os.unlink(new_cache_name)
364             except NameError:  # mkstemp failed.
365                 pass
366         else:
367             self.cache_file.close()
368             self.cache_file = new_cache
369
370     def close(self):
371         self.cache_file.close()
372
373     def destroy(self):
374         try:
375             os.unlink(self.filename)
376         except OSError as error:
377             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
378                 raise
379         self.close()
380
381     def restart(self):
382         self.destroy()
383         self.__init__(self.filename)
384
385
386 class ArvPutUploadJob(object):
387     CACHE_DIR = '.cache/arvados/arv-put'
388     EMPTY_STATE = {
389         'manifest' : None, # Last saved manifest checkpoint
390         'files' : {} # Previous run file list: {path : {size, mtime}}
391     }
392
393     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
394                  name=None, owner_uuid=None,
395                  ensure_unique_name=False, num_retries=None,
396                  put_threads=None, replication_desired=None,
397                  filename=None, update_time=60.0, update_collection=None,
398                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
399                  follow_links=True, exclude_paths=[], exclude_names=None):
400         self.paths = paths
401         self.resume = resume
402         self.use_cache = use_cache
403         self.update = False
404         self.reporter = reporter
405         # This will set to 0 before start counting, if no special files are going
406         # to be read.
407         self.bytes_expected = None
408         self.bytes_written = 0
409         self.bytes_skipped = 0
410         self.name = name
411         self.owner_uuid = owner_uuid
412         self.ensure_unique_name = ensure_unique_name
413         self.num_retries = num_retries
414         self.replication_desired = replication_desired
415         self.put_threads = put_threads
416         self.filename = filename
417         self._state_lock = threading.Lock()
418         self._state = None # Previous run state (file list & manifest)
419         self._current_files = [] # Current run file list
420         self._cache_file = None
421         self._collection_lock = threading.Lock()
422         self._remote_collection = None # Collection being updated (if asked)
423         self._local_collection = None # Collection from previous run manifest
424         self._file_paths = set() # Files to be updated in remote collection
425         self._stop_checkpointer = threading.Event()
426         self._checkpointer = threading.Thread(target=self._update_task)
427         self._checkpointer.daemon = True
428         self._update_task_time = update_time  # How many seconds wait between update runs
429         self._files_to_upload = FileUploadList(dry_run=dry_run)
430         self._upload_started = False
431         self.logger = logger
432         self.dry_run = dry_run
433         self._checkpoint_before_quit = True
434         self.follow_links = follow_links
435         self.exclude_paths = exclude_paths
436         self.exclude_names = exclude_names
437
438         if not self.use_cache and self.resume:
439             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
440
441         # Check for obvious dry-run responses
442         if self.dry_run and (not self.use_cache or not self.resume):
443             raise ArvPutUploadIsPending()
444
445         # Load cached data if any and if needed
446         self._setup_state(update_collection)
447
448         # Build the upload file list, excluding requested files and counting the
449         # bytes expected to be uploaded.
450         self._build_upload_list()
451
452     def _build_upload_list(self):
453         """
454         Scan the requested paths to count file sizes, excluding files & dirs if requested
455         and building the upload file list.
456         """
457         # If there aren't special files to be read, reset total bytes count to zero
458         # to start counting.
459         if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)),
460                           self.paths)):
461             self.bytes_expected = 0
462
463         for path in self.paths:
464             # Test for stdin first, in case some file named '-' exist
465             if path == '-':
466                 if self.dry_run:
467                     raise ArvPutUploadIsPending()
468                 self._write_stdin(self.filename or 'stdin')
469             elif not os.path.exists(path):
470                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
471             elif os.path.isdir(path):
472                 # Use absolute paths on cache index so CWD doesn't interfere
473                 # with the caching logic.
474                 orig_path = path
475                 path = os.path.abspath(path)
476                 if orig_path[-1:] == os.sep:
477                     # When passing a directory reference with a trailing slash,
478                     # its contents should be uploaded directly to the
479                     # collection's root.
480                     prefixdir = path
481                 else:
482                     # When passing a directory reference with no trailing slash,
483                     # upload the directory to the collection's root.
484                     prefixdir = os.path.dirname(path)
485                 prefixdir += os.sep
486                 for root, dirs, files in os.walk(path,
487                                                  followlinks=self.follow_links):
488                     root_relpath = os.path.relpath(root, path)
489                     if root_relpath == '.':
490                         root_relpath = ''
491                     # Exclude files/dirs by full path matching pattern
492                     if self.exclude_paths:
493                         dirs[:] = list(filter(
494                             lambda d: not any(
495                                 [pathname_match(os.path.join(root_relpath, d),
496                                                 pat)
497                                  for pat in self.exclude_paths]),
498                             dirs))
499                         files = list(filter(
500                             lambda f: not any(
501                                 [pathname_match(os.path.join(root_relpath, f),
502                                                 pat)
503                                  for pat in self.exclude_paths]),
504                             files))
505                     # Exclude files/dirs by name matching pattern
506                     if self.exclude_names is not None:
507                         dirs[:] = list(filter(lambda d: not self.exclude_names.match(d), dirs))
508                         files = list(filter(lambda f: not self.exclude_names.match(f), files))
509                     # Make os.walk()'s dir traversing order deterministic
510                     dirs.sort()
511                     files.sort()
512                     for f in files:
513                         filepath = os.path.join(root, f)
514                         # Add its size to the total bytes count (if applicable)
515                         if self.follow_links or (not os.path.islink(filepath)):
516                             if self.bytes_expected is not None:
517                                 self.bytes_expected += os.path.getsize(filepath)
518                         self._check_file(filepath,
519                                          os.path.join(root[len(prefixdir):], f))
520             else:
521                 filepath = os.path.abspath(path)
522                 # Add its size to the total bytes count (if applicable)
523                 if self.follow_links or (not os.path.islink(filepath)):
524                     if self.bytes_expected is not None:
525                         self.bytes_expected += os.path.getsize(filepath)
526                 self._check_file(filepath,
527                                  self.filename or os.path.basename(path))
528         # If dry-mode is on, and got up to this point, then we should notify that
529         # there aren't any file to upload.
530         if self.dry_run:
531             raise ArvPutUploadNotPending()
532         # Remove local_collection's files that don't exist locally anymore, so the
533         # bytes_written count is correct.
534         for f in self.collection_file_paths(self._local_collection,
535                                             path_prefix=""):
536             if f != 'stdin' and f != self.filename and not f in self._file_paths:
537                 self._local_collection.remove(f)
538
539     def start(self, save_collection):
540         """
541         Start supporting thread & file uploading
542         """
543         self._checkpointer.start()
544         try:
545             # Update bytes_written from current local collection and
546             # report initial progress.
547             self._update()
548             # Actual file upload
549             self._upload_started = True # Used by the update thread to start checkpointing
550             self._upload_files()
551         except (SystemExit, Exception) as e:
552             self._checkpoint_before_quit = False
553             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
554             # Note: We're expecting SystemExit instead of
555             # KeyboardInterrupt because we have a custom signal
556             # handler in place that raises SystemExit with the catched
557             # signal's code.
558             if isinstance(e, PathDoesNotExistError):
559                 # We aren't interested in the traceback for this case
560                 pass
561             elif not isinstance(e, SystemExit) or e.code != -2:
562                 self.logger.warning("Abnormal termination:\n{}".format(
563                     traceback.format_exc()))
564             raise
565         finally:
566             if not self.dry_run:
567                 # Stop the thread before doing anything else
568                 self._stop_checkpointer.set()
569                 self._checkpointer.join()
570                 if self._checkpoint_before_quit:
571                     # Commit all pending blocks & one last _update()
572                     self._local_collection.manifest_text()
573                     self._update(final=True)
574                     if save_collection:
575                         self.save_collection()
576             if self.use_cache:
577                 self._cache_file.close()
578
579     def save_collection(self):
580         if self.update:
581             # Check if files should be updated on the remote collection.
582             for fp in self._file_paths:
583                 remote_file = self._remote_collection.find(fp)
584                 if not remote_file:
585                     # File don't exist on remote collection, copy it.
586                     self._remote_collection.copy(fp, fp, self._local_collection)
587                 elif remote_file != self._local_collection.find(fp):
588                     # A different file exist on remote collection, overwrite it.
589                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
590                 else:
591                     # The file already exist on remote collection, skip it.
592                     pass
593             self._remote_collection.save(num_retries=self.num_retries)
594         else:
595             self._local_collection.save_new(
596                 name=self.name, owner_uuid=self.owner_uuid,
597                 ensure_unique_name=self.ensure_unique_name,
598                 num_retries=self.num_retries)
599
600     def destroy_cache(self):
601         if self.use_cache:
602             try:
603                 os.unlink(self._cache_filename)
604             except OSError as error:
605                 # That's what we wanted anyway.
606                 if error.errno != errno.ENOENT:
607                     raise
608             self._cache_file.close()
609
610     def _collection_size(self, collection):
611         """
612         Recursively get the total size of the collection
613         """
614         size = 0
615         for item in listvalues(collection):
616             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
617                 size += self._collection_size(item)
618             else:
619                 size += item.size()
620         return size
621
622     def _update_task(self):
623         """
624         Periodically called support task. File uploading is
625         asynchronous so we poll status from the collection.
626         """
627         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
628             self._update()
629
630     def _update(self, final=False):
631         """
632         Update cached manifest text and report progress.
633         """
634         if self._upload_started:
635             with self._collection_lock:
636                 self.bytes_written = self._collection_size(self._local_collection)
637                 if self.use_cache:
638                     if final:
639                         manifest = self._local_collection.manifest_text()
640                     else:
641                         # Get the manifest text without comitting pending blocks
642                         manifest = self._local_collection.manifest_text(strip=False,
643                                                                         normalize=False,
644                                                                         only_committed=True)
645                     # Update cache
646                     with self._state_lock:
647                         self._state['manifest'] = manifest
648             if self.use_cache:
649                 try:
650                     self._save_state()
651                 except Exception as e:
652                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
653         else:
654             self.bytes_written = self.bytes_skipped
655         # Call the reporter, if any
656         self.report_progress()
657
658     def report_progress(self):
659         if self.reporter is not None:
660             self.reporter(self.bytes_written, self.bytes_expected)
661
662     def _write_stdin(self, filename):
663         output = self._local_collection.open(filename, 'wb')
664         self._write(sys.stdin, output)
665         output.close()
666
667     def _check_file(self, source, filename):
668         """
669         Check if this file needs to be uploaded
670         """
671         # Ignore symlinks when requested
672         if (not self.follow_links) and os.path.islink(source):
673             return
674         resume_offset = 0
675         should_upload = False
676         new_file_in_cache = False
677         # Record file path for updating the remote collection before exiting
678         self._file_paths.add(filename)
679
680         with self._state_lock:
681             # If no previous cached data on this file, store it for an eventual
682             # repeated run.
683             if source not in self._state['files']:
684                 self._state['files'][source] = {
685                     'mtime': os.path.getmtime(source),
686                     'size' : os.path.getsize(source)
687                 }
688                 new_file_in_cache = True
689             cached_file_data = self._state['files'][source]
690
691         # Check if file was already uploaded (at least partially)
692         file_in_local_collection = self._local_collection.find(filename)
693
694         # If not resuming, upload the full file.
695         if not self.resume:
696             should_upload = True
697         # New file detected from last run, upload it.
698         elif new_file_in_cache:
699             should_upload = True
700         # Local file didn't change from last run.
701         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
702             if not file_in_local_collection:
703                 # File not uploaded yet, upload it completely
704                 should_upload = True
705             elif file_in_local_collection.permission_expired():
706                 # Permission token expired, re-upload file. This will change whenever
707                 # we have a API for refreshing tokens.
708                 should_upload = True
709                 self._local_collection.remove(filename)
710             elif cached_file_data['size'] == file_in_local_collection.size():
711                 # File already there, skip it.
712                 self.bytes_skipped += cached_file_data['size']
713             elif cached_file_data['size'] > file_in_local_collection.size():
714                 # File partially uploaded, resume!
715                 resume_offset = file_in_local_collection.size()
716                 self.bytes_skipped += resume_offset
717                 should_upload = True
718             else:
719                 # Inconsistent cache, re-upload the file
720                 should_upload = True
721                 self._local_collection.remove(filename)
722                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
723         # Local file differs from cached data, re-upload it.
724         else:
725             if file_in_local_collection:
726                 self._local_collection.remove(filename)
727             should_upload = True
728
729         if should_upload:
730             try:
731                 self._files_to_upload.append((source, resume_offset, filename))
732             except ArvPutUploadIsPending:
733                 # This could happen when running on dry-mode, close cache file to
734                 # avoid locking issues.
735                 self._cache_file.close()
736                 raise
737
738     def _upload_files(self):
739         for source, resume_offset, filename in self._files_to_upload:
740             with open(source, 'rb') as source_fd:
741                 with self._state_lock:
742                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
743                     self._state['files'][source]['size'] = os.path.getsize(source)
744                 if resume_offset > 0:
745                     # Start upload where we left off
746                     output = self._local_collection.open(filename, 'ab')
747                     source_fd.seek(resume_offset)
748                 else:
749                     # Start from scratch
750                     output = self._local_collection.open(filename, 'wb')
751                 self._write(source_fd, output)
752                 output.close(flush=False)
753
754     def _write(self, source_fd, output):
755         while True:
756             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
757             if not data:
758                 break
759             output.write(data)
760
761     def _my_collection(self):
762         return self._remote_collection if self.update else self._local_collection
763
764     def _setup_state(self, update_collection):
765         """
766         Create a new cache file or load a previously existing one.
767         """
768         # Load an already existing collection for update
769         if update_collection and re.match(arvados.util.collection_uuid_pattern,
770                                           update_collection):
771             try:
772                 self._remote_collection = arvados.collection.Collection(update_collection)
773             except arvados.errors.ApiError as error:
774                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
775             else:
776                 self.update = True
777         elif update_collection:
778             # Collection locator provided, but unknown format
779             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
780
781         if self.use_cache:
782             # Set up cache file name from input paths.
783             md5 = hashlib.md5()
784             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
785             realpaths = sorted(os.path.realpath(path) for path in self.paths)
786             md5.update(b'\0'.join([p.encode() for p in realpaths]))
787             if self.filename:
788                 md5.update(self.filename.encode())
789             cache_filename = md5.hexdigest()
790             cache_filepath = os.path.join(
791                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
792                 cache_filename)
793             if self.resume and os.path.exists(cache_filepath):
794                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
795                 self._cache_file = open(cache_filepath, 'a+')
796             else:
797                 # --no-resume means start with a empty cache file.
798                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
799                 self._cache_file = open(cache_filepath, 'w+')
800             self._cache_filename = self._cache_file.name
801             self._lock_file(self._cache_file)
802             self._cache_file.seek(0)
803
804         with self._state_lock:
805             if self.use_cache:
806                 try:
807                     self._state = json.load(self._cache_file)
808                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
809                         # Cache at least partially incomplete, set up new cache
810                         self._state = copy.deepcopy(self.EMPTY_STATE)
811                 except ValueError:
812                     # Cache file empty, set up new cache
813                     self._state = copy.deepcopy(self.EMPTY_STATE)
814             else:
815                 self.logger.info("No cache usage requested for this run.")
816                 # No cache file, set empty state
817                 self._state = copy.deepcopy(self.EMPTY_STATE)
818             # Load the previous manifest so we can check if files were modified remotely.
819             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
820
821     def collection_file_paths(self, col, path_prefix='.'):
822         """Return a list of file paths by recursively go through the entire collection `col`"""
823         file_paths = []
824         for name, item in listitems(col):
825             if isinstance(item, arvados.arvfile.ArvadosFile):
826                 file_paths.append(os.path.join(path_prefix, name))
827             elif isinstance(item, arvados.collection.Subcollection):
828                 new_prefix = os.path.join(path_prefix, name)
829                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
830         return file_paths
831
832     def _lock_file(self, fileobj):
833         try:
834             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
835         except IOError:
836             raise ResumeCacheConflict("{} locked".format(fileobj.name))
837
838     def _save_state(self):
839         """
840         Atomically save current state into cache.
841         """
842         with self._state_lock:
843             # We're not using copy.deepcopy() here because it's a lot slower
844             # than json.dumps(), and we're already needing JSON format to be
845             # saved on disk.
846             state = json.dumps(self._state)
847         try:
848             new_cache = tempfile.NamedTemporaryFile(
849                 mode='w+',
850                 dir=os.path.dirname(self._cache_filename), delete=False)
851             self._lock_file(new_cache)
852             new_cache.write(state)
853             new_cache.flush()
854             os.fsync(new_cache)
855             os.rename(new_cache.name, self._cache_filename)
856         except (IOError, OSError, ResumeCacheConflict) as error:
857             self.logger.error("There was a problem while saving the cache file: {}".format(error))
858             try:
859                 os.unlink(new_cache_name)
860             except NameError:  # mkstemp failed.
861                 pass
862         else:
863             self._cache_file.close()
864             self._cache_file = new_cache
865
866     def collection_name(self):
867         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
868
869     def manifest_locator(self):
870         return self._my_collection().manifest_locator()
871
872     def portable_data_hash(self):
873         pdh = self._my_collection().portable_data_hash()
874         m = self._my_collection().stripped_manifest().encode()
875         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
876         if pdh != local_pdh:
877             logger.warning("\n".join([
878                 "arv-put: API server provided PDH differs from local manifest.",
879                 "         This should not happen; showing API server version."]))
880         return pdh
881
882     def manifest_text(self, stream_name=".", strip=False, normalize=False):
883         return self._my_collection().manifest_text(stream_name, strip, normalize)
884
885     def _datablocks_on_item(self, item):
886         """
887         Return a list of datablock locators, recursively navigating
888         through subcollections
889         """
890         if isinstance(item, arvados.arvfile.ArvadosFile):
891             if item.size() == 0:
892                 # Empty file locator
893                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
894             else:
895                 locators = []
896                 for segment in item.segments():
897                     loc = segment.locator
898                     locators.append(loc)
899                 return locators
900         elif isinstance(item, arvados.collection.Collection):
901             l = [self._datablocks_on_item(x) for x in listvalues(item)]
902             # Fast list flattener method taken from:
903             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
904             return [loc for sublist in l for loc in sublist]
905         else:
906             return None
907
908     def data_locators(self):
909         with self._collection_lock:
910             # Make sure all datablocks are flushed before getting the locators
911             self._my_collection().manifest_text()
912             datablocks = self._datablocks_on_item(self._my_collection())
913         return datablocks
914
915 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
916                                                             os.getpid())
917
918 # Simulate glob.glob() matching behavior without the need to scan the filesystem
919 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
920 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
921 # so instead we're using it on every path component.
922 def pathname_match(pathname, pattern):
923     name = pathname.split(os.sep)
924     # Fix patterns like 'some/subdir/' or 'some//subdir'
925     pat = [x for x in pattern.split(os.sep) if x != '']
926     if len(name) != len(pat):
927         return False
928     for i in range(len(name)):
929         if not fnmatch.fnmatch(name[i], pat[i]):
930             return False
931     return True
932
933 def machine_progress(bytes_written, bytes_expected):
934     return _machine_format.format(
935         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
936
937 def human_progress(bytes_written, bytes_expected):
938     if bytes_expected:
939         return "\r{}M / {}M {:.1%} ".format(
940             bytes_written >> 20, bytes_expected >> 20,
941             float(bytes_written) / bytes_expected)
942     else:
943         return "\r{} ".format(bytes_written)
944
945 def progress_writer(progress_func, outfile=sys.stderr):
946     def write_progress(bytes_written, bytes_expected):
947         outfile.write(progress_func(bytes_written, bytes_expected))
948     return write_progress
949
950 def exit_signal_handler(sigcode, frame):
951     sys.exit(-sigcode)
952
953 def desired_project_uuid(api_client, project_uuid, num_retries):
954     if not project_uuid:
955         query = api_client.users().current()
956     elif arvados.util.user_uuid_pattern.match(project_uuid):
957         query = api_client.users().get(uuid=project_uuid)
958     elif arvados.util.group_uuid_pattern.match(project_uuid):
959         query = api_client.groups().get(uuid=project_uuid)
960     else:
961         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
962     return query.execute(num_retries=num_retries)['uuid']
963
964 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
965     global api_client
966
967     logger = logging.getLogger('arvados.arv_put')
968     logger.setLevel(logging.INFO)
969     args = parse_arguments(arguments)
970     status = 0
971     if api_client is None:
972         api_client = arvados.api('v1')
973
974     # Determine the name to use
975     if args.name:
976         if args.stream or args.raw:
977             logger.error("Cannot use --name with --stream or --raw")
978             sys.exit(1)
979         elif args.update_collection:
980             logger.error("Cannot use --name with --update-collection")
981             sys.exit(1)
982         collection_name = args.name
983     else:
984         collection_name = "Saved at {} by {}@{}".format(
985             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
986             pwd.getpwuid(os.getuid()).pw_name,
987             socket.gethostname())
988
989     if args.project_uuid and (args.stream or args.raw):
990         logger.error("Cannot use --project-uuid with --stream or --raw")
991         sys.exit(1)
992
993     # Determine the parent project
994     try:
995         project_uuid = desired_project_uuid(api_client, args.project_uuid,
996                                             args.retries)
997     except (apiclient_errors.Error, ValueError) as error:
998         logger.error(error)
999         sys.exit(1)
1000
1001     if args.progress:
1002         reporter = progress_writer(human_progress)
1003     elif args.batch_progress:
1004         reporter = progress_writer(machine_progress)
1005     else:
1006         reporter = None
1007
1008     # Setup exclude regex from all the --exclude arguments provided
1009     name_patterns = []
1010     exclude_paths = []
1011     exclude_names = None
1012     if len(args.exclude) > 0:
1013         # We're supporting 2 kinds of exclusion patterns:
1014         # 1) --exclude '*.jpg'      (file/dir name patterns, will only match
1015         #                            the name)
1016         # 2) --exclude 'foo/bar'    (file/dir path patterns, will match the
1017         #                            entire path, and should be relative to
1018         #                            any input dir argument)
1019         for p in args.exclude:
1020             # Only relative paths patterns allowed
1021             if p.startswith(os.sep):
1022                 logger.error("Cannot use absolute paths with --exclude")
1023                 sys.exit(1)
1024             if os.path.dirname(p):
1025                 # We don't support of path patterns with '.' or '..'
1026                 p_parts = p.split(os.sep)
1027                 if '.' in p_parts or '..' in p_parts:
1028                     logger.error(
1029                         "Cannot use path patterns that include '.' or '..'")
1030                     sys.exit(1)
1031                 # Path search pattern
1032                 exclude_paths.append(p)
1033             else:
1034                 # Name-only search pattern
1035                 name_patterns.append(p)
1036         # For name only matching, we can combine all patterns into a single regexp,
1037         # for better performance.
1038         exclude_names = re.compile('|'.join(
1039             [fnmatch.translate(p) for p in name_patterns]
1040         )) if len(name_patterns) > 0 else None
1041         # Show the user the patterns to be used, just in case they weren't specified inside
1042         # quotes and got changed by the shell expansion.
1043         logger.info("Exclude patterns: {}".format(args.exclude))
1044
1045     # If this is used by a human, and there's at least one directory to be
1046     # uploaded, the expected bytes calculation can take a moment.
1047     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1048         logger.info("Calculating upload size, this could take some time...")
1049     try:
1050         writer = ArvPutUploadJob(paths = args.paths,
1051                                  resume = args.resume,
1052                                  use_cache = args.use_cache,
1053                                  filename = args.filename,
1054                                  reporter = reporter,
1055                                  num_retries = args.retries,
1056                                  replication_desired = args.replication,
1057                                  put_threads = args.threads,
1058                                  name = collection_name,
1059                                  owner_uuid = project_uuid,
1060                                  ensure_unique_name = True,
1061                                  update_collection = args.update_collection,
1062                                  logger=logger,
1063                                  dry_run=args.dry_run,
1064                                  follow_links=args.follow_links,
1065                                  exclude_paths=exclude_paths,
1066                                  exclude_names=exclude_names)
1067     except ResumeCacheConflict:
1068         logger.error("\n".join([
1069             "arv-put: Another process is already uploading this data.",
1070             "         Use --no-cache if this is really what you want."]))
1071         sys.exit(1)
1072     except CollectionUpdateError as error:
1073         logger.error("\n".join([
1074             "arv-put: %s" % str(error)]))
1075         sys.exit(1)
1076     except ArvPutUploadIsPending:
1077         # Dry run check successful, return proper exit code.
1078         sys.exit(2)
1079     except ArvPutUploadNotPending:
1080         # No files pending for upload
1081         sys.exit(0)
1082     except PathDoesNotExistError as error:
1083         logger.error("\n".join([
1084             "arv-put: %s" % str(error)]))
1085         sys.exit(1)
1086
1087     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1088     # the originals.
1089     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1090                             for sigcode in CAUGHT_SIGNALS}
1091
1092     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1093         logger.warning("\n".join([
1094             "arv-put: Resuming previous upload from last checkpoint.",
1095             "         Use the --no-resume option to start over."]))
1096
1097     if not args.dry_run:
1098         writer.report_progress()
1099     output = None
1100     try:
1101         writer.start(save_collection=not(args.stream or args.raw))
1102     except arvados.errors.ApiError as error:
1103         logger.error("\n".join([
1104             "arv-put: %s" % str(error)]))
1105         sys.exit(1)
1106
1107     if args.progress:  # Print newline to split stderr from stdout for humans.
1108         logger.info("\n")
1109
1110     if args.stream:
1111         if args.normalize:
1112             output = writer.manifest_text(normalize=True)
1113         else:
1114             output = writer.manifest_text()
1115     elif args.raw:
1116         output = ','.join(writer.data_locators())
1117     else:
1118         try:
1119             if args.update_collection:
1120                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1121             else:
1122                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1123             if args.portable_data_hash:
1124                 output = writer.portable_data_hash()
1125             else:
1126                 output = writer.manifest_locator()
1127         except apiclient_errors.Error as error:
1128             logger.error(
1129                 "arv-put: Error creating Collection on project: {}.".format(
1130                     error))
1131             status = 1
1132
1133     # Print the locator (uuid) of the new collection.
1134     if output is None:
1135         status = status or 1
1136     else:
1137         stdout.write(output)
1138         if not output.endswith('\n'):
1139             stdout.write('\n')
1140
1141     for sigcode, orig_handler in listitems(orig_signal_handlers):
1142         signal.signal(sigcode, orig_handler)
1143
1144     if status != 0:
1145         sys.exit(status)
1146
1147     # Success!
1148     return output
1149
1150
1151 if __name__ == '__main__':
1152     main()