Merge branch 'master' into 13804-no-shutdown-wanted-nodes
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34
35 import arvados.commands._util as arv_cmd
36
37 api_client = None
38
39 upload_opts = argparse.ArgumentParser(add_help=False)
40
41 upload_opts.add_argument('--version', action='version',
42                          version="%s %s" % (sys.argv[0], __version__),
43                          help='Print version and exit.')
44 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
45                          help="""
46 Local file or directory. If path is a directory reference with a trailing
47 slash, then just upload the directory's contents; otherwise upload the
48 directory itself. Default: read from standard input.
49 """)
50
51 _group = upload_opts.add_mutually_exclusive_group()
52
53 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
54                     default=-1, help=argparse.SUPPRESS)
55
56 _group.add_argument('--normalize', action='store_true',
57                     help="""
58 Normalize the manifest by re-ordering files and streams after writing
59 data.
60 """)
61
62 _group.add_argument('--dry-run', action='store_true', default=False,
63                     help="""
64 Don't actually upload files, but only check if any file should be
65 uploaded. Exit with code=2 when files are pending for upload.
66 """)
67
68 _group = upload_opts.add_mutually_exclusive_group()
69
70 _group.add_argument('--as-stream', action='store_true', dest='stream',
71                     help="""
72 Synonym for --stream.
73 """)
74
75 _group.add_argument('--stream', action='store_true',
76                     help="""
77 Store the file content and display the resulting manifest on
78 stdout. Do not write the manifest to Keep or save a Collection object
79 in Arvados.
80 """)
81
82 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
83                     help="""
84 Synonym for --manifest.
85 """)
86
87 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
88                     help="""
89 Synonym for --manifest.
90 """)
91
92 _group.add_argument('--manifest', action='store_true',
93                     help="""
94 Store the file data and resulting manifest in Keep, save a Collection
95 object in Arvados, and display the manifest locator (Collection uuid)
96 on stdout. This is the default behavior.
97 """)
98
99 _group.add_argument('--as-raw', action='store_true', dest='raw',
100                     help="""
101 Synonym for --raw.
102 """)
103
104 _group.add_argument('--raw', action='store_true',
105                     help="""
106 Store the file content and display the data block locators on stdout,
107 separated by commas, with a trailing newline. Do not store a
108 manifest.
109 """)
110
111 upload_opts.add_argument('--update-collection', type=str, default=None,
112                          dest='update_collection', metavar="UUID", help="""
113 Update an existing collection identified by the given Arvados collection
114 UUID. All new local files will be uploaded.
115 """)
116
117 upload_opts.add_argument('--use-filename', type=str, default=None,
118                          dest='filename', help="""
119 Synonym for --filename.
120 """)
121
122 upload_opts.add_argument('--filename', type=str, default=None,
123                          help="""
124 Use the given filename in the manifest, instead of the name of the
125 local file. This is useful when "-" or "/dev/stdin" is given as an
126 input file. It can be used only if there is exactly one path given and
127 it is not a directory. Implies --manifest.
128 """)
129
130 upload_opts.add_argument('--portable-data-hash', action='store_true',
131                          help="""
132 Print the portable data hash instead of the Arvados UUID for the collection
133 created by the upload.
134 """)
135
136 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
137                          help="""
138 Set the replication level for the new collection: how many different
139 physical storage devices (e.g., disks) should have a copy of each data
140 block. Default is to use the server-provided default (if any) or 2.
141 """)
142
143 upload_opts.add_argument('--storage-classes', help="""
144 Specify comma separated list of storage classes to be used when saving data to Keep.
145 """)
146
147 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
148                          help="""
149 Set the number of upload threads to be used. Take into account that
150 using lots of threads will increase the RAM requirements. Default is
151 to use 2 threads.
152 On high latency installations, using a greater number will improve
153 overall throughput.
154 """)
155
156 run_opts = argparse.ArgumentParser(add_help=False)
157
158 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
159 Store the collection in the specified project, instead of your Home
160 project.
161 """)
162
163 run_opts.add_argument('--name', help="""
164 Save the collection with the specified name.
165 """)
166
167 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
168                       action='append', help="""
169 Exclude files and directories whose names match the given glob pattern. When
170 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
171 directory, relative to the provided input dirs will be excluded.
172 When using a filename pattern like '*.txt', any text file will be excluded
173 no matter where is placed.
174 For the special case of needing to exclude only files or dirs directly below
175 the given input directory, you can use a pattern like './exclude_this.gif'.
176 You can specify multiple patterns by using this argument more than once.
177 """)
178
179 _group = run_opts.add_mutually_exclusive_group()
180 _group.add_argument('--progress', action='store_true',
181                     help="""
182 Display human-readable progress on stderr (bytes and, if possible,
183 percentage of total data size). This is the default behavior when
184 stderr is a tty.
185 """)
186
187 _group.add_argument('--no-progress', action='store_true',
188                     help="""
189 Do not display human-readable progress on stderr, even if stderr is a
190 tty.
191 """)
192
193 _group.add_argument('--batch-progress', action='store_true',
194                     help="""
195 Display machine-readable progress on stderr (bytes and, if known,
196 total data size).
197 """)
198
199 run_opts.add_argument('--silent', action='store_true',
200                       help="""
201 Do not print any debug messages to console. (Any error messages will
202 still be displayed.)
203 """)
204
205 _group = run_opts.add_mutually_exclusive_group()
206 _group.add_argument('--resume', action='store_true', default=True,
207                     help="""
208 Continue interrupted uploads from cached state (default).
209 """)
210 _group.add_argument('--no-resume', action='store_false', dest='resume',
211                     help="""
212 Do not continue interrupted uploads from cached state.
213 """)
214
215 _group = run_opts.add_mutually_exclusive_group()
216 _group.add_argument('--follow-links', action='store_true', default=True,
217                     dest='follow_links', help="""
218 Follow file and directory symlinks (default).
219 """)
220 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
221                     help="""
222 Do not follow file and directory symlinks.
223 """)
224
225 _group = run_opts.add_mutually_exclusive_group()
226 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
227                     help="""
228 Save upload state in a cache file for resuming (default).
229 """)
230 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
231                     help="""
232 Do not save upload state in a cache file for resuming.
233 """)
234
235 arg_parser = argparse.ArgumentParser(
236     description='Copy data from the local filesystem to Keep.',
237     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
238
239 def parse_arguments(arguments):
240     args = arg_parser.parse_args(arguments)
241
242     if len(args.paths) == 0:
243         args.paths = ['-']
244
245     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
246
247     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
248         if args.filename:
249             arg_parser.error("""
250     --filename argument cannot be used when storing a directory or
251     multiple files.
252     """)
253
254     # Turn on --progress by default if stderr is a tty.
255     if (not (args.batch_progress or args.no_progress or args.silent)
256         and os.isatty(sys.stderr.fileno())):
257         args.progress = True
258
259     # Turn off --resume (default) if --no-cache is used.
260     if not args.use_cache:
261         args.resume = False
262
263     if args.paths == ['-']:
264         if args.update_collection:
265             arg_parser.error("""
266     --update-collection cannot be used when reading from stdin.
267     """)
268         args.resume = False
269         args.use_cache = False
270         if not args.filename:
271             args.filename = 'stdin'
272
273     # Remove possible duplicated patterns
274     if len(args.exclude) > 0:
275         args.exclude = list(set(args.exclude))
276
277     return args
278
279
280 class PathDoesNotExistError(Exception):
281     pass
282
283
284 class CollectionUpdateError(Exception):
285     pass
286
287
288 class ResumeCacheConflict(Exception):
289     pass
290
291
292 class ArvPutArgumentConflict(Exception):
293     pass
294
295
296 class ArvPutUploadIsPending(Exception):
297     pass
298
299
300 class ArvPutUploadNotPending(Exception):
301     pass
302
303
304 class FileUploadList(list):
305     def __init__(self, dry_run=False):
306         list.__init__(self)
307         self.dry_run = dry_run
308
309     def append(self, other):
310         if self.dry_run:
311             raise ArvPutUploadIsPending()
312         super(FileUploadList, self).append(other)
313
314
315 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
316 class ArvPutLogFormatter(logging.Formatter):
317     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
318     err_fmtr = None
319     request_id_informed = False
320
321     def __init__(self, request_id):
322         self.err_fmtr = logging.Formatter(
323             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
324             arvados.log_date_format)
325
326     def format(self, record):
327         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
328             self.request_id_informed = True
329             return self.err_fmtr.format(record)
330         return self.std_fmtr.format(record)
331
332
333 class ResumeCache(object):
334     CACHE_DIR = '.cache/arvados/arv-put'
335
336     def __init__(self, file_spec):
337         self.cache_file = open(file_spec, 'a+')
338         self._lock_file(self.cache_file)
339         self.filename = self.cache_file.name
340
341     @classmethod
342     def make_path(cls, args):
343         md5 = hashlib.md5()
344         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
345         realpaths = sorted(os.path.realpath(path) for path in args.paths)
346         md5.update(b'\0'.join([p.encode() for p in realpaths]))
347         if any(os.path.isdir(path) for path in realpaths):
348             md5.update(b'-1')
349         elif args.filename:
350             md5.update(args.filename.encode())
351         return os.path.join(
352             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
353             md5.hexdigest())
354
355     def _lock_file(self, fileobj):
356         try:
357             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
358         except IOError:
359             raise ResumeCacheConflict("{} locked".format(fileobj.name))
360
361     def load(self):
362         self.cache_file.seek(0)
363         return json.load(self.cache_file)
364
365     def check_cache(self, api_client=None, num_retries=0):
366         try:
367             state = self.load()
368             locator = None
369             try:
370                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
371                     locator = state["_finished_streams"][0][1][0]
372                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
373                     locator = state["_current_stream_locators"][0]
374                 if locator is not None:
375                     kc = arvados.keep.KeepClient(api_client=api_client)
376                     kc.head(locator, num_retries=num_retries)
377             except Exception as e:
378                 self.restart()
379         except (ValueError):
380             pass
381
382     def save(self, data):
383         try:
384             new_cache_fd, new_cache_name = tempfile.mkstemp(
385                 dir=os.path.dirname(self.filename))
386             self._lock_file(new_cache_fd)
387             new_cache = os.fdopen(new_cache_fd, 'r+')
388             json.dump(data, new_cache)
389             os.rename(new_cache_name, self.filename)
390         except (IOError, OSError, ResumeCacheConflict) as error:
391             try:
392                 os.unlink(new_cache_name)
393             except NameError:  # mkstemp failed.
394                 pass
395         else:
396             self.cache_file.close()
397             self.cache_file = new_cache
398
399     def close(self):
400         self.cache_file.close()
401
402     def destroy(self):
403         try:
404             os.unlink(self.filename)
405         except OSError as error:
406             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
407                 raise
408         self.close()
409
410     def restart(self):
411         self.destroy()
412         self.__init__(self.filename)
413
414
415 class ArvPutUploadJob(object):
416     CACHE_DIR = '.cache/arvados/arv-put'
417     EMPTY_STATE = {
418         'manifest' : None, # Last saved manifest checkpoint
419         'files' : {} # Previous run file list: {path : {size, mtime}}
420     }
421
422     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
423                  name=None, owner_uuid=None, api_client=None,
424                  ensure_unique_name=False, num_retries=None,
425                  put_threads=None, replication_desired=None, filename=None,
426                  update_time=60.0, update_collection=None, storage_classes=None,
427                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
428                  follow_links=True, exclude_paths=[], exclude_names=None):
429         self.paths = paths
430         self.resume = resume
431         self.use_cache = use_cache
432         self.update = False
433         self.reporter = reporter
434         # This will set to 0 before start counting, if no special files are going
435         # to be read.
436         self.bytes_expected = None
437         self.bytes_written = 0
438         self.bytes_skipped = 0
439         self.name = name
440         self.owner_uuid = owner_uuid
441         self.ensure_unique_name = ensure_unique_name
442         self.num_retries = num_retries
443         self.replication_desired = replication_desired
444         self.put_threads = put_threads
445         self.filename = filename
446         self.storage_classes = storage_classes
447         self._api_client = api_client
448         self._state_lock = threading.Lock()
449         self._state = None # Previous run state (file list & manifest)
450         self._current_files = [] # Current run file list
451         self._cache_file = None
452         self._collection_lock = threading.Lock()
453         self._remote_collection = None # Collection being updated (if asked)
454         self._local_collection = None # Collection from previous run manifest
455         self._file_paths = set() # Files to be updated in remote collection
456         self._stop_checkpointer = threading.Event()
457         self._checkpointer = threading.Thread(target=self._update_task)
458         self._checkpointer.daemon = True
459         self._update_task_time = update_time  # How many seconds wait between update runs
460         self._files_to_upload = FileUploadList(dry_run=dry_run)
461         self._upload_started = False
462         self.logger = logger
463         self.dry_run = dry_run
464         self._checkpoint_before_quit = True
465         self.follow_links = follow_links
466         self.exclude_paths = exclude_paths
467         self.exclude_names = exclude_names
468
469         if not self.use_cache and self.resume:
470             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
471
472         # Check for obvious dry-run responses
473         if self.dry_run and (not self.use_cache or not self.resume):
474             raise ArvPutUploadIsPending()
475
476         # Load cached data if any and if needed
477         self._setup_state(update_collection)
478
479         # Build the upload file list, excluding requested files and counting the
480         # bytes expected to be uploaded.
481         self._build_upload_list()
482
483     def _build_upload_list(self):
484         """
485         Scan the requested paths to count file sizes, excluding files & dirs if requested
486         and building the upload file list.
487         """
488         # If there aren't special files to be read, reset total bytes count to zero
489         # to start counting.
490         if not any([p for p in self.paths
491                     if not (os.path.isfile(p) or os.path.isdir(p))]):
492             self.bytes_expected = 0
493
494         for path in self.paths:
495             # Test for stdin first, in case some file named '-' exist
496             if path == '-':
497                 if self.dry_run:
498                     raise ArvPutUploadIsPending()
499                 self._write_stdin(self.filename or 'stdin')
500             elif not os.path.exists(path):
501                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
502             elif os.path.isdir(path):
503                 # Use absolute paths on cache index so CWD doesn't interfere
504                 # with the caching logic.
505                 orig_path = path
506                 path = os.path.abspath(path)
507                 if orig_path[-1:] == os.sep:
508                     # When passing a directory reference with a trailing slash,
509                     # its contents should be uploaded directly to the
510                     # collection's root.
511                     prefixdir = path
512                 else:
513                     # When passing a directory reference with no trailing slash,
514                     # upload the directory to the collection's root.
515                     prefixdir = os.path.dirname(path)
516                 prefixdir += os.sep
517                 for root, dirs, files in os.walk(path,
518                                                  followlinks=self.follow_links):
519                     root_relpath = os.path.relpath(root, path)
520                     if root_relpath == '.':
521                         root_relpath = ''
522                     # Exclude files/dirs by full path matching pattern
523                     if self.exclude_paths:
524                         dirs[:] = [d for d in dirs
525                                    if not any(pathname_match(
526                                            os.path.join(root_relpath, d), pat)
527                                               for pat in self.exclude_paths)]
528                         files = [f for f in files
529                                  if not any(pathname_match(
530                                          os.path.join(root_relpath, f), pat)
531                                             for pat in self.exclude_paths)]
532                     # Exclude files/dirs by name matching pattern
533                     if self.exclude_names is not None:
534                         dirs[:] = [d for d in dirs
535                                    if not self.exclude_names.match(d)]
536                         files = [f for f in files
537                                  if not self.exclude_names.match(f)]
538                     # Make os.walk()'s dir traversing order deterministic
539                     dirs.sort()
540                     files.sort()
541                     for f in files:
542                         filepath = os.path.join(root, f)
543                         # Add its size to the total bytes count (if applicable)
544                         if self.follow_links or (not os.path.islink(filepath)):
545                             if self.bytes_expected is not None:
546                                 self.bytes_expected += os.path.getsize(filepath)
547                         self._check_file(filepath,
548                                          os.path.join(root[len(prefixdir):], f))
549             else:
550                 filepath = os.path.abspath(path)
551                 # Add its size to the total bytes count (if applicable)
552                 if self.follow_links or (not os.path.islink(filepath)):
553                     if self.bytes_expected is not None:
554                         self.bytes_expected += os.path.getsize(filepath)
555                 self._check_file(filepath,
556                                  self.filename or os.path.basename(path))
557         # If dry-mode is on, and got up to this point, then we should notify that
558         # there aren't any file to upload.
559         if self.dry_run:
560             raise ArvPutUploadNotPending()
561         # Remove local_collection's files that don't exist locally anymore, so the
562         # bytes_written count is correct.
563         for f in self.collection_file_paths(self._local_collection,
564                                             path_prefix=""):
565             if f != 'stdin' and f != self.filename and not f in self._file_paths:
566                 self._local_collection.remove(f)
567
568     def start(self, save_collection):
569         """
570         Start supporting thread & file uploading
571         """
572         self._checkpointer.start()
573         try:
574             # Update bytes_written from current local collection and
575             # report initial progress.
576             self._update()
577             # Actual file upload
578             self._upload_started = True # Used by the update thread to start checkpointing
579             self._upload_files()
580         except (SystemExit, Exception) as e:
581             self._checkpoint_before_quit = False
582             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
583             # Note: We're expecting SystemExit instead of
584             # KeyboardInterrupt because we have a custom signal
585             # handler in place that raises SystemExit with the catched
586             # signal's code.
587             if isinstance(e, PathDoesNotExistError):
588                 # We aren't interested in the traceback for this case
589                 pass
590             elif not isinstance(e, SystemExit) or e.code != -2:
591                 self.logger.warning("Abnormal termination:\n{}".format(
592                     traceback.format_exc()))
593             raise
594         finally:
595             if not self.dry_run:
596                 # Stop the thread before doing anything else
597                 self._stop_checkpointer.set()
598                 self._checkpointer.join()
599                 if self._checkpoint_before_quit:
600                     # Commit all pending blocks & one last _update()
601                     self._local_collection.manifest_text()
602                     self._update(final=True)
603                     if save_collection:
604                         self.save_collection()
605             if self.use_cache:
606                 self._cache_file.close()
607
608     def save_collection(self):
609         if self.update:
610             # Check if files should be updated on the remote collection.
611             for fp in self._file_paths:
612                 remote_file = self._remote_collection.find(fp)
613                 if not remote_file:
614                     # File don't exist on remote collection, copy it.
615                     self._remote_collection.copy(fp, fp, self._local_collection)
616                 elif remote_file != self._local_collection.find(fp):
617                     # A different file exist on remote collection, overwrite it.
618                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
619                 else:
620                     # The file already exist on remote collection, skip it.
621                     pass
622             self._remote_collection.save(storage_classes=self.storage_classes,
623                                          num_retries=self.num_retries)
624         else:
625             if self.storage_classes is None:
626                 self.storage_classes = ['default']
627             self._local_collection.save_new(
628                 name=self.name, owner_uuid=self.owner_uuid,
629                 storage_classes=self.storage_classes,
630                 ensure_unique_name=self.ensure_unique_name,
631                 num_retries=self.num_retries)
632
633     def destroy_cache(self):
634         if self.use_cache:
635             try:
636                 os.unlink(self._cache_filename)
637             except OSError as error:
638                 # That's what we wanted anyway.
639                 if error.errno != errno.ENOENT:
640                     raise
641             self._cache_file.close()
642
643     def _collection_size(self, collection):
644         """
645         Recursively get the total size of the collection
646         """
647         size = 0
648         for item in listvalues(collection):
649             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
650                 size += self._collection_size(item)
651             else:
652                 size += item.size()
653         return size
654
655     def _update_task(self):
656         """
657         Periodically called support task. File uploading is
658         asynchronous so we poll status from the collection.
659         """
660         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
661             self._update()
662
663     def _update(self, final=False):
664         """
665         Update cached manifest text and report progress.
666         """
667         if self._upload_started:
668             with self._collection_lock:
669                 self.bytes_written = self._collection_size(self._local_collection)
670                 if self.use_cache:
671                     if final:
672                         manifest = self._local_collection.manifest_text()
673                     else:
674                         # Get the manifest text without comitting pending blocks
675                         manifest = self._local_collection.manifest_text(strip=False,
676                                                                         normalize=False,
677                                                                         only_committed=True)
678                     # Update cache
679                     with self._state_lock:
680                         self._state['manifest'] = manifest
681             if self.use_cache:
682                 try:
683                     self._save_state()
684                 except Exception as e:
685                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
686         else:
687             self.bytes_written = self.bytes_skipped
688         # Call the reporter, if any
689         self.report_progress()
690
691     def report_progress(self):
692         if self.reporter is not None:
693             self.reporter(self.bytes_written, self.bytes_expected)
694
695     def _write_stdin(self, filename):
696         output = self._local_collection.open(filename, 'wb')
697         self._write(sys.stdin, output)
698         output.close()
699
700     def _check_file(self, source, filename):
701         """
702         Check if this file needs to be uploaded
703         """
704         # Ignore symlinks when requested
705         if (not self.follow_links) and os.path.islink(source):
706             return
707         resume_offset = 0
708         should_upload = False
709         new_file_in_cache = False
710         # Record file path for updating the remote collection before exiting
711         self._file_paths.add(filename)
712
713         with self._state_lock:
714             # If no previous cached data on this file, store it for an eventual
715             # repeated run.
716             if source not in self._state['files']:
717                 self._state['files'][source] = {
718                     'mtime': os.path.getmtime(source),
719                     'size' : os.path.getsize(source)
720                 }
721                 new_file_in_cache = True
722             cached_file_data = self._state['files'][source]
723
724         # Check if file was already uploaded (at least partially)
725         file_in_local_collection = self._local_collection.find(filename)
726
727         # If not resuming, upload the full file.
728         if not self.resume:
729             should_upload = True
730         # New file detected from last run, upload it.
731         elif new_file_in_cache:
732             should_upload = True
733         # Local file didn't change from last run.
734         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
735             if not file_in_local_collection:
736                 # File not uploaded yet, upload it completely
737                 should_upload = True
738             elif file_in_local_collection.permission_expired():
739                 # Permission token expired, re-upload file. This will change whenever
740                 # we have a API for refreshing tokens.
741                 self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
742                 should_upload = True
743                 self._local_collection.remove(filename)
744             elif cached_file_data['size'] == file_in_local_collection.size():
745                 # File already there, skip it.
746                 self.bytes_skipped += cached_file_data['size']
747             elif cached_file_data['size'] > file_in_local_collection.size():
748                 # File partially uploaded, resume!
749                 resume_offset = file_in_local_collection.size()
750                 self.bytes_skipped += resume_offset
751                 should_upload = True
752             else:
753                 # Inconsistent cache, re-upload the file
754                 should_upload = True
755                 self._local_collection.remove(filename)
756                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
757         # Local file differs from cached data, re-upload it.
758         else:
759             if file_in_local_collection:
760                 self._local_collection.remove(filename)
761             should_upload = True
762
763         if should_upload:
764             try:
765                 self._files_to_upload.append((source, resume_offset, filename))
766             except ArvPutUploadIsPending:
767                 # This could happen when running on dry-mode, close cache file to
768                 # avoid locking issues.
769                 self._cache_file.close()
770                 raise
771
772     def _upload_files(self):
773         for source, resume_offset, filename in self._files_to_upload:
774             with open(source, 'rb') as source_fd:
775                 with self._state_lock:
776                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
777                     self._state['files'][source]['size'] = os.path.getsize(source)
778                 if resume_offset > 0:
779                     # Start upload where we left off
780                     output = self._local_collection.open(filename, 'ab')
781                     source_fd.seek(resume_offset)
782                 else:
783                     # Start from scratch
784                     output = self._local_collection.open(filename, 'wb')
785                 self._write(source_fd, output)
786                 output.close(flush=False)
787
788     def _write(self, source_fd, output):
789         while True:
790             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
791             if not data:
792                 break
793             output.write(data)
794
795     def _my_collection(self):
796         return self._remote_collection if self.update else self._local_collection
797
798     def _setup_state(self, update_collection):
799         """
800         Create a new cache file or load a previously existing one.
801         """
802         # Load an already existing collection for update
803         if update_collection and re.match(arvados.util.collection_uuid_pattern,
804                                           update_collection):
805             try:
806                 self._remote_collection = arvados.collection.Collection(
807                     update_collection, api_client=self._api_client)
808             except arvados.errors.ApiError as error:
809                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
810             else:
811                 self.update = True
812         elif update_collection:
813             # Collection locator provided, but unknown format
814             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
815
816         if self.use_cache:
817             # Set up cache file name from input paths.
818             md5 = hashlib.md5()
819             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
820             realpaths = sorted(os.path.realpath(path) for path in self.paths)
821             md5.update(b'\0'.join([p.encode() for p in realpaths]))
822             if self.filename:
823                 md5.update(self.filename.encode())
824             cache_filename = md5.hexdigest()
825             cache_filepath = os.path.join(
826                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
827                 cache_filename)
828             if self.resume and os.path.exists(cache_filepath):
829                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
830                 self._cache_file = open(cache_filepath, 'a+')
831             else:
832                 # --no-resume means start with a empty cache file.
833                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
834                 self._cache_file = open(cache_filepath, 'w+')
835             self._cache_filename = self._cache_file.name
836             self._lock_file(self._cache_file)
837             self._cache_file.seek(0)
838
839         with self._state_lock:
840             if self.use_cache:
841                 try:
842                     self._state = json.load(self._cache_file)
843                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
844                         # Cache at least partially incomplete, set up new cache
845                         self._state = copy.deepcopy(self.EMPTY_STATE)
846                 except ValueError:
847                     # Cache file empty, set up new cache
848                     self._state = copy.deepcopy(self.EMPTY_STATE)
849             else:
850                 self.logger.info("No cache usage requested for this run.")
851                 # No cache file, set empty state
852                 self._state = copy.deepcopy(self.EMPTY_STATE)
853             # Load the previous manifest so we can check if files were modified remotely.
854             self._local_collection = arvados.collection.Collection(
855                 self._state['manifest'],
856                 replication_desired=self.replication_desired,
857                 put_threads=self.put_threads,
858                 api_client=self._api_client)
859
860     def collection_file_paths(self, col, path_prefix='.'):
861         """Return a list of file paths by recursively go through the entire collection `col`"""
862         file_paths = []
863         for name, item in listitems(col):
864             if isinstance(item, arvados.arvfile.ArvadosFile):
865                 file_paths.append(os.path.join(path_prefix, name))
866             elif isinstance(item, arvados.collection.Subcollection):
867                 new_prefix = os.path.join(path_prefix, name)
868                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
869         return file_paths
870
871     def _lock_file(self, fileobj):
872         try:
873             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
874         except IOError:
875             raise ResumeCacheConflict("{} locked".format(fileobj.name))
876
877     def _save_state(self):
878         """
879         Atomically save current state into cache.
880         """
881         with self._state_lock:
882             # We're not using copy.deepcopy() here because it's a lot slower
883             # than json.dumps(), and we're already needing JSON format to be
884             # saved on disk.
885             state = json.dumps(self._state)
886         try:
887             new_cache = tempfile.NamedTemporaryFile(
888                 mode='w+',
889                 dir=os.path.dirname(self._cache_filename), delete=False)
890             self._lock_file(new_cache)
891             new_cache.write(state)
892             new_cache.flush()
893             os.fsync(new_cache)
894             os.rename(new_cache.name, self._cache_filename)
895         except (IOError, OSError, ResumeCacheConflict) as error:
896             self.logger.error("There was a problem while saving the cache file: {}".format(error))
897             try:
898                 os.unlink(new_cache_name)
899             except NameError:  # mkstemp failed.
900                 pass
901         else:
902             self._cache_file.close()
903             self._cache_file = new_cache
904
905     def collection_name(self):
906         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
907
908     def manifest_locator(self):
909         return self._my_collection().manifest_locator()
910
911     def portable_data_hash(self):
912         pdh = self._my_collection().portable_data_hash()
913         m = self._my_collection().stripped_manifest().encode()
914         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
915         if pdh != local_pdh:
916             self.logger.warning("\n".join([
917                 "arv-put: API server provided PDH differs from local manifest.",
918                 "         This should not happen; showing API server version."]))
919         return pdh
920
921     def manifest_text(self, stream_name=".", strip=False, normalize=False):
922         return self._my_collection().manifest_text(stream_name, strip, normalize)
923
924     def _datablocks_on_item(self, item):
925         """
926         Return a list of datablock locators, recursively navigating
927         through subcollections
928         """
929         if isinstance(item, arvados.arvfile.ArvadosFile):
930             if item.size() == 0:
931                 # Empty file locator
932                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
933             else:
934                 locators = []
935                 for segment in item.segments():
936                     loc = segment.locator
937                     locators.append(loc)
938                 return locators
939         elif isinstance(item, arvados.collection.Collection):
940             l = [self._datablocks_on_item(x) for x in listvalues(item)]
941             # Fast list flattener method taken from:
942             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
943             return [loc for sublist in l for loc in sublist]
944         else:
945             return None
946
947     def data_locators(self):
948         with self._collection_lock:
949             # Make sure all datablocks are flushed before getting the locators
950             self._my_collection().manifest_text()
951             datablocks = self._datablocks_on_item(self._my_collection())
952         return datablocks
953
954 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
955                                                             os.getpid())
956
957 # Simulate glob.glob() matching behavior without the need to scan the filesystem
958 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
959 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
960 # so instead we're using it on every path component.
961 def pathname_match(pathname, pattern):
962     name = pathname.split(os.sep)
963     # Fix patterns like 'some/subdir/' or 'some//subdir'
964     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
965     if len(name) != len(pat):
966         return False
967     for i in range(len(name)):
968         if not fnmatch.fnmatch(name[i], pat[i]):
969             return False
970     return True
971
972 def machine_progress(bytes_written, bytes_expected):
973     return _machine_format.format(
974         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
975
976 def human_progress(bytes_written, bytes_expected):
977     if bytes_expected:
978         return "\r{}M / {}M {:.1%} ".format(
979             bytes_written >> 20, bytes_expected >> 20,
980             float(bytes_written) / bytes_expected)
981     else:
982         return "\r{} ".format(bytes_written)
983
984 def progress_writer(progress_func, outfile=sys.stderr):
985     def write_progress(bytes_written, bytes_expected):
986         outfile.write(progress_func(bytes_written, bytes_expected))
987     return write_progress
988
989 def desired_project_uuid(api_client, project_uuid, num_retries):
990     if not project_uuid:
991         query = api_client.users().current()
992     elif arvados.util.user_uuid_pattern.match(project_uuid):
993         query = api_client.users().get(uuid=project_uuid)
994     elif arvados.util.group_uuid_pattern.match(project_uuid):
995         query = api_client.groups().get(uuid=project_uuid)
996     else:
997         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
998     return query.execute(num_retries=num_retries)['uuid']
999
1000 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1001          install_sig_handlers=True):
1002     global api_client
1003
1004     args = parse_arguments(arguments)
1005     logger = logging.getLogger('arvados.arv_put')
1006     if args.silent:
1007         logger.setLevel(logging.WARNING)
1008     else:
1009         logger.setLevel(logging.INFO)
1010     status = 0
1011
1012     request_id = arvados.util.new_request_id()
1013
1014     formatter = ArvPutLogFormatter(request_id)
1015     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1016
1017     if api_client is None:
1018         api_client = arvados.api('v1', request_id=request_id)
1019
1020     if install_sig_handlers:
1021         arv_cmd.install_signal_handlers()
1022
1023     # Determine the name to use
1024     if args.name:
1025         if args.stream or args.raw:
1026             logger.error("Cannot use --name with --stream or --raw")
1027             sys.exit(1)
1028         elif args.update_collection:
1029             logger.error("Cannot use --name with --update-collection")
1030             sys.exit(1)
1031         collection_name = args.name
1032     else:
1033         collection_name = "Saved at {} by {}@{}".format(
1034             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1035             pwd.getpwuid(os.getuid()).pw_name,
1036             socket.gethostname())
1037
1038     if args.project_uuid and (args.stream or args.raw):
1039         logger.error("Cannot use --project-uuid with --stream or --raw")
1040         sys.exit(1)
1041
1042     # Determine the parent project
1043     try:
1044         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1045                                             args.retries)
1046     except (apiclient_errors.Error, ValueError) as error:
1047         logger.error(error)
1048         sys.exit(1)
1049
1050     if args.progress:
1051         reporter = progress_writer(human_progress)
1052     elif args.batch_progress:
1053         reporter = progress_writer(machine_progress)
1054     else:
1055         reporter = None
1056
1057     #  Split storage-classes argument
1058     storage_classes = None
1059     if args.storage_classes:
1060         storage_classes = args.storage_classes.strip().split(',')
1061         if len(storage_classes) > 1:
1062             logger.error("Multiple storage classes are not supported currently.")
1063             sys.exit(1)
1064
1065
1066     # Setup exclude regex from all the --exclude arguments provided
1067     name_patterns = []
1068     exclude_paths = []
1069     exclude_names = None
1070     if len(args.exclude) > 0:
1071         # We're supporting 2 kinds of exclusion patterns:
1072         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1073         #                            the name, wherever the file is on the tree)
1074         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1075         #                            entire path, and should be relative to
1076         #                            any input dir argument)
1077         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1078         #                            placed directly underneath the input dir)
1079         for p in args.exclude:
1080             # Only relative paths patterns allowed
1081             if p.startswith(os.sep):
1082                 logger.error("Cannot use absolute paths with --exclude")
1083                 sys.exit(1)
1084             if os.path.dirname(p):
1085                 # We don't support of path patterns with '..'
1086                 p_parts = p.split(os.sep)
1087                 if '..' in p_parts:
1088                     logger.error(
1089                         "Cannot use path patterns that include or '..'")
1090                     sys.exit(1)
1091                 # Path search pattern
1092                 exclude_paths.append(p)
1093             else:
1094                 # Name-only search pattern
1095                 name_patterns.append(p)
1096         # For name only matching, we can combine all patterns into a single
1097         # regexp, for better performance.
1098         exclude_names = re.compile('|'.join(
1099             [fnmatch.translate(p) for p in name_patterns]
1100         )) if len(name_patterns) > 0 else None
1101         # Show the user the patterns to be used, just in case they weren't
1102         # specified inside quotes and got changed by the shell expansion.
1103         logger.info("Exclude patterns: {}".format(args.exclude))
1104
1105     # If this is used by a human, and there's at least one directory to be
1106     # uploaded, the expected bytes calculation can take a moment.
1107     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1108         logger.info("Calculating upload size, this could take some time...")
1109     try:
1110         writer = ArvPutUploadJob(paths = args.paths,
1111                                  resume = args.resume,
1112                                  use_cache = args.use_cache,
1113                                  filename = args.filename,
1114                                  reporter = reporter,
1115                                  api_client = api_client,
1116                                  num_retries = args.retries,
1117                                  replication_desired = args.replication,
1118                                  put_threads = args.threads,
1119                                  name = collection_name,
1120                                  owner_uuid = project_uuid,
1121                                  ensure_unique_name = True,
1122                                  update_collection = args.update_collection,
1123                                  storage_classes=storage_classes,
1124                                  logger=logger,
1125                                  dry_run=args.dry_run,
1126                                  follow_links=args.follow_links,
1127                                  exclude_paths=exclude_paths,
1128                                  exclude_names=exclude_names)
1129     except ResumeCacheConflict:
1130         logger.error("\n".join([
1131             "arv-put: Another process is already uploading this data.",
1132             "         Use --no-cache if this is really what you want."]))
1133         sys.exit(1)
1134     except CollectionUpdateError as error:
1135         logger.error("\n".join([
1136             "arv-put: %s" % str(error)]))
1137         sys.exit(1)
1138     except ArvPutUploadIsPending:
1139         # Dry run check successful, return proper exit code.
1140         sys.exit(2)
1141     except ArvPutUploadNotPending:
1142         # No files pending for upload
1143         sys.exit(0)
1144     except PathDoesNotExistError as error:
1145         logger.error("\n".join([
1146             "arv-put: %s" % str(error)]))
1147         sys.exit(1)
1148
1149     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1150         logger.warning("\n".join([
1151             "arv-put: Resuming previous upload from last checkpoint.",
1152             "         Use the --no-resume option to start over."]))
1153
1154     if not args.dry_run:
1155         writer.report_progress()
1156     output = None
1157     try:
1158         writer.start(save_collection=not(args.stream or args.raw))
1159     except arvados.errors.ApiError as error:
1160         logger.error("\n".join([
1161             "arv-put: %s" % str(error)]))
1162         sys.exit(1)
1163
1164     if args.progress:  # Print newline to split stderr from stdout for humans.
1165         logger.info("\n")
1166
1167     if args.stream:
1168         if args.normalize:
1169             output = writer.manifest_text(normalize=True)
1170         else:
1171             output = writer.manifest_text()
1172     elif args.raw:
1173         output = ','.join(writer.data_locators())
1174     else:
1175         try:
1176             if args.update_collection:
1177                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1178             else:
1179                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1180             if args.portable_data_hash:
1181                 output = writer.portable_data_hash()
1182             else:
1183                 output = writer.manifest_locator()
1184         except apiclient_errors.Error as error:
1185             logger.error(
1186                 "arv-put: Error creating Collection on project: {}.".format(
1187                     error))
1188             status = 1
1189
1190     # Print the locator (uuid) of the new collection.
1191     if output is None:
1192         status = status or 1
1193     elif not args.silent:
1194         stdout.write(output)
1195         if not output.endswith('\n'):
1196             stdout.write('\n')
1197
1198     if install_sig_handlers:
1199         arv_cmd.restore_signal_handlers()
1200
1201     if status != 0:
1202         sys.exit(status)
1203
1204     # Success!
1205     return output
1206
1207
1208 if __name__ == '__main__':
1209     main()