Merge branch '12167-arvput-log-on-signals'
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34
35 import arvados.commands._util as arv_cmd
36
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55                     default=-1, help=argparse.SUPPRESS)
56
57 _group.add_argument('--normalize', action='store_true',
58                     help="""
59 Normalize the manifest by re-ordering files and streams after writing
60 data.
61 """)
62
63 _group.add_argument('--dry-run', action='store_true', default=False,
64                     help="""
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
67 """)
68
69 _group = upload_opts.add_mutually_exclusive_group()
70
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
72                     help="""
73 Synonym for --stream.
74 """)
75
76 _group.add_argument('--stream', action='store_true',
77                     help="""
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
80 in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
145                          help="""
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
148 to use 2 threads.
149 On high latency installations, using a greater number will improve
150 overall throughput.
151 """)
152
153 run_opts = argparse.ArgumentParser(add_help=False)
154
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
157 project.
158 """)
159
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
162 """)
163
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165                       action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 For the special case of needing to exclude only files or dirs directly below
172 the given input directory, you can use a pattern like './exclude_this.gif'.
173 You can specify multiple patterns by using this argument more than once.
174 """)
175
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--progress', action='store_true',
178                     help="""
179 Display human-readable progress on stderr (bytes and, if possible,
180 percentage of total data size). This is the default behavior when
181 stderr is a tty.
182 """)
183
184 _group.add_argument('--no-progress', action='store_true',
185                     help="""
186 Do not display human-readable progress on stderr, even if stderr is a
187 tty.
188 """)
189
190 _group.add_argument('--batch-progress', action='store_true',
191                     help="""
192 Display machine-readable progress on stderr (bytes and, if known,
193 total data size).
194 """)
195
196 run_opts.add_argument('--silent', action='store_true',
197                       help="""
198 Do not print any debug messages to console. (Any error messages will
199 still be displayed.)
200 """)
201
202 _group = run_opts.add_mutually_exclusive_group()
203 _group.add_argument('--resume', action='store_true', default=True,
204                     help="""
205 Continue interrupted uploads from cached state (default).
206 """)
207 _group.add_argument('--no-resume', action='store_false', dest='resume',
208                     help="""
209 Do not continue interrupted uploads from cached state.
210 """)
211
212 _group = run_opts.add_mutually_exclusive_group()
213 _group.add_argument('--follow-links', action='store_true', default=True,
214                     dest='follow_links', help="""
215 Follow file and directory symlinks (default).
216 """)
217 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
218                     help="""
219 Do not follow file and directory symlinks.
220 """)
221
222 _group = run_opts.add_mutually_exclusive_group()
223 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
224                     help="""
225 Save upload state in a cache file for resuming (default).
226 """)
227 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
228                     help="""
229 Do not save upload state in a cache file for resuming.
230 """)
231
232 arg_parser = argparse.ArgumentParser(
233     description='Copy data from the local filesystem to Keep.',
234     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
235
236 def parse_arguments(arguments):
237     args = arg_parser.parse_args(arguments)
238
239     if len(args.paths) == 0:
240         args.paths = ['-']
241
242     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
243
244     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
245         if args.filename:
246             arg_parser.error("""
247     --filename argument cannot be used when storing a directory or
248     multiple files.
249     """)
250
251     # Turn on --progress by default if stderr is a tty.
252     if (not (args.batch_progress or args.no_progress or args.silent)
253         and os.isatty(sys.stderr.fileno())):
254         args.progress = True
255
256     # Turn off --resume (default) if --no-cache is used.
257     if not args.use_cache:
258         args.resume = False
259
260     if args.paths == ['-']:
261         if args.update_collection:
262             arg_parser.error("""
263     --update-collection cannot be used when reading from stdin.
264     """)
265         args.resume = False
266         args.use_cache = False
267         if not args.filename:
268             args.filename = 'stdin'
269
270     # Remove possible duplicated patterns
271     if len(args.exclude) > 0:
272         args.exclude = list(set(args.exclude))
273
274     return args
275
276
277 class PathDoesNotExistError(Exception):
278     pass
279
280
281 class CollectionUpdateError(Exception):
282     pass
283
284
285 class ResumeCacheConflict(Exception):
286     pass
287
288
289 class ArvPutArgumentConflict(Exception):
290     pass
291
292
293 class ArvPutUploadIsPending(Exception):
294     pass
295
296
297 class ArvPutUploadNotPending(Exception):
298     pass
299
300
301 class FileUploadList(list):
302     def __init__(self, dry_run=False):
303         list.__init__(self)
304         self.dry_run = dry_run
305
306     def append(self, other):
307         if self.dry_run:
308             raise ArvPutUploadIsPending()
309         super(FileUploadList, self).append(other)
310
311
312 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
313 class ArvPutLogFormatter(logging.Formatter):
314     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
315     err_fmtr = None
316     request_id_informed = False
317
318     def __init__(self, request_id):
319         self.err_fmtr = logging.Formatter(
320             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
321             arvados.log_date_format)
322
323     def format(self, record):
324         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
325             self.request_id_informed = True
326             return self.err_fmtr.format(record)
327         return self.std_fmtr.format(record)
328
329
330 class ResumeCache(object):
331     CACHE_DIR = '.cache/arvados/arv-put'
332
333     def __init__(self, file_spec):
334         self.cache_file = open(file_spec, 'a+')
335         self._lock_file(self.cache_file)
336         self.filename = self.cache_file.name
337
338     @classmethod
339     def make_path(cls, args):
340         md5 = hashlib.md5()
341         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
342         realpaths = sorted(os.path.realpath(path) for path in args.paths)
343         md5.update(b'\0'.join([p.encode() for p in realpaths]))
344         if any(os.path.isdir(path) for path in realpaths):
345             md5.update(b'-1')
346         elif args.filename:
347             md5.update(args.filename.encode())
348         return os.path.join(
349             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
350             md5.hexdigest())
351
352     def _lock_file(self, fileobj):
353         try:
354             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
355         except IOError:
356             raise ResumeCacheConflict("{} locked".format(fileobj.name))
357
358     def load(self):
359         self.cache_file.seek(0)
360         return json.load(self.cache_file)
361
362     def check_cache(self, api_client=None, num_retries=0):
363         try:
364             state = self.load()
365             locator = None
366             try:
367                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
368                     locator = state["_finished_streams"][0][1][0]
369                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
370                     locator = state["_current_stream_locators"][0]
371                 if locator is not None:
372                     kc = arvados.keep.KeepClient(api_client=api_client)
373                     kc.head(locator, num_retries=num_retries)
374             except Exception as e:
375                 self.restart()
376         except (ValueError):
377             pass
378
379     def save(self, data):
380         try:
381             new_cache_fd, new_cache_name = tempfile.mkstemp(
382                 dir=os.path.dirname(self.filename))
383             self._lock_file(new_cache_fd)
384             new_cache = os.fdopen(new_cache_fd, 'r+')
385             json.dump(data, new_cache)
386             os.rename(new_cache_name, self.filename)
387         except (IOError, OSError, ResumeCacheConflict) as error:
388             try:
389                 os.unlink(new_cache_name)
390             except NameError:  # mkstemp failed.
391                 pass
392         else:
393             self.cache_file.close()
394             self.cache_file = new_cache
395
396     def close(self):
397         self.cache_file.close()
398
399     def destroy(self):
400         try:
401             os.unlink(self.filename)
402         except OSError as error:
403             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
404                 raise
405         self.close()
406
407     def restart(self):
408         self.destroy()
409         self.__init__(self.filename)
410
411
412 class ArvPutUploadJob(object):
413     CACHE_DIR = '.cache/arvados/arv-put'
414     EMPTY_STATE = {
415         'manifest' : None, # Last saved manifest checkpoint
416         'files' : {} # Previous run file list: {path : {size, mtime}}
417     }
418
419     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
420                  name=None, owner_uuid=None, api_client=None,
421                  ensure_unique_name=False, num_retries=None,
422                  put_threads=None, replication_desired=None,
423                  filename=None, update_time=60.0, update_collection=None,
424                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
425                  follow_links=True, exclude_paths=[], exclude_names=None):
426         self.paths = paths
427         self.resume = resume
428         self.use_cache = use_cache
429         self.update = False
430         self.reporter = reporter
431         # This will set to 0 before start counting, if no special files are going
432         # to be read.
433         self.bytes_expected = None
434         self.bytes_written = 0
435         self.bytes_skipped = 0
436         self.name = name
437         self.owner_uuid = owner_uuid
438         self.ensure_unique_name = ensure_unique_name
439         self.num_retries = num_retries
440         self.replication_desired = replication_desired
441         self.put_threads = put_threads
442         self.filename = filename
443         self._api_client = api_client
444         self._state_lock = threading.Lock()
445         self._state = None # Previous run state (file list & manifest)
446         self._current_files = [] # Current run file list
447         self._cache_file = None
448         self._collection_lock = threading.Lock()
449         self._remote_collection = None # Collection being updated (if asked)
450         self._local_collection = None # Collection from previous run manifest
451         self._file_paths = set() # Files to be updated in remote collection
452         self._stop_checkpointer = threading.Event()
453         self._checkpointer = threading.Thread(target=self._update_task)
454         self._checkpointer.daemon = True
455         self._update_task_time = update_time  # How many seconds wait between update runs
456         self._files_to_upload = FileUploadList(dry_run=dry_run)
457         self._upload_started = False
458         self.logger = logger
459         self.dry_run = dry_run
460         self._checkpoint_before_quit = True
461         self.follow_links = follow_links
462         self.exclude_paths = exclude_paths
463         self.exclude_names = exclude_names
464
465         if not self.use_cache and self.resume:
466             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
467
468         # Check for obvious dry-run responses
469         if self.dry_run and (not self.use_cache or not self.resume):
470             raise ArvPutUploadIsPending()
471
472         # Load cached data if any and if needed
473         self._setup_state(update_collection)
474
475         # Build the upload file list, excluding requested files and counting the
476         # bytes expected to be uploaded.
477         self._build_upload_list()
478
479     def _build_upload_list(self):
480         """
481         Scan the requested paths to count file sizes, excluding files & dirs if requested
482         and building the upload file list.
483         """
484         # If there aren't special files to be read, reset total bytes count to zero
485         # to start counting.
486         if not any([p for p in self.paths
487                     if not (os.path.isfile(p) or os.path.isdir(p))]):
488             self.bytes_expected = 0
489
490         for path in self.paths:
491             # Test for stdin first, in case some file named '-' exist
492             if path == '-':
493                 if self.dry_run:
494                     raise ArvPutUploadIsPending()
495                 self._write_stdin(self.filename or 'stdin')
496             elif not os.path.exists(path):
497                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
498             elif os.path.isdir(path):
499                 # Use absolute paths on cache index so CWD doesn't interfere
500                 # with the caching logic.
501                 orig_path = path
502                 path = os.path.abspath(path)
503                 if orig_path[-1:] == os.sep:
504                     # When passing a directory reference with a trailing slash,
505                     # its contents should be uploaded directly to the
506                     # collection's root.
507                     prefixdir = path
508                 else:
509                     # When passing a directory reference with no trailing slash,
510                     # upload the directory to the collection's root.
511                     prefixdir = os.path.dirname(path)
512                 prefixdir += os.sep
513                 for root, dirs, files in os.walk(path,
514                                                  followlinks=self.follow_links):
515                     root_relpath = os.path.relpath(root, path)
516                     if root_relpath == '.':
517                         root_relpath = ''
518                     # Exclude files/dirs by full path matching pattern
519                     if self.exclude_paths:
520                         dirs[:] = [d for d in dirs
521                                    if not any(pathname_match(
522                                            os.path.join(root_relpath, d), pat)
523                                               for pat in self.exclude_paths)]
524                         files = [f for f in files
525                                  if not any(pathname_match(
526                                          os.path.join(root_relpath, f), pat)
527                                             for pat in self.exclude_paths)]
528                     # Exclude files/dirs by name matching pattern
529                     if self.exclude_names is not None:
530                         dirs[:] = [d for d in dirs
531                                    if not self.exclude_names.match(d)]
532                         files = [f for f in files
533                                  if not self.exclude_names.match(f)]
534                     # Make os.walk()'s dir traversing order deterministic
535                     dirs.sort()
536                     files.sort()
537                     for f in files:
538                         filepath = os.path.join(root, f)
539                         # Add its size to the total bytes count (if applicable)
540                         if self.follow_links or (not os.path.islink(filepath)):
541                             if self.bytes_expected is not None:
542                                 self.bytes_expected += os.path.getsize(filepath)
543                         self._check_file(filepath,
544                                          os.path.join(root[len(prefixdir):], f))
545             else:
546                 filepath = os.path.abspath(path)
547                 # Add its size to the total bytes count (if applicable)
548                 if self.follow_links or (not os.path.islink(filepath)):
549                     if self.bytes_expected is not None:
550                         self.bytes_expected += os.path.getsize(filepath)
551                 self._check_file(filepath,
552                                  self.filename or os.path.basename(path))
553         # If dry-mode is on, and got up to this point, then we should notify that
554         # there aren't any file to upload.
555         if self.dry_run:
556             raise ArvPutUploadNotPending()
557         # Remove local_collection's files that don't exist locally anymore, so the
558         # bytes_written count is correct.
559         for f in self.collection_file_paths(self._local_collection,
560                                             path_prefix=""):
561             if f != 'stdin' and f != self.filename and not f in self._file_paths:
562                 self._local_collection.remove(f)
563
564     def start(self, save_collection):
565         """
566         Start supporting thread & file uploading
567         """
568         self._checkpointer.start()
569         try:
570             # Update bytes_written from current local collection and
571             # report initial progress.
572             self._update()
573             # Actual file upload
574             self._upload_started = True # Used by the update thread to start checkpointing
575             self._upload_files()
576         except (SystemExit, Exception) as e:
577             self._checkpoint_before_quit = False
578             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
579             # Note: We're expecting SystemExit instead of
580             # KeyboardInterrupt because we have a custom signal
581             # handler in place that raises SystemExit with the catched
582             # signal's code.
583             if isinstance(e, PathDoesNotExistError):
584                 # We aren't interested in the traceback for this case
585                 pass
586             elif not isinstance(e, SystemExit) or e.code != -2:
587                 self.logger.warning("Abnormal termination:\n{}".format(
588                     traceback.format_exc()))
589             raise
590         finally:
591             if not self.dry_run:
592                 # Stop the thread before doing anything else
593                 self._stop_checkpointer.set()
594                 self._checkpointer.join()
595                 if self._checkpoint_before_quit:
596                     # Commit all pending blocks & one last _update()
597                     self._local_collection.manifest_text()
598                     self._update(final=True)
599                     if save_collection:
600                         self.save_collection()
601             if self.use_cache:
602                 self._cache_file.close()
603
604     def save_collection(self):
605         if self.update:
606             # Check if files should be updated on the remote collection.
607             for fp in self._file_paths:
608                 remote_file = self._remote_collection.find(fp)
609                 if not remote_file:
610                     # File don't exist on remote collection, copy it.
611                     self._remote_collection.copy(fp, fp, self._local_collection)
612                 elif remote_file != self._local_collection.find(fp):
613                     # A different file exist on remote collection, overwrite it.
614                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
615                 else:
616                     # The file already exist on remote collection, skip it.
617                     pass
618             self._remote_collection.save(num_retries=self.num_retries)
619         else:
620             self._local_collection.save_new(
621                 name=self.name, owner_uuid=self.owner_uuid,
622                 ensure_unique_name=self.ensure_unique_name,
623                 num_retries=self.num_retries)
624
625     def destroy_cache(self):
626         if self.use_cache:
627             try:
628                 os.unlink(self._cache_filename)
629             except OSError as error:
630                 # That's what we wanted anyway.
631                 if error.errno != errno.ENOENT:
632                     raise
633             self._cache_file.close()
634
635     def _collection_size(self, collection):
636         """
637         Recursively get the total size of the collection
638         """
639         size = 0
640         for item in listvalues(collection):
641             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
642                 size += self._collection_size(item)
643             else:
644                 size += item.size()
645         return size
646
647     def _update_task(self):
648         """
649         Periodically called support task. File uploading is
650         asynchronous so we poll status from the collection.
651         """
652         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
653             self._update()
654
655     def _update(self, final=False):
656         """
657         Update cached manifest text and report progress.
658         """
659         if self._upload_started:
660             with self._collection_lock:
661                 self.bytes_written = self._collection_size(self._local_collection)
662                 if self.use_cache:
663                     if final:
664                         manifest = self._local_collection.manifest_text()
665                     else:
666                         # Get the manifest text without comitting pending blocks
667                         manifest = self._local_collection.manifest_text(strip=False,
668                                                                         normalize=False,
669                                                                         only_committed=True)
670                     # Update cache
671                     with self._state_lock:
672                         self._state['manifest'] = manifest
673             if self.use_cache:
674                 try:
675                     self._save_state()
676                 except Exception as e:
677                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
678         else:
679             self.bytes_written = self.bytes_skipped
680         # Call the reporter, if any
681         self.report_progress()
682
683     def report_progress(self):
684         if self.reporter is not None:
685             self.reporter(self.bytes_written, self.bytes_expected)
686
687     def _write_stdin(self, filename):
688         output = self._local_collection.open(filename, 'wb')
689         self._write(sys.stdin, output)
690         output.close()
691
692     def _check_file(self, source, filename):
693         """
694         Check if this file needs to be uploaded
695         """
696         # Ignore symlinks when requested
697         if (not self.follow_links) and os.path.islink(source):
698             return
699         resume_offset = 0
700         should_upload = False
701         new_file_in_cache = False
702         # Record file path for updating the remote collection before exiting
703         self._file_paths.add(filename)
704
705         with self._state_lock:
706             # If no previous cached data on this file, store it for an eventual
707             # repeated run.
708             if source not in self._state['files']:
709                 self._state['files'][source] = {
710                     'mtime': os.path.getmtime(source),
711                     'size' : os.path.getsize(source)
712                 }
713                 new_file_in_cache = True
714             cached_file_data = self._state['files'][source]
715
716         # Check if file was already uploaded (at least partially)
717         file_in_local_collection = self._local_collection.find(filename)
718
719         # If not resuming, upload the full file.
720         if not self.resume:
721             should_upload = True
722         # New file detected from last run, upload it.
723         elif new_file_in_cache:
724             should_upload = True
725         # Local file didn't change from last run.
726         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
727             if not file_in_local_collection:
728                 # File not uploaded yet, upload it completely
729                 should_upload = True
730             elif file_in_local_collection.permission_expired():
731                 # Permission token expired, re-upload file. This will change whenever
732                 # we have a API for refreshing tokens.
733                 self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
734                 should_upload = True
735                 self._local_collection.remove(filename)
736             elif cached_file_data['size'] == file_in_local_collection.size():
737                 # File already there, skip it.
738                 self.bytes_skipped += cached_file_data['size']
739             elif cached_file_data['size'] > file_in_local_collection.size():
740                 # File partially uploaded, resume!
741                 resume_offset = file_in_local_collection.size()
742                 self.bytes_skipped += resume_offset
743                 should_upload = True
744             else:
745                 # Inconsistent cache, re-upload the file
746                 should_upload = True
747                 self._local_collection.remove(filename)
748                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
749         # Local file differs from cached data, re-upload it.
750         else:
751             if file_in_local_collection:
752                 self._local_collection.remove(filename)
753             should_upload = True
754
755         if should_upload:
756             try:
757                 self._files_to_upload.append((source, resume_offset, filename))
758             except ArvPutUploadIsPending:
759                 # This could happen when running on dry-mode, close cache file to
760                 # avoid locking issues.
761                 self._cache_file.close()
762                 raise
763
764     def _upload_files(self):
765         for source, resume_offset, filename in self._files_to_upload:
766             with open(source, 'rb') as source_fd:
767                 with self._state_lock:
768                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
769                     self._state['files'][source]['size'] = os.path.getsize(source)
770                 if resume_offset > 0:
771                     # Start upload where we left off
772                     output = self._local_collection.open(filename, 'ab')
773                     source_fd.seek(resume_offset)
774                 else:
775                     # Start from scratch
776                     output = self._local_collection.open(filename, 'wb')
777                 self._write(source_fd, output)
778                 output.close(flush=False)
779
780     def _write(self, source_fd, output):
781         while True:
782             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
783             if not data:
784                 break
785             output.write(data)
786
787     def _my_collection(self):
788         return self._remote_collection if self.update else self._local_collection
789
790     def _setup_state(self, update_collection):
791         """
792         Create a new cache file or load a previously existing one.
793         """
794         # Load an already existing collection for update
795         if update_collection and re.match(arvados.util.collection_uuid_pattern,
796                                           update_collection):
797             try:
798                 self._remote_collection = arvados.collection.Collection(
799                     update_collection, api_client=self._api_client)
800             except arvados.errors.ApiError as error:
801                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
802             else:
803                 self.update = True
804         elif update_collection:
805             # Collection locator provided, but unknown format
806             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
807
808         if self.use_cache:
809             # Set up cache file name from input paths.
810             md5 = hashlib.md5()
811             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
812             realpaths = sorted(os.path.realpath(path) for path in self.paths)
813             md5.update(b'\0'.join([p.encode() for p in realpaths]))
814             if self.filename:
815                 md5.update(self.filename.encode())
816             cache_filename = md5.hexdigest()
817             cache_filepath = os.path.join(
818                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
819                 cache_filename)
820             if self.resume and os.path.exists(cache_filepath):
821                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
822                 self._cache_file = open(cache_filepath, 'a+')
823             else:
824                 # --no-resume means start with a empty cache file.
825                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
826                 self._cache_file = open(cache_filepath, 'w+')
827             self._cache_filename = self._cache_file.name
828             self._lock_file(self._cache_file)
829             self._cache_file.seek(0)
830
831         with self._state_lock:
832             if self.use_cache:
833                 try:
834                     self._state = json.load(self._cache_file)
835                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
836                         # Cache at least partially incomplete, set up new cache
837                         self._state = copy.deepcopy(self.EMPTY_STATE)
838                 except ValueError:
839                     # Cache file empty, set up new cache
840                     self._state = copy.deepcopy(self.EMPTY_STATE)
841             else:
842                 self.logger.info("No cache usage requested for this run.")
843                 # No cache file, set empty state
844                 self._state = copy.deepcopy(self.EMPTY_STATE)
845             # Load the previous manifest so we can check if files were modified remotely.
846             self._local_collection = arvados.collection.Collection(
847                 self._state['manifest'],
848                 replication_desired=self.replication_desired,
849                 put_threads=self.put_threads,
850                 api_client=self._api_client)
851
852     def collection_file_paths(self, col, path_prefix='.'):
853         """Return a list of file paths by recursively go through the entire collection `col`"""
854         file_paths = []
855         for name, item in listitems(col):
856             if isinstance(item, arvados.arvfile.ArvadosFile):
857                 file_paths.append(os.path.join(path_prefix, name))
858             elif isinstance(item, arvados.collection.Subcollection):
859                 new_prefix = os.path.join(path_prefix, name)
860                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
861         return file_paths
862
863     def _lock_file(self, fileobj):
864         try:
865             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
866         except IOError:
867             raise ResumeCacheConflict("{} locked".format(fileobj.name))
868
869     def _save_state(self):
870         """
871         Atomically save current state into cache.
872         """
873         with self._state_lock:
874             # We're not using copy.deepcopy() here because it's a lot slower
875             # than json.dumps(), and we're already needing JSON format to be
876             # saved on disk.
877             state = json.dumps(self._state)
878         try:
879             new_cache = tempfile.NamedTemporaryFile(
880                 mode='w+',
881                 dir=os.path.dirname(self._cache_filename), delete=False)
882             self._lock_file(new_cache)
883             new_cache.write(state)
884             new_cache.flush()
885             os.fsync(new_cache)
886             os.rename(new_cache.name, self._cache_filename)
887         except (IOError, OSError, ResumeCacheConflict) as error:
888             self.logger.error("There was a problem while saving the cache file: {}".format(error))
889             try:
890                 os.unlink(new_cache_name)
891             except NameError:  # mkstemp failed.
892                 pass
893         else:
894             self._cache_file.close()
895             self._cache_file = new_cache
896
897     def collection_name(self):
898         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
899
900     def manifest_locator(self):
901         return self._my_collection().manifest_locator()
902
903     def portable_data_hash(self):
904         pdh = self._my_collection().portable_data_hash()
905         m = self._my_collection().stripped_manifest().encode()
906         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
907         if pdh != local_pdh:
908             self.logger.warning("\n".join([
909                 "arv-put: API server provided PDH differs from local manifest.",
910                 "         This should not happen; showing API server version."]))
911         return pdh
912
913     def manifest_text(self, stream_name=".", strip=False, normalize=False):
914         return self._my_collection().manifest_text(stream_name, strip, normalize)
915
916     def _datablocks_on_item(self, item):
917         """
918         Return a list of datablock locators, recursively navigating
919         through subcollections
920         """
921         if isinstance(item, arvados.arvfile.ArvadosFile):
922             if item.size() == 0:
923                 # Empty file locator
924                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
925             else:
926                 locators = []
927                 for segment in item.segments():
928                     loc = segment.locator
929                     locators.append(loc)
930                 return locators
931         elif isinstance(item, arvados.collection.Collection):
932             l = [self._datablocks_on_item(x) for x in listvalues(item)]
933             # Fast list flattener method taken from:
934             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
935             return [loc for sublist in l for loc in sublist]
936         else:
937             return None
938
939     def data_locators(self):
940         with self._collection_lock:
941             # Make sure all datablocks are flushed before getting the locators
942             self._my_collection().manifest_text()
943             datablocks = self._datablocks_on_item(self._my_collection())
944         return datablocks
945
946 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
947                                                             os.getpid())
948
949 # Simulate glob.glob() matching behavior without the need to scan the filesystem
950 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
951 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
952 # so instead we're using it on every path component.
953 def pathname_match(pathname, pattern):
954     name = pathname.split(os.sep)
955     # Fix patterns like 'some/subdir/' or 'some//subdir'
956     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
957     if len(name) != len(pat):
958         return False
959     for i in range(len(name)):
960         if not fnmatch.fnmatch(name[i], pat[i]):
961             return False
962     return True
963
964 def machine_progress(bytes_written, bytes_expected):
965     return _machine_format.format(
966         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
967
968 def human_progress(bytes_written, bytes_expected):
969     if bytes_expected:
970         return "\r{}M / {}M {:.1%} ".format(
971             bytes_written >> 20, bytes_expected >> 20,
972             float(bytes_written) / bytes_expected)
973     else:
974         return "\r{} ".format(bytes_written)
975
976 def progress_writer(progress_func, outfile=sys.stderr):
977     def write_progress(bytes_written, bytes_expected):
978         outfile.write(progress_func(bytes_written, bytes_expected))
979     return write_progress
980
981 def exit_signal_handler(sigcode, frame):
982     logging.getLogger('arvados.arv_put').error("Caught signal {}, exiting.".format(sigcode))
983     sys.exit(-sigcode)
984
985 def desired_project_uuid(api_client, project_uuid, num_retries):
986     if not project_uuid:
987         query = api_client.users().current()
988     elif arvados.util.user_uuid_pattern.match(project_uuid):
989         query = api_client.users().get(uuid=project_uuid)
990     elif arvados.util.group_uuid_pattern.match(project_uuid):
991         query = api_client.groups().get(uuid=project_uuid)
992     else:
993         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
994     return query.execute(num_retries=num_retries)['uuid']
995
996 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
997     global api_client
998
999     args = parse_arguments(arguments)
1000     logger = logging.getLogger('arvados.arv_put')
1001     if args.silent:
1002         logger.setLevel(logging.WARNING)
1003     else:
1004         logger.setLevel(logging.INFO)
1005     status = 0
1006
1007     request_id = arvados.util.new_request_id()
1008
1009     formatter = ArvPutLogFormatter(request_id)
1010     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1011
1012     if api_client is None:
1013         api_client = arvados.api('v1', request_id=request_id)
1014
1015     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1016     # the originals.
1017     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1018                             for sigcode in CAUGHT_SIGNALS}
1019
1020     # Determine the name to use
1021     if args.name:
1022         if args.stream or args.raw:
1023             logger.error("Cannot use --name with --stream or --raw")
1024             sys.exit(1)
1025         elif args.update_collection:
1026             logger.error("Cannot use --name with --update-collection")
1027             sys.exit(1)
1028         collection_name = args.name
1029     else:
1030         collection_name = "Saved at {} by {}@{}".format(
1031             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1032             pwd.getpwuid(os.getuid()).pw_name,
1033             socket.gethostname())
1034
1035     if args.project_uuid and (args.stream or args.raw):
1036         logger.error("Cannot use --project-uuid with --stream or --raw")
1037         sys.exit(1)
1038
1039     # Determine the parent project
1040     try:
1041         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1042                                             args.retries)
1043     except (apiclient_errors.Error, ValueError) as error:
1044         logger.error(error)
1045         sys.exit(1)
1046
1047     if args.progress:
1048         reporter = progress_writer(human_progress)
1049     elif args.batch_progress:
1050         reporter = progress_writer(machine_progress)
1051     else:
1052         reporter = None
1053
1054     # Setup exclude regex from all the --exclude arguments provided
1055     name_patterns = []
1056     exclude_paths = []
1057     exclude_names = None
1058     if len(args.exclude) > 0:
1059         # We're supporting 2 kinds of exclusion patterns:
1060         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1061         #                            the name, wherever the file is on the tree)
1062         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1063         #                            entire path, and should be relative to
1064         #                            any input dir argument)
1065         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1066         #                            placed directly underneath the input dir)
1067         for p in args.exclude:
1068             # Only relative paths patterns allowed
1069             if p.startswith(os.sep):
1070                 logger.error("Cannot use absolute paths with --exclude")
1071                 sys.exit(1)
1072             if os.path.dirname(p):
1073                 # We don't support of path patterns with '..'
1074                 p_parts = p.split(os.sep)
1075                 if '..' in p_parts:
1076                     logger.error(
1077                         "Cannot use path patterns that include or '..'")
1078                     sys.exit(1)
1079                 # Path search pattern
1080                 exclude_paths.append(p)
1081             else:
1082                 # Name-only search pattern
1083                 name_patterns.append(p)
1084         # For name only matching, we can combine all patterns into a single
1085         # regexp, for better performance.
1086         exclude_names = re.compile('|'.join(
1087             [fnmatch.translate(p) for p in name_patterns]
1088         )) if len(name_patterns) > 0 else None
1089         # Show the user the patterns to be used, just in case they weren't
1090         # specified inside quotes and got changed by the shell expansion.
1091         logger.info("Exclude patterns: {}".format(args.exclude))
1092
1093     # If this is used by a human, and there's at least one directory to be
1094     # uploaded, the expected bytes calculation can take a moment.
1095     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1096         logger.info("Calculating upload size, this could take some time...")
1097     try:
1098         writer = ArvPutUploadJob(paths = args.paths,
1099                                  resume = args.resume,
1100                                  use_cache = args.use_cache,
1101                                  filename = args.filename,
1102                                  reporter = reporter,
1103                                  api_client = api_client,
1104                                  num_retries = args.retries,
1105                                  replication_desired = args.replication,
1106                                  put_threads = args.threads,
1107                                  name = collection_name,
1108                                  owner_uuid = project_uuid,
1109                                  ensure_unique_name = True,
1110                                  update_collection = args.update_collection,
1111                                  logger=logger,
1112                                  dry_run=args.dry_run,
1113                                  follow_links=args.follow_links,
1114                                  exclude_paths=exclude_paths,
1115                                  exclude_names=exclude_names)
1116     except ResumeCacheConflict:
1117         logger.error("\n".join([
1118             "arv-put: Another process is already uploading this data.",
1119             "         Use --no-cache if this is really what you want."]))
1120         sys.exit(1)
1121     except CollectionUpdateError as error:
1122         logger.error("\n".join([
1123             "arv-put: %s" % str(error)]))
1124         sys.exit(1)
1125     except ArvPutUploadIsPending:
1126         # Dry run check successful, return proper exit code.
1127         sys.exit(2)
1128     except ArvPutUploadNotPending:
1129         # No files pending for upload
1130         sys.exit(0)
1131     except PathDoesNotExistError as error:
1132         logger.error("\n".join([
1133             "arv-put: %s" % str(error)]))
1134         sys.exit(1)
1135
1136     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1137         logger.warning("\n".join([
1138             "arv-put: Resuming previous upload from last checkpoint.",
1139             "         Use the --no-resume option to start over."]))
1140
1141     if not args.dry_run:
1142         writer.report_progress()
1143     output = None
1144     try:
1145         writer.start(save_collection=not(args.stream or args.raw))
1146     except arvados.errors.ApiError as error:
1147         logger.error("\n".join([
1148             "arv-put: %s" % str(error)]))
1149         sys.exit(1)
1150
1151     if args.progress:  # Print newline to split stderr from stdout for humans.
1152         logger.info("\n")
1153
1154     if args.stream:
1155         if args.normalize:
1156             output = writer.manifest_text(normalize=True)
1157         else:
1158             output = writer.manifest_text()
1159     elif args.raw:
1160         output = ','.join(writer.data_locators())
1161     else:
1162         try:
1163             if args.update_collection:
1164                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1165             else:
1166                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1167             if args.portable_data_hash:
1168                 output = writer.portable_data_hash()
1169             else:
1170                 output = writer.manifest_locator()
1171         except apiclient_errors.Error as error:
1172             logger.error(
1173                 "arv-put: Error creating Collection on project: {}.".format(
1174                     error))
1175             status = 1
1176
1177     # Print the locator (uuid) of the new collection.
1178     if output is None:
1179         status = status or 1
1180     elif not args.silent:
1181         stdout.write(output)
1182         if not output.endswith('\n'):
1183             stdout.write('\n')
1184
1185     for sigcode, orig_handler in listitems(orig_signal_handlers):
1186         signal.signal(sigcode, orig_handler)
1187
1188     if status != 0:
1189         sys.exit(status)
1190
1191     # Success!
1192     return output
1193
1194
1195 if __name__ == '__main__':
1196     main()