Merge branch '12511-is-trashed-query' refs #12511
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34
35 import arvados.commands._util as arv_cmd
36
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55                     default=-1, help=argparse.SUPPRESS)
56
57 _group.add_argument('--normalize', action='store_true',
58                     help="""
59 Normalize the manifest by re-ordering files and streams after writing
60 data.
61 """)
62
63 _group.add_argument('--dry-run', action='store_true', default=False,
64                     help="""
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
67 """)
68
69 _group = upload_opts.add_mutually_exclusive_group()
70
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
72                     help="""
73 Synonym for --stream.
74 """)
75
76 _group.add_argument('--stream', action='store_true',
77                     help="""
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
80 in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
145                          help="""
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
148 to use 2 threads.
149 On high latency installations, using a greater number will improve
150 overall throughput.
151 """)
152
153 run_opts = argparse.ArgumentParser(add_help=False)
154
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
157 project.
158 """)
159
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
162 """)
163
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165                       action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 For the special case of needing to exclude only files or dirs directly below
172 the given input directory, you can use a pattern like './exclude_this.gif'.
173 You can specify multiple patterns by using this argument more than once.
174 """)
175
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--progress', action='store_true',
178                     help="""
179 Display human-readable progress on stderr (bytes and, if possible,
180 percentage of total data size). This is the default behavior when
181 stderr is a tty.
182 """)
183
184 _group.add_argument('--no-progress', action='store_true',
185                     help="""
186 Do not display human-readable progress on stderr, even if stderr is a
187 tty.
188 """)
189
190 _group.add_argument('--batch-progress', action='store_true',
191                     help="""
192 Display machine-readable progress on stderr (bytes and, if known,
193 total data size).
194 """)
195
196 run_opts.add_argument('--silent', action='store_true',
197                       help="""
198 Do not print any debug messages to console. (Any error messages will
199 still be displayed.)
200 """)
201
202 _group = run_opts.add_mutually_exclusive_group()
203 _group.add_argument('--resume', action='store_true', default=True,
204                     help="""
205 Continue interrupted uploads from cached state (default).
206 """)
207 _group.add_argument('--no-resume', action='store_false', dest='resume',
208                     help="""
209 Do not continue interrupted uploads from cached state.
210 """)
211
212 _group = run_opts.add_mutually_exclusive_group()
213 _group.add_argument('--follow-links', action='store_true', default=True,
214                     dest='follow_links', help="""
215 Follow file and directory symlinks (default).
216 """)
217 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
218                     help="""
219 Do not follow file and directory symlinks.
220 """)
221
222 _group = run_opts.add_mutually_exclusive_group()
223 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
224                     help="""
225 Save upload state in a cache file for resuming (default).
226 """)
227 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
228                     help="""
229 Do not save upload state in a cache file for resuming.
230 """)
231
232 arg_parser = argparse.ArgumentParser(
233     description='Copy data from the local filesystem to Keep.',
234     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
235
236 def parse_arguments(arguments):
237     args = arg_parser.parse_args(arguments)
238
239     if len(args.paths) == 0:
240         args.paths = ['-']
241
242     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
243
244     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
245         if args.filename:
246             arg_parser.error("""
247     --filename argument cannot be used when storing a directory or
248     multiple files.
249     """)
250
251     # Turn on --progress by default if stderr is a tty.
252     if (not (args.batch_progress or args.no_progress or args.silent)
253         and os.isatty(sys.stderr.fileno())):
254         args.progress = True
255
256     # Turn off --resume (default) if --no-cache is used.
257     if not args.use_cache:
258         args.resume = False
259
260     if args.paths == ['-']:
261         if args.update_collection:
262             arg_parser.error("""
263     --update-collection cannot be used when reading from stdin.
264     """)
265         args.resume = False
266         args.use_cache = False
267         if not args.filename:
268             args.filename = 'stdin'
269
270     # Remove possible duplicated patterns
271     if len(args.exclude) > 0:
272         args.exclude = list(set(args.exclude))
273
274     return args
275
276
277 class PathDoesNotExistError(Exception):
278     pass
279
280
281 class CollectionUpdateError(Exception):
282     pass
283
284
285 class ResumeCacheConflict(Exception):
286     pass
287
288
289 class ArvPutArgumentConflict(Exception):
290     pass
291
292
293 class ArvPutUploadIsPending(Exception):
294     pass
295
296
297 class ArvPutUploadNotPending(Exception):
298     pass
299
300
301 class FileUploadList(list):
302     def __init__(self, dry_run=False):
303         list.__init__(self)
304         self.dry_run = dry_run
305
306     def append(self, other):
307         if self.dry_run:
308             raise ArvPutUploadIsPending()
309         super(FileUploadList, self).append(other)
310
311
312 class ResumeCache(object):
313     CACHE_DIR = '.cache/arvados/arv-put'
314
315     def __init__(self, file_spec):
316         self.cache_file = open(file_spec, 'a+')
317         self._lock_file(self.cache_file)
318         self.filename = self.cache_file.name
319
320     @classmethod
321     def make_path(cls, args):
322         md5 = hashlib.md5()
323         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
324         realpaths = sorted(os.path.realpath(path) for path in args.paths)
325         md5.update(b'\0'.join([p.encode() for p in realpaths]))
326         if any(os.path.isdir(path) for path in realpaths):
327             md5.update(b'-1')
328         elif args.filename:
329             md5.update(args.filename.encode())
330         return os.path.join(
331             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
332             md5.hexdigest())
333
334     def _lock_file(self, fileobj):
335         try:
336             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
337         except IOError:
338             raise ResumeCacheConflict("{} locked".format(fileobj.name))
339
340     def load(self):
341         self.cache_file.seek(0)
342         return json.load(self.cache_file)
343
344     def check_cache(self, api_client=None, num_retries=0):
345         try:
346             state = self.load()
347             locator = None
348             try:
349                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
350                     locator = state["_finished_streams"][0][1][0]
351                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
352                     locator = state["_current_stream_locators"][0]
353                 if locator is not None:
354                     kc = arvados.keep.KeepClient(api_client=api_client)
355                     kc.head(locator, num_retries=num_retries)
356             except Exception as e:
357                 self.restart()
358         except (ValueError):
359             pass
360
361     def save(self, data):
362         try:
363             new_cache_fd, new_cache_name = tempfile.mkstemp(
364                 dir=os.path.dirname(self.filename))
365             self._lock_file(new_cache_fd)
366             new_cache = os.fdopen(new_cache_fd, 'r+')
367             json.dump(data, new_cache)
368             os.rename(new_cache_name, self.filename)
369         except (IOError, OSError, ResumeCacheConflict) as error:
370             try:
371                 os.unlink(new_cache_name)
372             except NameError:  # mkstemp failed.
373                 pass
374         else:
375             self.cache_file.close()
376             self.cache_file = new_cache
377
378     def close(self):
379         self.cache_file.close()
380
381     def destroy(self):
382         try:
383             os.unlink(self.filename)
384         except OSError as error:
385             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
386                 raise
387         self.close()
388
389     def restart(self):
390         self.destroy()
391         self.__init__(self.filename)
392
393
394 class ArvPutUploadJob(object):
395     CACHE_DIR = '.cache/arvados/arv-put'
396     EMPTY_STATE = {
397         'manifest' : None, # Last saved manifest checkpoint
398         'files' : {} # Previous run file list: {path : {size, mtime}}
399     }
400
401     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
402                  name=None, owner_uuid=None, api_client=None,
403                  ensure_unique_name=False, num_retries=None,
404                  put_threads=None, replication_desired=None,
405                  filename=None, update_time=60.0, update_collection=None,
406                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
407                  follow_links=True, exclude_paths=[], exclude_names=None):
408         self.paths = paths
409         self.resume = resume
410         self.use_cache = use_cache
411         self.update = False
412         self.reporter = reporter
413         # This will set to 0 before start counting, if no special files are going
414         # to be read.
415         self.bytes_expected = None
416         self.bytes_written = 0
417         self.bytes_skipped = 0
418         self.name = name
419         self.owner_uuid = owner_uuid
420         self.ensure_unique_name = ensure_unique_name
421         self.num_retries = num_retries
422         self.replication_desired = replication_desired
423         self.put_threads = put_threads
424         self.filename = filename
425         self._api_client = api_client
426         self._state_lock = threading.Lock()
427         self._state = None # Previous run state (file list & manifest)
428         self._current_files = [] # Current run file list
429         self._cache_file = None
430         self._collection_lock = threading.Lock()
431         self._remote_collection = None # Collection being updated (if asked)
432         self._local_collection = None # Collection from previous run manifest
433         self._file_paths = set() # Files to be updated in remote collection
434         self._stop_checkpointer = threading.Event()
435         self._checkpointer = threading.Thread(target=self._update_task)
436         self._checkpointer.daemon = True
437         self._update_task_time = update_time  # How many seconds wait between update runs
438         self._files_to_upload = FileUploadList(dry_run=dry_run)
439         self._upload_started = False
440         self.logger = logger
441         self.dry_run = dry_run
442         self._checkpoint_before_quit = True
443         self.follow_links = follow_links
444         self.exclude_paths = exclude_paths
445         self.exclude_names = exclude_names
446
447         if not self.use_cache and self.resume:
448             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
449
450         # Check for obvious dry-run responses
451         if self.dry_run and (not self.use_cache or not self.resume):
452             raise ArvPutUploadIsPending()
453
454         # Load cached data if any and if needed
455         self._setup_state(update_collection)
456
457         # Build the upload file list, excluding requested files and counting the
458         # bytes expected to be uploaded.
459         self._build_upload_list()
460
461     def _build_upload_list(self):
462         """
463         Scan the requested paths to count file sizes, excluding files & dirs if requested
464         and building the upload file list.
465         """
466         # If there aren't special files to be read, reset total bytes count to zero
467         # to start counting.
468         if not any([p for p in self.paths
469                     if not (os.path.isfile(p) or os.path.isdir(p))]):
470             self.bytes_expected = 0
471
472         for path in self.paths:
473             # Test for stdin first, in case some file named '-' exist
474             if path == '-':
475                 if self.dry_run:
476                     raise ArvPutUploadIsPending()
477                 self._write_stdin(self.filename or 'stdin')
478             elif not os.path.exists(path):
479                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
480             elif os.path.isdir(path):
481                 # Use absolute paths on cache index so CWD doesn't interfere
482                 # with the caching logic.
483                 orig_path = path
484                 path = os.path.abspath(path)
485                 if orig_path[-1:] == os.sep:
486                     # When passing a directory reference with a trailing slash,
487                     # its contents should be uploaded directly to the
488                     # collection's root.
489                     prefixdir = path
490                 else:
491                     # When passing a directory reference with no trailing slash,
492                     # upload the directory to the collection's root.
493                     prefixdir = os.path.dirname(path)
494                 prefixdir += os.sep
495                 for root, dirs, files in os.walk(path,
496                                                  followlinks=self.follow_links):
497                     root_relpath = os.path.relpath(root, path)
498                     if root_relpath == '.':
499                         root_relpath = ''
500                     # Exclude files/dirs by full path matching pattern
501                     if self.exclude_paths:
502                         dirs[:] = [d for d in dirs
503                                    if not any(pathname_match(
504                                            os.path.join(root_relpath, d), pat)
505                                               for pat in self.exclude_paths)]
506                         files = [f for f in files
507                                  if not any(pathname_match(
508                                          os.path.join(root_relpath, f), pat)
509                                             for pat in self.exclude_paths)]
510                     # Exclude files/dirs by name matching pattern
511                     if self.exclude_names is not None:
512                         dirs[:] = [d for d in dirs
513                                    if not self.exclude_names.match(d)]
514                         files = [f for f in files
515                                  if not self.exclude_names.match(f)]
516                     # Make os.walk()'s dir traversing order deterministic
517                     dirs.sort()
518                     files.sort()
519                     for f in files:
520                         filepath = os.path.join(root, f)
521                         # Add its size to the total bytes count (if applicable)
522                         if self.follow_links or (not os.path.islink(filepath)):
523                             if self.bytes_expected is not None:
524                                 self.bytes_expected += os.path.getsize(filepath)
525                         self._check_file(filepath,
526                                          os.path.join(root[len(prefixdir):], f))
527             else:
528                 filepath = os.path.abspath(path)
529                 # Add its size to the total bytes count (if applicable)
530                 if self.follow_links or (not os.path.islink(filepath)):
531                     if self.bytes_expected is not None:
532                         self.bytes_expected += os.path.getsize(filepath)
533                 self._check_file(filepath,
534                                  self.filename or os.path.basename(path))
535         # If dry-mode is on, and got up to this point, then we should notify that
536         # there aren't any file to upload.
537         if self.dry_run:
538             raise ArvPutUploadNotPending()
539         # Remove local_collection's files that don't exist locally anymore, so the
540         # bytes_written count is correct.
541         for f in self.collection_file_paths(self._local_collection,
542                                             path_prefix=""):
543             if f != 'stdin' and f != self.filename and not f in self._file_paths:
544                 self._local_collection.remove(f)
545
546     def start(self, save_collection):
547         """
548         Start supporting thread & file uploading
549         """
550         self._checkpointer.start()
551         try:
552             # Update bytes_written from current local collection and
553             # report initial progress.
554             self._update()
555             # Actual file upload
556             self._upload_started = True # Used by the update thread to start checkpointing
557             self._upload_files()
558         except (SystemExit, Exception) as e:
559             self._checkpoint_before_quit = False
560             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
561             # Note: We're expecting SystemExit instead of
562             # KeyboardInterrupt because we have a custom signal
563             # handler in place that raises SystemExit with the catched
564             # signal's code.
565             if isinstance(e, PathDoesNotExistError):
566                 # We aren't interested in the traceback for this case
567                 pass
568             elif not isinstance(e, SystemExit) or e.code != -2:
569                 self.logger.warning("Abnormal termination:\n{}".format(
570                     traceback.format_exc()))
571             raise
572         finally:
573             if not self.dry_run:
574                 # Stop the thread before doing anything else
575                 self._stop_checkpointer.set()
576                 self._checkpointer.join()
577                 if self._checkpoint_before_quit:
578                     # Commit all pending blocks & one last _update()
579                     self._local_collection.manifest_text()
580                     self._update(final=True)
581                     if save_collection:
582                         self.save_collection()
583             if self.use_cache:
584                 self._cache_file.close()
585
586     def save_collection(self):
587         if self.update:
588             # Check if files should be updated on the remote collection.
589             for fp in self._file_paths:
590                 remote_file = self._remote_collection.find(fp)
591                 if not remote_file:
592                     # File don't exist on remote collection, copy it.
593                     self._remote_collection.copy(fp, fp, self._local_collection)
594                 elif remote_file != self._local_collection.find(fp):
595                     # A different file exist on remote collection, overwrite it.
596                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
597                 else:
598                     # The file already exist on remote collection, skip it.
599                     pass
600             self._remote_collection.save(num_retries=self.num_retries)
601         else:
602             self._local_collection.save_new(
603                 name=self.name, owner_uuid=self.owner_uuid,
604                 ensure_unique_name=self.ensure_unique_name,
605                 num_retries=self.num_retries)
606
607     def destroy_cache(self):
608         if self.use_cache:
609             try:
610                 os.unlink(self._cache_filename)
611             except OSError as error:
612                 # That's what we wanted anyway.
613                 if error.errno != errno.ENOENT:
614                     raise
615             self._cache_file.close()
616
617     def _collection_size(self, collection):
618         """
619         Recursively get the total size of the collection
620         """
621         size = 0
622         for item in listvalues(collection):
623             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
624                 size += self._collection_size(item)
625             else:
626                 size += item.size()
627         return size
628
629     def _update_task(self):
630         """
631         Periodically called support task. File uploading is
632         asynchronous so we poll status from the collection.
633         """
634         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
635             self._update()
636
637     def _update(self, final=False):
638         """
639         Update cached manifest text and report progress.
640         """
641         if self._upload_started:
642             with self._collection_lock:
643                 self.bytes_written = self._collection_size(self._local_collection)
644                 if self.use_cache:
645                     if final:
646                         manifest = self._local_collection.manifest_text()
647                     else:
648                         # Get the manifest text without comitting pending blocks
649                         manifest = self._local_collection.manifest_text(strip=False,
650                                                                         normalize=False,
651                                                                         only_committed=True)
652                     # Update cache
653                     with self._state_lock:
654                         self._state['manifest'] = manifest
655             if self.use_cache:
656                 try:
657                     self._save_state()
658                 except Exception as e:
659                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
660         else:
661             self.bytes_written = self.bytes_skipped
662         # Call the reporter, if any
663         self.report_progress()
664
665     def report_progress(self):
666         if self.reporter is not None:
667             self.reporter(self.bytes_written, self.bytes_expected)
668
669     def _write_stdin(self, filename):
670         output = self._local_collection.open(filename, 'wb')
671         self._write(sys.stdin, output)
672         output.close()
673
674     def _check_file(self, source, filename):
675         """
676         Check if this file needs to be uploaded
677         """
678         # Ignore symlinks when requested
679         if (not self.follow_links) and os.path.islink(source):
680             return
681         resume_offset = 0
682         should_upload = False
683         new_file_in_cache = False
684         # Record file path for updating the remote collection before exiting
685         self._file_paths.add(filename)
686
687         with self._state_lock:
688             # If no previous cached data on this file, store it for an eventual
689             # repeated run.
690             if source not in self._state['files']:
691                 self._state['files'][source] = {
692                     'mtime': os.path.getmtime(source),
693                     'size' : os.path.getsize(source)
694                 }
695                 new_file_in_cache = True
696             cached_file_data = self._state['files'][source]
697
698         # Check if file was already uploaded (at least partially)
699         file_in_local_collection = self._local_collection.find(filename)
700
701         # If not resuming, upload the full file.
702         if not self.resume:
703             should_upload = True
704         # New file detected from last run, upload it.
705         elif new_file_in_cache:
706             should_upload = True
707         # Local file didn't change from last run.
708         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
709             if not file_in_local_collection:
710                 # File not uploaded yet, upload it completely
711                 should_upload = True
712             elif file_in_local_collection.permission_expired():
713                 # Permission token expired, re-upload file. This will change whenever
714                 # we have a API for refreshing tokens.
715                 self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
716                 should_upload = True
717                 self._local_collection.remove(filename)
718             elif cached_file_data['size'] == file_in_local_collection.size():
719                 # File already there, skip it.
720                 self.bytes_skipped += cached_file_data['size']
721             elif cached_file_data['size'] > file_in_local_collection.size():
722                 # File partially uploaded, resume!
723                 resume_offset = file_in_local_collection.size()
724                 self.bytes_skipped += resume_offset
725                 should_upload = True
726             else:
727                 # Inconsistent cache, re-upload the file
728                 should_upload = True
729                 self._local_collection.remove(filename)
730                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
731         # Local file differs from cached data, re-upload it.
732         else:
733             if file_in_local_collection:
734                 self._local_collection.remove(filename)
735             should_upload = True
736
737         if should_upload:
738             try:
739                 self._files_to_upload.append((source, resume_offset, filename))
740             except ArvPutUploadIsPending:
741                 # This could happen when running on dry-mode, close cache file to
742                 # avoid locking issues.
743                 self._cache_file.close()
744                 raise
745
746     def _upload_files(self):
747         for source, resume_offset, filename in self._files_to_upload:
748             with open(source, 'rb') as source_fd:
749                 with self._state_lock:
750                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
751                     self._state['files'][source]['size'] = os.path.getsize(source)
752                 if resume_offset > 0:
753                     # Start upload where we left off
754                     output = self._local_collection.open(filename, 'ab')
755                     source_fd.seek(resume_offset)
756                 else:
757                     # Start from scratch
758                     output = self._local_collection.open(filename, 'wb')
759                 self._write(source_fd, output)
760                 output.close(flush=False)
761
762     def _write(self, source_fd, output):
763         while True:
764             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
765             if not data:
766                 break
767             output.write(data)
768
769     def _my_collection(self):
770         return self._remote_collection if self.update else self._local_collection
771
772     def _setup_state(self, update_collection):
773         """
774         Create a new cache file or load a previously existing one.
775         """
776         # Load an already existing collection for update
777         if update_collection and re.match(arvados.util.collection_uuid_pattern,
778                                           update_collection):
779             try:
780                 self._remote_collection = arvados.collection.Collection(
781                     update_collection, api_client=self._api_client)
782             except arvados.errors.ApiError as error:
783                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
784             else:
785                 self.update = True
786         elif update_collection:
787             # Collection locator provided, but unknown format
788             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
789
790         if self.use_cache:
791             # Set up cache file name from input paths.
792             md5 = hashlib.md5()
793             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
794             realpaths = sorted(os.path.realpath(path) for path in self.paths)
795             md5.update(b'\0'.join([p.encode() for p in realpaths]))
796             if self.filename:
797                 md5.update(self.filename.encode())
798             cache_filename = md5.hexdigest()
799             cache_filepath = os.path.join(
800                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
801                 cache_filename)
802             if self.resume and os.path.exists(cache_filepath):
803                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
804                 self._cache_file = open(cache_filepath, 'a+')
805             else:
806                 # --no-resume means start with a empty cache file.
807                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
808                 self._cache_file = open(cache_filepath, 'w+')
809             self._cache_filename = self._cache_file.name
810             self._lock_file(self._cache_file)
811             self._cache_file.seek(0)
812
813         with self._state_lock:
814             if self.use_cache:
815                 try:
816                     self._state = json.load(self._cache_file)
817                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
818                         # Cache at least partially incomplete, set up new cache
819                         self._state = copy.deepcopy(self.EMPTY_STATE)
820                 except ValueError:
821                     # Cache file empty, set up new cache
822                     self._state = copy.deepcopy(self.EMPTY_STATE)
823             else:
824                 self.logger.info("No cache usage requested for this run.")
825                 # No cache file, set empty state
826                 self._state = copy.deepcopy(self.EMPTY_STATE)
827             # Load the previous manifest so we can check if files were modified remotely.
828             self._local_collection = arvados.collection.Collection(
829                 self._state['manifest'],
830                 replication_desired=self.replication_desired,
831                 put_threads=self.put_threads,
832                 api_client=self._api_client)
833
834     def collection_file_paths(self, col, path_prefix='.'):
835         """Return a list of file paths by recursively go through the entire collection `col`"""
836         file_paths = []
837         for name, item in listitems(col):
838             if isinstance(item, arvados.arvfile.ArvadosFile):
839                 file_paths.append(os.path.join(path_prefix, name))
840             elif isinstance(item, arvados.collection.Subcollection):
841                 new_prefix = os.path.join(path_prefix, name)
842                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
843         return file_paths
844
845     def _lock_file(self, fileobj):
846         try:
847             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
848         except IOError:
849             raise ResumeCacheConflict("{} locked".format(fileobj.name))
850
851     def _save_state(self):
852         """
853         Atomically save current state into cache.
854         """
855         with self._state_lock:
856             # We're not using copy.deepcopy() here because it's a lot slower
857             # than json.dumps(), and we're already needing JSON format to be
858             # saved on disk.
859             state = json.dumps(self._state)
860         try:
861             new_cache = tempfile.NamedTemporaryFile(
862                 mode='w+',
863                 dir=os.path.dirname(self._cache_filename), delete=False)
864             self._lock_file(new_cache)
865             new_cache.write(state)
866             new_cache.flush()
867             os.fsync(new_cache)
868             os.rename(new_cache.name, self._cache_filename)
869         except (IOError, OSError, ResumeCacheConflict) as error:
870             self.logger.error("There was a problem while saving the cache file: {}".format(error))
871             try:
872                 os.unlink(new_cache_name)
873             except NameError:  # mkstemp failed.
874                 pass
875         else:
876             self._cache_file.close()
877             self._cache_file = new_cache
878
879     def collection_name(self):
880         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
881
882     def manifest_locator(self):
883         return self._my_collection().manifest_locator()
884
885     def portable_data_hash(self):
886         pdh = self._my_collection().portable_data_hash()
887         m = self._my_collection().stripped_manifest().encode()
888         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
889         if pdh != local_pdh:
890             logger.warning("\n".join([
891                 "arv-put: API server provided PDH differs from local manifest.",
892                 "         This should not happen; showing API server version."]))
893         return pdh
894
895     def manifest_text(self, stream_name=".", strip=False, normalize=False):
896         return self._my_collection().manifest_text(stream_name, strip, normalize)
897
898     def _datablocks_on_item(self, item):
899         """
900         Return a list of datablock locators, recursively navigating
901         through subcollections
902         """
903         if isinstance(item, arvados.arvfile.ArvadosFile):
904             if item.size() == 0:
905                 # Empty file locator
906                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
907             else:
908                 locators = []
909                 for segment in item.segments():
910                     loc = segment.locator
911                     locators.append(loc)
912                 return locators
913         elif isinstance(item, arvados.collection.Collection):
914             l = [self._datablocks_on_item(x) for x in listvalues(item)]
915             # Fast list flattener method taken from:
916             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
917             return [loc for sublist in l for loc in sublist]
918         else:
919             return None
920
921     def data_locators(self):
922         with self._collection_lock:
923             # Make sure all datablocks are flushed before getting the locators
924             self._my_collection().manifest_text()
925             datablocks = self._datablocks_on_item(self._my_collection())
926         return datablocks
927
928 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
929                                                             os.getpid())
930
931 # Simulate glob.glob() matching behavior without the need to scan the filesystem
932 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
933 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
934 # so instead we're using it on every path component.
935 def pathname_match(pathname, pattern):
936     name = pathname.split(os.sep)
937     # Fix patterns like 'some/subdir/' or 'some//subdir'
938     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
939     if len(name) != len(pat):
940         return False
941     for i in range(len(name)):
942         if not fnmatch.fnmatch(name[i], pat[i]):
943             return False
944     return True
945
946 def machine_progress(bytes_written, bytes_expected):
947     return _machine_format.format(
948         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
949
950 def human_progress(bytes_written, bytes_expected):
951     if bytes_expected:
952         return "\r{}M / {}M {:.1%} ".format(
953             bytes_written >> 20, bytes_expected >> 20,
954             float(bytes_written) / bytes_expected)
955     else:
956         return "\r{} ".format(bytes_written)
957
958 def progress_writer(progress_func, outfile=sys.stderr):
959     def write_progress(bytes_written, bytes_expected):
960         outfile.write(progress_func(bytes_written, bytes_expected))
961     return write_progress
962
963 def exit_signal_handler(sigcode, frame):
964     sys.exit(-sigcode)
965
966 def desired_project_uuid(api_client, project_uuid, num_retries):
967     if not project_uuid:
968         query = api_client.users().current()
969     elif arvados.util.user_uuid_pattern.match(project_uuid):
970         query = api_client.users().get(uuid=project_uuid)
971     elif arvados.util.group_uuid_pattern.match(project_uuid):
972         query = api_client.groups().get(uuid=project_uuid)
973     else:
974         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
975     return query.execute(num_retries=num_retries)['uuid']
976
977 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
978     global api_client
979
980     args = parse_arguments(arguments)
981     logger = logging.getLogger('arvados.arv_put')
982     if args.silent:
983         logger.setLevel(logging.WARNING)
984     else:
985         logger.setLevel(logging.INFO)
986     status = 0
987
988     request_id = arvados.util.new_request_id()
989     logger.info('X-Request-Id: '+request_id)
990
991     if api_client is None:
992         api_client = arvados.api('v1', request_id=request_id)
993
994     # Determine the name to use
995     if args.name:
996         if args.stream or args.raw:
997             logger.error("Cannot use --name with --stream or --raw")
998             sys.exit(1)
999         elif args.update_collection:
1000             logger.error("Cannot use --name with --update-collection")
1001             sys.exit(1)
1002         collection_name = args.name
1003     else:
1004         collection_name = "Saved at {} by {}@{}".format(
1005             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1006             pwd.getpwuid(os.getuid()).pw_name,
1007             socket.gethostname())
1008
1009     if args.project_uuid and (args.stream or args.raw):
1010         logger.error("Cannot use --project-uuid with --stream or --raw")
1011         sys.exit(1)
1012
1013     # Determine the parent project
1014     try:
1015         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1016                                             args.retries)
1017     except (apiclient_errors.Error, ValueError) as error:
1018         logger.error(error)
1019         sys.exit(1)
1020
1021     if args.progress:
1022         reporter = progress_writer(human_progress)
1023     elif args.batch_progress:
1024         reporter = progress_writer(machine_progress)
1025     else:
1026         reporter = None
1027
1028     # Setup exclude regex from all the --exclude arguments provided
1029     name_patterns = []
1030     exclude_paths = []
1031     exclude_names = None
1032     if len(args.exclude) > 0:
1033         # We're supporting 2 kinds of exclusion patterns:
1034         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1035         #                            the name, wherever the file is on the tree)
1036         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1037         #                            entire path, and should be relative to
1038         #                            any input dir argument)
1039         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1040         #                            placed directly underneath the input dir)
1041         for p in args.exclude:
1042             # Only relative paths patterns allowed
1043             if p.startswith(os.sep):
1044                 logger.error("Cannot use absolute paths with --exclude")
1045                 sys.exit(1)
1046             if os.path.dirname(p):
1047                 # We don't support of path patterns with '..'
1048                 p_parts = p.split(os.sep)
1049                 if '..' in p_parts:
1050                     logger.error(
1051                         "Cannot use path patterns that include or '..'")
1052                     sys.exit(1)
1053                 # Path search pattern
1054                 exclude_paths.append(p)
1055             else:
1056                 # Name-only search pattern
1057                 name_patterns.append(p)
1058         # For name only matching, we can combine all patterns into a single
1059         # regexp, for better performance.
1060         exclude_names = re.compile('|'.join(
1061             [fnmatch.translate(p) for p in name_patterns]
1062         )) if len(name_patterns) > 0 else None
1063         # Show the user the patterns to be used, just in case they weren't
1064         # specified inside quotes and got changed by the shell expansion.
1065         logger.info("Exclude patterns: {}".format(args.exclude))
1066
1067     # If this is used by a human, and there's at least one directory to be
1068     # uploaded, the expected bytes calculation can take a moment.
1069     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1070         logger.info("Calculating upload size, this could take some time...")
1071     try:
1072         writer = ArvPutUploadJob(paths = args.paths,
1073                                  resume = args.resume,
1074                                  use_cache = args.use_cache,
1075                                  filename = args.filename,
1076                                  reporter = reporter,
1077                                  api_client = api_client,
1078                                  num_retries = args.retries,
1079                                  replication_desired = args.replication,
1080                                  put_threads = args.threads,
1081                                  name = collection_name,
1082                                  owner_uuid = project_uuid,
1083                                  ensure_unique_name = True,
1084                                  update_collection = args.update_collection,
1085                                  logger=logger,
1086                                  dry_run=args.dry_run,
1087                                  follow_links=args.follow_links,
1088                                  exclude_paths=exclude_paths,
1089                                  exclude_names=exclude_names)
1090     except ResumeCacheConflict:
1091         logger.error("\n".join([
1092             "arv-put: Another process is already uploading this data.",
1093             "         Use --no-cache if this is really what you want."]))
1094         sys.exit(1)
1095     except CollectionUpdateError as error:
1096         logger.error("\n".join([
1097             "arv-put: %s" % str(error)]))
1098         sys.exit(1)
1099     except ArvPutUploadIsPending:
1100         # Dry run check successful, return proper exit code.
1101         sys.exit(2)
1102     except ArvPutUploadNotPending:
1103         # No files pending for upload
1104         sys.exit(0)
1105     except PathDoesNotExistError as error:
1106         logger.error("\n".join([
1107             "arv-put: %s" % str(error)]))
1108         sys.exit(1)
1109
1110     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1111     # the originals.
1112     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1113                             for sigcode in CAUGHT_SIGNALS}
1114
1115     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1116         logger.warning("\n".join([
1117             "arv-put: Resuming previous upload from last checkpoint.",
1118             "         Use the --no-resume option to start over."]))
1119
1120     if not args.dry_run:
1121         writer.report_progress()
1122     output = None
1123     try:
1124         writer.start(save_collection=not(args.stream or args.raw))
1125     except arvados.errors.ApiError as error:
1126         logger.error("\n".join([
1127             "arv-put: %s" % str(error)]))
1128         sys.exit(1)
1129
1130     if args.progress:  # Print newline to split stderr from stdout for humans.
1131         logger.info("\n")
1132
1133     if args.stream:
1134         if args.normalize:
1135             output = writer.manifest_text(normalize=True)
1136         else:
1137             output = writer.manifest_text()
1138     elif args.raw:
1139         output = ','.join(writer.data_locators())
1140     else:
1141         try:
1142             if args.update_collection:
1143                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1144             else:
1145                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1146             if args.portable_data_hash:
1147                 output = writer.portable_data_hash()
1148             else:
1149                 output = writer.manifest_locator()
1150         except apiclient_errors.Error as error:
1151             logger.error(
1152                 "arv-put: Error creating Collection on project: {}.".format(
1153                     error))
1154             status = 1
1155
1156     # Print the locator (uuid) of the new collection.
1157     if output is None:
1158         status = status or 1
1159     elif not args.silent:
1160         stdout.write(output)
1161         if not output.endswith('\n'):
1162             stdout.write('\n')
1163
1164     for sigcode, orig_handler in listitems(orig_signal_handlers):
1165         signal.signal(sigcode, orig_handler)
1166
1167     if status != 0:
1168         sys.exit(status)
1169
1170     # Success!
1171     return output
1172
1173
1174 if __name__ == '__main__':
1175     main()