14930: Moves the trash_at compute logic into its own method.
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import ciso8601
14 import copy
15 import datetime
16 import errno
17 import fcntl
18 import fnmatch
19 import hashlib
20 import json
21 import logging
22 import os
23 import pwd
24 import re
25 import signal
26 import socket
27 import sys
28 import tempfile
29 import threading
30 import time
31 import traceback
32
33 from apiclient import errors as apiclient_errors
34 from arvados._version import __version__
35 from arvados.util import keep_locator_pattern
36
37 import arvados.commands._util as arv_cmd
38
39 api_client = None
40
41 upload_opts = argparse.ArgumentParser(add_help=False)
42
43 upload_opts.add_argument('--version', action='version',
44                          version="%s %s" % (sys.argv[0], __version__),
45                          help='Print version and exit.')
46 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47                          help="""
48 Local file or directory. If path is a directory reference with a trailing
49 slash, then just upload the directory's contents; otherwise upload the
50 directory itself. Default: read from standard input.
51 """)
52
53 _group = upload_opts.add_mutually_exclusive_group()
54
55 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
56                     default=-1, help=argparse.SUPPRESS)
57
58 _group.add_argument('--normalize', action='store_true',
59                     help="""
60 Normalize the manifest by re-ordering files and streams after writing
61 data.
62 """)
63
64 _group.add_argument('--dry-run', action='store_true', default=False,
65                     help="""
66 Don't actually upload files, but only check if any file should be
67 uploaded. Exit with code=2 when files are pending for upload.
68 """)
69
70 _group = upload_opts.add_mutually_exclusive_group()
71
72 _group.add_argument('--as-stream', action='store_true', dest='stream',
73                     help="""
74 Synonym for --stream.
75 """)
76
77 _group.add_argument('--stream', action='store_true',
78                     help="""
79 Store the file content and display the resulting manifest on
80 stdout. Do not write the manifest to Keep or save a Collection object
81 in Arvados.
82 """)
83
84 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85                     help="""
86 Synonym for --manifest.
87 """)
88
89 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90                     help="""
91 Synonym for --manifest.
92 """)
93
94 _group.add_argument('--manifest', action='store_true',
95                     help="""
96 Store the file data and resulting manifest in Keep, save a Collection
97 object in Arvados, and display the manifest locator (Collection uuid)
98 on stdout. This is the default behavior.
99 """)
100
101 _group.add_argument('--as-raw', action='store_true', dest='raw',
102                     help="""
103 Synonym for --raw.
104 """)
105
106 _group.add_argument('--raw', action='store_true',
107                     help="""
108 Store the file content and display the data block locators on stdout,
109 separated by commas, with a trailing newline. Do not store a
110 manifest.
111 """)
112
113 upload_opts.add_argument('--update-collection', type=str, default=None,
114                          dest='update_collection', metavar="UUID", help="""
115 Update an existing collection identified by the given Arvados collection
116 UUID. All new local files will be uploaded.
117 """)
118
119 upload_opts.add_argument('--use-filename', type=str, default=None,
120                          dest='filename', help="""
121 Synonym for --filename.
122 """)
123
124 upload_opts.add_argument('--filename', type=str, default=None,
125                          help="""
126 Use the given filename in the manifest, instead of the name of the
127 local file. This is useful when "-" or "/dev/stdin" is given as an
128 input file. It can be used only if there is exactly one path given and
129 it is not a directory. Implies --manifest.
130 """)
131
132 upload_opts.add_argument('--portable-data-hash', action='store_true',
133                          help="""
134 Print the portable data hash instead of the Arvados UUID for the collection
135 created by the upload.
136 """)
137
138 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139                          help="""
140 Set the replication level for the new collection: how many different
141 physical storage devices (e.g., disks) should have a copy of each data
142 block. Default is to use the server-provided default (if any) or 2.
143 """)
144
145 upload_opts.add_argument('--storage-classes', help="""
146 Specify comma separated list of storage classes to be used when saving data to Keep.
147 """)
148
149 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
150                          help="""
151 Set the number of upload threads to be used. Take into account that
152 using lots of threads will increase the RAM requirements. Default is
153 to use 2 threads.
154 On high latency installations, using a greater number will improve
155 overall throughput.
156 """)
157
158 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
159                       action='append', help="""
160 Exclude files and directories whose names match the given glob pattern. When
161 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
162 directory, relative to the provided input dirs will be excluded.
163 When using a filename pattern like '*.txt', any text file will be excluded
164 no matter where it is placed.
165 For the special case of needing to exclude only files or dirs directly below
166 the given input directory, you can use a pattern like './exclude_this.gif'.
167 You can specify multiple patterns by using this argument more than once.
168 """)
169
170 _group = upload_opts.add_mutually_exclusive_group()
171 _group.add_argument('--follow-links', action='store_true', default=True,
172                     dest='follow_links', help="""
173 Follow file and directory symlinks (default).
174 """)
175 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
176                     help="""
177 Do not follow file and directory symlinks.
178 """)
179
180
181 run_opts = argparse.ArgumentParser(add_help=False)
182
183 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
184 Store the collection in the specified project, instead of your Home
185 project.
186 """)
187
188 run_opts.add_argument('--name', help="""
189 Save the collection with the specified name.
190 """)
191
192 _group = run_opts.add_mutually_exclusive_group()
193 _group.add_argument('--progress', action='store_true',
194                     help="""
195 Display human-readable progress on stderr (bytes and, if possible,
196 percentage of total data size). This is the default behavior when
197 stderr is a tty.
198 """)
199
200 _group.add_argument('--no-progress', action='store_true',
201                     help="""
202 Do not display human-readable progress on stderr, even if stderr is a
203 tty.
204 """)
205
206 _group.add_argument('--batch-progress', action='store_true',
207                     help="""
208 Display machine-readable progress on stderr (bytes and, if known,
209 total data size).
210 """)
211
212 run_opts.add_argument('--silent', action='store_true',
213                       help="""
214 Do not print any debug messages to console. (Any error messages will
215 still be displayed.)
216 """)
217
218 _group = run_opts.add_mutually_exclusive_group()
219 _group.add_argument('--resume', action='store_true', default=True,
220                     help="""
221 Continue interrupted uploads from cached state (default).
222 """)
223 _group.add_argument('--no-resume', action='store_false', dest='resume',
224                     help="""
225 Do not continue interrupted uploads from cached state.
226 """)
227
228 _group = run_opts.add_mutually_exclusive_group()
229 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
230                     help="""
231 Save upload state in a cache file for resuming (default).
232 """)
233 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
234                     help="""
235 Do not save upload state in a cache file for resuming.
236 """)
237
238 _group = upload_opts.add_mutually_exclusive_group()
239 _group.add_argument('--trash-at', metavar='YYYY-MM-DD HH:MM', default=None,
240                     help="""
241 Set the trash date of the resulting collection to an absolute date in the future.
242 The accepted format is defined by the ISO 8601 standard.
243 """)
244 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
245                     help="""
246 Set the trash date of the resulting collection to an amount of days from the
247 date/time that the upload process finishes.
248 """)
249
250 arg_parser = argparse.ArgumentParser(
251     description='Copy data from the local filesystem to Keep.',
252     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
253
254 def parse_arguments(arguments):
255     args = arg_parser.parse_args(arguments)
256
257     if len(args.paths) == 0:
258         args.paths = ['-']
259
260     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
261
262     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
263         if args.filename:
264             arg_parser.error("""
265     --filename argument cannot be used when storing a directory or
266     multiple files.
267     """)
268
269     # Turn on --progress by default if stderr is a tty.
270     if (not (args.batch_progress or args.no_progress or args.silent)
271         and os.isatty(sys.stderr.fileno())):
272         args.progress = True
273
274     # Turn off --resume (default) if --no-cache is used.
275     if not args.use_cache:
276         args.resume = False
277
278     if args.paths == ['-']:
279         if args.update_collection:
280             arg_parser.error("""
281     --update-collection cannot be used when reading from stdin.
282     """)
283         args.resume = False
284         args.use_cache = False
285         if not args.filename:
286             args.filename = 'stdin'
287
288     # Remove possible duplicated patterns
289     if len(args.exclude) > 0:
290         args.exclude = list(set(args.exclude))
291
292     return args
293
294
295 class PathDoesNotExistError(Exception):
296     pass
297
298
299 class CollectionUpdateError(Exception):
300     pass
301
302
303 class ResumeCacheConflict(Exception):
304     pass
305
306
307 class ResumeCacheInvalidError(Exception):
308     pass
309
310 class ArvPutArgumentConflict(Exception):
311     pass
312
313
314 class ArvPutUploadIsPending(Exception):
315     pass
316
317
318 class ArvPutUploadNotPending(Exception):
319     pass
320
321
322 class FileUploadList(list):
323     def __init__(self, dry_run=False):
324         list.__init__(self)
325         self.dry_run = dry_run
326
327     def append(self, other):
328         if self.dry_run:
329             raise ArvPutUploadIsPending()
330         super(FileUploadList, self).append(other)
331
332
333 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
334 class ArvPutLogFormatter(logging.Formatter):
335     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
336     err_fmtr = None
337     request_id_informed = False
338
339     def __init__(self, request_id):
340         self.err_fmtr = logging.Formatter(
341             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
342             arvados.log_date_format)
343
344     def format(self, record):
345         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
346             self.request_id_informed = True
347             return self.err_fmtr.format(record)
348         return self.std_fmtr.format(record)
349
350
351 class ResumeCache(object):
352     CACHE_DIR = '.cache/arvados/arv-put'
353
354     def __init__(self, file_spec):
355         self.cache_file = open(file_spec, 'a+')
356         self._lock_file(self.cache_file)
357         self.filename = self.cache_file.name
358
359     @classmethod
360     def make_path(cls, args):
361         md5 = hashlib.md5()
362         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
363         realpaths = sorted(os.path.realpath(path) for path in args.paths)
364         md5.update(b'\0'.join([p.encode() for p in realpaths]))
365         if any(os.path.isdir(path) for path in realpaths):
366             md5.update(b'-1')
367         elif args.filename:
368             md5.update(args.filename.encode())
369         return os.path.join(
370             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
371             md5.hexdigest())
372
373     def _lock_file(self, fileobj):
374         try:
375             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
376         except IOError:
377             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
378
379     def load(self):
380         self.cache_file.seek(0)
381         return json.load(self.cache_file)
382
383     def check_cache(self, api_client=None, num_retries=0):
384         try:
385             state = self.load()
386             locator = None
387             try:
388                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
389                     locator = state["_finished_streams"][0][1][0]
390                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
391                     locator = state["_current_stream_locators"][0]
392                 if locator is not None:
393                     kc = arvados.keep.KeepClient(api_client=api_client)
394                     kc.head(locator, num_retries=num_retries)
395             except Exception as e:
396                 self.restart()
397         except (ValueError):
398             pass
399
400     def save(self, data):
401         try:
402             new_cache_fd, new_cache_name = tempfile.mkstemp(
403                 dir=os.path.dirname(self.filename))
404             self._lock_file(new_cache_fd)
405             new_cache = os.fdopen(new_cache_fd, 'r+')
406             json.dump(data, new_cache)
407             os.rename(new_cache_name, self.filename)
408         except (IOError, OSError, ResumeCacheConflict):
409             try:
410                 os.unlink(new_cache_name)
411             except NameError:  # mkstemp failed.
412                 pass
413         else:
414             self.cache_file.close()
415             self.cache_file = new_cache
416
417     def close(self):
418         self.cache_file.close()
419
420     def destroy(self):
421         try:
422             os.unlink(self.filename)
423         except OSError as error:
424             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
425                 raise
426         self.close()
427
428     def restart(self):
429         self.destroy()
430         self.__init__(self.filename)
431
432
433 class ArvPutUploadJob(object):
434     CACHE_DIR = '.cache/arvados/arv-put'
435     EMPTY_STATE = {
436         'manifest' : None, # Last saved manifest checkpoint
437         'files' : {} # Previous run file list: {path : {size, mtime}}
438     }
439
440     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
441                  name=None, owner_uuid=None, api_client=None,
442                  ensure_unique_name=False, num_retries=None,
443                  put_threads=None, replication_desired=None, filename=None,
444                  update_time=60.0, update_collection=None, storage_classes=None,
445                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
446                  follow_links=True, exclude_paths=[], exclude_names=None,
447                  trash_at=None):
448         self.paths = paths
449         self.resume = resume
450         self.use_cache = use_cache
451         self.update = False
452         self.reporter = reporter
453         # This will set to 0 before start counting, if no special files are going
454         # to be read.
455         self.bytes_expected = None
456         self.bytes_written = 0
457         self.bytes_skipped = 0
458         self.name = name
459         self.owner_uuid = owner_uuid
460         self.ensure_unique_name = ensure_unique_name
461         self.num_retries = num_retries
462         self.replication_desired = replication_desired
463         self.put_threads = put_threads
464         self.filename = filename
465         self.storage_classes = storage_classes
466         self._api_client = api_client
467         self._state_lock = threading.Lock()
468         self._state = None # Previous run state (file list & manifest)
469         self._current_files = [] # Current run file list
470         self._cache_file = None
471         self._collection_lock = threading.Lock()
472         self._remote_collection = None # Collection being updated (if asked)
473         self._local_collection = None # Collection from previous run manifest
474         self._file_paths = set() # Files to be updated in remote collection
475         self._stop_checkpointer = threading.Event()
476         self._checkpointer = threading.Thread(target=self._update_task)
477         self._checkpointer.daemon = True
478         self._update_task_time = update_time  # How many seconds wait between update runs
479         self._files_to_upload = FileUploadList(dry_run=dry_run)
480         self._upload_started = False
481         self.logger = logger
482         self.dry_run = dry_run
483         self._checkpoint_before_quit = True
484         self.follow_links = follow_links
485         self.exclude_paths = exclude_paths
486         self.exclude_names = exclude_names
487         self._trash_at = trash_at
488
489         if self._trash_at is not None and type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
490             raise TypeError('trash_at should be None, datetime or timedelta')
491
492         if not self.use_cache and self.resume:
493             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
494
495         # Check for obvious dry-run responses
496         if self.dry_run and (not self.use_cache or not self.resume):
497             raise ArvPutUploadIsPending()
498
499         # Load cached data if any and if needed
500         self._setup_state(update_collection)
501
502         # Build the upload file list, excluding requested files and counting the
503         # bytes expected to be uploaded.
504         self._build_upload_list()
505
506     def _build_upload_list(self):
507         """
508         Scan the requested paths to count file sizes, excluding requested files
509         and dirs and building the upload file list.
510         """
511         # If there aren't special files to be read, reset total bytes count to zero
512         # to start counting.
513         if not any([p for p in self.paths
514                     if not (os.path.isfile(p) or os.path.isdir(p))]):
515             self.bytes_expected = 0
516
517         for path in self.paths:
518             # Test for stdin first, in case some file named '-' exist
519             if path == '-':
520                 if self.dry_run:
521                     raise ArvPutUploadIsPending()
522                 self._write_stdin(self.filename or 'stdin')
523             elif not os.path.exists(path):
524                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
525             elif os.path.isdir(path):
526                 # Use absolute paths on cache index so CWD doesn't interfere
527                 # with the caching logic.
528                 orig_path = path
529                 path = os.path.abspath(path)
530                 if orig_path[-1:] == os.sep:
531                     # When passing a directory reference with a trailing slash,
532                     # its contents should be uploaded directly to the
533                     # collection's root.
534                     prefixdir = path
535                 else:
536                     # When passing a directory reference with no trailing slash,
537                     # upload the directory to the collection's root.
538                     prefixdir = os.path.dirname(path)
539                 prefixdir += os.sep
540                 for root, dirs, files in os.walk(path,
541                                                  followlinks=self.follow_links):
542                     root_relpath = os.path.relpath(root, path)
543                     if root_relpath == '.':
544                         root_relpath = ''
545                     # Exclude files/dirs by full path matching pattern
546                     if self.exclude_paths:
547                         dirs[:] = [d for d in dirs
548                                    if not any(pathname_match(
549                                            os.path.join(root_relpath, d), pat)
550                                               for pat in self.exclude_paths)]
551                         files = [f for f in files
552                                  if not any(pathname_match(
553                                          os.path.join(root_relpath, f), pat)
554                                             for pat in self.exclude_paths)]
555                     # Exclude files/dirs by name matching pattern
556                     if self.exclude_names is not None:
557                         dirs[:] = [d for d in dirs
558                                    if not self.exclude_names.match(d)]
559                         files = [f for f in files
560                                  if not self.exclude_names.match(f)]
561                     # Make os.walk()'s dir traversing order deterministic
562                     dirs.sort()
563                     files.sort()
564                     for f in files:
565                         filepath = os.path.join(root, f)
566                         # Add its size to the total bytes count (if applicable)
567                         if self.follow_links or (not os.path.islink(filepath)):
568                             if self.bytes_expected is not None:
569                                 self.bytes_expected += os.path.getsize(filepath)
570                         self._check_file(filepath,
571                                          os.path.join(root[len(prefixdir):], f))
572             else:
573                 filepath = os.path.abspath(path)
574                 # Add its size to the total bytes count (if applicable)
575                 if self.follow_links or (not os.path.islink(filepath)):
576                     if self.bytes_expected is not None:
577                         self.bytes_expected += os.path.getsize(filepath)
578                 self._check_file(filepath,
579                                  self.filename or os.path.basename(path))
580         # If dry-mode is on, and got up to this point, then we should notify that
581         # there aren't any file to upload.
582         if self.dry_run:
583             raise ArvPutUploadNotPending()
584         # Remove local_collection's files that don't exist locally anymore, so the
585         # bytes_written count is correct.
586         for f in self.collection_file_paths(self._local_collection,
587                                             path_prefix=""):
588             if f != 'stdin' and f != self.filename and not f in self._file_paths:
589                 self._local_collection.remove(f)
590
591     def start(self, save_collection):
592         """
593         Start supporting thread & file uploading
594         """
595         self._checkpointer.start()
596         try:
597             # Update bytes_written from current local collection and
598             # report initial progress.
599             self._update()
600             # Actual file upload
601             self._upload_started = True # Used by the update thread to start checkpointing
602             self._upload_files()
603         except (SystemExit, Exception) as e:
604             self._checkpoint_before_quit = False
605             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
606             # Note: We're expecting SystemExit instead of
607             # KeyboardInterrupt because we have a custom signal
608             # handler in place that raises SystemExit with the catched
609             # signal's code.
610             if isinstance(e, PathDoesNotExistError):
611                 # We aren't interested in the traceback for this case
612                 pass
613             elif not isinstance(e, SystemExit) or e.code != -2:
614                 self.logger.warning("Abnormal termination:\n{}".format(
615                     traceback.format_exc()))
616             raise
617         finally:
618             if not self.dry_run:
619                 # Stop the thread before doing anything else
620                 self._stop_checkpointer.set()
621                 self._checkpointer.join()
622                 if self._checkpoint_before_quit:
623                     # Commit all pending blocks & one last _update()
624                     self._local_collection.manifest_text()
625                     self._update(final=True)
626                     if save_collection:
627                         self.save_collection()
628             if self.use_cache:
629                 self._cache_file.close()
630
631     def _collection_trash_at(self):
632         """
633         Returns the trash date that the collection should use at save time.
634         Takes into account absolute/relative trash_at values requested
635         by the user.
636         """
637         if type(self._trash_at) == datetime.timedelta:
638             # Get an absolute datetime for trash_at
639             return datetime.datetime.utcnow() + self._trash_at
640         return self._trash_at
641
642     def save_collection(self):
643         if self.update:
644             # Check if files should be updated on the remote collection.
645             for fp in self._file_paths:
646                 remote_file = self._remote_collection.find(fp)
647                 if not remote_file:
648                     # File don't exist on remote collection, copy it.
649                     self._remote_collection.copy(fp, fp, self._local_collection)
650                 elif remote_file != self._local_collection.find(fp):
651                     # A different file exist on remote collection, overwrite it.
652                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
653                 else:
654                     # The file already exist on remote collection, skip it.
655                     pass
656             self._remote_collection.save(storage_classes=self.storage_classes,
657                                          num_retries=self.num_retries,
658                                          trash_at=self._collection_trash_at())
659         else:
660             if self.storage_classes is None:
661                 self.storage_classes = ['default']
662             self._local_collection.save_new(
663                 name=self.name, owner_uuid=self.owner_uuid,
664                 storage_classes=self.storage_classes,
665                 ensure_unique_name=self.ensure_unique_name,
666                 num_retries=self.num_retries,
667                 trash_at=self._collection_trash_at())
668
669     def destroy_cache(self):
670         if self.use_cache:
671             try:
672                 os.unlink(self._cache_filename)
673             except OSError as error:
674                 # That's what we wanted anyway.
675                 if error.errno != errno.ENOENT:
676                     raise
677             self._cache_file.close()
678
679     def _collection_size(self, collection):
680         """
681         Recursively get the total size of the collection
682         """
683         size = 0
684         for item in listvalues(collection):
685             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
686                 size += self._collection_size(item)
687             else:
688                 size += item.size()
689         return size
690
691     def _update_task(self):
692         """
693         Periodically called support task. File uploading is
694         asynchronous so we poll status from the collection.
695         """
696         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
697             self._update()
698
699     def _update(self, final=False):
700         """
701         Update cached manifest text and report progress.
702         """
703         if self._upload_started:
704             with self._collection_lock:
705                 self.bytes_written = self._collection_size(self._local_collection)
706                 if self.use_cache:
707                     if final:
708                         manifest = self._local_collection.manifest_text()
709                     else:
710                         # Get the manifest text without comitting pending blocks
711                         manifest = self._local_collection.manifest_text(strip=False,
712                                                                         normalize=False,
713                                                                         only_committed=True)
714                     # Update cache
715                     with self._state_lock:
716                         self._state['manifest'] = manifest
717             if self.use_cache:
718                 try:
719                     self._save_state()
720                 except Exception as e:
721                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
722         else:
723             self.bytes_written = self.bytes_skipped
724         # Call the reporter, if any
725         self.report_progress()
726
727     def report_progress(self):
728         if self.reporter is not None:
729             self.reporter(self.bytes_written, self.bytes_expected)
730
731     def _write_stdin(self, filename):
732         output = self._local_collection.open(filename, 'wb')
733         self._write(sys.stdin, output)
734         output.close()
735
736     def _check_file(self, source, filename):
737         """
738         Check if this file needs to be uploaded
739         """
740         # Ignore symlinks when requested
741         if (not self.follow_links) and os.path.islink(source):
742             return
743         resume_offset = 0
744         should_upload = False
745         new_file_in_cache = False
746         # Record file path for updating the remote collection before exiting
747         self._file_paths.add(filename)
748
749         with self._state_lock:
750             # If no previous cached data on this file, store it for an eventual
751             # repeated run.
752             if source not in self._state['files']:
753                 self._state['files'][source] = {
754                     'mtime': os.path.getmtime(source),
755                     'size' : os.path.getsize(source)
756                 }
757                 new_file_in_cache = True
758             cached_file_data = self._state['files'][source]
759
760         # Check if file was already uploaded (at least partially)
761         file_in_local_collection = self._local_collection.find(filename)
762
763         # If not resuming, upload the full file.
764         if not self.resume:
765             should_upload = True
766         # New file detected from last run, upload it.
767         elif new_file_in_cache:
768             should_upload = True
769         # Local file didn't change from last run.
770         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
771             if not file_in_local_collection:
772                 # File not uploaded yet, upload it completely
773                 should_upload = True
774             elif file_in_local_collection.permission_expired():
775                 # Permission token expired, re-upload file. This will change whenever
776                 # we have a API for refreshing tokens.
777                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
778                 should_upload = True
779                 self._local_collection.remove(filename)
780             elif cached_file_data['size'] == file_in_local_collection.size():
781                 # File already there, skip it.
782                 self.bytes_skipped += cached_file_data['size']
783             elif cached_file_data['size'] > file_in_local_collection.size():
784                 # File partially uploaded, resume!
785                 resume_offset = file_in_local_collection.size()
786                 self.bytes_skipped += resume_offset
787                 should_upload = True
788             else:
789                 # Inconsistent cache, re-upload the file
790                 should_upload = True
791                 self._local_collection.remove(filename)
792                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
793         # Local file differs from cached data, re-upload it.
794         else:
795             if file_in_local_collection:
796                 self._local_collection.remove(filename)
797             should_upload = True
798
799         if should_upload:
800             try:
801                 self._files_to_upload.append((source, resume_offset, filename))
802             except ArvPutUploadIsPending:
803                 # This could happen when running on dry-mode, close cache file to
804                 # avoid locking issues.
805                 self._cache_file.close()
806                 raise
807
808     def _upload_files(self):
809         for source, resume_offset, filename in self._files_to_upload:
810             with open(source, 'rb') as source_fd:
811                 with self._state_lock:
812                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
813                     self._state['files'][source]['size'] = os.path.getsize(source)
814                 if resume_offset > 0:
815                     # Start upload where we left off
816                     output = self._local_collection.open(filename, 'ab')
817                     source_fd.seek(resume_offset)
818                 else:
819                     # Start from scratch
820                     output = self._local_collection.open(filename, 'wb')
821                 self._write(source_fd, output)
822                 output.close(flush=False)
823
824     def _write(self, source_fd, output):
825         while True:
826             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
827             if not data:
828                 break
829             output.write(data)
830
831     def _my_collection(self):
832         return self._remote_collection if self.update else self._local_collection
833
834     def _get_cache_filepath(self):
835         # Set up cache file name from input paths.
836         md5 = hashlib.md5()
837         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
838         realpaths = sorted(os.path.realpath(path) for path in self.paths)
839         md5.update(b'\0'.join([p.encode() for p in realpaths]))
840         if self.filename:
841             md5.update(self.filename.encode())
842         cache_filename = md5.hexdigest()
843         cache_filepath = os.path.join(
844             arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
845             cache_filename)
846         return cache_filepath
847
848     def _setup_state(self, update_collection):
849         """
850         Create a new cache file or load a previously existing one.
851         """
852         # Load an already existing collection for update
853         if update_collection and re.match(arvados.util.collection_uuid_pattern,
854                                           update_collection):
855             try:
856                 self._remote_collection = arvados.collection.Collection(
857                     update_collection, api_client=self._api_client)
858             except arvados.errors.ApiError as error:
859                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
860             else:
861                 self.update = True
862         elif update_collection:
863             # Collection locator provided, but unknown format
864             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
865
866         if self.use_cache:
867             cache_filepath = self._get_cache_filepath()
868             if self.resume and os.path.exists(cache_filepath):
869                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
870                 self._cache_file = open(cache_filepath, 'a+')
871             else:
872                 # --no-resume means start with a empty cache file.
873                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
874                 self._cache_file = open(cache_filepath, 'w+')
875             self._cache_filename = self._cache_file.name
876             self._lock_file(self._cache_file)
877             self._cache_file.seek(0)
878
879         with self._state_lock:
880             if self.use_cache:
881                 try:
882                     self._state = json.load(self._cache_file)
883                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
884                         # Cache at least partially incomplete, set up new cache
885                         self._state = copy.deepcopy(self.EMPTY_STATE)
886                 except ValueError:
887                     # Cache file empty, set up new cache
888                     self._state = copy.deepcopy(self.EMPTY_STATE)
889             else:
890                 self.logger.info("No cache usage requested for this run.")
891                 # No cache file, set empty state
892                 self._state = copy.deepcopy(self.EMPTY_STATE)
893             if not self._cached_manifest_valid():
894                 raise ResumeCacheInvalidError()
895             # Load the previous manifest so we can check if files were modified remotely.
896             self._local_collection = arvados.collection.Collection(
897                 self._state['manifest'],
898                 replication_desired=self.replication_desired,
899                 put_threads=self.put_threads,
900                 api_client=self._api_client)
901
902     def _cached_manifest_valid(self):
903         """
904         Validate the oldest non-expired block signature to check if cached manifest
905         is usable: checking if the cached manifest was not created with a different
906         arvados account.
907         """
908         if self._state.get('manifest', None) is None:
909             # No cached manifest yet, all good.
910             return True
911         now = datetime.datetime.utcnow()
912         oldest_exp = None
913         oldest_loc = None
914         block_found = False
915         for m in keep_locator_pattern.finditer(self._state['manifest']):
916             loc = m.group(0)
917             try:
918                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
919             except IndexError:
920                 # Locator without signature
921                 continue
922             block_found = True
923             if exp > now and (oldest_exp is None or exp < oldest_exp):
924                 oldest_exp = exp
925                 oldest_loc = loc
926         if not block_found:
927             # No block signatures found => no invalid block signatures.
928             return True
929         if oldest_loc is None:
930             # Locator signatures found, but all have expired.
931             # Reset the cache and move on.
932             self.logger.info('Cache expired, starting from scratch.')
933             self._state['manifest'] = ''
934             return True
935         kc = arvados.KeepClient(api_client=self._api_client,
936                                 num_retries=self.num_retries)
937         try:
938             kc.head(oldest_loc)
939         except arvados.errors.KeepRequestError:
940             # Something is wrong, cached manifest is not valid.
941             return False
942         return True
943
944     def collection_file_paths(self, col, path_prefix='.'):
945         """Return a list of file paths by recursively go through the entire collection `col`"""
946         file_paths = []
947         for name, item in listitems(col):
948             if isinstance(item, arvados.arvfile.ArvadosFile):
949                 file_paths.append(os.path.join(path_prefix, name))
950             elif isinstance(item, arvados.collection.Subcollection):
951                 new_prefix = os.path.join(path_prefix, name)
952                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
953         return file_paths
954
955     def _lock_file(self, fileobj):
956         try:
957             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
958         except IOError:
959             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
960
961     def _save_state(self):
962         """
963         Atomically save current state into cache.
964         """
965         with self._state_lock:
966             # We're not using copy.deepcopy() here because it's a lot slower
967             # than json.dumps(), and we're already needing JSON format to be
968             # saved on disk.
969             state = json.dumps(self._state)
970         try:
971             new_cache = tempfile.NamedTemporaryFile(
972                 mode='w+',
973                 dir=os.path.dirname(self._cache_filename), delete=False)
974             self._lock_file(new_cache)
975             new_cache.write(state)
976             new_cache.flush()
977             os.fsync(new_cache)
978             os.rename(new_cache.name, self._cache_filename)
979         except (IOError, OSError, ResumeCacheConflict) as error:
980             self.logger.error("There was a problem while saving the cache file: {}".format(error))
981             try:
982                 os.unlink(new_cache_name)
983             except NameError:  # mkstemp failed.
984                 pass
985         else:
986             self._cache_file.close()
987             self._cache_file = new_cache
988
989     def collection_name(self):
990         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
991
992     def manifest_locator(self):
993         return self._my_collection().manifest_locator()
994
995     def portable_data_hash(self):
996         pdh = self._my_collection().portable_data_hash()
997         m = self._my_collection().stripped_manifest().encode()
998         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
999         if pdh != local_pdh:
1000             self.logger.warning("\n".join([
1001                 "arv-put: API server provided PDH differs from local manifest.",
1002                 "         This should not happen; showing API server version."]))
1003         return pdh
1004
1005     def manifest_text(self, stream_name=".", strip=False, normalize=False):
1006         return self._my_collection().manifest_text(stream_name, strip, normalize)
1007
1008     def _datablocks_on_item(self, item):
1009         """
1010         Return a list of datablock locators, recursively navigating
1011         through subcollections
1012         """
1013         if isinstance(item, arvados.arvfile.ArvadosFile):
1014             if item.size() == 0:
1015                 # Empty file locator
1016                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1017             else:
1018                 locators = []
1019                 for segment in item.segments():
1020                     loc = segment.locator
1021                     locators.append(loc)
1022                 return locators
1023         elif isinstance(item, arvados.collection.Collection):
1024             l = [self._datablocks_on_item(x) for x in listvalues(item)]
1025             # Fast list flattener method taken from:
1026             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1027             return [loc for sublist in l for loc in sublist]
1028         else:
1029             return None
1030
1031     def data_locators(self):
1032         with self._collection_lock:
1033             # Make sure all datablocks are flushed before getting the locators
1034             self._my_collection().manifest_text()
1035             datablocks = self._datablocks_on_item(self._my_collection())
1036         return datablocks
1037
1038 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1039                                                             os.getpid())
1040
1041 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1042 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1043 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1044 # so instead we're using it on every path component.
1045 def pathname_match(pathname, pattern):
1046     name = pathname.split(os.sep)
1047     # Fix patterns like 'some/subdir/' or 'some//subdir'
1048     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1049     if len(name) != len(pat):
1050         return False
1051     for i in range(len(name)):
1052         if not fnmatch.fnmatch(name[i], pat[i]):
1053             return False
1054     return True
1055
1056 def machine_progress(bytes_written, bytes_expected):
1057     return _machine_format.format(
1058         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1059
1060 def human_progress(bytes_written, bytes_expected):
1061     if bytes_expected:
1062         return "\r{}M / {}M {:.1%} ".format(
1063             bytes_written >> 20, bytes_expected >> 20,
1064             float(bytes_written) / bytes_expected)
1065     else:
1066         return "\r{} ".format(bytes_written)
1067
1068 def progress_writer(progress_func, outfile=sys.stderr):
1069     def write_progress(bytes_written, bytes_expected):
1070         outfile.write(progress_func(bytes_written, bytes_expected))
1071     return write_progress
1072
1073 def desired_project_uuid(api_client, project_uuid, num_retries):
1074     if not project_uuid:
1075         query = api_client.users().current()
1076     elif arvados.util.user_uuid_pattern.match(project_uuid):
1077         query = api_client.users().get(uuid=project_uuid)
1078     elif arvados.util.group_uuid_pattern.match(project_uuid):
1079         query = api_client.groups().get(uuid=project_uuid)
1080     else:
1081         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1082     return query.execute(num_retries=num_retries)['uuid']
1083
1084 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1085          install_sig_handlers=True):
1086     global api_client
1087
1088     args = parse_arguments(arguments)
1089     logger = logging.getLogger('arvados.arv_put')
1090     if args.silent:
1091         logger.setLevel(logging.WARNING)
1092     else:
1093         logger.setLevel(logging.INFO)
1094     status = 0
1095
1096     request_id = arvados.util.new_request_id()
1097
1098     formatter = ArvPutLogFormatter(request_id)
1099     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1100
1101     if api_client is None:
1102         api_client = arvados.api('v1', request_id=request_id)
1103
1104     if install_sig_handlers:
1105         arv_cmd.install_signal_handlers()
1106
1107     # Trash arguments validation
1108     trash_at = None
1109     if args.trash_at is not None:
1110         try:
1111             trash_at = ciso8601.parse_datetime(args.trash_at)
1112         except:
1113             logger.error("--trash-at argument format invalid, should be YYYY-MM-DDTHH:MM.")
1114             sys.exit(1)
1115         else:
1116             if trash_at.tzinfo is not None:
1117                 # Timezone-aware datetime provided, convert to non-aware UTC
1118                 delta = trash_at.tzinfo.utcoffset(None)
1119                 trash_at = trash_at.replace(tzinfo=None) - delta
1120         if trash_at <= datetime.datetime.utcnow():
1121             logger.error("--trash-at argument should be set in the future")
1122             sys.exit(1)
1123     if args.trash_after is not None:
1124         if args.trash_after < 1:
1125             logger.error("--trash-after argument should be >= 1")
1126             sys.exit(1)
1127         trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1128
1129     # Determine the name to use
1130     if args.name:
1131         if args.stream or args.raw:
1132             logger.error("Cannot use --name with --stream or --raw")
1133             sys.exit(1)
1134         elif args.update_collection:
1135             logger.error("Cannot use --name with --update-collection")
1136             sys.exit(1)
1137         collection_name = args.name
1138     else:
1139         collection_name = "Saved at {} by {}@{}".format(
1140             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1141             pwd.getpwuid(os.getuid()).pw_name,
1142             socket.gethostname())
1143
1144     if args.project_uuid and (args.stream or args.raw):
1145         logger.error("Cannot use --project-uuid with --stream or --raw")
1146         sys.exit(1)
1147
1148     # Determine the parent project
1149     try:
1150         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1151                                             args.retries)
1152     except (apiclient_errors.Error, ValueError) as error:
1153         logger.error(error)
1154         sys.exit(1)
1155
1156     if args.progress:
1157         reporter = progress_writer(human_progress)
1158     elif args.batch_progress:
1159         reporter = progress_writer(machine_progress)
1160     else:
1161         reporter = None
1162
1163     #  Split storage-classes argument
1164     storage_classes = None
1165     if args.storage_classes:
1166         storage_classes = args.storage_classes.strip().split(',')
1167         if len(storage_classes) > 1:
1168             logger.error("Multiple storage classes are not supported currently.")
1169             sys.exit(1)
1170
1171
1172     # Setup exclude regex from all the --exclude arguments provided
1173     name_patterns = []
1174     exclude_paths = []
1175     exclude_names = None
1176     if len(args.exclude) > 0:
1177         # We're supporting 2 kinds of exclusion patterns:
1178         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1179         #                            the name, wherever the file is on the tree)
1180         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1181         #                            entire path, and should be relative to
1182         #                            any input dir argument)
1183         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1184         #                            placed directly underneath the input dir)
1185         for p in args.exclude:
1186             # Only relative paths patterns allowed
1187             if p.startswith(os.sep):
1188                 logger.error("Cannot use absolute paths with --exclude")
1189                 sys.exit(1)
1190             if os.path.dirname(p):
1191                 # We don't support of path patterns with '..'
1192                 p_parts = p.split(os.sep)
1193                 if '..' in p_parts:
1194                     logger.error(
1195                         "Cannot use path patterns that include or '..'")
1196                     sys.exit(1)
1197                 # Path search pattern
1198                 exclude_paths.append(p)
1199             else:
1200                 # Name-only search pattern
1201                 name_patterns.append(p)
1202         # For name only matching, we can combine all patterns into a single
1203         # regexp, for better performance.
1204         exclude_names = re.compile('|'.join(
1205             [fnmatch.translate(p) for p in name_patterns]
1206         )) if len(name_patterns) > 0 else None
1207         # Show the user the patterns to be used, just in case they weren't
1208         # specified inside quotes and got changed by the shell expansion.
1209         logger.info("Exclude patterns: {}".format(args.exclude))
1210
1211     # If this is used by a human, and there's at least one directory to be
1212     # uploaded, the expected bytes calculation can take a moment.
1213     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1214         logger.info("Calculating upload size, this could take some time...")
1215     try:
1216         writer = ArvPutUploadJob(paths = args.paths,
1217                                  resume = args.resume,
1218                                  use_cache = args.use_cache,
1219                                  filename = args.filename,
1220                                  reporter = reporter,
1221                                  api_client = api_client,
1222                                  num_retries = args.retries,
1223                                  replication_desired = args.replication,
1224                                  put_threads = args.threads,
1225                                  name = collection_name,
1226                                  owner_uuid = project_uuid,
1227                                  ensure_unique_name = True,
1228                                  update_collection = args.update_collection,
1229                                  storage_classes=storage_classes,
1230                                  logger=logger,
1231                                  dry_run=args.dry_run,
1232                                  follow_links=args.follow_links,
1233                                  exclude_paths=exclude_paths,
1234                                  exclude_names=exclude_names,
1235                                  trash_at=trash_at)
1236     except ResumeCacheConflict:
1237         logger.error("\n".join([
1238             "arv-put: Another process is already uploading this data.",
1239             "         Use --no-cache if this is really what you want."]))
1240         sys.exit(1)
1241     except ResumeCacheInvalidError:
1242         logger.error("\n".join([
1243             "arv-put: Resume cache contains invalid signature: it may have expired",
1244             "         or been created with another Arvados user's credentials.",
1245             "         Switch user or use one of the following options to restart upload:",
1246             "         --no-resume to start a new resume cache.",
1247             "         --no-cache to disable resume cache."]))
1248         sys.exit(1)
1249     except (CollectionUpdateError, PathDoesNotExistError) as error:
1250         logger.error("\n".join([
1251             "arv-put: %s" % str(error)]))
1252         sys.exit(1)
1253     except ArvPutUploadIsPending:
1254         # Dry run check successful, return proper exit code.
1255         sys.exit(2)
1256     except ArvPutUploadNotPending:
1257         # No files pending for upload
1258         sys.exit(0)
1259
1260     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1261         logger.warning("\n".join([
1262             "arv-put: Resuming previous upload from last checkpoint.",
1263             "         Use the --no-resume option to start over."]))
1264
1265     if not args.dry_run:
1266         writer.report_progress()
1267     output = None
1268     try:
1269         writer.start(save_collection=not(args.stream or args.raw))
1270     except arvados.errors.ApiError as error:
1271         logger.error("\n".join([
1272             "arv-put: %s" % str(error)]))
1273         sys.exit(1)
1274
1275     if args.progress:  # Print newline to split stderr from stdout for humans.
1276         logger.info("\n")
1277
1278     if args.stream:
1279         if args.normalize:
1280             output = writer.manifest_text(normalize=True)
1281         else:
1282             output = writer.manifest_text()
1283     elif args.raw:
1284         output = ','.join(writer.data_locators())
1285     else:
1286         try:
1287             if args.update_collection:
1288                 logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
1289             else:
1290                 logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
1291             if args.portable_data_hash:
1292                 output = writer.portable_data_hash()
1293             else:
1294                 output = writer.manifest_locator()
1295         except apiclient_errors.Error as error:
1296             logger.error(
1297                 "arv-put: Error creating Collection on project: {}.".format(
1298                     error))
1299             status = 1
1300
1301     # Print the locator (uuid) of the new collection.
1302     if output is None:
1303         status = status or 1
1304     elif not args.silent:
1305         stdout.write(output)
1306         if not output.endswith('\n'):
1307             stdout.write('\n')
1308
1309     if install_sig_handlers:
1310         arv_cmd.restore_signal_handlers()
1311
1312     if status != 0:
1313         sys.exit(status)
1314
1315     # Success!
1316     return output
1317
1318
1319 if __name__ == '__main__':
1320     main()