Make sure that the libpam-arvados package puts its various files in the place
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34 from arvados.util import keep_locator_pattern
35
36 import arvados.commands._util as arv_cmd
37
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55                     default=-1, help=argparse.SUPPRESS)
56
57 _group.add_argument('--normalize', action='store_true',
58                     help="""
59 Normalize the manifest by re-ordering files and streams after writing
60 data.
61 """)
62
63 _group.add_argument('--dry-run', action='store_true', default=False,
64                     help="""
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
67 """)
68
69 _group = upload_opts.add_mutually_exclusive_group()
70
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
72                     help="""
73 Synonym for --stream.
74 """)
75
76 _group.add_argument('--stream', action='store_true',
77                     help="""
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
80 in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
146 """)
147
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
149                          help="""
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
152 to use 2 threads.
153 On high latency installations, using a greater number will improve
154 overall throughput.
155 """)
156
157 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
158                       action='append', help="""
159 Exclude files and directories whose names match the given glob pattern. When
160 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
161 directory, relative to the provided input dirs will be excluded.
162 When using a filename pattern like '*.txt', any text file will be excluded
163 no matter where it is placed.
164 For the special case of needing to exclude only files or dirs directly below
165 the given input directory, you can use a pattern like './exclude_this.gif'.
166 You can specify multiple patterns by using this argument more than once.
167 """)
168
169 _group = upload_opts.add_mutually_exclusive_group()
170 _group.add_argument('--follow-links', action='store_true', default=True,
171                     dest='follow_links', help="""
172 Follow file and directory symlinks (default).
173 """)
174 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
175                     help="""
176 Do not follow file and directory symlinks.
177 """)
178
179
180 run_opts = argparse.ArgumentParser(add_help=False)
181
182 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
183 Store the collection in the specified project, instead of your Home
184 project.
185 """)
186
187 run_opts.add_argument('--name', help="""
188 Save the collection with the specified name.
189 """)
190
191 _group = run_opts.add_mutually_exclusive_group()
192 _group.add_argument('--progress', action='store_true',
193                     help="""
194 Display human-readable progress on stderr (bytes and, if possible,
195 percentage of total data size). This is the default behavior when
196 stderr is a tty.
197 """)
198
199 _group.add_argument('--no-progress', action='store_true',
200                     help="""
201 Do not display human-readable progress on stderr, even if stderr is a
202 tty.
203 """)
204
205 _group.add_argument('--batch-progress', action='store_true',
206                     help="""
207 Display machine-readable progress on stderr (bytes and, if known,
208 total data size).
209 """)
210
211 run_opts.add_argument('--silent', action='store_true',
212                       help="""
213 Do not print any debug messages to console. (Any error messages will
214 still be displayed.)
215 """)
216
217 _group = run_opts.add_mutually_exclusive_group()
218 _group.add_argument('--resume', action='store_true', default=True,
219                     help="""
220 Continue interrupted uploads from cached state (default).
221 """)
222 _group.add_argument('--no-resume', action='store_false', dest='resume',
223                     help="""
224 Do not continue interrupted uploads from cached state.
225 """)
226
227 _group = run_opts.add_mutually_exclusive_group()
228 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
229                     help="""
230 Save upload state in a cache file for resuming (default).
231 """)
232 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
233                     help="""
234 Do not save upload state in a cache file for resuming.
235 """)
236
237 arg_parser = argparse.ArgumentParser(
238     description='Copy data from the local filesystem to Keep.',
239     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
240
241 def parse_arguments(arguments):
242     args = arg_parser.parse_args(arguments)
243
244     if len(args.paths) == 0:
245         args.paths = ['-']
246
247     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
248
249     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
250         if args.filename:
251             arg_parser.error("""
252     --filename argument cannot be used when storing a directory or
253     multiple files.
254     """)
255
256     # Turn on --progress by default if stderr is a tty.
257     if (not (args.batch_progress or args.no_progress or args.silent)
258         and os.isatty(sys.stderr.fileno())):
259         args.progress = True
260
261     # Turn off --resume (default) if --no-cache is used.
262     if not args.use_cache:
263         args.resume = False
264
265     if args.paths == ['-']:
266         if args.update_collection:
267             arg_parser.error("""
268     --update-collection cannot be used when reading from stdin.
269     """)
270         args.resume = False
271         args.use_cache = False
272         if not args.filename:
273             args.filename = 'stdin'
274
275     # Remove possible duplicated patterns
276     if len(args.exclude) > 0:
277         args.exclude = list(set(args.exclude))
278
279     return args
280
281
282 class PathDoesNotExistError(Exception):
283     pass
284
285
286 class CollectionUpdateError(Exception):
287     pass
288
289
290 class ResumeCacheConflict(Exception):
291     pass
292
293
294 class ResumeCacheInvalidError(Exception):
295     pass
296
297 class ArvPutArgumentConflict(Exception):
298     pass
299
300
301 class ArvPutUploadIsPending(Exception):
302     pass
303
304
305 class ArvPutUploadNotPending(Exception):
306     pass
307
308
309 class FileUploadList(list):
310     def __init__(self, dry_run=False):
311         list.__init__(self)
312         self.dry_run = dry_run
313
314     def append(self, other):
315         if self.dry_run:
316             raise ArvPutUploadIsPending()
317         super(FileUploadList, self).append(other)
318
319
320 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
321 class ArvPutLogFormatter(logging.Formatter):
322     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
323     err_fmtr = None
324     request_id_informed = False
325
326     def __init__(self, request_id):
327         self.err_fmtr = logging.Formatter(
328             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
329             arvados.log_date_format)
330
331     def format(self, record):
332         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
333             self.request_id_informed = True
334             return self.err_fmtr.format(record)
335         return self.std_fmtr.format(record)
336
337
338 class ResumeCache(object):
339     CACHE_DIR = '.cache/arvados/arv-put'
340
341     def __init__(self, file_spec):
342         self.cache_file = open(file_spec, 'a+')
343         self._lock_file(self.cache_file)
344         self.filename = self.cache_file.name
345
346     @classmethod
347     def make_path(cls, args):
348         md5 = hashlib.md5()
349         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
350         realpaths = sorted(os.path.realpath(path) for path in args.paths)
351         md5.update(b'\0'.join([p.encode() for p in realpaths]))
352         if any(os.path.isdir(path) for path in realpaths):
353             md5.update(b'-1')
354         elif args.filename:
355             md5.update(args.filename.encode())
356         return os.path.join(
357             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
358             md5.hexdigest())
359
360     def _lock_file(self, fileobj):
361         try:
362             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
363         except IOError:
364             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
365
366     def load(self):
367         self.cache_file.seek(0)
368         return json.load(self.cache_file)
369
370     def check_cache(self, api_client=None, num_retries=0):
371         try:
372             state = self.load()
373             locator = None
374             try:
375                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
376                     locator = state["_finished_streams"][0][1][0]
377                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
378                     locator = state["_current_stream_locators"][0]
379                 if locator is not None:
380                     kc = arvados.keep.KeepClient(api_client=api_client)
381                     kc.head(locator, num_retries=num_retries)
382             except Exception as e:
383                 self.restart()
384         except (ValueError):
385             pass
386
387     def save(self, data):
388         try:
389             new_cache_fd, new_cache_name = tempfile.mkstemp(
390                 dir=os.path.dirname(self.filename))
391             self._lock_file(new_cache_fd)
392             new_cache = os.fdopen(new_cache_fd, 'r+')
393             json.dump(data, new_cache)
394             os.rename(new_cache_name, self.filename)
395         except (IOError, OSError, ResumeCacheConflict):
396             try:
397                 os.unlink(new_cache_name)
398             except NameError:  # mkstemp failed.
399                 pass
400         else:
401             self.cache_file.close()
402             self.cache_file = new_cache
403
404     def close(self):
405         self.cache_file.close()
406
407     def destroy(self):
408         try:
409             os.unlink(self.filename)
410         except OSError as error:
411             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
412                 raise
413         self.close()
414
415     def restart(self):
416         self.destroy()
417         self.__init__(self.filename)
418
419
420 class ArvPutUploadJob(object):
421     CACHE_DIR = '.cache/arvados/arv-put'
422     EMPTY_STATE = {
423         'manifest' : None, # Last saved manifest checkpoint
424         'files' : {} # Previous run file list: {path : {size, mtime}}
425     }
426
427     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
428                  name=None, owner_uuid=None, api_client=None,
429                  ensure_unique_name=False, num_retries=None,
430                  put_threads=None, replication_desired=None, filename=None,
431                  update_time=60.0, update_collection=None, storage_classes=None,
432                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
433                  follow_links=True, exclude_paths=[], exclude_names=None):
434         self.paths = paths
435         self.resume = resume
436         self.use_cache = use_cache
437         self.update = False
438         self.reporter = reporter
439         # This will set to 0 before start counting, if no special files are going
440         # to be read.
441         self.bytes_expected = None
442         self.bytes_written = 0
443         self.bytes_skipped = 0
444         self.name = name
445         self.owner_uuid = owner_uuid
446         self.ensure_unique_name = ensure_unique_name
447         self.num_retries = num_retries
448         self.replication_desired = replication_desired
449         self.put_threads = put_threads
450         self.filename = filename
451         self.storage_classes = storage_classes
452         self._api_client = api_client
453         self._state_lock = threading.Lock()
454         self._state = None # Previous run state (file list & manifest)
455         self._current_files = [] # Current run file list
456         self._cache_file = None
457         self._collection_lock = threading.Lock()
458         self._remote_collection = None # Collection being updated (if asked)
459         self._local_collection = None # Collection from previous run manifest
460         self._file_paths = set() # Files to be updated in remote collection
461         self._stop_checkpointer = threading.Event()
462         self._checkpointer = threading.Thread(target=self._update_task)
463         self._checkpointer.daemon = True
464         self._update_task_time = update_time  # How many seconds wait between update runs
465         self._files_to_upload = FileUploadList(dry_run=dry_run)
466         self._upload_started = False
467         self.logger = logger
468         self.dry_run = dry_run
469         self._checkpoint_before_quit = True
470         self.follow_links = follow_links
471         self.exclude_paths = exclude_paths
472         self.exclude_names = exclude_names
473
474         if not self.use_cache and self.resume:
475             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
476
477         # Check for obvious dry-run responses
478         if self.dry_run and (not self.use_cache or not self.resume):
479             raise ArvPutUploadIsPending()
480
481         # Load cached data if any and if needed
482         self._setup_state(update_collection)
483
484         # Build the upload file list, excluding requested files and counting the
485         # bytes expected to be uploaded.
486         self._build_upload_list()
487
488     def _build_upload_list(self):
489         """
490         Scan the requested paths to count file sizes, excluding requested files
491         and dirs and building the upload file list.
492         """
493         # If there aren't special files to be read, reset total bytes count to zero
494         # to start counting.
495         if not any([p for p in self.paths
496                     if not (os.path.isfile(p) or os.path.isdir(p))]):
497             self.bytes_expected = 0
498
499         for path in self.paths:
500             # Test for stdin first, in case some file named '-' exist
501             if path == '-':
502                 if self.dry_run:
503                     raise ArvPutUploadIsPending()
504                 self._write_stdin(self.filename or 'stdin')
505             elif not os.path.exists(path):
506                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
507             elif os.path.isdir(path):
508                 # Use absolute paths on cache index so CWD doesn't interfere
509                 # with the caching logic.
510                 orig_path = path
511                 path = os.path.abspath(path)
512                 if orig_path[-1:] == os.sep:
513                     # When passing a directory reference with a trailing slash,
514                     # its contents should be uploaded directly to the
515                     # collection's root.
516                     prefixdir = path
517                 else:
518                     # When passing a directory reference with no trailing slash,
519                     # upload the directory to the collection's root.
520                     prefixdir = os.path.dirname(path)
521                 prefixdir += os.sep
522                 for root, dirs, files in os.walk(path,
523                                                  followlinks=self.follow_links):
524                     root_relpath = os.path.relpath(root, path)
525                     if root_relpath == '.':
526                         root_relpath = ''
527                     # Exclude files/dirs by full path matching pattern
528                     if self.exclude_paths:
529                         dirs[:] = [d for d in dirs
530                                    if not any(pathname_match(
531                                            os.path.join(root_relpath, d), pat)
532                                               for pat in self.exclude_paths)]
533                         files = [f for f in files
534                                  if not any(pathname_match(
535                                          os.path.join(root_relpath, f), pat)
536                                             for pat in self.exclude_paths)]
537                     # Exclude files/dirs by name matching pattern
538                     if self.exclude_names is not None:
539                         dirs[:] = [d for d in dirs
540                                    if not self.exclude_names.match(d)]
541                         files = [f for f in files
542                                  if not self.exclude_names.match(f)]
543                     # Make os.walk()'s dir traversing order deterministic
544                     dirs.sort()
545                     files.sort()
546                     for f in files:
547                         filepath = os.path.join(root, f)
548                         # Add its size to the total bytes count (if applicable)
549                         if self.follow_links or (not os.path.islink(filepath)):
550                             if self.bytes_expected is not None:
551                                 self.bytes_expected += os.path.getsize(filepath)
552                         self._check_file(filepath,
553                                          os.path.join(root[len(prefixdir):], f))
554             else:
555                 filepath = os.path.abspath(path)
556                 # Add its size to the total bytes count (if applicable)
557                 if self.follow_links or (not os.path.islink(filepath)):
558                     if self.bytes_expected is not None:
559                         self.bytes_expected += os.path.getsize(filepath)
560                 self._check_file(filepath,
561                                  self.filename or os.path.basename(path))
562         # If dry-mode is on, and got up to this point, then we should notify that
563         # there aren't any file to upload.
564         if self.dry_run:
565             raise ArvPutUploadNotPending()
566         # Remove local_collection's files that don't exist locally anymore, so the
567         # bytes_written count is correct.
568         for f in self.collection_file_paths(self._local_collection,
569                                             path_prefix=""):
570             if f != 'stdin' and f != self.filename and not f in self._file_paths:
571                 self._local_collection.remove(f)
572
573     def start(self, save_collection):
574         """
575         Start supporting thread & file uploading
576         """
577         self._checkpointer.start()
578         try:
579             # Update bytes_written from current local collection and
580             # report initial progress.
581             self._update()
582             # Actual file upload
583             self._upload_started = True # Used by the update thread to start checkpointing
584             self._upload_files()
585         except (SystemExit, Exception) as e:
586             self._checkpoint_before_quit = False
587             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
588             # Note: We're expecting SystemExit instead of
589             # KeyboardInterrupt because we have a custom signal
590             # handler in place that raises SystemExit with the catched
591             # signal's code.
592             if isinstance(e, PathDoesNotExistError):
593                 # We aren't interested in the traceback for this case
594                 pass
595             elif not isinstance(e, SystemExit) or e.code != -2:
596                 self.logger.warning("Abnormal termination:\n{}".format(
597                     traceback.format_exc()))
598             raise
599         finally:
600             if not self.dry_run:
601                 # Stop the thread before doing anything else
602                 self._stop_checkpointer.set()
603                 self._checkpointer.join()
604                 if self._checkpoint_before_quit:
605                     # Commit all pending blocks & one last _update()
606                     self._local_collection.manifest_text()
607                     self._update(final=True)
608                     if save_collection:
609                         self.save_collection()
610             if self.use_cache:
611                 self._cache_file.close()
612
613     def save_collection(self):
614         if self.update:
615             # Check if files should be updated on the remote collection.
616             for fp in self._file_paths:
617                 remote_file = self._remote_collection.find(fp)
618                 if not remote_file:
619                     # File don't exist on remote collection, copy it.
620                     self._remote_collection.copy(fp, fp, self._local_collection)
621                 elif remote_file != self._local_collection.find(fp):
622                     # A different file exist on remote collection, overwrite it.
623                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
624                 else:
625                     # The file already exist on remote collection, skip it.
626                     pass
627             self._remote_collection.save(storage_classes=self.storage_classes,
628                                          num_retries=self.num_retries)
629         else:
630             if self.storage_classes is None:
631                 self.storage_classes = ['default']
632             self._local_collection.save_new(
633                 name=self.name, owner_uuid=self.owner_uuid,
634                 storage_classes=self.storage_classes,
635                 ensure_unique_name=self.ensure_unique_name,
636                 num_retries=self.num_retries)
637
638     def destroy_cache(self):
639         if self.use_cache:
640             try:
641                 os.unlink(self._cache_filename)
642             except OSError as error:
643                 # That's what we wanted anyway.
644                 if error.errno != errno.ENOENT:
645                     raise
646             self._cache_file.close()
647
648     def _collection_size(self, collection):
649         """
650         Recursively get the total size of the collection
651         """
652         size = 0
653         for item in listvalues(collection):
654             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
655                 size += self._collection_size(item)
656             else:
657                 size += item.size()
658         return size
659
660     def _update_task(self):
661         """
662         Periodically called support task. File uploading is
663         asynchronous so we poll status from the collection.
664         """
665         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
666             self._update()
667
668     def _update(self, final=False):
669         """
670         Update cached manifest text and report progress.
671         """
672         if self._upload_started:
673             with self._collection_lock:
674                 self.bytes_written = self._collection_size(self._local_collection)
675                 if self.use_cache:
676                     if final:
677                         manifest = self._local_collection.manifest_text()
678                     else:
679                         # Get the manifest text without comitting pending blocks
680                         manifest = self._local_collection.manifest_text(strip=False,
681                                                                         normalize=False,
682                                                                         only_committed=True)
683                     # Update cache
684                     with self._state_lock:
685                         self._state['manifest'] = manifest
686             if self.use_cache:
687                 try:
688                     self._save_state()
689                 except Exception as e:
690                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
691         else:
692             self.bytes_written = self.bytes_skipped
693         # Call the reporter, if any
694         self.report_progress()
695
696     def report_progress(self):
697         if self.reporter is not None:
698             self.reporter(self.bytes_written, self.bytes_expected)
699
700     def _write_stdin(self, filename):
701         output = self._local_collection.open(filename, 'wb')
702         self._write(sys.stdin, output)
703         output.close()
704
705     def _check_file(self, source, filename):
706         """
707         Check if this file needs to be uploaded
708         """
709         # Ignore symlinks when requested
710         if (not self.follow_links) and os.path.islink(source):
711             return
712         resume_offset = 0
713         should_upload = False
714         new_file_in_cache = False
715         # Record file path for updating the remote collection before exiting
716         self._file_paths.add(filename)
717
718         with self._state_lock:
719             # If no previous cached data on this file, store it for an eventual
720             # repeated run.
721             if source not in self._state['files']:
722                 self._state['files'][source] = {
723                     'mtime': os.path.getmtime(source),
724                     'size' : os.path.getsize(source)
725                 }
726                 new_file_in_cache = True
727             cached_file_data = self._state['files'][source]
728
729         # Check if file was already uploaded (at least partially)
730         file_in_local_collection = self._local_collection.find(filename)
731
732         # If not resuming, upload the full file.
733         if not self.resume:
734             should_upload = True
735         # New file detected from last run, upload it.
736         elif new_file_in_cache:
737             should_upload = True
738         # Local file didn't change from last run.
739         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
740             if not file_in_local_collection:
741                 # File not uploaded yet, upload it completely
742                 should_upload = True
743             elif file_in_local_collection.permission_expired():
744                 # Permission token expired, re-upload file. This will change whenever
745                 # we have a API for refreshing tokens.
746                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
747                 should_upload = True
748                 self._local_collection.remove(filename)
749             elif cached_file_data['size'] == file_in_local_collection.size():
750                 # File already there, skip it.
751                 self.bytes_skipped += cached_file_data['size']
752             elif cached_file_data['size'] > file_in_local_collection.size():
753                 # File partially uploaded, resume!
754                 resume_offset = file_in_local_collection.size()
755                 self.bytes_skipped += resume_offset
756                 should_upload = True
757             else:
758                 # Inconsistent cache, re-upload the file
759                 should_upload = True
760                 self._local_collection.remove(filename)
761                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
762         # Local file differs from cached data, re-upload it.
763         else:
764             if file_in_local_collection:
765                 self._local_collection.remove(filename)
766             should_upload = True
767
768         if should_upload:
769             try:
770                 self._files_to_upload.append((source, resume_offset, filename))
771             except ArvPutUploadIsPending:
772                 # This could happen when running on dry-mode, close cache file to
773                 # avoid locking issues.
774                 self._cache_file.close()
775                 raise
776
777     def _upload_files(self):
778         for source, resume_offset, filename in self._files_to_upload:
779             with open(source, 'rb') as source_fd:
780                 with self._state_lock:
781                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
782                     self._state['files'][source]['size'] = os.path.getsize(source)
783                 if resume_offset > 0:
784                     # Start upload where we left off
785                     output = self._local_collection.open(filename, 'ab')
786                     source_fd.seek(resume_offset)
787                 else:
788                     # Start from scratch
789                     output = self._local_collection.open(filename, 'wb')
790                 self._write(source_fd, output)
791                 output.close(flush=False)
792
793     def _write(self, source_fd, output):
794         while True:
795             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
796             if not data:
797                 break
798             output.write(data)
799
800     def _my_collection(self):
801         return self._remote_collection if self.update else self._local_collection
802
803     def _get_cache_filepath(self):
804         # Set up cache file name from input paths.
805         md5 = hashlib.md5()
806         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
807         realpaths = sorted(os.path.realpath(path) for path in self.paths)
808         md5.update(b'\0'.join([p.encode() for p in realpaths]))
809         if self.filename:
810             md5.update(self.filename.encode())
811         cache_filename = md5.hexdigest()
812         cache_filepath = os.path.join(
813             arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
814             cache_filename)
815         return cache_filepath
816
817     def _setup_state(self, update_collection):
818         """
819         Create a new cache file or load a previously existing one.
820         """
821         # Load an already existing collection for update
822         if update_collection and re.match(arvados.util.collection_uuid_pattern,
823                                           update_collection):
824             try:
825                 self._remote_collection = arvados.collection.Collection(
826                     update_collection, api_client=self._api_client)
827             except arvados.errors.ApiError as error:
828                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
829             else:
830                 self.update = True
831         elif update_collection:
832             # Collection locator provided, but unknown format
833             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
834
835         if self.use_cache:
836             cache_filepath = self._get_cache_filepath()
837             if self.resume and os.path.exists(cache_filepath):
838                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
839                 self._cache_file = open(cache_filepath, 'a+')
840             else:
841                 # --no-resume means start with a empty cache file.
842                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
843                 self._cache_file = open(cache_filepath, 'w+')
844             self._cache_filename = self._cache_file.name
845             self._lock_file(self._cache_file)
846             self._cache_file.seek(0)
847
848         with self._state_lock:
849             if self.use_cache:
850                 try:
851                     self._state = json.load(self._cache_file)
852                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
853                         # Cache at least partially incomplete, set up new cache
854                         self._state = copy.deepcopy(self.EMPTY_STATE)
855                 except ValueError:
856                     # Cache file empty, set up new cache
857                     self._state = copy.deepcopy(self.EMPTY_STATE)
858             else:
859                 self.logger.info("No cache usage requested for this run.")
860                 # No cache file, set empty state
861                 self._state = copy.deepcopy(self.EMPTY_STATE)
862             if not self._cached_manifest_valid():
863                 raise ResumeCacheInvalidError()
864             # Load the previous manifest so we can check if files were modified remotely.
865             self._local_collection = arvados.collection.Collection(
866                 self._state['manifest'],
867                 replication_desired=self.replication_desired,
868                 put_threads=self.put_threads,
869                 api_client=self._api_client)
870
871     def _cached_manifest_valid(self):
872         """
873         Validate the oldest non-expired block signature to check if cached manifest
874         is usable: checking if the cached manifest was not created with a different
875         arvados account.
876         """
877         if self._state.get('manifest', None) is None:
878             # No cached manifest yet, all good.
879             return True
880         now = datetime.datetime.utcnow()
881         oldest_exp = None
882         oldest_loc = None
883         block_found = False
884         for m in keep_locator_pattern.finditer(self._state['manifest']):
885             loc = m.group(0)
886             try:
887                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
888             except IndexError:
889                 # Locator without signature
890                 continue
891             block_found = True
892             if exp > now and (oldest_exp is None or exp < oldest_exp):
893                 oldest_exp = exp
894                 oldest_loc = loc
895         if not block_found:
896             # No block signatures found => no invalid block signatures.
897             return True
898         if oldest_loc is None:
899             # Locator signatures found, but all have expired.
900             # Reset the cache and move on.
901             self.logger.info('Cache expired, starting from scratch.')
902             self._state['manifest'] = ''
903             return True
904         kc = arvados.KeepClient(api_client=self._api_client,
905                                 num_retries=self.num_retries)
906         try:
907             kc.head(oldest_loc)
908         except arvados.errors.KeepRequestError:
909             # Something is wrong, cached manifest is not valid.
910             return False
911         return True
912
913     def collection_file_paths(self, col, path_prefix='.'):
914         """Return a list of file paths by recursively go through the entire collection `col`"""
915         file_paths = []
916         for name, item in listitems(col):
917             if isinstance(item, arvados.arvfile.ArvadosFile):
918                 file_paths.append(os.path.join(path_prefix, name))
919             elif isinstance(item, arvados.collection.Subcollection):
920                 new_prefix = os.path.join(path_prefix, name)
921                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
922         return file_paths
923
924     def _lock_file(self, fileobj):
925         try:
926             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
927         except IOError:
928             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
929
930     def _save_state(self):
931         """
932         Atomically save current state into cache.
933         """
934         with self._state_lock:
935             # We're not using copy.deepcopy() here because it's a lot slower
936             # than json.dumps(), and we're already needing JSON format to be
937             # saved on disk.
938             state = json.dumps(self._state)
939         try:
940             new_cache = tempfile.NamedTemporaryFile(
941                 mode='w+',
942                 dir=os.path.dirname(self._cache_filename), delete=False)
943             self._lock_file(new_cache)
944             new_cache.write(state)
945             new_cache.flush()
946             os.fsync(new_cache)
947             os.rename(new_cache.name, self._cache_filename)
948         except (IOError, OSError, ResumeCacheConflict) as error:
949             self.logger.error("There was a problem while saving the cache file: {}".format(error))
950             try:
951                 os.unlink(new_cache_name)
952             except NameError:  # mkstemp failed.
953                 pass
954         else:
955             self._cache_file.close()
956             self._cache_file = new_cache
957
958     def collection_name(self):
959         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
960
961     def manifest_locator(self):
962         return self._my_collection().manifest_locator()
963
964     def portable_data_hash(self):
965         pdh = self._my_collection().portable_data_hash()
966         m = self._my_collection().stripped_manifest().encode()
967         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
968         if pdh != local_pdh:
969             self.logger.warning("\n".join([
970                 "arv-put: API server provided PDH differs from local manifest.",
971                 "         This should not happen; showing API server version."]))
972         return pdh
973
974     def manifest_text(self, stream_name=".", strip=False, normalize=False):
975         return self._my_collection().manifest_text(stream_name, strip, normalize)
976
977     def _datablocks_on_item(self, item):
978         """
979         Return a list of datablock locators, recursively navigating
980         through subcollections
981         """
982         if isinstance(item, arvados.arvfile.ArvadosFile):
983             if item.size() == 0:
984                 # Empty file locator
985                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
986             else:
987                 locators = []
988                 for segment in item.segments():
989                     loc = segment.locator
990                     locators.append(loc)
991                 return locators
992         elif isinstance(item, arvados.collection.Collection):
993             l = [self._datablocks_on_item(x) for x in listvalues(item)]
994             # Fast list flattener method taken from:
995             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
996             return [loc for sublist in l for loc in sublist]
997         else:
998             return None
999
1000     def data_locators(self):
1001         with self._collection_lock:
1002             # Make sure all datablocks are flushed before getting the locators
1003             self._my_collection().manifest_text()
1004             datablocks = self._datablocks_on_item(self._my_collection())
1005         return datablocks
1006
1007 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1008                                                             os.getpid())
1009
1010 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1011 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1012 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1013 # so instead we're using it on every path component.
1014 def pathname_match(pathname, pattern):
1015     name = pathname.split(os.sep)
1016     # Fix patterns like 'some/subdir/' or 'some//subdir'
1017     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1018     if len(name) != len(pat):
1019         return False
1020     for i in range(len(name)):
1021         if not fnmatch.fnmatch(name[i], pat[i]):
1022             return False
1023     return True
1024
1025 def machine_progress(bytes_written, bytes_expected):
1026     return _machine_format.format(
1027         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1028
1029 def human_progress(bytes_written, bytes_expected):
1030     if bytes_expected:
1031         return "\r{}M / {}M {:.1%} ".format(
1032             bytes_written >> 20, bytes_expected >> 20,
1033             float(bytes_written) / bytes_expected)
1034     else:
1035         return "\r{} ".format(bytes_written)
1036
1037 def progress_writer(progress_func, outfile=sys.stderr):
1038     def write_progress(bytes_written, bytes_expected):
1039         outfile.write(progress_func(bytes_written, bytes_expected))
1040     return write_progress
1041
1042 def desired_project_uuid(api_client, project_uuid, num_retries):
1043     if not project_uuid:
1044         query = api_client.users().current()
1045     elif arvados.util.user_uuid_pattern.match(project_uuid):
1046         query = api_client.users().get(uuid=project_uuid)
1047     elif arvados.util.group_uuid_pattern.match(project_uuid):
1048         query = api_client.groups().get(uuid=project_uuid)
1049     else:
1050         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1051     return query.execute(num_retries=num_retries)['uuid']
1052
1053 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1054          install_sig_handlers=True):
1055     global api_client
1056
1057     args = parse_arguments(arguments)
1058     logger = logging.getLogger('arvados.arv_put')
1059     if args.silent:
1060         logger.setLevel(logging.WARNING)
1061     else:
1062         logger.setLevel(logging.INFO)
1063     status = 0
1064
1065     request_id = arvados.util.new_request_id()
1066
1067     formatter = ArvPutLogFormatter(request_id)
1068     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1069
1070     if api_client is None:
1071         api_client = arvados.api('v1', request_id=request_id)
1072
1073     if install_sig_handlers:
1074         arv_cmd.install_signal_handlers()
1075
1076     # Determine the name to use
1077     if args.name:
1078         if args.stream or args.raw:
1079             logger.error("Cannot use --name with --stream or --raw")
1080             sys.exit(1)
1081         elif args.update_collection:
1082             logger.error("Cannot use --name with --update-collection")
1083             sys.exit(1)
1084         collection_name = args.name
1085     else:
1086         collection_name = "Saved at {} by {}@{}".format(
1087             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1088             pwd.getpwuid(os.getuid()).pw_name,
1089             socket.gethostname())
1090
1091     if args.project_uuid and (args.stream or args.raw):
1092         logger.error("Cannot use --project-uuid with --stream or --raw")
1093         sys.exit(1)
1094
1095     # Determine the parent project
1096     try:
1097         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1098                                             args.retries)
1099     except (apiclient_errors.Error, ValueError) as error:
1100         logger.error(error)
1101         sys.exit(1)
1102
1103     if args.progress:
1104         reporter = progress_writer(human_progress)
1105     elif args.batch_progress:
1106         reporter = progress_writer(machine_progress)
1107     else:
1108         reporter = None
1109
1110     #  Split storage-classes argument
1111     storage_classes = None
1112     if args.storage_classes:
1113         storage_classes = args.storage_classes.strip().split(',')
1114         if len(storage_classes) > 1:
1115             logger.error("Multiple storage classes are not supported currently.")
1116             sys.exit(1)
1117
1118
1119     # Setup exclude regex from all the --exclude arguments provided
1120     name_patterns = []
1121     exclude_paths = []
1122     exclude_names = None
1123     if len(args.exclude) > 0:
1124         # We're supporting 2 kinds of exclusion patterns:
1125         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1126         #                            the name, wherever the file is on the tree)
1127         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1128         #                            entire path, and should be relative to
1129         #                            any input dir argument)
1130         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1131         #                            placed directly underneath the input dir)
1132         for p in args.exclude:
1133             # Only relative paths patterns allowed
1134             if p.startswith(os.sep):
1135                 logger.error("Cannot use absolute paths with --exclude")
1136                 sys.exit(1)
1137             if os.path.dirname(p):
1138                 # We don't support of path patterns with '..'
1139                 p_parts = p.split(os.sep)
1140                 if '..' in p_parts:
1141                     logger.error(
1142                         "Cannot use path patterns that include or '..'")
1143                     sys.exit(1)
1144                 # Path search pattern
1145                 exclude_paths.append(p)
1146             else:
1147                 # Name-only search pattern
1148                 name_patterns.append(p)
1149         # For name only matching, we can combine all patterns into a single
1150         # regexp, for better performance.
1151         exclude_names = re.compile('|'.join(
1152             [fnmatch.translate(p) for p in name_patterns]
1153         )) if len(name_patterns) > 0 else None
1154         # Show the user the patterns to be used, just in case they weren't
1155         # specified inside quotes and got changed by the shell expansion.
1156         logger.info("Exclude patterns: {}".format(args.exclude))
1157
1158     # If this is used by a human, and there's at least one directory to be
1159     # uploaded, the expected bytes calculation can take a moment.
1160     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1161         logger.info("Calculating upload size, this could take some time...")
1162     try:
1163         writer = ArvPutUploadJob(paths = args.paths,
1164                                  resume = args.resume,
1165                                  use_cache = args.use_cache,
1166                                  filename = args.filename,
1167                                  reporter = reporter,
1168                                  api_client = api_client,
1169                                  num_retries = args.retries,
1170                                  replication_desired = args.replication,
1171                                  put_threads = args.threads,
1172                                  name = collection_name,
1173                                  owner_uuid = project_uuid,
1174                                  ensure_unique_name = True,
1175                                  update_collection = args.update_collection,
1176                                  storage_classes=storage_classes,
1177                                  logger=logger,
1178                                  dry_run=args.dry_run,
1179                                  follow_links=args.follow_links,
1180                                  exclude_paths=exclude_paths,
1181                                  exclude_names=exclude_names)
1182     except ResumeCacheConflict:
1183         logger.error("\n".join([
1184             "arv-put: Another process is already uploading this data.",
1185             "         Use --no-cache if this is really what you want."]))
1186         sys.exit(1)
1187     except ResumeCacheInvalidError:
1188         logger.error("\n".join([
1189             "arv-put: Resume cache contains invalid signature: it may have expired",
1190             "         or been created with another Arvados user's credentials.",
1191             "         Switch user or use one of the following options to restart upload:",
1192             "         --no-resume to start a new resume cache.",
1193             "         --no-cache to disable resume cache."]))
1194         sys.exit(1)
1195     except CollectionUpdateError as error:
1196         logger.error("\n".join([
1197             "arv-put: %s" % str(error)]))
1198         sys.exit(1)
1199     except ArvPutUploadIsPending:
1200         # Dry run check successful, return proper exit code.
1201         sys.exit(2)
1202     except ArvPutUploadNotPending:
1203         # No files pending for upload
1204         sys.exit(0)
1205     except PathDoesNotExistError as error:
1206         logger.error("\n".join([
1207             "arv-put: %s" % str(error)]))
1208         sys.exit(1)
1209
1210     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1211         logger.warning("\n".join([
1212             "arv-put: Resuming previous upload from last checkpoint.",
1213             "         Use the --no-resume option to start over."]))
1214
1215     if not args.dry_run:
1216         writer.report_progress()
1217     output = None
1218     try:
1219         writer.start(save_collection=not(args.stream or args.raw))
1220     except arvados.errors.ApiError as error:
1221         logger.error("\n".join([
1222             "arv-put: %s" % str(error)]))
1223         sys.exit(1)
1224
1225     if args.progress:  # Print newline to split stderr from stdout for humans.
1226         logger.info("\n")
1227
1228     if args.stream:
1229         if args.normalize:
1230             output = writer.manifest_text(normalize=True)
1231         else:
1232             output = writer.manifest_text()
1233     elif args.raw:
1234         output = ','.join(writer.data_locators())
1235     else:
1236         try:
1237             if args.update_collection:
1238                 logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
1239             else:
1240                 logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
1241             if args.portable_data_hash:
1242                 output = writer.portable_data_hash()
1243             else:
1244                 output = writer.manifest_locator()
1245         except apiclient_errors.Error as error:
1246             logger.error(
1247                 "arv-put: Error creating Collection on project: {}.".format(
1248                     error))
1249             status = 1
1250
1251     # Print the locator (uuid) of the new collection.
1252     if output is None:
1253         status = status or 1
1254     elif not args.silent:
1255         stdout.write(output)
1256         if not output.endswith('\n'):
1257             stdout.write('\n')
1258
1259     if install_sig_handlers:
1260         arv_cmd.restore_signal_handlers()
1261
1262     if status != 0:
1263         sys.exit(status)
1264
1265     # Success!
1266     return output
1267
1268
1269 if __name__ == '__main__':
1270     main()