Merge branch '9865-cwl-fix-ignored-exceptions'
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34 from arvados.util import keep_locator_pattern
35
36 import arvados.commands._util as arv_cmd
37
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55                     default=-1, help=argparse.SUPPRESS)
56
57 _group.add_argument('--normalize', action='store_true',
58                     help="""
59 Normalize the manifest by re-ordering files and streams after writing
60 data.
61 """)
62
63 _group.add_argument('--dry-run', action='store_true', default=False,
64                     help="""
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
67 """)
68
69 _group = upload_opts.add_mutually_exclusive_group()
70
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
72                     help="""
73 Synonym for --stream.
74 """)
75
76 _group.add_argument('--stream', action='store_true',
77                     help="""
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
80 in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
146 """)
147
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
149                          help="""
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
152 to use 2 threads.
153 On high latency installations, using a greater number will improve
154 overall throughput.
155 """)
156
157 run_opts = argparse.ArgumentParser(add_help=False)
158
159 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
160 Store the collection in the specified project, instead of your Home
161 project.
162 """)
163
164 run_opts.add_argument('--name', help="""
165 Save the collection with the specified name.
166 """)
167
168 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
169                       action='append', help="""
170 Exclude files and directories whose names match the given glob pattern. When
171 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
172 directory, relative to the provided input dirs will be excluded.
173 When using a filename pattern like '*.txt', any text file will be excluded
174 no matter where is placed.
175 For the special case of needing to exclude only files or dirs directly below
176 the given input directory, you can use a pattern like './exclude_this.gif'.
177 You can specify multiple patterns by using this argument more than once.
178 """)
179
180 _group = run_opts.add_mutually_exclusive_group()
181 _group.add_argument('--progress', action='store_true',
182                     help="""
183 Display human-readable progress on stderr (bytes and, if possible,
184 percentage of total data size). This is the default behavior when
185 stderr is a tty.
186 """)
187
188 _group.add_argument('--no-progress', action='store_true',
189                     help="""
190 Do not display human-readable progress on stderr, even if stderr is a
191 tty.
192 """)
193
194 _group.add_argument('--batch-progress', action='store_true',
195                     help="""
196 Display machine-readable progress on stderr (bytes and, if known,
197 total data size).
198 """)
199
200 run_opts.add_argument('--silent', action='store_true',
201                       help="""
202 Do not print any debug messages to console. (Any error messages will
203 still be displayed.)
204 """)
205
206 _group = run_opts.add_mutually_exclusive_group()
207 _group.add_argument('--resume', action='store_true', default=True,
208                     help="""
209 Continue interrupted uploads from cached state (default).
210 """)
211 _group.add_argument('--no-resume', action='store_false', dest='resume',
212                     help="""
213 Do not continue interrupted uploads from cached state.
214 """)
215
216 _group = run_opts.add_mutually_exclusive_group()
217 _group.add_argument('--follow-links', action='store_true', default=True,
218                     dest='follow_links', help="""
219 Follow file and directory symlinks (default).
220 """)
221 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
222                     help="""
223 Do not follow file and directory symlinks.
224 """)
225
226 _group = run_opts.add_mutually_exclusive_group()
227 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
228                     help="""
229 Save upload state in a cache file for resuming (default).
230 """)
231 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
232                     help="""
233 Do not save upload state in a cache file for resuming.
234 """)
235
236 arg_parser = argparse.ArgumentParser(
237     description='Copy data from the local filesystem to Keep.',
238     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
239
240 def parse_arguments(arguments):
241     args = arg_parser.parse_args(arguments)
242
243     if len(args.paths) == 0:
244         args.paths = ['-']
245
246     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
247
248     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
249         if args.filename:
250             arg_parser.error("""
251     --filename argument cannot be used when storing a directory or
252     multiple files.
253     """)
254
255     # Turn on --progress by default if stderr is a tty.
256     if (not (args.batch_progress or args.no_progress or args.silent)
257         and os.isatty(sys.stderr.fileno())):
258         args.progress = True
259
260     # Turn off --resume (default) if --no-cache is used.
261     if not args.use_cache:
262         args.resume = False
263
264     if args.paths == ['-']:
265         if args.update_collection:
266             arg_parser.error("""
267     --update-collection cannot be used when reading from stdin.
268     """)
269         args.resume = False
270         args.use_cache = False
271         if not args.filename:
272             args.filename = 'stdin'
273
274     # Remove possible duplicated patterns
275     if len(args.exclude) > 0:
276         args.exclude = list(set(args.exclude))
277
278     return args
279
280
281 class PathDoesNotExistError(Exception):
282     pass
283
284
285 class CollectionUpdateError(Exception):
286     pass
287
288
289 class ResumeCacheConflict(Exception):
290     pass
291
292
293 class ResumeCacheInvalidError(Exception):
294     pass
295
296 class ArvPutArgumentConflict(Exception):
297     pass
298
299
300 class ArvPutUploadIsPending(Exception):
301     pass
302
303
304 class ArvPutUploadNotPending(Exception):
305     pass
306
307
308 class FileUploadList(list):
309     def __init__(self, dry_run=False):
310         list.__init__(self)
311         self.dry_run = dry_run
312
313     def append(self, other):
314         if self.dry_run:
315             raise ArvPutUploadIsPending()
316         super(FileUploadList, self).append(other)
317
318
319 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
320 class ArvPutLogFormatter(logging.Formatter):
321     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
322     err_fmtr = None
323     request_id_informed = False
324
325     def __init__(self, request_id):
326         self.err_fmtr = logging.Formatter(
327             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
328             arvados.log_date_format)
329
330     def format(self, record):
331         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
332             self.request_id_informed = True
333             return self.err_fmtr.format(record)
334         return self.std_fmtr.format(record)
335
336
337 class ResumeCache(object):
338     CACHE_DIR = '.cache/arvados/arv-put'
339
340     def __init__(self, file_spec):
341         self.cache_file = open(file_spec, 'a+')
342         self._lock_file(self.cache_file)
343         self.filename = self.cache_file.name
344
345     @classmethod
346     def make_path(cls, args):
347         md5 = hashlib.md5()
348         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
349         realpaths = sorted(os.path.realpath(path) for path in args.paths)
350         md5.update(b'\0'.join([p.encode() for p in realpaths]))
351         if any(os.path.isdir(path) for path in realpaths):
352             md5.update(b'-1')
353         elif args.filename:
354             md5.update(args.filename.encode())
355         return os.path.join(
356             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
357             md5.hexdigest())
358
359     def _lock_file(self, fileobj):
360         try:
361             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
362         except IOError:
363             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
364
365     def load(self):
366         self.cache_file.seek(0)
367         return json.load(self.cache_file)
368
369     def check_cache(self, api_client=None, num_retries=0):
370         try:
371             state = self.load()
372             locator = None
373             try:
374                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
375                     locator = state["_finished_streams"][0][1][0]
376                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
377                     locator = state["_current_stream_locators"][0]
378                 if locator is not None:
379                     kc = arvados.keep.KeepClient(api_client=api_client)
380                     kc.head(locator, num_retries=num_retries)
381             except Exception as e:
382                 self.restart()
383         except (ValueError):
384             pass
385
386     def save(self, data):
387         try:
388             new_cache_fd, new_cache_name = tempfile.mkstemp(
389                 dir=os.path.dirname(self.filename))
390             self._lock_file(new_cache_fd)
391             new_cache = os.fdopen(new_cache_fd, 'r+')
392             json.dump(data, new_cache)
393             os.rename(new_cache_name, self.filename)
394         except (IOError, OSError, ResumeCacheConflict):
395             try:
396                 os.unlink(new_cache_name)
397             except NameError:  # mkstemp failed.
398                 pass
399         else:
400             self.cache_file.close()
401             self.cache_file = new_cache
402
403     def close(self):
404         self.cache_file.close()
405
406     def destroy(self):
407         try:
408             os.unlink(self.filename)
409         except OSError as error:
410             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
411                 raise
412         self.close()
413
414     def restart(self):
415         self.destroy()
416         self.__init__(self.filename)
417
418
419 class ArvPutUploadJob(object):
420     CACHE_DIR = '.cache/arvados/arv-put'
421     EMPTY_STATE = {
422         'manifest' : None, # Last saved manifest checkpoint
423         'files' : {} # Previous run file list: {path : {size, mtime}}
424     }
425
426     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
427                  name=None, owner_uuid=None, api_client=None,
428                  ensure_unique_name=False, num_retries=None,
429                  put_threads=None, replication_desired=None, filename=None,
430                  update_time=60.0, update_collection=None, storage_classes=None,
431                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
432                  follow_links=True, exclude_paths=[], exclude_names=None):
433         self.paths = paths
434         self.resume = resume
435         self.use_cache = use_cache
436         self.update = False
437         self.reporter = reporter
438         # This will set to 0 before start counting, if no special files are going
439         # to be read.
440         self.bytes_expected = None
441         self.bytes_written = 0
442         self.bytes_skipped = 0
443         self.name = name
444         self.owner_uuid = owner_uuid
445         self.ensure_unique_name = ensure_unique_name
446         self.num_retries = num_retries
447         self.replication_desired = replication_desired
448         self.put_threads = put_threads
449         self.filename = filename
450         self.storage_classes = storage_classes
451         self._api_client = api_client
452         self._state_lock = threading.Lock()
453         self._state = None # Previous run state (file list & manifest)
454         self._current_files = [] # Current run file list
455         self._cache_file = None
456         self._collection_lock = threading.Lock()
457         self._remote_collection = None # Collection being updated (if asked)
458         self._local_collection = None # Collection from previous run manifest
459         self._file_paths = set() # Files to be updated in remote collection
460         self._stop_checkpointer = threading.Event()
461         self._checkpointer = threading.Thread(target=self._update_task)
462         self._checkpointer.daemon = True
463         self._update_task_time = update_time  # How many seconds wait between update runs
464         self._files_to_upload = FileUploadList(dry_run=dry_run)
465         self._upload_started = False
466         self.logger = logger
467         self.dry_run = dry_run
468         self._checkpoint_before_quit = True
469         self.follow_links = follow_links
470         self.exclude_paths = exclude_paths
471         self.exclude_names = exclude_names
472
473         if not self.use_cache and self.resume:
474             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
475
476         # Check for obvious dry-run responses
477         if self.dry_run and (not self.use_cache or not self.resume):
478             raise ArvPutUploadIsPending()
479
480         # Load cached data if any and if needed
481         self._setup_state(update_collection)
482
483         # Build the upload file list, excluding requested files and counting the
484         # bytes expected to be uploaded.
485         self._build_upload_list()
486
487     def _build_upload_list(self):
488         """
489         Scan the requested paths to count file sizes, excluding requested files
490         and dirs and building the upload file list.
491         """
492         # If there aren't special files to be read, reset total bytes count to zero
493         # to start counting.
494         if not any([p for p in self.paths
495                     if not (os.path.isfile(p) or os.path.isdir(p))]):
496             self.bytes_expected = 0
497
498         for path in self.paths:
499             # Test for stdin first, in case some file named '-' exist
500             if path == '-':
501                 if self.dry_run:
502                     raise ArvPutUploadIsPending()
503                 self._write_stdin(self.filename or 'stdin')
504             elif not os.path.exists(path):
505                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
506             elif os.path.isdir(path):
507                 # Use absolute paths on cache index so CWD doesn't interfere
508                 # with the caching logic.
509                 orig_path = path
510                 path = os.path.abspath(path)
511                 if orig_path[-1:] == os.sep:
512                     # When passing a directory reference with a trailing slash,
513                     # its contents should be uploaded directly to the
514                     # collection's root.
515                     prefixdir = path
516                 else:
517                     # When passing a directory reference with no trailing slash,
518                     # upload the directory to the collection's root.
519                     prefixdir = os.path.dirname(path)
520                 prefixdir += os.sep
521                 for root, dirs, files in os.walk(path,
522                                                  followlinks=self.follow_links):
523                     root_relpath = os.path.relpath(root, path)
524                     if root_relpath == '.':
525                         root_relpath = ''
526                     # Exclude files/dirs by full path matching pattern
527                     if self.exclude_paths:
528                         dirs[:] = [d for d in dirs
529                                    if not any(pathname_match(
530                                            os.path.join(root_relpath, d), pat)
531                                               for pat in self.exclude_paths)]
532                         files = [f for f in files
533                                  if not any(pathname_match(
534                                          os.path.join(root_relpath, f), pat)
535                                             for pat in self.exclude_paths)]
536                     # Exclude files/dirs by name matching pattern
537                     if self.exclude_names is not None:
538                         dirs[:] = [d for d in dirs
539                                    if not self.exclude_names.match(d)]
540                         files = [f for f in files
541                                  if not self.exclude_names.match(f)]
542                     # Make os.walk()'s dir traversing order deterministic
543                     dirs.sort()
544                     files.sort()
545                     for f in files:
546                         filepath = os.path.join(root, f)
547                         # Add its size to the total bytes count (if applicable)
548                         if self.follow_links or (not os.path.islink(filepath)):
549                             if self.bytes_expected is not None:
550                                 self.bytes_expected += os.path.getsize(filepath)
551                         self._check_file(filepath,
552                                          os.path.join(root[len(prefixdir):], f))
553             else:
554                 filepath = os.path.abspath(path)
555                 # Add its size to the total bytes count (if applicable)
556                 if self.follow_links or (not os.path.islink(filepath)):
557                     if self.bytes_expected is not None:
558                         self.bytes_expected += os.path.getsize(filepath)
559                 self._check_file(filepath,
560                                  self.filename or os.path.basename(path))
561         # If dry-mode is on, and got up to this point, then we should notify that
562         # there aren't any file to upload.
563         if self.dry_run:
564             raise ArvPutUploadNotPending()
565         # Remove local_collection's files that don't exist locally anymore, so the
566         # bytes_written count is correct.
567         for f in self.collection_file_paths(self._local_collection,
568                                             path_prefix=""):
569             if f != 'stdin' and f != self.filename and not f in self._file_paths:
570                 self._local_collection.remove(f)
571
572     def start(self, save_collection):
573         """
574         Start supporting thread & file uploading
575         """
576         self._checkpointer.start()
577         try:
578             # Update bytes_written from current local collection and
579             # report initial progress.
580             self._update()
581             # Actual file upload
582             self._upload_started = True # Used by the update thread to start checkpointing
583             self._upload_files()
584         except (SystemExit, Exception) as e:
585             self._checkpoint_before_quit = False
586             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
587             # Note: We're expecting SystemExit instead of
588             # KeyboardInterrupt because we have a custom signal
589             # handler in place that raises SystemExit with the catched
590             # signal's code.
591             if isinstance(e, PathDoesNotExistError):
592                 # We aren't interested in the traceback for this case
593                 pass
594             elif not isinstance(e, SystemExit) or e.code != -2:
595                 self.logger.warning("Abnormal termination:\n{}".format(
596                     traceback.format_exc()))
597             raise
598         finally:
599             if not self.dry_run:
600                 # Stop the thread before doing anything else
601                 self._stop_checkpointer.set()
602                 self._checkpointer.join()
603                 if self._checkpoint_before_quit:
604                     # Commit all pending blocks & one last _update()
605                     self._local_collection.manifest_text()
606                     self._update(final=True)
607                     if save_collection:
608                         self.save_collection()
609             if self.use_cache:
610                 self._cache_file.close()
611
612     def save_collection(self):
613         if self.update:
614             # Check if files should be updated on the remote collection.
615             for fp in self._file_paths:
616                 remote_file = self._remote_collection.find(fp)
617                 if not remote_file:
618                     # File don't exist on remote collection, copy it.
619                     self._remote_collection.copy(fp, fp, self._local_collection)
620                 elif remote_file != self._local_collection.find(fp):
621                     # A different file exist on remote collection, overwrite it.
622                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
623                 else:
624                     # The file already exist on remote collection, skip it.
625                     pass
626             self._remote_collection.save(storage_classes=self.storage_classes,
627                                          num_retries=self.num_retries)
628         else:
629             if self.storage_classes is None:
630                 self.storage_classes = ['default']
631             self._local_collection.save_new(
632                 name=self.name, owner_uuid=self.owner_uuid,
633                 storage_classes=self.storage_classes,
634                 ensure_unique_name=self.ensure_unique_name,
635                 num_retries=self.num_retries)
636
637     def destroy_cache(self):
638         if self.use_cache:
639             try:
640                 os.unlink(self._cache_filename)
641             except OSError as error:
642                 # That's what we wanted anyway.
643                 if error.errno != errno.ENOENT:
644                     raise
645             self._cache_file.close()
646
647     def _collection_size(self, collection):
648         """
649         Recursively get the total size of the collection
650         """
651         size = 0
652         for item in listvalues(collection):
653             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
654                 size += self._collection_size(item)
655             else:
656                 size += item.size()
657         return size
658
659     def _update_task(self):
660         """
661         Periodically called support task. File uploading is
662         asynchronous so we poll status from the collection.
663         """
664         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
665             self._update()
666
667     def _update(self, final=False):
668         """
669         Update cached manifest text and report progress.
670         """
671         if self._upload_started:
672             with self._collection_lock:
673                 self.bytes_written = self._collection_size(self._local_collection)
674                 if self.use_cache:
675                     if final:
676                         manifest = self._local_collection.manifest_text()
677                     else:
678                         # Get the manifest text without comitting pending blocks
679                         manifest = self._local_collection.manifest_text(strip=False,
680                                                                         normalize=False,
681                                                                         only_committed=True)
682                     # Update cache
683                     with self._state_lock:
684                         self._state['manifest'] = manifest
685             if self.use_cache:
686                 try:
687                     self._save_state()
688                 except Exception as e:
689                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
690         else:
691             self.bytes_written = self.bytes_skipped
692         # Call the reporter, if any
693         self.report_progress()
694
695     def report_progress(self):
696         if self.reporter is not None:
697             self.reporter(self.bytes_written, self.bytes_expected)
698
699     def _write_stdin(self, filename):
700         output = self._local_collection.open(filename, 'wb')
701         self._write(sys.stdin, output)
702         output.close()
703
704     def _check_file(self, source, filename):
705         """
706         Check if this file needs to be uploaded
707         """
708         # Ignore symlinks when requested
709         if (not self.follow_links) and os.path.islink(source):
710             return
711         resume_offset = 0
712         should_upload = False
713         new_file_in_cache = False
714         # Record file path for updating the remote collection before exiting
715         self._file_paths.add(filename)
716
717         with self._state_lock:
718             # If no previous cached data on this file, store it for an eventual
719             # repeated run.
720             if source not in self._state['files']:
721                 self._state['files'][source] = {
722                     'mtime': os.path.getmtime(source),
723                     'size' : os.path.getsize(source)
724                 }
725                 new_file_in_cache = True
726             cached_file_data = self._state['files'][source]
727
728         # Check if file was already uploaded (at least partially)
729         file_in_local_collection = self._local_collection.find(filename)
730
731         # If not resuming, upload the full file.
732         if not self.resume:
733             should_upload = True
734         # New file detected from last run, upload it.
735         elif new_file_in_cache:
736             should_upload = True
737         # Local file didn't change from last run.
738         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
739             if not file_in_local_collection:
740                 # File not uploaded yet, upload it completely
741                 should_upload = True
742             elif file_in_local_collection.permission_expired():
743                 # Permission token expired, re-upload file. This will change whenever
744                 # we have a API for refreshing tokens.
745                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
746                 should_upload = True
747                 self._local_collection.remove(filename)
748             elif cached_file_data['size'] == file_in_local_collection.size():
749                 # File already there, skip it.
750                 self.bytes_skipped += cached_file_data['size']
751             elif cached_file_data['size'] > file_in_local_collection.size():
752                 # File partially uploaded, resume!
753                 resume_offset = file_in_local_collection.size()
754                 self.bytes_skipped += resume_offset
755                 should_upload = True
756             else:
757                 # Inconsistent cache, re-upload the file
758                 should_upload = True
759                 self._local_collection.remove(filename)
760                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
761         # Local file differs from cached data, re-upload it.
762         else:
763             if file_in_local_collection:
764                 self._local_collection.remove(filename)
765             should_upload = True
766
767         if should_upload:
768             try:
769                 self._files_to_upload.append((source, resume_offset, filename))
770             except ArvPutUploadIsPending:
771                 # This could happen when running on dry-mode, close cache file to
772                 # avoid locking issues.
773                 self._cache_file.close()
774                 raise
775
776     def _upload_files(self):
777         for source, resume_offset, filename in self._files_to_upload:
778             with open(source, 'rb') as source_fd:
779                 with self._state_lock:
780                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
781                     self._state['files'][source]['size'] = os.path.getsize(source)
782                 if resume_offset > 0:
783                     # Start upload where we left off
784                     output = self._local_collection.open(filename, 'ab')
785                     source_fd.seek(resume_offset)
786                 else:
787                     # Start from scratch
788                     output = self._local_collection.open(filename, 'wb')
789                 self._write(source_fd, output)
790                 output.close(flush=False)
791
792     def _write(self, source_fd, output):
793         while True:
794             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
795             if not data:
796                 break
797             output.write(data)
798
799     def _my_collection(self):
800         return self._remote_collection if self.update else self._local_collection
801
802     def _get_cache_filepath(self):
803         # Set up cache file name from input paths.
804         md5 = hashlib.md5()
805         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
806         realpaths = sorted(os.path.realpath(path) for path in self.paths)
807         md5.update(b'\0'.join([p.encode() for p in realpaths]))
808         if self.filename:
809             md5.update(self.filename.encode())
810         cache_filename = md5.hexdigest()
811         cache_filepath = os.path.join(
812             arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
813             cache_filename)
814         return cache_filepath
815
816     def _setup_state(self, update_collection):
817         """
818         Create a new cache file or load a previously existing one.
819         """
820         # Load an already existing collection for update
821         if update_collection and re.match(arvados.util.collection_uuid_pattern,
822                                           update_collection):
823             try:
824                 self._remote_collection = arvados.collection.Collection(
825                     update_collection, api_client=self._api_client)
826             except arvados.errors.ApiError as error:
827                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
828             else:
829                 self.update = True
830         elif update_collection:
831             # Collection locator provided, but unknown format
832             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
833
834         if self.use_cache:
835             cache_filepath = self._get_cache_filepath()
836             if self.resume and os.path.exists(cache_filepath):
837                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
838                 self._cache_file = open(cache_filepath, 'a+')
839             else:
840                 # --no-resume means start with a empty cache file.
841                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
842                 self._cache_file = open(cache_filepath, 'w+')
843             self._cache_filename = self._cache_file.name
844             self._lock_file(self._cache_file)
845             self._cache_file.seek(0)
846
847         with self._state_lock:
848             if self.use_cache:
849                 try:
850                     self._state = json.load(self._cache_file)
851                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
852                         # Cache at least partially incomplete, set up new cache
853                         self._state = copy.deepcopy(self.EMPTY_STATE)
854                 except ValueError:
855                     # Cache file empty, set up new cache
856                     self._state = copy.deepcopy(self.EMPTY_STATE)
857             else:
858                 self.logger.info("No cache usage requested for this run.")
859                 # No cache file, set empty state
860                 self._state = copy.deepcopy(self.EMPTY_STATE)
861             if not self._cached_manifest_valid():
862                 raise ResumeCacheInvalidError()
863             # Load the previous manifest so we can check if files were modified remotely.
864             self._local_collection = arvados.collection.Collection(
865                 self._state['manifest'],
866                 replication_desired=self.replication_desired,
867                 put_threads=self.put_threads,
868                 api_client=self._api_client)
869
870     def _cached_manifest_valid(self):
871         """
872         Validate the oldest non-expired block signature to check if cached manifest
873         is usable: checking if the cached manifest was not created with a different
874         arvados account.
875         """
876         if self._state.get('manifest', None) is None:
877             # No cached manifest yet, all good.
878             return True
879         now = datetime.datetime.utcnow()
880         oldest_exp = None
881         oldest_loc = None
882         block_found = False
883         for m in keep_locator_pattern.finditer(self._state['manifest']):
884             loc = m.group(0)
885             try:
886                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
887             except IndexError:
888                 # Locator without signature
889                 continue
890             block_found = True
891             if exp > now and (oldest_exp is None or exp < oldest_exp):
892                 oldest_exp = exp
893                 oldest_loc = loc
894         if not block_found:
895             # No block signatures found => no invalid block signatures.
896             return True
897         if oldest_loc is None:
898             # Locator signatures found, but all have expired.
899             # Reset the cache and move on.
900             self.logger.info('Cache expired, starting from scratch.')
901             self._state['manifest'] = ''
902             return True
903         kc = arvados.KeepClient(api_client=self._api_client,
904                                 num_retries=self.num_retries)
905         try:
906             kc.head(oldest_loc)
907         except arvados.errors.KeepRequestError:
908             # Something is wrong, cached manifest is not valid.
909             return False
910         return True
911
912     def collection_file_paths(self, col, path_prefix='.'):
913         """Return a list of file paths by recursively go through the entire collection `col`"""
914         file_paths = []
915         for name, item in listitems(col):
916             if isinstance(item, arvados.arvfile.ArvadosFile):
917                 file_paths.append(os.path.join(path_prefix, name))
918             elif isinstance(item, arvados.collection.Subcollection):
919                 new_prefix = os.path.join(path_prefix, name)
920                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
921         return file_paths
922
923     def _lock_file(self, fileobj):
924         try:
925             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
926         except IOError:
927             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
928
929     def _save_state(self):
930         """
931         Atomically save current state into cache.
932         """
933         with self._state_lock:
934             # We're not using copy.deepcopy() here because it's a lot slower
935             # than json.dumps(), and we're already needing JSON format to be
936             # saved on disk.
937             state = json.dumps(self._state)
938         try:
939             new_cache = tempfile.NamedTemporaryFile(
940                 mode='w+',
941                 dir=os.path.dirname(self._cache_filename), delete=False)
942             self._lock_file(new_cache)
943             new_cache.write(state)
944             new_cache.flush()
945             os.fsync(new_cache)
946             os.rename(new_cache.name, self._cache_filename)
947         except (IOError, OSError, ResumeCacheConflict) as error:
948             self.logger.error("There was a problem while saving the cache file: {}".format(error))
949             try:
950                 os.unlink(new_cache_name)
951             except NameError:  # mkstemp failed.
952                 pass
953         else:
954             self._cache_file.close()
955             self._cache_file = new_cache
956
957     def collection_name(self):
958         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
959
960     def manifest_locator(self):
961         return self._my_collection().manifest_locator()
962
963     def portable_data_hash(self):
964         pdh = self._my_collection().portable_data_hash()
965         m = self._my_collection().stripped_manifest().encode()
966         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
967         if pdh != local_pdh:
968             self.logger.warning("\n".join([
969                 "arv-put: API server provided PDH differs from local manifest.",
970                 "         This should not happen; showing API server version."]))
971         return pdh
972
973     def manifest_text(self, stream_name=".", strip=False, normalize=False):
974         return self._my_collection().manifest_text(stream_name, strip, normalize)
975
976     def _datablocks_on_item(self, item):
977         """
978         Return a list of datablock locators, recursively navigating
979         through subcollections
980         """
981         if isinstance(item, arvados.arvfile.ArvadosFile):
982             if item.size() == 0:
983                 # Empty file locator
984                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
985             else:
986                 locators = []
987                 for segment in item.segments():
988                     loc = segment.locator
989                     locators.append(loc)
990                 return locators
991         elif isinstance(item, arvados.collection.Collection):
992             l = [self._datablocks_on_item(x) for x in listvalues(item)]
993             # Fast list flattener method taken from:
994             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
995             return [loc for sublist in l for loc in sublist]
996         else:
997             return None
998
999     def data_locators(self):
1000         with self._collection_lock:
1001             # Make sure all datablocks are flushed before getting the locators
1002             self._my_collection().manifest_text()
1003             datablocks = self._datablocks_on_item(self._my_collection())
1004         return datablocks
1005
1006 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1007                                                             os.getpid())
1008
1009 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1010 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1011 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1012 # so instead we're using it on every path component.
1013 def pathname_match(pathname, pattern):
1014     name = pathname.split(os.sep)
1015     # Fix patterns like 'some/subdir/' or 'some//subdir'
1016     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1017     if len(name) != len(pat):
1018         return False
1019     for i in range(len(name)):
1020         if not fnmatch.fnmatch(name[i], pat[i]):
1021             return False
1022     return True
1023
1024 def machine_progress(bytes_written, bytes_expected):
1025     return _machine_format.format(
1026         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1027
1028 def human_progress(bytes_written, bytes_expected):
1029     if bytes_expected:
1030         return "\r{}M / {}M {:.1%} ".format(
1031             bytes_written >> 20, bytes_expected >> 20,
1032             float(bytes_written) / bytes_expected)
1033     else:
1034         return "\r{} ".format(bytes_written)
1035
1036 def progress_writer(progress_func, outfile=sys.stderr):
1037     def write_progress(bytes_written, bytes_expected):
1038         outfile.write(progress_func(bytes_written, bytes_expected))
1039     return write_progress
1040
1041 def desired_project_uuid(api_client, project_uuid, num_retries):
1042     if not project_uuid:
1043         query = api_client.users().current()
1044     elif arvados.util.user_uuid_pattern.match(project_uuid):
1045         query = api_client.users().get(uuid=project_uuid)
1046     elif arvados.util.group_uuid_pattern.match(project_uuid):
1047         query = api_client.groups().get(uuid=project_uuid)
1048     else:
1049         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1050     return query.execute(num_retries=num_retries)['uuid']
1051
1052 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1053          install_sig_handlers=True):
1054     global api_client
1055
1056     args = parse_arguments(arguments)
1057     logger = logging.getLogger('arvados.arv_put')
1058     if args.silent:
1059         logger.setLevel(logging.WARNING)
1060     else:
1061         logger.setLevel(logging.INFO)
1062     status = 0
1063
1064     request_id = arvados.util.new_request_id()
1065
1066     formatter = ArvPutLogFormatter(request_id)
1067     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1068
1069     if api_client is None:
1070         api_client = arvados.api('v1', request_id=request_id)
1071
1072     if install_sig_handlers:
1073         arv_cmd.install_signal_handlers()
1074
1075     # Determine the name to use
1076     if args.name:
1077         if args.stream or args.raw:
1078             logger.error("Cannot use --name with --stream or --raw")
1079             sys.exit(1)
1080         elif args.update_collection:
1081             logger.error("Cannot use --name with --update-collection")
1082             sys.exit(1)
1083         collection_name = args.name
1084     else:
1085         collection_name = "Saved at {} by {}@{}".format(
1086             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1087             pwd.getpwuid(os.getuid()).pw_name,
1088             socket.gethostname())
1089
1090     if args.project_uuid and (args.stream or args.raw):
1091         logger.error("Cannot use --project-uuid with --stream or --raw")
1092         sys.exit(1)
1093
1094     # Determine the parent project
1095     try:
1096         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1097                                             args.retries)
1098     except (apiclient_errors.Error, ValueError) as error:
1099         logger.error(error)
1100         sys.exit(1)
1101
1102     if args.progress:
1103         reporter = progress_writer(human_progress)
1104     elif args.batch_progress:
1105         reporter = progress_writer(machine_progress)
1106     else:
1107         reporter = None
1108
1109     #  Split storage-classes argument
1110     storage_classes = None
1111     if args.storage_classes:
1112         storage_classes = args.storage_classes.strip().split(',')
1113         if len(storage_classes) > 1:
1114             logger.error("Multiple storage classes are not supported currently.")
1115             sys.exit(1)
1116
1117
1118     # Setup exclude regex from all the --exclude arguments provided
1119     name_patterns = []
1120     exclude_paths = []
1121     exclude_names = None
1122     if len(args.exclude) > 0:
1123         # We're supporting 2 kinds of exclusion patterns:
1124         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1125         #                            the name, wherever the file is on the tree)
1126         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1127         #                            entire path, and should be relative to
1128         #                            any input dir argument)
1129         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1130         #                            placed directly underneath the input dir)
1131         for p in args.exclude:
1132             # Only relative paths patterns allowed
1133             if p.startswith(os.sep):
1134                 logger.error("Cannot use absolute paths with --exclude")
1135                 sys.exit(1)
1136             if os.path.dirname(p):
1137                 # We don't support of path patterns with '..'
1138                 p_parts = p.split(os.sep)
1139                 if '..' in p_parts:
1140                     logger.error(
1141                         "Cannot use path patterns that include or '..'")
1142                     sys.exit(1)
1143                 # Path search pattern
1144                 exclude_paths.append(p)
1145             else:
1146                 # Name-only search pattern
1147                 name_patterns.append(p)
1148         # For name only matching, we can combine all patterns into a single
1149         # regexp, for better performance.
1150         exclude_names = re.compile('|'.join(
1151             [fnmatch.translate(p) for p in name_patterns]
1152         )) if len(name_patterns) > 0 else None
1153         # Show the user the patterns to be used, just in case they weren't
1154         # specified inside quotes and got changed by the shell expansion.
1155         logger.info("Exclude patterns: {}".format(args.exclude))
1156
1157     # If this is used by a human, and there's at least one directory to be
1158     # uploaded, the expected bytes calculation can take a moment.
1159     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1160         logger.info("Calculating upload size, this could take some time...")
1161     try:
1162         writer = ArvPutUploadJob(paths = args.paths,
1163                                  resume = args.resume,
1164                                  use_cache = args.use_cache,
1165                                  filename = args.filename,
1166                                  reporter = reporter,
1167                                  api_client = api_client,
1168                                  num_retries = args.retries,
1169                                  replication_desired = args.replication,
1170                                  put_threads = args.threads,
1171                                  name = collection_name,
1172                                  owner_uuid = project_uuid,
1173                                  ensure_unique_name = True,
1174                                  update_collection = args.update_collection,
1175                                  storage_classes=storage_classes,
1176                                  logger=logger,
1177                                  dry_run=args.dry_run,
1178                                  follow_links=args.follow_links,
1179                                  exclude_paths=exclude_paths,
1180                                  exclude_names=exclude_names)
1181     except ResumeCacheConflict:
1182         logger.error("\n".join([
1183             "arv-put: Another process is already uploading this data.",
1184             "         Use --no-cache if this is really what you want."]))
1185         sys.exit(1)
1186     except ResumeCacheInvalidError:
1187         logger.error("\n".join([
1188             "arv-put: Resume cache contains invalid signature: it may have expired",
1189             "         or been created with another Arvados user's credentials.",
1190             "         Switch user or use one of the following options to restart upload:",
1191             "         --no-resume to start a new resume cache.",
1192             "         --no-cache to disable resume cache."]))
1193         sys.exit(1)
1194     except CollectionUpdateError as error:
1195         logger.error("\n".join([
1196             "arv-put: %s" % str(error)]))
1197         sys.exit(1)
1198     except ArvPutUploadIsPending:
1199         # Dry run check successful, return proper exit code.
1200         sys.exit(2)
1201     except ArvPutUploadNotPending:
1202         # No files pending for upload
1203         sys.exit(0)
1204     except PathDoesNotExistError as error:
1205         logger.error("\n".join([
1206             "arv-put: %s" % str(error)]))
1207         sys.exit(1)
1208
1209     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1210         logger.warning("\n".join([
1211             "arv-put: Resuming previous upload from last checkpoint.",
1212             "         Use the --no-resume option to start over."]))
1213
1214     if not args.dry_run:
1215         writer.report_progress()
1216     output = None
1217     try:
1218         writer.start(save_collection=not(args.stream or args.raw))
1219     except arvados.errors.ApiError as error:
1220         logger.error("\n".join([
1221             "arv-put: %s" % str(error)]))
1222         sys.exit(1)
1223
1224     if args.progress:  # Print newline to split stderr from stdout for humans.
1225         logger.info("\n")
1226
1227     if args.stream:
1228         if args.normalize:
1229             output = writer.manifest_text(normalize=True)
1230         else:
1231             output = writer.manifest_text()
1232     elif args.raw:
1233         output = ','.join(writer.data_locators())
1234     else:
1235         try:
1236             if args.update_collection:
1237                 logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
1238             else:
1239                 logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
1240             if args.portable_data_hash:
1241                 output = writer.portable_data_hash()
1242             else:
1243                 output = writer.manifest_locator()
1244         except apiclient_errors.Error as error:
1245             logger.error(
1246                 "arv-put: Error creating Collection on project: {}.".format(
1247                     error))
1248             status = 1
1249
1250     # Print the locator (uuid) of the new collection.
1251     if output is None:
1252         status = status or 1
1253     elif not args.silent:
1254         stdout.write(output)
1255         if not output.endswith('\n'):
1256             stdout.write('\n')
1257
1258     if install_sig_handlers:
1259         arv_cmd.restore_signal_handlers()
1260
1261     if status != 0:
1262         sys.exit(status)
1263
1264     # Success!
1265     return output
1266
1267
1268 if __name__ == '__main__':
1269     main()