9fcd27c124b72ad0eb06a948658c5a7da9ce15c5
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34
35 import arvados.commands._util as arv_cmd
36
37 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55                     default=-1, help=argparse.SUPPRESS)
56
57 _group.add_argument('--normalize', action='store_true',
58                     help="""
59 Normalize the manifest by re-ordering files and streams after writing
60 data.
61 """)
62
63 _group.add_argument('--dry-run', action='store_true', default=False,
64                     help="""
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
67 """)
68
69 _group = upload_opts.add_mutually_exclusive_group()
70
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
72                     help="""
73 Synonym for --stream.
74 """)
75
76 _group.add_argument('--stream', action='store_true',
77                     help="""
78 Store the file content and display the resulting manifest on
79 stdout. Do not write the manifest to Keep or save a Collection object
80 in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
145                          help="""
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
148 to use 2 threads.
149 On high latency installations, using a greater number will improve
150 overall throughput.
151 """)
152
153 run_opts = argparse.ArgumentParser(add_help=False)
154
155 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
156 Store the collection in the specified project, instead of your Home
157 project.
158 """)
159
160 run_opts.add_argument('--name', help="""
161 Save the collection with the specified name.
162 """)
163
164 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
165                       action='append', help="""
166 Exclude files and directories whose names match the given glob pattern. When
167 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
168 directory, relative to the provided input dirs will be excluded.
169 When using a filename pattern like '*.txt', any text file will be excluded
170 no matter where is placed.
171 You can specify multiple patterns by using this argument more than once.
172 """)
173
174 _group = run_opts.add_mutually_exclusive_group()
175 _group.add_argument('--progress', action='store_true',
176                     help="""
177 Display human-readable progress on stderr (bytes and, if possible,
178 percentage of total data size). This is the default behavior when
179 stderr is a tty.
180 """)
181
182 _group.add_argument('--no-progress', action='store_true',
183                     help="""
184 Do not display human-readable progress on stderr, even if stderr is a
185 tty.
186 """)
187
188 _group.add_argument('--batch-progress', action='store_true',
189                     help="""
190 Display machine-readable progress on stderr (bytes and, if known,
191 total data size).
192 """)
193
194 _group = run_opts.add_mutually_exclusive_group()
195 _group.add_argument('--resume', action='store_true', default=True,
196                     help="""
197 Continue interrupted uploads from cached state (default).
198 """)
199 _group.add_argument('--no-resume', action='store_false', dest='resume',
200                     help="""
201 Do not continue interrupted uploads from cached state.
202 """)
203
204 _group = run_opts.add_mutually_exclusive_group()
205 _group.add_argument('--follow-links', action='store_true', default=True,
206                     dest='follow_links', help="""
207 Follow file and directory symlinks (default).
208 """)
209 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
210                     help="""
211 Do not follow file and directory symlinks.
212 """)
213
214 _group = run_opts.add_mutually_exclusive_group()
215 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
216                     help="""
217 Save upload state in a cache file for resuming (default).
218 """)
219 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
220                     help="""
221 Do not save upload state in a cache file for resuming.
222 """)
223
224 arg_parser = argparse.ArgumentParser(
225     description='Copy data from the local filesystem to Keep.',
226     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
227
228 def parse_arguments(arguments):
229     args = arg_parser.parse_args(arguments)
230
231     if len(args.paths) == 0:
232         args.paths = ['-']
233
234     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
235
236     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
237         if args.filename:
238             arg_parser.error("""
239     --filename argument cannot be used when storing a directory or
240     multiple files.
241     """)
242
243     # Turn on --progress by default if stderr is a tty.
244     if (not (args.batch_progress or args.no_progress)
245         and os.isatty(sys.stderr.fileno())):
246         args.progress = True
247
248     # Turn off --resume (default) if --no-cache is used.
249     if not args.use_cache:
250         args.resume = False
251
252     if args.paths == ['-']:
253         if args.update_collection:
254             arg_parser.error("""
255     --update-collection cannot be used when reading from stdin.
256     """)
257         args.resume = False
258         args.use_cache = False
259         if not args.filename:
260             args.filename = 'stdin'
261
262     # Remove possible duplicated patterns
263     if len(args.exclude) > 0:
264         args.exclude = list(set(args.exclude))
265
266     return args
267
268
269 class PathDoesNotExistError(Exception):
270     pass
271
272
273 class CollectionUpdateError(Exception):
274     pass
275
276
277 class ResumeCacheConflict(Exception):
278     pass
279
280
281 class ArvPutArgumentConflict(Exception):
282     pass
283
284
285 class ArvPutUploadIsPending(Exception):
286     pass
287
288
289 class ArvPutUploadNotPending(Exception):
290     pass
291
292
293 class FileUploadList(list):
294     def __init__(self, dry_run=False):
295         list.__init__(self)
296         self.dry_run = dry_run
297
298     def append(self, other):
299         if self.dry_run:
300             raise ArvPutUploadIsPending()
301         super(FileUploadList, self).append(other)
302
303
304 class ResumeCache(object):
305     CACHE_DIR = '.cache/arvados/arv-put'
306
307     def __init__(self, file_spec):
308         self.cache_file = open(file_spec, 'a+')
309         self._lock_file(self.cache_file)
310         self.filename = self.cache_file.name
311
312     @classmethod
313     def make_path(cls, args):
314         md5 = hashlib.md5()
315         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
316         realpaths = sorted(os.path.realpath(path) for path in args.paths)
317         md5.update(b'\0'.join([p.encode() for p in realpaths]))
318         if any(os.path.isdir(path) for path in realpaths):
319             md5.update(b'-1')
320         elif args.filename:
321             md5.update(args.filename.encode())
322         return os.path.join(
323             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
324             md5.hexdigest())
325
326     def _lock_file(self, fileobj):
327         try:
328             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
329         except IOError:
330             raise ResumeCacheConflict("{} locked".format(fileobj.name))
331
332     def load(self):
333         self.cache_file.seek(0)
334         return json.load(self.cache_file)
335
336     def check_cache(self, api_client=None, num_retries=0):
337         try:
338             state = self.load()
339             locator = None
340             try:
341                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
342                     locator = state["_finished_streams"][0][1][0]
343                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
344                     locator = state["_current_stream_locators"][0]
345                 if locator is not None:
346                     kc = arvados.keep.KeepClient(api_client=api_client)
347                     kc.head(locator, num_retries=num_retries)
348             except Exception as e:
349                 self.restart()
350         except (ValueError):
351             pass
352
353     def save(self, data):
354         try:
355             new_cache_fd, new_cache_name = tempfile.mkstemp(
356                 dir=os.path.dirname(self.filename))
357             self._lock_file(new_cache_fd)
358             new_cache = os.fdopen(new_cache_fd, 'r+')
359             json.dump(data, new_cache)
360             os.rename(new_cache_name, self.filename)
361         except (IOError, OSError, ResumeCacheConflict) as error:
362             try:
363                 os.unlink(new_cache_name)
364             except NameError:  # mkstemp failed.
365                 pass
366         else:
367             self.cache_file.close()
368             self.cache_file = new_cache
369
370     def close(self):
371         self.cache_file.close()
372
373     def destroy(self):
374         try:
375             os.unlink(self.filename)
376         except OSError as error:
377             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
378                 raise
379         self.close()
380
381     def restart(self):
382         self.destroy()
383         self.__init__(self.filename)
384
385
386 class ArvPutUploadJob(object):
387     CACHE_DIR = '.cache/arvados/arv-put'
388     EMPTY_STATE = {
389         'manifest' : None, # Last saved manifest checkpoint
390         'files' : {} # Previous run file list: {path : {size, mtime}}
391     }
392
393     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
394                  name=None, owner_uuid=None,
395                  ensure_unique_name=False, num_retries=None,
396                  put_threads=None, replication_desired=None,
397                  filename=None, update_time=60.0, update_collection=None,
398                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
399                  follow_links=True, exclude_paths=[], exclude_names=None):
400         self.paths = paths
401         self.resume = resume
402         self.use_cache = use_cache
403         self.update = False
404         self.reporter = reporter
405         # This will set to 0 before start counting, if no special files are going
406         # to be read.
407         self.bytes_expected = None
408         self.bytes_written = 0
409         self.bytes_skipped = 0
410         self.name = name
411         self.owner_uuid = owner_uuid
412         self.ensure_unique_name = ensure_unique_name
413         self.num_retries = num_retries
414         self.replication_desired = replication_desired
415         self.put_threads = put_threads
416         self.filename = filename
417         self._state_lock = threading.Lock()
418         self._state = None # Previous run state (file list & manifest)
419         self._current_files = [] # Current run file list
420         self._cache_file = None
421         self._collection_lock = threading.Lock()
422         self._remote_collection = None # Collection being updated (if asked)
423         self._local_collection = None # Collection from previous run manifest
424         self._file_paths = set() # Files to be updated in remote collection
425         self._stop_checkpointer = threading.Event()
426         self._checkpointer = threading.Thread(target=self._update_task)
427         self._checkpointer.daemon = True
428         self._update_task_time = update_time  # How many seconds wait between update runs
429         self._files_to_upload = FileUploadList(dry_run=dry_run)
430         self._upload_started = False
431         self.logger = logger
432         self.dry_run = dry_run
433         self._checkpoint_before_quit = True
434         self.follow_links = follow_links
435         self.exclude_paths = exclude_paths
436         self.exclude_names = exclude_names
437
438         if not self.use_cache and self.resume:
439             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
440
441         # Check for obvious dry-run responses
442         if self.dry_run and (not self.use_cache or not self.resume):
443             raise ArvPutUploadIsPending()
444
445         # Load cached data if any and if needed
446         self._setup_state(update_collection)
447
448         # Build the upload file list, excluding requested files and counting the
449         # bytes expected to be uploaded.
450         self._build_upload_list()
451
452     def _build_upload_list(self):
453         """
454         Scan the requested paths to count file sizes, excluding files & dirs if requested
455         and building the upload file list.
456         """
457         # If there aren't special files to be read, reset total bytes count to zero
458         # to start counting.
459         if not any([p for p in self.paths
460                     if not (os.path.isfile(p) or os.path.isdir(p))]):
461             self.bytes_expected = 0
462
463         for path in self.paths:
464             # Test for stdin first, in case some file named '-' exist
465             if path == '-':
466                 if self.dry_run:
467                     raise ArvPutUploadIsPending()
468                 self._write_stdin(self.filename or 'stdin')
469             elif not os.path.exists(path):
470                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
471             elif os.path.isdir(path):
472                 # Use absolute paths on cache index so CWD doesn't interfere
473                 # with the caching logic.
474                 orig_path = path
475                 path = os.path.abspath(path)
476                 if orig_path[-1:] == os.sep:
477                     # When passing a directory reference with a trailing slash,
478                     # its contents should be uploaded directly to the
479                     # collection's root.
480                     prefixdir = path
481                 else:
482                     # When passing a directory reference with no trailing slash,
483                     # upload the directory to the collection's root.
484                     prefixdir = os.path.dirname(path)
485                 prefixdir += os.sep
486                 for root, dirs, files in os.walk(path,
487                                                  followlinks=self.follow_links):
488                     root_relpath = os.path.relpath(root, path)
489                     if root_relpath == '.':
490                         root_relpath = ''
491                     # Exclude files/dirs by full path matching pattern
492                     if self.exclude_paths:
493                         dirs[:] = [d for d in dirs
494                                    if not any(pathname_match(
495                                            os.path.join(root_relpath, d), pat)
496                                               for pat in self.exclude_paths)]
497                         files = [f for f in files
498                                  if not any(pathname_match(
499                                          os.path.join(root_relpath, f), pat)
500                                             for pat in self.exclude_paths)]
501                     # Exclude files/dirs by name matching pattern
502                     if self.exclude_names is not None:
503                         dirs[:] = [d for d in dirs
504                                    if not self.exclude_names.match(d)]
505                         files = [f for f in files
506                                  if not self.exclude_names.match(f)]
507                     # Make os.walk()'s dir traversing order deterministic
508                     dirs.sort()
509                     files.sort()
510                     for f in files:
511                         filepath = os.path.join(root, f)
512                         # Add its size to the total bytes count (if applicable)
513                         if self.follow_links or (not os.path.islink(filepath)):
514                             if self.bytes_expected is not None:
515                                 self.bytes_expected += os.path.getsize(filepath)
516                         self._check_file(filepath,
517                                          os.path.join(root[len(prefixdir):], f))
518             else:
519                 filepath = os.path.abspath(path)
520                 # Add its size to the total bytes count (if applicable)
521                 if self.follow_links or (not os.path.islink(filepath)):
522                     if self.bytes_expected is not None:
523                         self.bytes_expected += os.path.getsize(filepath)
524                 self._check_file(filepath,
525                                  self.filename or os.path.basename(path))
526         # If dry-mode is on, and got up to this point, then we should notify that
527         # there aren't any file to upload.
528         if self.dry_run:
529             raise ArvPutUploadNotPending()
530         # Remove local_collection's files that don't exist locally anymore, so the
531         # bytes_written count is correct.
532         for f in self.collection_file_paths(self._local_collection,
533                                             path_prefix=""):
534             if f != 'stdin' and f != self.filename and not f in self._file_paths:
535                 self._local_collection.remove(f)
536
537     def start(self, save_collection):
538         """
539         Start supporting thread & file uploading
540         """
541         self._checkpointer.start()
542         try:
543             # Update bytes_written from current local collection and
544             # report initial progress.
545             self._update()
546             # Actual file upload
547             self._upload_started = True # Used by the update thread to start checkpointing
548             self._upload_files()
549         except (SystemExit, Exception) as e:
550             self._checkpoint_before_quit = False
551             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
552             # Note: We're expecting SystemExit instead of
553             # KeyboardInterrupt because we have a custom signal
554             # handler in place that raises SystemExit with the catched
555             # signal's code.
556             if isinstance(e, PathDoesNotExistError):
557                 # We aren't interested in the traceback for this case
558                 pass
559             elif not isinstance(e, SystemExit) or e.code != -2:
560                 self.logger.warning("Abnormal termination:\n{}".format(
561                     traceback.format_exc()))
562             raise
563         finally:
564             if not self.dry_run:
565                 # Stop the thread before doing anything else
566                 self._stop_checkpointer.set()
567                 self._checkpointer.join()
568                 if self._checkpoint_before_quit:
569                     # Commit all pending blocks & one last _update()
570                     self._local_collection.manifest_text()
571                     self._update(final=True)
572                     if save_collection:
573                         self.save_collection()
574             if self.use_cache:
575                 self._cache_file.close()
576
577     def save_collection(self):
578         if self.update:
579             # Check if files should be updated on the remote collection.
580             for fp in self._file_paths:
581                 remote_file = self._remote_collection.find(fp)
582                 if not remote_file:
583                     # File don't exist on remote collection, copy it.
584                     self._remote_collection.copy(fp, fp, self._local_collection)
585                 elif remote_file != self._local_collection.find(fp):
586                     # A different file exist on remote collection, overwrite it.
587                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
588                 else:
589                     # The file already exist on remote collection, skip it.
590                     pass
591             self._remote_collection.save(num_retries=self.num_retries)
592         else:
593             self._local_collection.save_new(
594                 name=self.name, owner_uuid=self.owner_uuid,
595                 ensure_unique_name=self.ensure_unique_name,
596                 num_retries=self.num_retries)
597
598     def destroy_cache(self):
599         if self.use_cache:
600             try:
601                 os.unlink(self._cache_filename)
602             except OSError as error:
603                 # That's what we wanted anyway.
604                 if error.errno != errno.ENOENT:
605                     raise
606             self._cache_file.close()
607
608     def _collection_size(self, collection):
609         """
610         Recursively get the total size of the collection
611         """
612         size = 0
613         for item in listvalues(collection):
614             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
615                 size += self._collection_size(item)
616             else:
617                 size += item.size()
618         return size
619
620     def _update_task(self):
621         """
622         Periodically called support task. File uploading is
623         asynchronous so we poll status from the collection.
624         """
625         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
626             self._update()
627
628     def _update(self, final=False):
629         """
630         Update cached manifest text and report progress.
631         """
632         if self._upload_started:
633             with self._collection_lock:
634                 self.bytes_written = self._collection_size(self._local_collection)
635                 if self.use_cache:
636                     if final:
637                         manifest = self._local_collection.manifest_text()
638                     else:
639                         # Get the manifest text without comitting pending blocks
640                         manifest = self._local_collection.manifest_text(strip=False,
641                                                                         normalize=False,
642                                                                         only_committed=True)
643                     # Update cache
644                     with self._state_lock:
645                         self._state['manifest'] = manifest
646             if self.use_cache:
647                 try:
648                     self._save_state()
649                 except Exception as e:
650                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
651         else:
652             self.bytes_written = self.bytes_skipped
653         # Call the reporter, if any
654         self.report_progress()
655
656     def report_progress(self):
657         if self.reporter is not None:
658             self.reporter(self.bytes_written, self.bytes_expected)
659
660     def _write_stdin(self, filename):
661         output = self._local_collection.open(filename, 'wb')
662         self._write(sys.stdin, output)
663         output.close()
664
665     def _check_file(self, source, filename):
666         """
667         Check if this file needs to be uploaded
668         """
669         # Ignore symlinks when requested
670         if (not self.follow_links) and os.path.islink(source):
671             return
672         resume_offset = 0
673         should_upload = False
674         new_file_in_cache = False
675         # Record file path for updating the remote collection before exiting
676         self._file_paths.add(filename)
677
678         with self._state_lock:
679             # If no previous cached data on this file, store it for an eventual
680             # repeated run.
681             if source not in self._state['files']:
682                 self._state['files'][source] = {
683                     'mtime': os.path.getmtime(source),
684                     'size' : os.path.getsize(source)
685                 }
686                 new_file_in_cache = True
687             cached_file_data = self._state['files'][source]
688
689         # Check if file was already uploaded (at least partially)
690         file_in_local_collection = self._local_collection.find(filename)
691
692         # If not resuming, upload the full file.
693         if not self.resume:
694             should_upload = True
695         # New file detected from last run, upload it.
696         elif new_file_in_cache:
697             should_upload = True
698         # Local file didn't change from last run.
699         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
700             if not file_in_local_collection:
701                 # File not uploaded yet, upload it completely
702                 should_upload = True
703             elif file_in_local_collection.permission_expired():
704                 # Permission token expired, re-upload file. This will change whenever
705                 # we have a API for refreshing tokens.
706                 should_upload = True
707                 self._local_collection.remove(filename)
708             elif cached_file_data['size'] == file_in_local_collection.size():
709                 # File already there, skip it.
710                 self.bytes_skipped += cached_file_data['size']
711             elif cached_file_data['size'] > file_in_local_collection.size():
712                 # File partially uploaded, resume!
713                 resume_offset = file_in_local_collection.size()
714                 self.bytes_skipped += resume_offset
715                 should_upload = True
716             else:
717                 # Inconsistent cache, re-upload the file
718                 should_upload = True
719                 self._local_collection.remove(filename)
720                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
721         # Local file differs from cached data, re-upload it.
722         else:
723             if file_in_local_collection:
724                 self._local_collection.remove(filename)
725             should_upload = True
726
727         if should_upload:
728             try:
729                 self._files_to_upload.append((source, resume_offset, filename))
730             except ArvPutUploadIsPending:
731                 # This could happen when running on dry-mode, close cache file to
732                 # avoid locking issues.
733                 self._cache_file.close()
734                 raise
735
736     def _upload_files(self):
737         for source, resume_offset, filename in self._files_to_upload:
738             with open(source, 'rb') as source_fd:
739                 with self._state_lock:
740                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
741                     self._state['files'][source]['size'] = os.path.getsize(source)
742                 if resume_offset > 0:
743                     # Start upload where we left off
744                     output = self._local_collection.open(filename, 'ab')
745                     source_fd.seek(resume_offset)
746                 else:
747                     # Start from scratch
748                     output = self._local_collection.open(filename, 'wb')
749                 self._write(source_fd, output)
750                 output.close(flush=False)
751
752     def _write(self, source_fd, output):
753         while True:
754             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
755             if not data:
756                 break
757             output.write(data)
758
759     def _my_collection(self):
760         return self._remote_collection if self.update else self._local_collection
761
762     def _setup_state(self, update_collection):
763         """
764         Create a new cache file or load a previously existing one.
765         """
766         # Load an already existing collection for update
767         if update_collection and re.match(arvados.util.collection_uuid_pattern,
768                                           update_collection):
769             try:
770                 self._remote_collection = arvados.collection.Collection(update_collection)
771             except arvados.errors.ApiError as error:
772                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
773             else:
774                 self.update = True
775         elif update_collection:
776             # Collection locator provided, but unknown format
777             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
778
779         if self.use_cache:
780             # Set up cache file name from input paths.
781             md5 = hashlib.md5()
782             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
783             realpaths = sorted(os.path.realpath(path) for path in self.paths)
784             md5.update(b'\0'.join([p.encode() for p in realpaths]))
785             if self.filename:
786                 md5.update(self.filename.encode())
787             cache_filename = md5.hexdigest()
788             cache_filepath = os.path.join(
789                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
790                 cache_filename)
791             if self.resume and os.path.exists(cache_filepath):
792                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
793                 self._cache_file = open(cache_filepath, 'a+')
794             else:
795                 # --no-resume means start with a empty cache file.
796                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
797                 self._cache_file = open(cache_filepath, 'w+')
798             self._cache_filename = self._cache_file.name
799             self._lock_file(self._cache_file)
800             self._cache_file.seek(0)
801
802         with self._state_lock:
803             if self.use_cache:
804                 try:
805                     self._state = json.load(self._cache_file)
806                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
807                         # Cache at least partially incomplete, set up new cache
808                         self._state = copy.deepcopy(self.EMPTY_STATE)
809                 except ValueError:
810                     # Cache file empty, set up new cache
811                     self._state = copy.deepcopy(self.EMPTY_STATE)
812             else:
813                 self.logger.info("No cache usage requested for this run.")
814                 # No cache file, set empty state
815                 self._state = copy.deepcopy(self.EMPTY_STATE)
816             # Load the previous manifest so we can check if files were modified remotely.
817             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
818
819     def collection_file_paths(self, col, path_prefix='.'):
820         """Return a list of file paths by recursively go through the entire collection `col`"""
821         file_paths = []
822         for name, item in listitems(col):
823             if isinstance(item, arvados.arvfile.ArvadosFile):
824                 file_paths.append(os.path.join(path_prefix, name))
825             elif isinstance(item, arvados.collection.Subcollection):
826                 new_prefix = os.path.join(path_prefix, name)
827                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
828         return file_paths
829
830     def _lock_file(self, fileobj):
831         try:
832             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
833         except IOError:
834             raise ResumeCacheConflict("{} locked".format(fileobj.name))
835
836     def _save_state(self):
837         """
838         Atomically save current state into cache.
839         """
840         with self._state_lock:
841             # We're not using copy.deepcopy() here because it's a lot slower
842             # than json.dumps(), and we're already needing JSON format to be
843             # saved on disk.
844             state = json.dumps(self._state)
845         try:
846             new_cache = tempfile.NamedTemporaryFile(
847                 mode='w+',
848                 dir=os.path.dirname(self._cache_filename), delete=False)
849             self._lock_file(new_cache)
850             new_cache.write(state)
851             new_cache.flush()
852             os.fsync(new_cache)
853             os.rename(new_cache.name, self._cache_filename)
854         except (IOError, OSError, ResumeCacheConflict) as error:
855             self.logger.error("There was a problem while saving the cache file: {}".format(error))
856             try:
857                 os.unlink(new_cache_name)
858             except NameError:  # mkstemp failed.
859                 pass
860         else:
861             self._cache_file.close()
862             self._cache_file = new_cache
863
864     def collection_name(self):
865         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
866
867     def manifest_locator(self):
868         return self._my_collection().manifest_locator()
869
870     def portable_data_hash(self):
871         pdh = self._my_collection().portable_data_hash()
872         m = self._my_collection().stripped_manifest().encode()
873         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
874         if pdh != local_pdh:
875             logger.warning("\n".join([
876                 "arv-put: API server provided PDH differs from local manifest.",
877                 "         This should not happen; showing API server version."]))
878         return pdh
879
880     def manifest_text(self, stream_name=".", strip=False, normalize=False):
881         return self._my_collection().manifest_text(stream_name, strip, normalize)
882
883     def _datablocks_on_item(self, item):
884         """
885         Return a list of datablock locators, recursively navigating
886         through subcollections
887         """
888         if isinstance(item, arvados.arvfile.ArvadosFile):
889             if item.size() == 0:
890                 # Empty file locator
891                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
892             else:
893                 locators = []
894                 for segment in item.segments():
895                     loc = segment.locator
896                     locators.append(loc)
897                 return locators
898         elif isinstance(item, arvados.collection.Collection):
899             l = [self._datablocks_on_item(x) for x in listvalues(item)]
900             # Fast list flattener method taken from:
901             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
902             return [loc for sublist in l for loc in sublist]
903         else:
904             return None
905
906     def data_locators(self):
907         with self._collection_lock:
908             # Make sure all datablocks are flushed before getting the locators
909             self._my_collection().manifest_text()
910             datablocks = self._datablocks_on_item(self._my_collection())
911         return datablocks
912
913 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
914                                                             os.getpid())
915
916 # Simulate glob.glob() matching behavior without the need to scan the filesystem
917 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
918 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
919 # so instead we're using it on every path component.
920 def pathname_match(pathname, pattern):
921     name = pathname.split(os.sep)
922     # Fix patterns like 'some/subdir/' or 'some//subdir'
923     pat = [x for x in pattern.split(os.sep) if x != '']
924     if len(name) != len(pat):
925         return False
926     for i in range(len(name)):
927         if not fnmatch.fnmatch(name[i], pat[i]):
928             return False
929     return True
930
931 def machine_progress(bytes_written, bytes_expected):
932     return _machine_format.format(
933         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
934
935 def human_progress(bytes_written, bytes_expected):
936     if bytes_expected:
937         return "\r{}M / {}M {:.1%} ".format(
938             bytes_written >> 20, bytes_expected >> 20,
939             float(bytes_written) / bytes_expected)
940     else:
941         return "\r{} ".format(bytes_written)
942
943 def progress_writer(progress_func, outfile=sys.stderr):
944     def write_progress(bytes_written, bytes_expected):
945         outfile.write(progress_func(bytes_written, bytes_expected))
946     return write_progress
947
948 def exit_signal_handler(sigcode, frame):
949     sys.exit(-sigcode)
950
951 def desired_project_uuid(api_client, project_uuid, num_retries):
952     if not project_uuid:
953         query = api_client.users().current()
954     elif arvados.util.user_uuid_pattern.match(project_uuid):
955         query = api_client.users().get(uuid=project_uuid)
956     elif arvados.util.group_uuid_pattern.match(project_uuid):
957         query = api_client.groups().get(uuid=project_uuid)
958     else:
959         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
960     return query.execute(num_retries=num_retries)['uuid']
961
962 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
963     global api_client
964
965     logger = logging.getLogger('arvados.arv_put')
966     logger.setLevel(logging.INFO)
967     args = parse_arguments(arguments)
968     status = 0
969     if api_client is None:
970         api_client = arvados.api('v1')
971
972     # Determine the name to use
973     if args.name:
974         if args.stream or args.raw:
975             logger.error("Cannot use --name with --stream or --raw")
976             sys.exit(1)
977         elif args.update_collection:
978             logger.error("Cannot use --name with --update-collection")
979             sys.exit(1)
980         collection_name = args.name
981     else:
982         collection_name = "Saved at {} by {}@{}".format(
983             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
984             pwd.getpwuid(os.getuid()).pw_name,
985             socket.gethostname())
986
987     if args.project_uuid and (args.stream or args.raw):
988         logger.error("Cannot use --project-uuid with --stream or --raw")
989         sys.exit(1)
990
991     # Determine the parent project
992     try:
993         project_uuid = desired_project_uuid(api_client, args.project_uuid,
994                                             args.retries)
995     except (apiclient_errors.Error, ValueError) as error:
996         logger.error(error)
997         sys.exit(1)
998
999     if args.progress:
1000         reporter = progress_writer(human_progress)
1001     elif args.batch_progress:
1002         reporter = progress_writer(machine_progress)
1003     else:
1004         reporter = None
1005
1006     # Setup exclude regex from all the --exclude arguments provided
1007     name_patterns = []
1008     exclude_paths = []
1009     exclude_names = None
1010     if len(args.exclude) > 0:
1011         # We're supporting 2 kinds of exclusion patterns:
1012         # 1) --exclude '*.jpg'      (file/dir name patterns, will only match
1013         #                            the name)
1014         # 2) --exclude 'foo/bar'    (file/dir path patterns, will match the
1015         #                            entire path, and should be relative to
1016         #                            any input dir argument)
1017         for p in args.exclude:
1018             # Only relative paths patterns allowed
1019             if p.startswith(os.sep):
1020                 logger.error("Cannot use absolute paths with --exclude")
1021                 sys.exit(1)
1022             if os.path.dirname(p):
1023                 # We don't support of path patterns with '.' or '..'
1024                 p_parts = p.split(os.sep)
1025                 if '.' in p_parts or '..' in p_parts:
1026                     logger.error(
1027                         "Cannot use path patterns that include '.' or '..'")
1028                     sys.exit(1)
1029                 # Path search pattern
1030                 exclude_paths.append(p)
1031             else:
1032                 # Name-only search pattern
1033                 name_patterns.append(p)
1034         # For name only matching, we can combine all patterns into a single regexp,
1035         # for better performance.
1036         exclude_names = re.compile('|'.join(
1037             [fnmatch.translate(p) for p in name_patterns]
1038         )) if len(name_patterns) > 0 else None
1039         # Show the user the patterns to be used, just in case they weren't specified inside
1040         # quotes and got changed by the shell expansion.
1041         logger.info("Exclude patterns: {}".format(args.exclude))
1042
1043     # If this is used by a human, and there's at least one directory to be
1044     # uploaded, the expected bytes calculation can take a moment.
1045     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1046         logger.info("Calculating upload size, this could take some time...")
1047     try:
1048         writer = ArvPutUploadJob(paths = args.paths,
1049                                  resume = args.resume,
1050                                  use_cache = args.use_cache,
1051                                  filename = args.filename,
1052                                  reporter = reporter,
1053                                  num_retries = args.retries,
1054                                  replication_desired = args.replication,
1055                                  put_threads = args.threads,
1056                                  name = collection_name,
1057                                  owner_uuid = project_uuid,
1058                                  ensure_unique_name = True,
1059                                  update_collection = args.update_collection,
1060                                  logger=logger,
1061                                  dry_run=args.dry_run,
1062                                  follow_links=args.follow_links,
1063                                  exclude_paths=exclude_paths,
1064                                  exclude_names=exclude_names)
1065     except ResumeCacheConflict:
1066         logger.error("\n".join([
1067             "arv-put: Another process is already uploading this data.",
1068             "         Use --no-cache if this is really what you want."]))
1069         sys.exit(1)
1070     except CollectionUpdateError as error:
1071         logger.error("\n".join([
1072             "arv-put: %s" % str(error)]))
1073         sys.exit(1)
1074     except ArvPutUploadIsPending:
1075         # Dry run check successful, return proper exit code.
1076         sys.exit(2)
1077     except ArvPutUploadNotPending:
1078         # No files pending for upload
1079         sys.exit(0)
1080     except PathDoesNotExistError as error:
1081         logger.error("\n".join([
1082             "arv-put: %s" % str(error)]))
1083         sys.exit(1)
1084
1085     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1086     # the originals.
1087     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1088                             for sigcode in CAUGHT_SIGNALS}
1089
1090     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1091         logger.warning("\n".join([
1092             "arv-put: Resuming previous upload from last checkpoint.",
1093             "         Use the --no-resume option to start over."]))
1094
1095     if not args.dry_run:
1096         writer.report_progress()
1097     output = None
1098     try:
1099         writer.start(save_collection=not(args.stream or args.raw))
1100     except arvados.errors.ApiError as error:
1101         logger.error("\n".join([
1102             "arv-put: %s" % str(error)]))
1103         sys.exit(1)
1104
1105     if args.progress:  # Print newline to split stderr from stdout for humans.
1106         logger.info("\n")
1107
1108     if args.stream:
1109         if args.normalize:
1110             output = writer.manifest_text(normalize=True)
1111         else:
1112             output = writer.manifest_text()
1113     elif args.raw:
1114         output = ','.join(writer.data_locators())
1115     else:
1116         try:
1117             if args.update_collection:
1118                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1119             else:
1120                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1121             if args.portable_data_hash:
1122                 output = writer.portable_data_hash()
1123             else:
1124                 output = writer.manifest_locator()
1125         except apiclient_errors.Error as error:
1126             logger.error(
1127                 "arv-put: Error creating Collection on project: {}.".format(
1128                     error))
1129             status = 1
1130
1131     # Print the locator (uuid) of the new collection.
1132     if output is None:
1133         status = status or 1
1134     else:
1135         stdout.write(output)
1136         if not output.endswith('\n'):
1137             stdout.write('\n')
1138
1139     for sigcode, orig_handler in listitems(orig_signal_handlers):
1140         signal.signal(sigcode, orig_handler)
1141
1142     if status != 0:
1143         sys.exit(status)
1144
1145     # Success!
1146     return output
1147
1148
1149 if __name__ == '__main__':
1150     main()