Merge branch '8784-dir-listings'
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34
35 import arvados.commands._util as arv_cmd
36
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55                     default=-1, help=argparse.SUPPRESS)
56
57 _group.add_argument('--normalize', action='store_true',
58                     help="""
59 Normalize the manifest by re-ordering files and streams after writing
60 data.
61 """)
62
63 _group.add_argument('--dry-run', action='store_true', default=False,
64                     help="""
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
67 """)
68
69 _group = upload_opts.add_mutually_exclusive_group()
70
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
72                     help="""
73 Synonym for --stream.
74 """)
75
76 _group.add_argument('--stream', action='store_true',
77                     help="""
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
80 in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
145                          help="""
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
148 to use 2 threads.
149 On high latency installations, using a greater number will improve
150 overall throughput.
151 """)
152
153 run_opts = argparse.ArgumentParser(add_help=False)
154
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
157 project.
158 """)
159
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
162 """)
163
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165                       action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 For the special case of needing to exclude only files or dirs directly below
172 the given input directory, you can use a pattern like './exclude_this.gif'.
173 You can specify multiple patterns by using this argument more than once.
174 """)
175
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--progress', action='store_true',
178                     help="""
179 Display human-readable progress on stderr (bytes and, if possible,
180 percentage of total data size). This is the default behavior when
181 stderr is a tty.
182 """)
183
184 _group.add_argument('--no-progress', action='store_true',
185                     help="""
186 Do not display human-readable progress on stderr, even if stderr is a
187 tty.
188 """)
189
190 _group.add_argument('--batch-progress', action='store_true',
191                     help="""
192 Display machine-readable progress on stderr (bytes and, if known,
193 total data size).
194 """)
195
196 _group = run_opts.add_mutually_exclusive_group()
197 _group.add_argument('--resume', action='store_true', default=True,
198                     help="""
199 Continue interrupted uploads from cached state (default).
200 """)
201 _group.add_argument('--no-resume', action='store_false', dest='resume',
202                     help="""
203 Do not continue interrupted uploads from cached state.
204 """)
205
206 _group = run_opts.add_mutually_exclusive_group()
207 _group.add_argument('--follow-links', action='store_true', default=True,
208                     dest='follow_links', help="""
209 Follow file and directory symlinks (default).
210 """)
211 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
212                     help="""
213 Do not follow file and directory symlinks.
214 """)
215
216 _group = run_opts.add_mutually_exclusive_group()
217 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
218                     help="""
219 Save upload state in a cache file for resuming (default).
220 """)
221 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
222                     help="""
223 Do not save upload state in a cache file for resuming.
224 """)
225
226 arg_parser = argparse.ArgumentParser(
227     description='Copy data from the local filesystem to Keep.',
228     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
229
230 def parse_arguments(arguments):
231     args = arg_parser.parse_args(arguments)
232
233     if len(args.paths) == 0:
234         args.paths = ['-']
235
236     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
237
238     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
239         if args.filename:
240             arg_parser.error("""
241     --filename argument cannot be used when storing a directory or
242     multiple files.
243     """)
244
245     # Turn on --progress by default if stderr is a tty.
246     if (not (args.batch_progress or args.no_progress)
247         and os.isatty(sys.stderr.fileno())):
248         args.progress = True
249
250     # Turn off --resume (default) if --no-cache is used.
251     if not args.use_cache:
252         args.resume = False
253
254     if args.paths == ['-']:
255         if args.update_collection:
256             arg_parser.error("""
257     --update-collection cannot be used when reading from stdin.
258     """)
259         args.resume = False
260         args.use_cache = False
261         if not args.filename:
262             args.filename = 'stdin'
263
264     # Remove possible duplicated patterns
265     if len(args.exclude) > 0:
266         args.exclude = list(set(args.exclude))
267
268     return args
269
270
271 class PathDoesNotExistError(Exception):
272     pass
273
274
275 class CollectionUpdateError(Exception):
276     pass
277
278
279 class ResumeCacheConflict(Exception):
280     pass
281
282
283 class ArvPutArgumentConflict(Exception):
284     pass
285
286
287 class ArvPutUploadIsPending(Exception):
288     pass
289
290
291 class ArvPutUploadNotPending(Exception):
292     pass
293
294
295 class FileUploadList(list):
296     def __init__(self, dry_run=False):
297         list.__init__(self)
298         self.dry_run = dry_run
299
300     def append(self, other):
301         if self.dry_run:
302             raise ArvPutUploadIsPending()
303         super(FileUploadList, self).append(other)
304
305
306 class ResumeCache(object):
307     CACHE_DIR = '.cache/arvados/arv-put'
308
309     def __init__(self, file_spec):
310         self.cache_file = open(file_spec, 'a+')
311         self._lock_file(self.cache_file)
312         self.filename = self.cache_file.name
313
314     @classmethod
315     def make_path(cls, args):
316         md5 = hashlib.md5()
317         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
318         realpaths = sorted(os.path.realpath(path) for path in args.paths)
319         md5.update(b'\0'.join([p.encode() for p in realpaths]))
320         if any(os.path.isdir(path) for path in realpaths):
321             md5.update(b'-1')
322         elif args.filename:
323             md5.update(args.filename.encode())
324         return os.path.join(
325             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
326             md5.hexdigest())
327
328     def _lock_file(self, fileobj):
329         try:
330             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
331         except IOError:
332             raise ResumeCacheConflict("{} locked".format(fileobj.name))
333
334     def load(self):
335         self.cache_file.seek(0)
336         return json.load(self.cache_file)
337
338     def check_cache(self, api_client=None, num_retries=0):
339         try:
340             state = self.load()
341             locator = None
342             try:
343                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
344                     locator = state["_finished_streams"][0][1][0]
345                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
346                     locator = state["_current_stream_locators"][0]
347                 if locator is not None:
348                     kc = arvados.keep.KeepClient(api_client=api_client)
349                     kc.head(locator, num_retries=num_retries)
350             except Exception as e:
351                 self.restart()
352         except (ValueError):
353             pass
354
355     def save(self, data):
356         try:
357             new_cache_fd, new_cache_name = tempfile.mkstemp(
358                 dir=os.path.dirname(self.filename))
359             self._lock_file(new_cache_fd)
360             new_cache = os.fdopen(new_cache_fd, 'r+')
361             json.dump(data, new_cache)
362             os.rename(new_cache_name, self.filename)
363         except (IOError, OSError, ResumeCacheConflict) as error:
364             try:
365                 os.unlink(new_cache_name)
366             except NameError:  # mkstemp failed.
367                 pass
368         else:
369             self.cache_file.close()
370             self.cache_file = new_cache
371
372     def close(self):
373         self.cache_file.close()
374
375     def destroy(self):
376         try:
377             os.unlink(self.filename)
378         except OSError as error:
379             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
380                 raise
381         self.close()
382
383     def restart(self):
384         self.destroy()
385         self.__init__(self.filename)
386
387
388 class ArvPutUploadJob(object):
389     CACHE_DIR = '.cache/arvados/arv-put'
390     EMPTY_STATE = {
391         'manifest' : None, # Last saved manifest checkpoint
392         'files' : {} # Previous run file list: {path : {size, mtime}}
393     }
394
395     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
396                  name=None, owner_uuid=None,
397                  ensure_unique_name=False, num_retries=None,
398                  put_threads=None, replication_desired=None,
399                  filename=None, update_time=60.0, update_collection=None,
400                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
401                  follow_links=True, exclude_paths=[], exclude_names=None):
402         self.paths = paths
403         self.resume = resume
404         self.use_cache = use_cache
405         self.update = False
406         self.reporter = reporter
407         # This will set to 0 before start counting, if no special files are going
408         # to be read.
409         self.bytes_expected = None
410         self.bytes_written = 0
411         self.bytes_skipped = 0
412         self.name = name
413         self.owner_uuid = owner_uuid
414         self.ensure_unique_name = ensure_unique_name
415         self.num_retries = num_retries
416         self.replication_desired = replication_desired
417         self.put_threads = put_threads
418         self.filename = filename
419         self._state_lock = threading.Lock()
420         self._state = None # Previous run state (file list & manifest)
421         self._current_files = [] # Current run file list
422         self._cache_file = None
423         self._collection_lock = threading.Lock()
424         self._remote_collection = None # Collection being updated (if asked)
425         self._local_collection = None # Collection from previous run manifest
426         self._file_paths = set() # Files to be updated in remote collection
427         self._stop_checkpointer = threading.Event()
428         self._checkpointer = threading.Thread(target=self._update_task)
429         self._checkpointer.daemon = True
430         self._update_task_time = update_time  # How many seconds wait between update runs
431         self._files_to_upload = FileUploadList(dry_run=dry_run)
432         self._upload_started = False
433         self.logger = logger
434         self.dry_run = dry_run
435         self._checkpoint_before_quit = True
436         self.follow_links = follow_links
437         self.exclude_paths = exclude_paths
438         self.exclude_names = exclude_names
439
440         if not self.use_cache and self.resume:
441             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
442
443         # Check for obvious dry-run responses
444         if self.dry_run and (not self.use_cache or not self.resume):
445             raise ArvPutUploadIsPending()
446
447         # Load cached data if any and if needed
448         self._setup_state(update_collection)
449
450         # Build the upload file list, excluding requested files and counting the
451         # bytes expected to be uploaded.
452         self._build_upload_list()
453
454     def _build_upload_list(self):
455         """
456         Scan the requested paths to count file sizes, excluding files & dirs if requested
457         and building the upload file list.
458         """
459         # If there aren't special files to be read, reset total bytes count to zero
460         # to start counting.
461         if not any([p for p in self.paths
462                     if not (os.path.isfile(p) or os.path.isdir(p))]):
463             self.bytes_expected = 0
464
465         for path in self.paths:
466             # Test for stdin first, in case some file named '-' exist
467             if path == '-':
468                 if self.dry_run:
469                     raise ArvPutUploadIsPending()
470                 self._write_stdin(self.filename or 'stdin')
471             elif not os.path.exists(path):
472                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
473             elif os.path.isdir(path):
474                 # Use absolute paths on cache index so CWD doesn't interfere
475                 # with the caching logic.
476                 orig_path = path
477                 path = os.path.abspath(path)
478                 if orig_path[-1:] == os.sep:
479                     # When passing a directory reference with a trailing slash,
480                     # its contents should be uploaded directly to the
481                     # collection's root.
482                     prefixdir = path
483                 else:
484                     # When passing a directory reference with no trailing slash,
485                     # upload the directory to the collection's root.
486                     prefixdir = os.path.dirname(path)
487                 prefixdir += os.sep
488                 for root, dirs, files in os.walk(path,
489                                                  followlinks=self.follow_links):
490                     root_relpath = os.path.relpath(root, path)
491                     if root_relpath == '.':
492                         root_relpath = ''
493                     # Exclude files/dirs by full path matching pattern
494                     if self.exclude_paths:
495                         dirs[:] = [d for d in dirs
496                                    if not any(pathname_match(
497                                            os.path.join(root_relpath, d), pat)
498                                               for pat in self.exclude_paths)]
499                         files = [f for f in files
500                                  if not any(pathname_match(
501                                          os.path.join(root_relpath, f), pat)
502                                             for pat in self.exclude_paths)]
503                     # Exclude files/dirs by name matching pattern
504                     if self.exclude_names is not None:
505                         dirs[:] = [d for d in dirs
506                                    if not self.exclude_names.match(d)]
507                         files = [f for f in files
508                                  if not self.exclude_names.match(f)]
509                     # Make os.walk()'s dir traversing order deterministic
510                     dirs.sort()
511                     files.sort()
512                     for f in files:
513                         filepath = os.path.join(root, f)
514                         # Add its size to the total bytes count (if applicable)
515                         if self.follow_links or (not os.path.islink(filepath)):
516                             if self.bytes_expected is not None:
517                                 self.bytes_expected += os.path.getsize(filepath)
518                         self._check_file(filepath,
519                                          os.path.join(root[len(prefixdir):], f))
520             else:
521                 filepath = os.path.abspath(path)
522                 # Add its size to the total bytes count (if applicable)
523                 if self.follow_links or (not os.path.islink(filepath)):
524                     if self.bytes_expected is not None:
525                         self.bytes_expected += os.path.getsize(filepath)
526                 self._check_file(filepath,
527                                  self.filename or os.path.basename(path))
528         # If dry-mode is on, and got up to this point, then we should notify that
529         # there aren't any file to upload.
530         if self.dry_run:
531             raise ArvPutUploadNotPending()
532         # Remove local_collection's files that don't exist locally anymore, so the
533         # bytes_written count is correct.
534         for f in self.collection_file_paths(self._local_collection,
535                                             path_prefix=""):
536             if f != 'stdin' and f != self.filename and not f in self._file_paths:
537                 self._local_collection.remove(f)
538
539     def start(self, save_collection):
540         """
541         Start supporting thread & file uploading
542         """
543         self._checkpointer.start()
544         try:
545             # Update bytes_written from current local collection and
546             # report initial progress.
547             self._update()
548             # Actual file upload
549             self._upload_started = True # Used by the update thread to start checkpointing
550             self._upload_files()
551         except (SystemExit, Exception) as e:
552             self._checkpoint_before_quit = False
553             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
554             # Note: We're expecting SystemExit instead of
555             # KeyboardInterrupt because we have a custom signal
556             # handler in place that raises SystemExit with the catched
557             # signal's code.
558             if isinstance(e, PathDoesNotExistError):
559                 # We aren't interested in the traceback for this case
560                 pass
561             elif not isinstance(e, SystemExit) or e.code != -2:
562                 self.logger.warning("Abnormal termination:\n{}".format(
563                     traceback.format_exc()))
564             raise
565         finally:
566             if not self.dry_run:
567                 # Stop the thread before doing anything else
568                 self._stop_checkpointer.set()
569                 self._checkpointer.join()
570                 if self._checkpoint_before_quit:
571                     # Commit all pending blocks & one last _update()
572                     self._local_collection.manifest_text()
573                     self._update(final=True)
574                     if save_collection:
575                         self.save_collection()
576             if self.use_cache:
577                 self._cache_file.close()
578
579     def save_collection(self):
580         if self.update:
581             # Check if files should be updated on the remote collection.
582             for fp in self._file_paths:
583                 remote_file = self._remote_collection.find(fp)
584                 if not remote_file:
585                     # File don't exist on remote collection, copy it.
586                     self._remote_collection.copy(fp, fp, self._local_collection)
587                 elif remote_file != self._local_collection.find(fp):
588                     # A different file exist on remote collection, overwrite it.
589                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
590                 else:
591                     # The file already exist on remote collection, skip it.
592                     pass
593             self._remote_collection.save(num_retries=self.num_retries)
594         else:
595             self._local_collection.save_new(
596                 name=self.name, owner_uuid=self.owner_uuid,
597                 ensure_unique_name=self.ensure_unique_name,
598                 num_retries=self.num_retries)
599
600     def destroy_cache(self):
601         if self.use_cache:
602             try:
603                 os.unlink(self._cache_filename)
604             except OSError as error:
605                 # That's what we wanted anyway.
606                 if error.errno != errno.ENOENT:
607                     raise
608             self._cache_file.close()
609
610     def _collection_size(self, collection):
611         """
612         Recursively get the total size of the collection
613         """
614         size = 0
615         for item in listvalues(collection):
616             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
617                 size += self._collection_size(item)
618             else:
619                 size += item.size()
620         return size
621
622     def _update_task(self):
623         """
624         Periodically called support task. File uploading is
625         asynchronous so we poll status from the collection.
626         """
627         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
628             self._update()
629
630     def _update(self, final=False):
631         """
632         Update cached manifest text and report progress.
633         """
634         if self._upload_started:
635             with self._collection_lock:
636                 self.bytes_written = self._collection_size(self._local_collection)
637                 if self.use_cache:
638                     if final:
639                         manifest = self._local_collection.manifest_text()
640                     else:
641                         # Get the manifest text without comitting pending blocks
642                         manifest = self._local_collection.manifest_text(strip=False,
643                                                                         normalize=False,
644                                                                         only_committed=True)
645                     # Update cache
646                     with self._state_lock:
647                         self._state['manifest'] = manifest
648             if self.use_cache:
649                 try:
650                     self._save_state()
651                 except Exception as e:
652                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
653         else:
654             self.bytes_written = self.bytes_skipped
655         # Call the reporter, if any
656         self.report_progress()
657
658     def report_progress(self):
659         if self.reporter is not None:
660             self.reporter(self.bytes_written, self.bytes_expected)
661
662     def _write_stdin(self, filename):
663         output = self._local_collection.open(filename, 'wb')
664         self._write(sys.stdin, output)
665         output.close()
666
667     def _check_file(self, source, filename):
668         """
669         Check if this file needs to be uploaded
670         """
671         # Ignore symlinks when requested
672         if (not self.follow_links) and os.path.islink(source):
673             return
674         resume_offset = 0
675         should_upload = False
676         new_file_in_cache = False
677         # Record file path for updating the remote collection before exiting
678         self._file_paths.add(filename)
679
680         with self._state_lock:
681             # If no previous cached data on this file, store it for an eventual
682             # repeated run.
683             if source not in self._state['files']:
684                 self._state['files'][source] = {
685                     'mtime': os.path.getmtime(source),
686                     'size' : os.path.getsize(source)
687                 }
688                 new_file_in_cache = True
689             cached_file_data = self._state['files'][source]
690
691         # Check if file was already uploaded (at least partially)
692         file_in_local_collection = self._local_collection.find(filename)
693
694         # If not resuming, upload the full file.
695         if not self.resume:
696             should_upload = True
697         # New file detected from last run, upload it.
698         elif new_file_in_cache:
699             should_upload = True
700         # Local file didn't change from last run.
701         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
702             if not file_in_local_collection:
703                 # File not uploaded yet, upload it completely
704                 should_upload = True
705             elif file_in_local_collection.permission_expired():
706                 # Permission token expired, re-upload file. This will change whenever
707                 # we have a API for refreshing tokens.
708                 should_upload = True
709                 self._local_collection.remove(filename)
710             elif cached_file_data['size'] == file_in_local_collection.size():
711                 # File already there, skip it.
712                 self.bytes_skipped += cached_file_data['size']
713             elif cached_file_data['size'] > file_in_local_collection.size():
714                 # File partially uploaded, resume!
715                 resume_offset = file_in_local_collection.size()
716                 self.bytes_skipped += resume_offset
717                 should_upload = True
718             else:
719                 # Inconsistent cache, re-upload the file
720                 should_upload = True
721                 self._local_collection.remove(filename)
722                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
723         # Local file differs from cached data, re-upload it.
724         else:
725             if file_in_local_collection:
726                 self._local_collection.remove(filename)
727             should_upload = True
728
729         if should_upload:
730             try:
731                 self._files_to_upload.append((source, resume_offset, filename))
732             except ArvPutUploadIsPending:
733                 # This could happen when running on dry-mode, close cache file to
734                 # avoid locking issues.
735                 self._cache_file.close()
736                 raise
737
738     def _upload_files(self):
739         for source, resume_offset, filename in self._files_to_upload:
740             with open(source, 'rb') as source_fd:
741                 with self._state_lock:
742                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
743                     self._state['files'][source]['size'] = os.path.getsize(source)
744                 if resume_offset > 0:
745                     # Start upload where we left off
746                     output = self._local_collection.open(filename, 'ab')
747                     source_fd.seek(resume_offset)
748                 else:
749                     # Start from scratch
750                     output = self._local_collection.open(filename, 'wb')
751                 self._write(source_fd, output)
752                 output.close(flush=False)
753
754     def _write(self, source_fd, output):
755         while True:
756             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
757             if not data:
758                 break
759             output.write(data)
760
761     def _my_collection(self):
762         return self._remote_collection if self.update else self._local_collection
763
764     def _setup_state(self, update_collection):
765         """
766         Create a new cache file or load a previously existing one.
767         """
768         # Load an already existing collection for update
769         if update_collection and re.match(arvados.util.collection_uuid_pattern,
770                                           update_collection):
771             try:
772                 self._remote_collection = arvados.collection.Collection(update_collection)
773             except arvados.errors.ApiError as error:
774                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
775             else:
776                 self.update = True
777         elif update_collection:
778             # Collection locator provided, but unknown format
779             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
780
781         if self.use_cache:
782             # Set up cache file name from input paths.
783             md5 = hashlib.md5()
784             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
785             realpaths = sorted(os.path.realpath(path) for path in self.paths)
786             md5.update(b'\0'.join([p.encode() for p in realpaths]))
787             if self.filename:
788                 md5.update(self.filename.encode())
789             cache_filename = md5.hexdigest()
790             cache_filepath = os.path.join(
791                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
792                 cache_filename)
793             if self.resume and os.path.exists(cache_filepath):
794                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
795                 self._cache_file = open(cache_filepath, 'a+')
796             else:
797                 # --no-resume means start with a empty cache file.
798                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
799                 self._cache_file = open(cache_filepath, 'w+')
800             self._cache_filename = self._cache_file.name
801             self._lock_file(self._cache_file)
802             self._cache_file.seek(0)
803
804         with self._state_lock:
805             if self.use_cache:
806                 try:
807                     self._state = json.load(self._cache_file)
808                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
809                         # Cache at least partially incomplete, set up new cache
810                         self._state = copy.deepcopy(self.EMPTY_STATE)
811                 except ValueError:
812                     # Cache file empty, set up new cache
813                     self._state = copy.deepcopy(self.EMPTY_STATE)
814             else:
815                 self.logger.info("No cache usage requested for this run.")
816                 # No cache file, set empty state
817                 self._state = copy.deepcopy(self.EMPTY_STATE)
818             # Load the previous manifest so we can check if files were modified remotely.
819             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
820
821     def collection_file_paths(self, col, path_prefix='.'):
822         """Return a list of file paths by recursively go through the entire collection `col`"""
823         file_paths = []
824         for name, item in listitems(col):
825             if isinstance(item, arvados.arvfile.ArvadosFile):
826                 file_paths.append(os.path.join(path_prefix, name))
827             elif isinstance(item, arvados.collection.Subcollection):
828                 new_prefix = os.path.join(path_prefix, name)
829                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
830         return file_paths
831
832     def _lock_file(self, fileobj):
833         try:
834             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
835         except IOError:
836             raise ResumeCacheConflict("{} locked".format(fileobj.name))
837
838     def _save_state(self):
839         """
840         Atomically save current state into cache.
841         """
842         with self._state_lock:
843             # We're not using copy.deepcopy() here because it's a lot slower
844             # than json.dumps(), and we're already needing JSON format to be
845             # saved on disk.
846             state = json.dumps(self._state)
847         try:
848             new_cache = tempfile.NamedTemporaryFile(
849                 mode='w+',
850                 dir=os.path.dirname(self._cache_filename), delete=False)
851             self._lock_file(new_cache)
852             new_cache.write(state)
853             new_cache.flush()
854             os.fsync(new_cache)
855             os.rename(new_cache.name, self._cache_filename)
856         except (IOError, OSError, ResumeCacheConflict) as error:
857             self.logger.error("There was a problem while saving the cache file: {}".format(error))
858             try:
859                 os.unlink(new_cache_name)
860             except NameError:  # mkstemp failed.
861                 pass
862         else:
863             self._cache_file.close()
864             self._cache_file = new_cache
865
866     def collection_name(self):
867         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
868
869     def manifest_locator(self):
870         return self._my_collection().manifest_locator()
871
872     def portable_data_hash(self):
873         pdh = self._my_collection().portable_data_hash()
874         m = self._my_collection().stripped_manifest().encode()
875         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
876         if pdh != local_pdh:
877             logger.warning("\n".join([
878                 "arv-put: API server provided PDH differs from local manifest.",
879                 "         This should not happen; showing API server version."]))
880         return pdh
881
882     def manifest_text(self, stream_name=".", strip=False, normalize=False):
883         return self._my_collection().manifest_text(stream_name, strip, normalize)
884
885     def _datablocks_on_item(self, item):
886         """
887         Return a list of datablock locators, recursively navigating
888         through subcollections
889         """
890         if isinstance(item, arvados.arvfile.ArvadosFile):
891             if item.size() == 0:
892                 # Empty file locator
893                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
894             else:
895                 locators = []
896                 for segment in item.segments():
897                     loc = segment.locator
898                     locators.append(loc)
899                 return locators
900         elif isinstance(item, arvados.collection.Collection):
901             l = [self._datablocks_on_item(x) for x in listvalues(item)]
902             # Fast list flattener method taken from:
903             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
904             return [loc for sublist in l for loc in sublist]
905         else:
906             return None
907
908     def data_locators(self):
909         with self._collection_lock:
910             # Make sure all datablocks are flushed before getting the locators
911             self._my_collection().manifest_text()
912             datablocks = self._datablocks_on_item(self._my_collection())
913         return datablocks
914
915 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
916                                                             os.getpid())
917
918 # Simulate glob.glob() matching behavior without the need to scan the filesystem
919 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
920 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
921 # so instead we're using it on every path component.
922 def pathname_match(pathname, pattern):
923     name = pathname.split(os.sep)
924     # Fix patterns like 'some/subdir/' or 'some//subdir'
925     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
926     if len(name) != len(pat):
927         return False
928     for i in range(len(name)):
929         if not fnmatch.fnmatch(name[i], pat[i]):
930             return False
931     return True
932
933 def machine_progress(bytes_written, bytes_expected):
934     return _machine_format.format(
935         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
936
937 def human_progress(bytes_written, bytes_expected):
938     if bytes_expected:
939         return "\r{}M / {}M {:.1%} ".format(
940             bytes_written >> 20, bytes_expected >> 20,
941             float(bytes_written) / bytes_expected)
942     else:
943         return "\r{} ".format(bytes_written)
944
945 def progress_writer(progress_func, outfile=sys.stderr):
946     def write_progress(bytes_written, bytes_expected):
947         outfile.write(progress_func(bytes_written, bytes_expected))
948     return write_progress
949
950 def exit_signal_handler(sigcode, frame):
951     sys.exit(-sigcode)
952
953 def desired_project_uuid(api_client, project_uuid, num_retries):
954     if not project_uuid:
955         query = api_client.users().current()
956     elif arvados.util.user_uuid_pattern.match(project_uuid):
957         query = api_client.users().get(uuid=project_uuid)
958     elif arvados.util.group_uuid_pattern.match(project_uuid):
959         query = api_client.groups().get(uuid=project_uuid)
960     else:
961         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
962     return query.execute(num_retries=num_retries)['uuid']
963
964 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
965     global api_client
966
967     logger = logging.getLogger('arvados.arv_put')
968     logger.setLevel(logging.INFO)
969     args = parse_arguments(arguments)
970     status = 0
971     if api_client is None:
972         api_client = arvados.api('v1')
973
974     # Determine the name to use
975     if args.name:
976         if args.stream or args.raw:
977             logger.error("Cannot use --name with --stream or --raw")
978             sys.exit(1)
979         elif args.update_collection:
980             logger.error("Cannot use --name with --update-collection")
981             sys.exit(1)
982         collection_name = args.name
983     else:
984         collection_name = "Saved at {} by {}@{}".format(
985             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
986             pwd.getpwuid(os.getuid()).pw_name,
987             socket.gethostname())
988
989     if args.project_uuid and (args.stream or args.raw):
990         logger.error("Cannot use --project-uuid with --stream or --raw")
991         sys.exit(1)
992
993     # Determine the parent project
994     try:
995         project_uuid = desired_project_uuid(api_client, args.project_uuid,
996                                             args.retries)
997     except (apiclient_errors.Error, ValueError) as error:
998         logger.error(error)
999         sys.exit(1)
1000
1001     if args.progress:
1002         reporter = progress_writer(human_progress)
1003     elif args.batch_progress:
1004         reporter = progress_writer(machine_progress)
1005     else:
1006         reporter = None
1007
1008     # Setup exclude regex from all the --exclude arguments provided
1009     name_patterns = []
1010     exclude_paths = []
1011     exclude_names = None
1012     if len(args.exclude) > 0:
1013         # We're supporting 2 kinds of exclusion patterns:
1014         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1015         #                            the name, wherever the file is on the tree)
1016         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1017         #                            entire path, and should be relative to
1018         #                            any input dir argument)
1019         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1020         #                            placed directly underneath the input dir)
1021         for p in args.exclude:
1022             # Only relative paths patterns allowed
1023             if p.startswith(os.sep):
1024                 logger.error("Cannot use absolute paths with --exclude")
1025                 sys.exit(1)
1026             if os.path.dirname(p):
1027                 # We don't support of path patterns with '..'
1028                 p_parts = p.split(os.sep)
1029                 if '..' in p_parts:
1030                     logger.error(
1031                         "Cannot use path patterns that include or '..'")
1032                     sys.exit(1)
1033                 # Path search pattern
1034                 exclude_paths.append(p)
1035             else:
1036                 # Name-only search pattern
1037                 name_patterns.append(p)
1038         # For name only matching, we can combine all patterns into a single
1039         # regexp, for better performance.
1040         exclude_names = re.compile('|'.join(
1041             [fnmatch.translate(p) for p in name_patterns]
1042         )) if len(name_patterns) > 0 else None
1043         # Show the user the patterns to be used, just in case they weren't
1044         # specified inside quotes and got changed by the shell expansion.
1045         logger.info("Exclude patterns: {}".format(args.exclude))
1046
1047     # If this is used by a human, and there's at least one directory to be
1048     # uploaded, the expected bytes calculation can take a moment.
1049     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1050         logger.info("Calculating upload size, this could take some time...")
1051     try:
1052         writer = ArvPutUploadJob(paths = args.paths,
1053                                  resume = args.resume,
1054                                  use_cache = args.use_cache,
1055                                  filename = args.filename,
1056                                  reporter = reporter,
1057                                  num_retries = args.retries,
1058                                  replication_desired = args.replication,
1059                                  put_threads = args.threads,
1060                                  name = collection_name,
1061                                  owner_uuid = project_uuid,
1062                                  ensure_unique_name = True,
1063                                  update_collection = args.update_collection,
1064                                  logger=logger,
1065                                  dry_run=args.dry_run,
1066                                  follow_links=args.follow_links,
1067                                  exclude_paths=exclude_paths,
1068                                  exclude_names=exclude_names)
1069     except ResumeCacheConflict:
1070         logger.error("\n".join([
1071             "arv-put: Another process is already uploading this data.",
1072             "         Use --no-cache if this is really what you want."]))
1073         sys.exit(1)
1074     except CollectionUpdateError as error:
1075         logger.error("\n".join([
1076             "arv-put: %s" % str(error)]))
1077         sys.exit(1)
1078     except ArvPutUploadIsPending:
1079         # Dry run check successful, return proper exit code.
1080         sys.exit(2)
1081     except ArvPutUploadNotPending:
1082         # No files pending for upload
1083         sys.exit(0)
1084     except PathDoesNotExistError as error:
1085         logger.error("\n".join([
1086             "arv-put: %s" % str(error)]))
1087         sys.exit(1)
1088
1089     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1090     # the originals.
1091     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1092                             for sigcode in CAUGHT_SIGNALS}
1093
1094     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1095         logger.warning("\n".join([
1096             "arv-put: Resuming previous upload from last checkpoint.",
1097             "         Use the --no-resume option to start over."]))
1098
1099     if not args.dry_run:
1100         writer.report_progress()
1101     output = None
1102     try:
1103         writer.start(save_collection=not(args.stream or args.raw))
1104     except arvados.errors.ApiError as error:
1105         logger.error("\n".join([
1106             "arv-put: %s" % str(error)]))
1107         sys.exit(1)
1108
1109     if args.progress:  # Print newline to split stderr from stdout for humans.
1110         logger.info("\n")
1111
1112     if args.stream:
1113         if args.normalize:
1114             output = writer.manifest_text(normalize=True)
1115         else:
1116             output = writer.manifest_text()
1117     elif args.raw:
1118         output = ','.join(writer.data_locators())
1119     else:
1120         try:
1121             if args.update_collection:
1122                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1123             else:
1124                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1125             if args.portable_data_hash:
1126                 output = writer.portable_data_hash()
1127             else:
1128                 output = writer.manifest_locator()
1129         except apiclient_errors.Error as error:
1130             logger.error(
1131                 "arv-put: Error creating Collection on project: {}.".format(
1132                     error))
1133             status = 1
1134
1135     # Print the locator (uuid) of the new collection.
1136     if output is None:
1137         status = status or 1
1138     else:
1139         stdout.write(output)
1140         if not output.endswith('\n'):
1141             stdout.write('\n')
1142
1143     for sigcode, orig_handler in listitems(orig_signal_handlers):
1144         signal.signal(sigcode, orig_handler)
1145
1146     if status != 0:
1147         sys.exit(status)
1148
1149     # Success!
1150     return output
1151
1152
1153 if __name__ == '__main__':
1154     main()