11789: Path exclude patterns validation and fixes.
[arvados.git] / sdk / python / arvados / commands / put.py
1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import copy
10 import datetime
11 import errno
12 import fcntl
13 import fnmatch
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 import traceback
27
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
30
31 import arvados.commands._util as arv_cmd
32
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
34 api_client = None
35
36 upload_opts = argparse.ArgumentParser(add_help=False)
37
38 upload_opts.add_argument('--version', action='version',
39                          version="%s %s" % (sys.argv[0], __version__),
40                          help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
42                          help="""
43 Local file or directory. If path is a directory reference with a trailing
44 slash, then just upload the directory's contents; otherwise upload the
45 directory itself. Default: read from standard input.
46 """)
47
48 _group = upload_opts.add_mutually_exclusive_group()
49
50 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
51                     default=-1, help=argparse.SUPPRESS)
52
53 _group.add_argument('--normalize', action='store_true',
54                     help="""
55 Normalize the manifest by re-ordering files and streams after writing
56 data.
57 """)
58
59 _group.add_argument('--dry-run', action='store_true', default=False,
60                     help="""
61 Don't actually upload files, but only check if any file should be
62 uploaded. Exit with code=2 when files are pending for upload.
63 """)
64
65 _group = upload_opts.add_mutually_exclusive_group()
66
67 _group.add_argument('--as-stream', action='store_true', dest='stream',
68                     help="""
69 Synonym for --stream.
70 """)
71
72 _group.add_argument('--stream', action='store_true',
73                     help="""
74 Store the file content and display the resulting manifest on
75 stdout. Do not write the manifest to Keep or save a Collection object
76 in Arvados.
77 """)
78
79 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
80                     help="""
81 Synonym for --manifest.
82 """)
83
84 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
85                     help="""
86 Synonym for --manifest.
87 """)
88
89 _group.add_argument('--manifest', action='store_true',
90                     help="""
91 Store the file data and resulting manifest in Keep, save a Collection
92 object in Arvados, and display the manifest locator (Collection uuid)
93 on stdout. This is the default behavior.
94 """)
95
96 _group.add_argument('--as-raw', action='store_true', dest='raw',
97                     help="""
98 Synonym for --raw.
99 """)
100
101 _group.add_argument('--raw', action='store_true',
102                     help="""
103 Store the file content and display the data block locators on stdout,
104 separated by commas, with a trailing newline. Do not store a
105 manifest.
106 """)
107
108 upload_opts.add_argument('--update-collection', type=str, default=None,
109                          dest='update_collection', metavar="UUID", help="""
110 Update an existing collection identified by the given Arvados collection
111 UUID. All new local files will be uploaded.
112 """)
113
114 upload_opts.add_argument('--use-filename', type=str, default=None,
115                          dest='filename', help="""
116 Synonym for --filename.
117 """)
118
119 upload_opts.add_argument('--filename', type=str, default=None,
120                          help="""
121 Use the given filename in the manifest, instead of the name of the
122 local file. This is useful when "-" or "/dev/stdin" is given as an
123 input file. It can be used only if there is exactly one path given and
124 it is not a directory. Implies --manifest.
125 """)
126
127 upload_opts.add_argument('--portable-data-hash', action='store_true',
128                          help="""
129 Print the portable data hash instead of the Arvados UUID for the collection
130 created by the upload.
131 """)
132
133 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
134                          help="""
135 Set the replication level for the new collection: how many different
136 physical storage devices (e.g., disks) should have a copy of each data
137 block. Default is to use the server-provided default (if any) or 2.
138 """)
139
140 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
141                          help="""
142 Set the number of upload threads to be used. Take into account that
143 using lots of threads will increase the RAM requirements. Default is
144 to use 2 threads.
145 On high latency installations, using a greater number will improve
146 overall throughput.
147 """)
148
149 run_opts = argparse.ArgumentParser(add_help=False)
150
151 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
152 Store the collection in the specified project, instead of your Home
153 project.
154 """)
155
156 run_opts.add_argument('--name', help="""
157 Save the collection with the specified name.
158 """)
159
160 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
161                       action='append', help="""
162 Exclude files and directories whose names match the given pattern. You
163 can specify multiple patterns by using this argument more than once.
164 """)
165
166 _group = run_opts.add_mutually_exclusive_group()
167 _group.add_argument('--progress', action='store_true',
168                     help="""
169 Display human-readable progress on stderr (bytes and, if possible,
170 percentage of total data size). This is the default behavior when
171 stderr is a tty.
172 """)
173
174 _group.add_argument('--no-progress', action='store_true',
175                     help="""
176 Do not display human-readable progress on stderr, even if stderr is a
177 tty.
178 """)
179
180 _group.add_argument('--batch-progress', action='store_true',
181                     help="""
182 Display machine-readable progress on stderr (bytes and, if known,
183 total data size).
184 """)
185
186 _group = run_opts.add_mutually_exclusive_group()
187 _group.add_argument('--resume', action='store_true', default=True,
188                     help="""
189 Continue interrupted uploads from cached state (default).
190 """)
191 _group.add_argument('--no-resume', action='store_false', dest='resume',
192                     help="""
193 Do not continue interrupted uploads from cached state.
194 """)
195
196 _group = run_opts.add_mutually_exclusive_group()
197 _group.add_argument('--follow-links', action='store_true', default=True,
198                     dest='follow_links', help="""
199 Follow file and directory symlinks (default).
200 """)
201 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
202                     help="""
203 Do not follow file and directory symlinks.
204 """)
205
206 _group = run_opts.add_mutually_exclusive_group()
207 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
208                     help="""
209 Save upload state in a cache file for resuming (default).
210 """)
211 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
212                     help="""
213 Do not save upload state in a cache file for resuming.
214 """)
215
216 arg_parser = argparse.ArgumentParser(
217     description='Copy data from the local filesystem to Keep.',
218     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
219
220 def parse_arguments(arguments):
221     args = arg_parser.parse_args(arguments)
222
223     if len(args.paths) == 0:
224         args.paths = ['-']
225
226     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
227
228     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
229         if args.filename:
230             arg_parser.error("""
231     --filename argument cannot be used when storing a directory or
232     multiple files.
233     """)
234
235     # Turn on --progress by default if stderr is a tty.
236     if (not (args.batch_progress or args.no_progress)
237         and os.isatty(sys.stderr.fileno())):
238         args.progress = True
239
240     # Turn off --resume (default) if --no-cache is used.
241     if not args.use_cache:
242         args.resume = False
243
244     if args.paths == ['-']:
245         if args.update_collection:
246             arg_parser.error("""
247     --update-collection cannot be used when reading from stdin.
248     """)
249         args.resume = False
250         args.use_cache = False
251         if not args.filename:
252             args.filename = 'stdin'
253
254     # Remove possible duplicated patterns
255     if len(args.exclude) > 0:
256         args.exclude = list(set(args.exclude))
257
258     return args
259
260
261 class PathDoesNotExistError(Exception):
262     pass
263
264
265 class CollectionUpdateError(Exception):
266     pass
267
268
269 class ResumeCacheConflict(Exception):
270     pass
271
272
273 class ArvPutArgumentConflict(Exception):
274     pass
275
276
277 class ArvPutUploadIsPending(Exception):
278     pass
279
280
281 class ArvPutUploadNotPending(Exception):
282     pass
283
284
285 class FileUploadList(list):
286     def __init__(self, dry_run=False):
287         list.__init__(self)
288         self.dry_run = dry_run
289
290     def append(self, other):
291         if self.dry_run:
292             raise ArvPutUploadIsPending()
293         super(FileUploadList, self).append(other)
294
295
296 class ResumeCache(object):
297     CACHE_DIR = '.cache/arvados/arv-put'
298
299     def __init__(self, file_spec):
300         self.cache_file = open(file_spec, 'a+')
301         self._lock_file(self.cache_file)
302         self.filename = self.cache_file.name
303
304     @classmethod
305     def make_path(cls, args):
306         md5 = hashlib.md5()
307         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
308         realpaths = sorted(os.path.realpath(path) for path in args.paths)
309         md5.update(b'\0'.join([p.encode() for p in realpaths]))
310         if any(os.path.isdir(path) for path in realpaths):
311             md5.update(b'-1')
312         elif args.filename:
313             md5.update(args.filename.encode())
314         return os.path.join(
315             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
316             md5.hexdigest())
317
318     def _lock_file(self, fileobj):
319         try:
320             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
321         except IOError:
322             raise ResumeCacheConflict("{} locked".format(fileobj.name))
323
324     def load(self):
325         self.cache_file.seek(0)
326         return json.load(self.cache_file)
327
328     def check_cache(self, api_client=None, num_retries=0):
329         try:
330             state = self.load()
331             locator = None
332             try:
333                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
334                     locator = state["_finished_streams"][0][1][0]
335                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
336                     locator = state["_current_stream_locators"][0]
337                 if locator is not None:
338                     kc = arvados.keep.KeepClient(api_client=api_client)
339                     kc.head(locator, num_retries=num_retries)
340             except Exception as e:
341                 self.restart()
342         except (ValueError):
343             pass
344
345     def save(self, data):
346         try:
347             new_cache_fd, new_cache_name = tempfile.mkstemp(
348                 dir=os.path.dirname(self.filename))
349             self._lock_file(new_cache_fd)
350             new_cache = os.fdopen(new_cache_fd, 'r+')
351             json.dump(data, new_cache)
352             os.rename(new_cache_name, self.filename)
353         except (IOError, OSError, ResumeCacheConflict) as error:
354             try:
355                 os.unlink(new_cache_name)
356             except NameError:  # mkstemp failed.
357                 pass
358         else:
359             self.cache_file.close()
360             self.cache_file = new_cache
361
362     def close(self):
363         self.cache_file.close()
364
365     def destroy(self):
366         try:
367             os.unlink(self.filename)
368         except OSError as error:
369             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
370                 raise
371         self.close()
372
373     def restart(self):
374         self.destroy()
375         self.__init__(self.filename)
376
377
378 class ArvPutUploadJob(object):
379     CACHE_DIR = '.cache/arvados/arv-put'
380     EMPTY_STATE = {
381         'manifest' : None, # Last saved manifest checkpoint
382         'files' : {} # Previous run file list: {path : {size, mtime}}
383     }
384
385     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
386                  name=None, owner_uuid=None,
387                  ensure_unique_name=False, num_retries=None,
388                  put_threads=None, replication_desired=None,
389                  filename=None, update_time=60.0, update_collection=None,
390                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
391                  follow_links=True, exclude_paths=[], exclude_names=None):
392         self.paths = paths
393         self.resume = resume
394         self.use_cache = use_cache
395         self.update = False
396         self.reporter = reporter
397         # This will set to 0 before start counting, if no special files are going
398         # to be read.
399         self.bytes_expected = None
400         self.bytes_written = 0
401         self.bytes_skipped = 0
402         self.name = name
403         self.owner_uuid = owner_uuid
404         self.ensure_unique_name = ensure_unique_name
405         self.num_retries = num_retries
406         self.replication_desired = replication_desired
407         self.put_threads = put_threads
408         self.filename = filename
409         self._state_lock = threading.Lock()
410         self._state = None # Previous run state (file list & manifest)
411         self._current_files = [] # Current run file list
412         self._cache_file = None
413         self._collection_lock = threading.Lock()
414         self._remote_collection = None # Collection being updated (if asked)
415         self._local_collection = None # Collection from previous run manifest
416         self._file_paths = set() # Files to be updated in remote collection
417         self._stop_checkpointer = threading.Event()
418         self._checkpointer = threading.Thread(target=self._update_task)
419         self._checkpointer.daemon = True
420         self._update_task_time = update_time  # How many seconds wait between update runs
421         self._files_to_upload = FileUploadList(dry_run=dry_run)
422         self._upload_started = False
423         self.logger = logger
424         self.dry_run = dry_run
425         self._checkpoint_before_quit = True
426         self.follow_links = follow_links
427         self.exclude_paths = exclude_paths
428         self.exclude_names = exclude_names
429
430         if not self.use_cache and self.resume:
431             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
432
433         # Check for obvious dry-run responses
434         if self.dry_run and (not self.use_cache or not self.resume):
435             raise ArvPutUploadIsPending()
436
437         # Load cached data if any and if needed
438         self._setup_state(update_collection)
439
440         # Build the upload file list, excluding requested files and counting the
441         # bytes expected to be uploaded.
442         self._build_upload_list()
443
444     def _build_upload_list(self):
445         """
446         Scan the requested paths to count file sizes, excluding files & dirs if requested
447         and building the upload file list.
448         """
449         # If there aren't special files to be read, reset total bytes count to zero
450         # to start counting.
451         if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)),
452                           self.paths)):
453             self.bytes_expected = 0
454
455         for path in self.paths:
456             # Test for stdin first, in case some file named '-' exist
457             if path == '-':
458                 if self.dry_run:
459                     raise ArvPutUploadIsPending()
460                 self._write_stdin(self.filename or 'stdin')
461             elif not os.path.exists(path):
462                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
463             elif os.path.isdir(path):
464                 # Use absolute paths on cache index so CWD doesn't interfere
465                 # with the caching logic.
466                 orig_path = path
467                 path = os.path.abspath(path)
468                 if orig_path[-1:] == os.sep:
469                     # When passing a directory reference with a trailing slash,
470                     # its contents should be uploaded directly to the collection's root.
471                     prefixdir = path
472                 else:
473                     # When passing a directory reference with no trailing slash,
474                     # upload the directory to the collection's root.
475                     prefixdir = os.path.dirname(path)
476                 prefixdir += os.sep
477                 for root, dirs, files in os.walk(path, followlinks=self.follow_links):
478                     root_relpath = os.path.relpath(root, path)
479                     # Exclude files/dirs by full path matching pattern
480                     if self.exclude_paths:
481                         dirs[:] = filter(
482                             lambda d: not any([pathname_match(os.path.join(root_relpath, d),
483                                                               pat)
484                                                for pat in self.exclude_paths]),
485                             dirs)
486                         files = filter(
487                             lambda f: not any([pathname_match(os.path.join(root_relpath, f),
488                                                               pat)
489                                                for pat in self.exclude_paths]),
490                             files)
491                     # Exclude files/dirs by name matching pattern
492                     if self.exclude_names is not None:
493                         dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs)
494                         files = filter(lambda f: not self.exclude_names.match(f), files)
495                     # Make os.walk()'s dir traversing order deterministic
496                     dirs.sort()
497                     files.sort()
498                     for f in files:
499                         filepath = os.path.join(root, f)
500                         # Add its size to the total bytes count (if applicable)
501                         if self.follow_links or (not os.path.islink(filepath)):
502                             if self.bytes_expected is not None:
503                                 self.bytes_expected += os.path.getsize(filepath)
504                         self._check_file(filepath,
505                                          os.path.join(root[len(prefixdir):], f))
506             else:
507                 filepath = os.path.abspath(path)
508                 # Add its size to the total bytes count (if applicable)
509                 if self.follow_links or (not os.path.islink(filepath)):
510                     if self.bytes_expected is not None:
511                         self.bytes_expected += os.path.getsize(filepath)
512                 self._check_file(filepath,
513                                  self.filename or os.path.basename(path))
514         # If dry-mode is on, and got up to this point, then we should notify that
515         # there aren't any file to upload.
516         if self.dry_run:
517             raise ArvPutUploadNotPending()
518         # Remove local_collection's files that don't exist locally anymore, so the
519         # bytes_written count is correct.
520         for f in self.collection_file_paths(self._local_collection,
521                                             path_prefix=""):
522             if f != 'stdin' and f != self.filename and not f in self._file_paths:
523                 self._local_collection.remove(f)
524
525     def start(self, save_collection):
526         """
527         Start supporting thread & file uploading
528         """
529         self._checkpointer.start()
530         try:
531             # Update bytes_written from current local collection and
532             # report initial progress.
533             self._update()
534             # Actual file upload
535             self._upload_started = True # Used by the update thread to start checkpointing
536             self._upload_files()
537         except (SystemExit, Exception) as e:
538             self._checkpoint_before_quit = False
539             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
540             # Note: We're expecting SystemExit instead of
541             # KeyboardInterrupt because we have a custom signal
542             # handler in place that raises SystemExit with the catched
543             # signal's code.
544             if isinstance(e, PathDoesNotExistError):
545                 # We aren't interested in the traceback for this case
546                 pass
547             elif not isinstance(e, SystemExit) or e.code != -2:
548                 self.logger.warning("Abnormal termination:\n{}".format(
549                     traceback.format_exc()))
550             raise
551         finally:
552             if not self.dry_run:
553                 # Stop the thread before doing anything else
554                 self._stop_checkpointer.set()
555                 self._checkpointer.join()
556                 if self._checkpoint_before_quit:
557                     # Commit all pending blocks & one last _update()
558                     self._local_collection.manifest_text()
559                     self._update(final=True)
560                     if save_collection:
561                         self.save_collection()
562             if self.use_cache:
563                 self._cache_file.close()
564
565     def save_collection(self):
566         if self.update:
567             # Check if files should be updated on the remote collection.
568             for fp in self._file_paths:
569                 remote_file = self._remote_collection.find(fp)
570                 if not remote_file:
571                     # File don't exist on remote collection, copy it.
572                     self._remote_collection.copy(fp, fp, self._local_collection)
573                 elif remote_file != self._local_collection.find(fp):
574                     # A different file exist on remote collection, overwrite it.
575                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
576                 else:
577                     # The file already exist on remote collection, skip it.
578                     pass
579             self._remote_collection.save(num_retries=self.num_retries)
580         else:
581             self._local_collection.save_new(
582                 name=self.name, owner_uuid=self.owner_uuid,
583                 ensure_unique_name=self.ensure_unique_name,
584                 num_retries=self.num_retries)
585
586     def destroy_cache(self):
587         if self.use_cache:
588             try:
589                 os.unlink(self._cache_filename)
590             except OSError as error:
591                 # That's what we wanted anyway.
592                 if error.errno != errno.ENOENT:
593                     raise
594             self._cache_file.close()
595
596     def _collection_size(self, collection):
597         """
598         Recursively get the total size of the collection
599         """
600         size = 0
601         for item in listvalues(collection):
602             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
603                 size += self._collection_size(item)
604             else:
605                 size += item.size()
606         return size
607
608     def _update_task(self):
609         """
610         Periodically called support task. File uploading is
611         asynchronous so we poll status from the collection.
612         """
613         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
614             self._update()
615
616     def _update(self, final=False):
617         """
618         Update cached manifest text and report progress.
619         """
620         if self._upload_started:
621             with self._collection_lock:
622                 self.bytes_written = self._collection_size(self._local_collection)
623                 if self.use_cache:
624                     if final:
625                         manifest = self._local_collection.manifest_text()
626                     else:
627                         # Get the manifest text without comitting pending blocks
628                         manifest = self._local_collection.manifest_text(strip=False,
629                                                                         normalize=False,
630                                                                         only_committed=True)
631                     # Update cache
632                     with self._state_lock:
633                         self._state['manifest'] = manifest
634             if self.use_cache:
635                 try:
636                     self._save_state()
637                 except Exception as e:
638                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
639         else:
640             self.bytes_written = self.bytes_skipped
641         # Call the reporter, if any
642         self.report_progress()
643
644     def report_progress(self):
645         if self.reporter is not None:
646             self.reporter(self.bytes_written, self.bytes_expected)
647
648     def _write_stdin(self, filename):
649         output = self._local_collection.open(filename, 'wb')
650         self._write(sys.stdin, output)
651         output.close()
652
653     def _check_file(self, source, filename):
654         """
655         Check if this file needs to be uploaded
656         """
657         # Ignore symlinks when requested
658         if (not self.follow_links) and os.path.islink(source):
659             return
660         resume_offset = 0
661         should_upload = False
662         new_file_in_cache = False
663         # Record file path for updating the remote collection before exiting
664         self._file_paths.add(filename)
665
666         with self._state_lock:
667             # If no previous cached data on this file, store it for an eventual
668             # repeated run.
669             if source not in self._state['files']:
670                 self._state['files'][source] = {
671                     'mtime': os.path.getmtime(source),
672                     'size' : os.path.getsize(source)
673                 }
674                 new_file_in_cache = True
675             cached_file_data = self._state['files'][source]
676
677         # Check if file was already uploaded (at least partially)
678         file_in_local_collection = self._local_collection.find(filename)
679
680         # If not resuming, upload the full file.
681         if not self.resume:
682             should_upload = True
683         # New file detected from last run, upload it.
684         elif new_file_in_cache:
685             should_upload = True
686         # Local file didn't change from last run.
687         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
688             if not file_in_local_collection:
689                 # File not uploaded yet, upload it completely
690                 should_upload = True
691             elif file_in_local_collection.permission_expired():
692                 # Permission token expired, re-upload file. This will change whenever
693                 # we have a API for refreshing tokens.
694                 should_upload = True
695                 self._local_collection.remove(filename)
696             elif cached_file_data['size'] == file_in_local_collection.size():
697                 # File already there, skip it.
698                 self.bytes_skipped += cached_file_data['size']
699             elif cached_file_data['size'] > file_in_local_collection.size():
700                 # File partially uploaded, resume!
701                 resume_offset = file_in_local_collection.size()
702                 self.bytes_skipped += resume_offset
703                 should_upload = True
704             else:
705                 # Inconsistent cache, re-upload the file
706                 should_upload = True
707                 self._local_collection.remove(filename)
708                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
709         # Local file differs from cached data, re-upload it.
710         else:
711             if file_in_local_collection:
712                 self._local_collection.remove(filename)
713             should_upload = True
714
715         if should_upload:
716             try:
717                 self._files_to_upload.append((source, resume_offset, filename))
718             except ArvPutUploadIsPending:
719                 # This could happen when running on dry-mode, close cache file to
720                 # avoid locking issues.
721                 self._cache_file.close()
722                 raise
723
724     def _upload_files(self):
725         for source, resume_offset, filename in self._files_to_upload:
726             with open(source, 'rb') as source_fd:
727                 with self._state_lock:
728                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
729                     self._state['files'][source]['size'] = os.path.getsize(source)
730                 if resume_offset > 0:
731                     # Start upload where we left off
732                     output = self._local_collection.open(filename, 'ab')
733                     source_fd.seek(resume_offset)
734                 else:
735                     # Start from scratch
736                     output = self._local_collection.open(filename, 'wb')
737                 self._write(source_fd, output)
738                 output.close(flush=False)
739
740     def _write(self, source_fd, output):
741         while True:
742             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
743             if not data:
744                 break
745             output.write(data)
746
747     def _my_collection(self):
748         return self._remote_collection if self.update else self._local_collection
749
750     def _setup_state(self, update_collection):
751         """
752         Create a new cache file or load a previously existing one.
753         """
754         # Load an already existing collection for update
755         if update_collection and re.match(arvados.util.collection_uuid_pattern,
756                                           update_collection):
757             try:
758                 self._remote_collection = arvados.collection.Collection(update_collection)
759             except arvados.errors.ApiError as error:
760                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
761             else:
762                 self.update = True
763         elif update_collection:
764             # Collection locator provided, but unknown format
765             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
766
767         if self.use_cache:
768             # Set up cache file name from input paths.
769             md5 = hashlib.md5()
770             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
771             realpaths = sorted(os.path.realpath(path) for path in self.paths)
772             md5.update(b'\0'.join([p.encode() for p in realpaths]))
773             if self.filename:
774                 md5.update(self.filename.encode())
775             cache_filename = md5.hexdigest()
776             cache_filepath = os.path.join(
777                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
778                 cache_filename)
779             if self.resume and os.path.exists(cache_filepath):
780                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
781                 self._cache_file = open(cache_filepath, 'a+')
782             else:
783                 # --no-resume means start with a empty cache file.
784                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
785                 self._cache_file = open(cache_filepath, 'w+')
786             self._cache_filename = self._cache_file.name
787             self._lock_file(self._cache_file)
788             self._cache_file.seek(0)
789
790         with self._state_lock:
791             if self.use_cache:
792                 try:
793                     self._state = json.load(self._cache_file)
794                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
795                         # Cache at least partially incomplete, set up new cache
796                         self._state = copy.deepcopy(self.EMPTY_STATE)
797                 except ValueError:
798                     # Cache file empty, set up new cache
799                     self._state = copy.deepcopy(self.EMPTY_STATE)
800             else:
801                 self.logger.info("No cache usage requested for this run.")
802                 # No cache file, set empty state
803                 self._state = copy.deepcopy(self.EMPTY_STATE)
804             # Load the previous manifest so we can check if files were modified remotely.
805             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
806
807     def collection_file_paths(self, col, path_prefix='.'):
808         """Return a list of file paths by recursively go through the entire collection `col`"""
809         file_paths = []
810         for name, item in listitems(col):
811             if isinstance(item, arvados.arvfile.ArvadosFile):
812                 file_paths.append(os.path.join(path_prefix, name))
813             elif isinstance(item, arvados.collection.Subcollection):
814                 new_prefix = os.path.join(path_prefix, name)
815                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
816         return file_paths
817
818     def _lock_file(self, fileobj):
819         try:
820             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
821         except IOError:
822             raise ResumeCacheConflict("{} locked".format(fileobj.name))
823
824     def _save_state(self):
825         """
826         Atomically save current state into cache.
827         """
828         with self._state_lock:
829             # We're not using copy.deepcopy() here because it's a lot slower
830             # than json.dumps(), and we're already needing JSON format to be
831             # saved on disk.
832             state = json.dumps(self._state)
833         try:
834             new_cache = tempfile.NamedTemporaryFile(
835                 mode='w+',
836                 dir=os.path.dirname(self._cache_filename), delete=False)
837             self._lock_file(new_cache)
838             new_cache.write(state)
839             new_cache.flush()
840             os.fsync(new_cache)
841             os.rename(new_cache.name, self._cache_filename)
842         except (IOError, OSError, ResumeCacheConflict) as error:
843             self.logger.error("There was a problem while saving the cache file: {}".format(error))
844             try:
845                 os.unlink(new_cache_name)
846             except NameError:  # mkstemp failed.
847                 pass
848         else:
849             self._cache_file.close()
850             self._cache_file = new_cache
851
852     def collection_name(self):
853         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
854
855     def manifest_locator(self):
856         return self._my_collection().manifest_locator()
857
858     def portable_data_hash(self):
859         pdh = self._my_collection().portable_data_hash()
860         m = self._my_collection().stripped_manifest().encode()
861         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
862         if pdh != local_pdh:
863             logger.warning("\n".join([
864                 "arv-put: API server provided PDH differs from local manifest.",
865                 "         This should not happen; showing API server version."]))
866         return pdh
867
868     def manifest_text(self, stream_name=".", strip=False, normalize=False):
869         return self._my_collection().manifest_text(stream_name, strip, normalize)
870
871     def _datablocks_on_item(self, item):
872         """
873         Return a list of datablock locators, recursively navigating
874         through subcollections
875         """
876         if isinstance(item, arvados.arvfile.ArvadosFile):
877             if item.size() == 0:
878                 # Empty file locator
879                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
880             else:
881                 locators = []
882                 for segment in item.segments():
883                     loc = segment.locator
884                     locators.append(loc)
885                 return locators
886         elif isinstance(item, arvados.collection.Collection):
887             l = [self._datablocks_on_item(x) for x in listvalues(item)]
888             # Fast list flattener method taken from:
889             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
890             return [loc for sublist in l for loc in sublist]
891         else:
892             return None
893
894     def data_locators(self):
895         with self._collection_lock:
896             # Make sure all datablocks are flushed before getting the locators
897             self._my_collection().manifest_text()
898             datablocks = self._datablocks_on_item(self._my_collection())
899         return datablocks
900
901 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
902                                                             os.getpid())
903
904 # Simulate glob.glob() matching behavior without the need to scan the filesystem
905 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
906 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
907 # so instead we're using it on every path component.
908 def pathname_match(pathname, pattern):
909     name = pathname.split(os.sep)
910     # Fix patterns like 'some/subdir/' or 'some//subdir'
911     pat = [x for x in pattern.split(os.sep) if x != '']
912     if len(name) != len(pat):
913         return False
914     for i in range(len(name)):
915         if not fnmatch.fnmatch(name[i], pat[i]):
916             return False
917     return True
918
919 def machine_progress(bytes_written, bytes_expected):
920     return _machine_format.format(
921         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
922
923 def human_progress(bytes_written, bytes_expected):
924     if bytes_expected:
925         return "\r{}M / {}M {:.1%} ".format(
926             bytes_written >> 20, bytes_expected >> 20,
927             float(bytes_written) / bytes_expected)
928     else:
929         return "\r{} ".format(bytes_written)
930
931 def progress_writer(progress_func, outfile=sys.stderr):
932     def write_progress(bytes_written, bytes_expected):
933         outfile.write(progress_func(bytes_written, bytes_expected))
934     return write_progress
935
936 def exit_signal_handler(sigcode, frame):
937     sys.exit(-sigcode)
938
939 def desired_project_uuid(api_client, project_uuid, num_retries):
940     if not project_uuid:
941         query = api_client.users().current()
942     elif arvados.util.user_uuid_pattern.match(project_uuid):
943         query = api_client.users().get(uuid=project_uuid)
944     elif arvados.util.group_uuid_pattern.match(project_uuid):
945         query = api_client.groups().get(uuid=project_uuid)
946     else:
947         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
948     return query.execute(num_retries=num_retries)['uuid']
949
950 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
951     global api_client
952
953     logger = logging.getLogger('arvados.arv_put')
954     logger.setLevel(logging.INFO)
955     args = parse_arguments(arguments)
956     status = 0
957     if api_client is None:
958         api_client = arvados.api('v1')
959
960     # Determine the name to use
961     if args.name:
962         if args.stream or args.raw:
963             logger.error("Cannot use --name with --stream or --raw")
964             sys.exit(1)
965         elif args.update_collection:
966             logger.error("Cannot use --name with --update-collection")
967             sys.exit(1)
968         collection_name = args.name
969     else:
970         collection_name = "Saved at {} by {}@{}".format(
971             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
972             pwd.getpwuid(os.getuid()).pw_name,
973             socket.gethostname())
974
975     if args.project_uuid and (args.stream or args.raw):
976         logger.error("Cannot use --project-uuid with --stream or --raw")
977         sys.exit(1)
978
979     # Determine the parent project
980     try:
981         project_uuid = desired_project_uuid(api_client, args.project_uuid,
982                                             args.retries)
983     except (apiclient_errors.Error, ValueError) as error:
984         logger.error(error)
985         sys.exit(1)
986
987     if args.progress:
988         reporter = progress_writer(human_progress)
989     elif args.batch_progress:
990         reporter = progress_writer(machine_progress)
991     else:
992         reporter = None
993
994     # Setup exclude regex from all the --exclude arguments provided
995     name_patterns = []
996     exclude_paths = []
997     exclude_names = None
998     if len(args.exclude) > 0:
999         # We're supporting 2 kinds of exclusion patterns:
1000         # 1) --exclude '*.jpg'      (file/dir name patterns, will only match
1001         #                            the name)
1002         # 2) --exclude 'foo/bar'    (file/dir path patterns, will match the
1003         #                            entire path, and should be relative to
1004         #                            any input dir argument)
1005         for p in args.exclude:
1006             # Only relative paths patterns allowed
1007             if p.startswith(os.sep):
1008                 logger.error("Cannot use absolute paths with --exclude")
1009                 sys.exit(1)
1010             if os.path.dirname(p):
1011                 # We don't support of path patterns with '.' or '..'
1012                 p_parts = p.split(os.sep)
1013                 if '.' in p_parts or '..' in p_parts:
1014                     logger.error(
1015                         "Cannot use path patterns that include '.' or '..")
1016                     sys.exit(1)
1017                 # Path search pattern
1018                 exclude_paths.append(p)
1019             else:
1020                 # Name-only search pattern
1021                 name_patterns.append(p)
1022         # For name only matching, we can combine all patterns into a single regexp,
1023         # for better performance.
1024         exclude_names = re.compile('|'.join(
1025             [fnmatch.translate(p) for p in name_patterns]
1026         )) if len(name_patterns) > 0 else None
1027         # Show the user the patterns to be used, just in case they weren't specified inside
1028         # quotes and got changed by the shell expansion.
1029         logger.info("Exclude patterns: {}".format(args.exclude))
1030
1031     # If this is used by a human, and there's at least one directory to be
1032     # uploaded, the expected bytes calculation can take a moment.
1033     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1034         logger.info("Calculating upload size, this could take some time...")
1035     try:
1036         writer = ArvPutUploadJob(paths = args.paths,
1037                                  resume = args.resume,
1038                                  use_cache = args.use_cache,
1039                                  filename = args.filename,
1040                                  reporter = reporter,
1041                                  num_retries = args.retries,
1042                                  replication_desired = args.replication,
1043                                  put_threads = args.threads,
1044                                  name = collection_name,
1045                                  owner_uuid = project_uuid,
1046                                  ensure_unique_name = True,
1047                                  update_collection = args.update_collection,
1048                                  logger=logger,
1049                                  dry_run=args.dry_run,
1050                                  follow_links=args.follow_links,
1051                                  exclude_paths=exclude_paths,
1052                                  exclude_names=exclude_names)
1053     except ResumeCacheConflict:
1054         logger.error("\n".join([
1055             "arv-put: Another process is already uploading this data.",
1056             "         Use --no-cache if this is really what you want."]))
1057         sys.exit(1)
1058     except CollectionUpdateError as error:
1059         logger.error("\n".join([
1060             "arv-put: %s" % str(error)]))
1061         sys.exit(1)
1062     except ArvPutUploadIsPending:
1063         # Dry run check successful, return proper exit code.
1064         sys.exit(2)
1065     except ArvPutUploadNotPending:
1066         # No files pending for upload
1067         sys.exit(0)
1068     except PathDoesNotExistError as error:
1069         logger.error("\n".join([
1070             "arv-put: %s" % str(error)]))
1071         sys.exit(1)
1072
1073     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1074     # the originals.
1075     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1076                             for sigcode in CAUGHT_SIGNALS}
1077
1078     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1079         logger.warning("\n".join([
1080             "arv-put: Resuming previous upload from last checkpoint.",
1081             "         Use the --no-resume option to start over."]))
1082
1083     if not args.dry_run:
1084         writer.report_progress()
1085     output = None
1086     try:
1087         writer.start(save_collection=not(args.stream or args.raw))
1088     except arvados.errors.ApiError as error:
1089         logger.error("\n".join([
1090             "arv-put: %s" % str(error)]))
1091         sys.exit(1)
1092
1093     if args.progress:  # Print newline to split stderr from stdout for humans.
1094         logger.info("\n")
1095
1096     if args.stream:
1097         if args.normalize:
1098             output = writer.manifest_text(normalize=True)
1099         else:
1100             output = writer.manifest_text()
1101     elif args.raw:
1102         output = ','.join(writer.data_locators())
1103     else:
1104         try:
1105             if args.update_collection:
1106                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1107             else:
1108                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1109             if args.portable_data_hash:
1110                 output = writer.portable_data_hash()
1111             else:
1112                 output = writer.manifest_locator()
1113         except apiclient_errors.Error as error:
1114             logger.error(
1115                 "arv-put: Error creating Collection on project: {}.".format(
1116                     error))
1117             status = 1
1118
1119     # Print the locator (uuid) of the new collection.
1120     if output is None:
1121         status = status or 1
1122     else:
1123         stdout.write(output)
1124         if not output.endswith('\n'):
1125             stdout.write('\n')
1126
1127     for sigcode, orig_handler in listitems(orig_signal_handlers):
1128         signal.signal(sigcode, orig_handler)
1129
1130     if status != 0:
1131         sys.exit(status)
1132
1133     # Success!
1134     return output
1135
1136
1137 if __name__ == '__main__':
1138     main()