11789: Unified the exclude logic by removing expected_bytes_for() and moving
[arvados.git] / sdk / python / arvados / commands / put.py
1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import copy
10 import datetime
11 import errno
12 import fcntl
13 import fnmatch
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 import traceback
27
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
30
31 import arvados.commands._util as arv_cmd
32
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
34 api_client = None
35
36 upload_opts = argparse.ArgumentParser(add_help=False)
37
38 upload_opts.add_argument('--version', action='version',
39                          version="%s %s" % (sys.argv[0], __version__),
40                          help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
42                          help="""
43 Local file or directory. If path is a directory reference with a trailing
44 slash, then just upload the directory's contents; otherwise upload the
45 directory itself. Default: read from standard input.
46 """)
47
48 _group = upload_opts.add_mutually_exclusive_group()
49
50 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
51                     default=-1, help=argparse.SUPPRESS)
52
53 _group.add_argument('--normalize', action='store_true',
54                     help="""
55 Normalize the manifest by re-ordering files and streams after writing
56 data.
57 """)
58
59 _group.add_argument('--dry-run', action='store_true', default=False,
60                     help="""
61 Don't actually upload files, but only check if any file should be
62 uploaded. Exit with code=2 when files are pending for upload.
63 """)
64
65 _group = upload_opts.add_mutually_exclusive_group()
66
67 _group.add_argument('--as-stream', action='store_true', dest='stream',
68                     help="""
69 Synonym for --stream.
70 """)
71
72 _group.add_argument('--stream', action='store_true',
73                     help="""
74 Store the file content and display the resulting manifest on
75 stdout. Do not write the manifest to Keep or save a Collection object
76 in Arvados.
77 """)
78
79 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
80                     help="""
81 Synonym for --manifest.
82 """)
83
84 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
85                     help="""
86 Synonym for --manifest.
87 """)
88
89 _group.add_argument('--manifest', action='store_true',
90                     help="""
91 Store the file data and resulting manifest in Keep, save a Collection
92 object in Arvados, and display the manifest locator (Collection uuid)
93 on stdout. This is the default behavior.
94 """)
95
96 _group.add_argument('--as-raw', action='store_true', dest='raw',
97                     help="""
98 Synonym for --raw.
99 """)
100
101 _group.add_argument('--raw', action='store_true',
102                     help="""
103 Store the file content and display the data block locators on stdout,
104 separated by commas, with a trailing newline. Do not store a
105 manifest.
106 """)
107
108 upload_opts.add_argument('--update-collection', type=str, default=None,
109                          dest='update_collection', metavar="UUID", help="""
110 Update an existing collection identified by the given Arvados collection
111 UUID. All new local files will be uploaded.
112 """)
113
114 upload_opts.add_argument('--use-filename', type=str, default=None,
115                          dest='filename', help="""
116 Synonym for --filename.
117 """)
118
119 upload_opts.add_argument('--filename', type=str, default=None,
120                          help="""
121 Use the given filename in the manifest, instead of the name of the
122 local file. This is useful when "-" or "/dev/stdin" is given as an
123 input file. It can be used only if there is exactly one path given and
124 it is not a directory. Implies --manifest.
125 """)
126
127 upload_opts.add_argument('--portable-data-hash', action='store_true',
128                          help="""
129 Print the portable data hash instead of the Arvados UUID for the collection
130 created by the upload.
131 """)
132
133 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
134                          help="""
135 Set the replication level for the new collection: how many different
136 physical storage devices (e.g., disks) should have a copy of each data
137 block. Default is to use the server-provided default (if any) or 2.
138 """)
139
140 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
141                          help="""
142 Set the number of upload threads to be used. Take into account that
143 using lots of threads will increase the RAM requirements. Default is
144 to use 2 threads.
145 On high latency installations, using a greater number will improve
146 overall throughput.
147 """)
148
149 run_opts = argparse.ArgumentParser(add_help=False)
150
151 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
152 Store the collection in the specified project, instead of your Home
153 project.
154 """)
155
156 run_opts.add_argument('--name', help="""
157 Save the collection with the specified name.
158 """)
159
160 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
161                       action='append', help="""
162 Exclude files and directories whose names match the given pattern. You
163 can specify multiple patterns by using this argument more than once.
164 """)
165
166 _group = run_opts.add_mutually_exclusive_group()
167 _group.add_argument('--progress', action='store_true',
168                     help="""
169 Display human-readable progress on stderr (bytes and, if possible,
170 percentage of total data size). This is the default behavior when
171 stderr is a tty.
172 """)
173
174 _group.add_argument('--no-progress', action='store_true',
175                     help="""
176 Do not display human-readable progress on stderr, even if stderr is a
177 tty.
178 """)
179
180 _group.add_argument('--batch-progress', action='store_true',
181                     help="""
182 Display machine-readable progress on stderr (bytes and, if known,
183 total data size).
184 """)
185
186 _group = run_opts.add_mutually_exclusive_group()
187 _group.add_argument('--resume', action='store_true', default=True,
188                     help="""
189 Continue interrupted uploads from cached state (default).
190 """)
191 _group.add_argument('--no-resume', action='store_false', dest='resume',
192                     help="""
193 Do not continue interrupted uploads from cached state.
194 """)
195
196 _group = run_opts.add_mutually_exclusive_group()
197 _group.add_argument('--follow-links', action='store_true', default=True,
198                     dest='follow_links', help="""
199 Follow file and directory symlinks (default).
200 """)
201 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
202                     help="""
203 Do not follow file and directory symlinks.
204 """)
205
206 _group = run_opts.add_mutually_exclusive_group()
207 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
208                     help="""
209 Save upload state in a cache file for resuming (default).
210 """)
211 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
212                     help="""
213 Do not save upload state in a cache file for resuming.
214 """)
215
216 arg_parser = argparse.ArgumentParser(
217     description='Copy data from the local filesystem to Keep.',
218     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
219
220 def parse_arguments(arguments):
221     args = arg_parser.parse_args(arguments)
222
223     if len(args.paths) == 0:
224         args.paths = ['-']
225
226     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
227
228     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
229         if args.filename:
230             arg_parser.error("""
231     --filename argument cannot be used when storing a directory or
232     multiple files.
233     """)
234
235     # Turn on --progress by default if stderr is a tty.
236     if (not (args.batch_progress or args.no_progress)
237         and os.isatty(sys.stderr.fileno())):
238         args.progress = True
239
240     # Turn off --resume (default) if --no-cache is used.
241     if not args.use_cache:
242         args.resume = False
243
244     if args.paths == ['-']:
245         if args.update_collection:
246             arg_parser.error("""
247     --update-collection cannot be used when reading from stdin.
248     """)
249         args.resume = False
250         args.use_cache = False
251         if not args.filename:
252             args.filename = 'stdin'
253
254     # Remove possible duplicated patterns
255     if len(args.exclude) > 0:
256         args.exclude = list(set(args.exclude))
257
258     return args
259
260
261 class PathDoesNotExistError(Exception):
262     pass
263
264
265 class CollectionUpdateError(Exception):
266     pass
267
268
269 class ResumeCacheConflict(Exception):
270     pass
271
272
273 class ArvPutArgumentConflict(Exception):
274     pass
275
276
277 class ArvPutUploadIsPending(Exception):
278     pass
279
280
281 class ArvPutUploadNotPending(Exception):
282     pass
283
284
285 class FileUploadList(list):
286     def __init__(self, dry_run=False):
287         list.__init__(self)
288         self.dry_run = dry_run
289
290     def append(self, other):
291         if self.dry_run:
292             raise ArvPutUploadIsPending()
293         super(FileUploadList, self).append(other)
294
295
296 class ResumeCache(object):
297     CACHE_DIR = '.cache/arvados/arv-put'
298
299     def __init__(self, file_spec):
300         self.cache_file = open(file_spec, 'a+')
301         self._lock_file(self.cache_file)
302         self.filename = self.cache_file.name
303
304     @classmethod
305     def make_path(cls, args):
306         md5 = hashlib.md5()
307         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
308         realpaths = sorted(os.path.realpath(path) for path in args.paths)
309         md5.update(b'\0'.join([p.encode() for p in realpaths]))
310         if any(os.path.isdir(path) for path in realpaths):
311             md5.update(b'-1')
312         elif args.filename:
313             md5.update(args.filename.encode())
314         return os.path.join(
315             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
316             md5.hexdigest())
317
318     def _lock_file(self, fileobj):
319         try:
320             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
321         except IOError:
322             raise ResumeCacheConflict("{} locked".format(fileobj.name))
323
324     def load(self):
325         self.cache_file.seek(0)
326         return json.load(self.cache_file)
327
328     def check_cache(self, api_client=None, num_retries=0):
329         try:
330             state = self.load()
331             locator = None
332             try:
333                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
334                     locator = state["_finished_streams"][0][1][0]
335                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
336                     locator = state["_current_stream_locators"][0]
337                 if locator is not None:
338                     kc = arvados.keep.KeepClient(api_client=api_client)
339                     kc.head(locator, num_retries=num_retries)
340             except Exception as e:
341                 self.restart()
342         except (ValueError):
343             pass
344
345     def save(self, data):
346         try:
347             new_cache_fd, new_cache_name = tempfile.mkstemp(
348                 dir=os.path.dirname(self.filename))
349             self._lock_file(new_cache_fd)
350             new_cache = os.fdopen(new_cache_fd, 'r+')
351             json.dump(data, new_cache)
352             os.rename(new_cache_name, self.filename)
353         except (IOError, OSError, ResumeCacheConflict) as error:
354             try:
355                 os.unlink(new_cache_name)
356             except NameError:  # mkstemp failed.
357                 pass
358         else:
359             self.cache_file.close()
360             self.cache_file = new_cache
361
362     def close(self):
363         self.cache_file.close()
364
365     def destroy(self):
366         try:
367             os.unlink(self.filename)
368         except OSError as error:
369             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
370                 raise
371         self.close()
372
373     def restart(self):
374         self.destroy()
375         self.__init__(self.filename)
376
377
378 class ArvPutUploadJob(object):
379     CACHE_DIR = '.cache/arvados/arv-put'
380     EMPTY_STATE = {
381         'manifest' : None, # Last saved manifest checkpoint
382         'files' : {} # Previous run file list: {path : {size, mtime}}
383     }
384
385     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
386                  name=None, owner_uuid=None,
387                  ensure_unique_name=False, num_retries=None,
388                  put_threads=None, replication_desired=None,
389                  filename=None, update_time=60.0, update_collection=None,
390                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
391                  follow_links=True, exclude_paths=[], exclude_names=None):
392         self.paths = paths
393         self.resume = resume
394         self.use_cache = use_cache
395         self.update = False
396         self.reporter = reporter
397         # This will set to 0 before start counting, if no special files are going
398         # to be read.
399         self.bytes_expected = None
400         self.bytes_written = 0
401         self.bytes_skipped = 0
402         self.name = name
403         self.owner_uuid = owner_uuid
404         self.ensure_unique_name = ensure_unique_name
405         self.num_retries = num_retries
406         self.replication_desired = replication_desired
407         self.put_threads = put_threads
408         self.filename = filename
409         self._state_lock = threading.Lock()
410         self._state = None # Previous run state (file list & manifest)
411         self._current_files = [] # Current run file list
412         self._cache_file = None
413         self._collection_lock = threading.Lock()
414         self._remote_collection = None # Collection being updated (if asked)
415         self._local_collection = None # Collection from previous run manifest
416         self._file_paths = set() # Files to be updated in remote collection
417         self._stop_checkpointer = threading.Event()
418         self._checkpointer = threading.Thread(target=self._update_task)
419         self._checkpointer.daemon = True
420         self._update_task_time = update_time  # How many seconds wait between update runs
421         self._files_to_upload = FileUploadList(dry_run=dry_run)
422         self._upload_started = False
423         self.logger = logger
424         self.dry_run = dry_run
425         self._checkpoint_before_quit = True
426         self.follow_links = follow_links
427         self.exclude_paths = exclude_paths
428         self.exclude_names = exclude_names
429
430         if not self.use_cache and self.resume:
431             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
432
433         # Check for obvious dry-run responses
434         if self.dry_run and (not self.use_cache or not self.resume):
435             raise ArvPutUploadIsPending()
436
437         # Load cached data if any and if needed
438         self._setup_state(update_collection)
439
440         # Build the upload file list, excluding requested files and counting the
441         # bytes expected to be uploaded.
442         self._build_upload_list()
443
444     def _build_upload_list(self):
445         """
446         Scan the requested paths to count file sizes, excluding files & dirs if requested
447         and building the upload file list.
448         """
449         # If there aren't special files to be read, reset total bytes count to zero
450         # to start counting.
451         if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)),
452                           self.paths)):
453             self.bytes_expected = 0
454
455         for path in self.paths:
456             # Test for stdin first, in case some file named '-' exist
457             if path == '-':
458                 if self.dry_run:
459                     raise ArvPutUploadIsPending()
460                 self._write_stdin(self.filename or 'stdin')
461             elif not os.path.exists(path):
462                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
463             elif os.path.isdir(path):
464                 # Use absolute paths on cache index so CWD doesn't interfere
465                 # with the caching logic.
466                 orig_path = path
467                 path = os.path.abspath(path)
468                 if orig_path[-1:] == os.sep:
469                     # When passing a directory reference with a trailing slash,
470                     # its contents should be uploaded directly to the collection's root.
471                     prefixdir = path
472                 else:
473                     # When passing a directory reference with no trailing slash,
474                     # upload the directory to the collection's root.
475                     prefixdir = os.path.dirname(path)
476                 prefixdir += os.sep
477                 for root, dirs, files in os.walk(path, followlinks=self.follow_links):
478                     root_relpath = os.path.relpath(root, path)
479                     # Exclude files/dirs by full path matching pattern
480                     if self.exclude_paths:
481                         dirs[:] = filter(
482                             lambda d: not any([pathname_match(os.path.join(root_relpath, d),
483                                                               pat)
484                                                for pat in self.exclude_paths]),
485                             dirs)
486                         files = filter(
487                             lambda f: not any([pathname_match(os.path.join(root_relpath, f),
488                                                               pat)
489                                                for pat in self.exclude_paths]),
490                             files)
491                     # Exclude files/dirs by name matching pattern
492                     if self.exclude_names is not None:
493                         dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs)
494                         files = filter(lambda f: not self.exclude_names.match(f), files)
495                     # Make os.walk()'s dir traversing order deterministic
496                     dirs.sort()
497                     files.sort()
498                     for f in files:
499                         filepath = os.path.join(root, f)
500                         # Add its size to the total bytes count (if applicable)
501                         if self.follow_links or (not os.path.islink(filepath)):
502                             if self.bytes_expected is not None:
503                                 self.bytes_expected += os.path.getsize(filepath)
504                         self._check_file(filepath,
505                                          os.path.join(root[len(prefixdir):], f))
506             else:
507                 filepath = os.path.abspath(path)
508                 # Add its size to the total bytes count (if applicable)
509                 if self.follow_links or (not os.path.islink(filepath)):
510                     if self.bytes_expected is not None:
511                         self.bytes_expected += os.path.getsize(filepath)
512                 self._check_file(filepath,
513                                  self.filename or os.path.basename(path))
514         # If dry-mode is on, and got up to this point, then we should notify that
515         # there aren't any file to upload.
516         if self.dry_run:
517             raise ArvPutUploadNotPending()
518         # Remove local_collection's files that don't exist locally anymore, so the
519         # bytes_written count is correct.
520         for f in self.collection_file_paths(self._local_collection,
521                                             path_prefix=""):
522             if f != 'stdin' and f != self.filename and not f in self._file_paths:
523                 self._local_collection.remove(f)
524
525     def start(self, save_collection):
526         """
527         Start supporting thread & file uploading
528         """
529         self._checkpointer.start()
530         try:
531             # Update bytes_written from current local collection and
532             # report initial progress.
533             self._update()
534             # Actual file upload
535             self._upload_started = True # Used by the update thread to start checkpointing
536             self._upload_files()
537         except (SystemExit, Exception) as e:
538             self._checkpoint_before_quit = False
539             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
540             # Note: We're expecting SystemExit instead of
541             # KeyboardInterrupt because we have a custom signal
542             # handler in place that raises SystemExit with the catched
543             # signal's code.
544             if isinstance(e, PathDoesNotExistError):
545                 # We aren't interested in the traceback for this case
546                 pass
547             elif not isinstance(e, SystemExit) or e.code != -2:
548                 self.logger.warning("Abnormal termination:\n{}".format(
549                     traceback.format_exc()))
550             raise
551         finally:
552             if not self.dry_run:
553                 # Stop the thread before doing anything else
554                 self._stop_checkpointer.set()
555                 self._checkpointer.join()
556                 if self._checkpoint_before_quit:
557                     # Commit all pending blocks & one last _update()
558                     self._local_collection.manifest_text()
559                     self._update(final=True)
560                     if save_collection:
561                         self.save_collection()
562             if self.use_cache:
563                 self._cache_file.close()
564
565     def save_collection(self):
566         if self.update:
567             # Check if files should be updated on the remote collection.
568             for fp in self._file_paths:
569                 remote_file = self._remote_collection.find(fp)
570                 if not remote_file:
571                     # File don't exist on remote collection, copy it.
572                     self._remote_collection.copy(fp, fp, self._local_collection)
573                 elif remote_file != self._local_collection.find(fp):
574                     # A different file exist on remote collection, overwrite it.
575                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
576                 else:
577                     # The file already exist on remote collection, skip it.
578                     pass
579             self._remote_collection.save(num_retries=self.num_retries)
580         else:
581             self._local_collection.save_new(
582                 name=self.name, owner_uuid=self.owner_uuid,
583                 ensure_unique_name=self.ensure_unique_name,
584                 num_retries=self.num_retries)
585
586     def destroy_cache(self):
587         if self.use_cache:
588             try:
589                 os.unlink(self._cache_filename)
590             except OSError as error:
591                 # That's what we wanted anyway.
592                 if error.errno != errno.ENOENT:
593                     raise
594             self._cache_file.close()
595
596     def _collection_size(self, collection):
597         """
598         Recursively get the total size of the collection
599         """
600         size = 0
601         for item in listvalues(collection):
602             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
603                 size += self._collection_size(item)
604             else:
605                 size += item.size()
606         return size
607
608     def _update_task(self):
609         """
610         Periodically called support task. File uploading is
611         asynchronous so we poll status from the collection.
612         """
613         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
614             self._update()
615
616     def _update(self, final=False):
617         """
618         Update cached manifest text and report progress.
619         """
620         if self._upload_started:
621             with self._collection_lock:
622                 self.bytes_written = self._collection_size(self._local_collection)
623                 if self.use_cache:
624                     if final:
625                         manifest = self._local_collection.manifest_text()
626                     else:
627                         # Get the manifest text without comitting pending blocks
628                         manifest = self._local_collection.manifest_text(strip=False,
629                                                                         normalize=False,
630                                                                         only_committed=True)
631                     # Update cache
632                     with self._state_lock:
633                         self._state['manifest'] = manifest
634             if self.use_cache:
635                 try:
636                     self._save_state()
637                 except Exception as e:
638                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
639         else:
640             self.bytes_written = self.bytes_skipped
641         # Call the reporter, if any
642         self.report_progress()
643
644     def report_progress(self):
645         if self.reporter is not None:
646             self.reporter(self.bytes_written, self.bytes_expected)
647
648     def _write_stdin(self, filename):
649         output = self._local_collection.open(filename, 'wb')
650         self._write(sys.stdin, output)
651         output.close()
652
653     def _check_file(self, source, filename):
654         """
655         Check if this file needs to be uploaded
656         """
657         # Ignore symlinks when requested
658         if (not self.follow_links) and os.path.islink(source):
659             return
660         resume_offset = 0
661         should_upload = False
662         new_file_in_cache = False
663         # Record file path for updating the remote collection before exiting
664         self._file_paths.add(filename)
665
666         with self._state_lock:
667             # If no previous cached data on this file, store it for an eventual
668             # repeated run.
669             if source not in self._state['files']:
670                 self._state['files'][source] = {
671                     'mtime': os.path.getmtime(source),
672                     'size' : os.path.getsize(source)
673                 }
674                 new_file_in_cache = True
675             cached_file_data = self._state['files'][source]
676
677         # Check if file was already uploaded (at least partially)
678         file_in_local_collection = self._local_collection.find(filename)
679
680         # If not resuming, upload the full file.
681         if not self.resume:
682             should_upload = True
683         # New file detected from last run, upload it.
684         elif new_file_in_cache:
685             should_upload = True
686         # Local file didn't change from last run.
687         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
688             if not file_in_local_collection:
689                 # File not uploaded yet, upload it completely
690                 should_upload = True
691             elif file_in_local_collection.permission_expired():
692                 # Permission token expired, re-upload file. This will change whenever
693                 # we have a API for refreshing tokens.
694                 should_upload = True
695                 self._local_collection.remove(filename)
696             elif cached_file_data['size'] == file_in_local_collection.size():
697                 # File already there, skip it.
698                 self.bytes_skipped += cached_file_data['size']
699             elif cached_file_data['size'] > file_in_local_collection.size():
700                 # File partially uploaded, resume!
701                 resume_offset = file_in_local_collection.size()
702                 self.bytes_skipped += resume_offset
703                 should_upload = True
704             else:
705                 # Inconsistent cache, re-upload the file
706                 should_upload = True
707                 self._local_collection.remove(filename)
708                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
709         # Local file differs from cached data, re-upload it.
710         else:
711             if file_in_local_collection:
712                 self._local_collection.remove(filename)
713             should_upload = True
714
715         if should_upload:
716             try:
717                 self._files_to_upload.append((source, resume_offset, filename))
718             except ArvPutUploadIsPending:
719                 # This could happen when running on dry-mode, close cache file to
720                 # avoid locking issues.
721                 self._cache_file.close()
722                 raise
723
724     def _upload_files(self):
725         for source, resume_offset, filename in self._files_to_upload:
726             with open(source, 'rb') as source_fd:
727                 with self._state_lock:
728                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
729                     self._state['files'][source]['size'] = os.path.getsize(source)
730                 if resume_offset > 0:
731                     # Start upload where we left off
732                     output = self._local_collection.open(filename, 'ab')
733                     source_fd.seek(resume_offset)
734                 else:
735                     # Start from scratch
736                     output = self._local_collection.open(filename, 'wb')
737                 self._write(source_fd, output)
738                 output.close(flush=False)
739
740     def _write(self, source_fd, output):
741         while True:
742             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
743             if not data:
744                 break
745             output.write(data)
746
747     def _my_collection(self):
748         return self._remote_collection if self.update else self._local_collection
749
750     def _setup_state(self, update_collection):
751         """
752         Create a new cache file or load a previously existing one.
753         """
754         # Load an already existing collection for update
755         if update_collection and re.match(arvados.util.collection_uuid_pattern,
756                                           update_collection):
757             try:
758                 self._remote_collection = arvados.collection.Collection(update_collection)
759             except arvados.errors.ApiError as error:
760                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
761             else:
762                 self.update = True
763         elif update_collection:
764             # Collection locator provided, but unknown format
765             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
766
767         if self.use_cache:
768             # Set up cache file name from input paths.
769             md5 = hashlib.md5()
770             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
771             realpaths = sorted(os.path.realpath(path) for path in self.paths)
772             md5.update(b'\0'.join([p.encode() for p in realpaths]))
773             if self.filename:
774                 md5.update(self.filename.encode())
775             cache_filename = md5.hexdigest()
776             cache_filepath = os.path.join(
777                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
778                 cache_filename)
779             if self.resume and os.path.exists(cache_filepath):
780                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
781                 self._cache_file = open(cache_filepath, 'a+')
782             else:
783                 # --no-resume means start with a empty cache file.
784                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
785                 self._cache_file = open(cache_filepath, 'w+')
786             self._cache_filename = self._cache_file.name
787             self._lock_file(self._cache_file)
788             self._cache_file.seek(0)
789
790         with self._state_lock:
791             if self.use_cache:
792                 try:
793                     self._state = json.load(self._cache_file)
794                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
795                         # Cache at least partially incomplete, set up new cache
796                         self._state = copy.deepcopy(self.EMPTY_STATE)
797                 except ValueError:
798                     # Cache file empty, set up new cache
799                     self._state = copy.deepcopy(self.EMPTY_STATE)
800             else:
801                 self.logger.info("No cache usage requested for this run.")
802                 # No cache file, set empty state
803                 self._state = copy.deepcopy(self.EMPTY_STATE)
804             # Load the previous manifest so we can check if files were modified remotely.
805             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
806
807     def collection_file_paths(self, col, path_prefix='.'):
808         """Return a list of file paths by recursively go through the entire collection `col`"""
809         file_paths = []
810         for name, item in listitems(col):
811             if isinstance(item, arvados.arvfile.ArvadosFile):
812                 file_paths.append(os.path.join(path_prefix, name))
813             elif isinstance(item, arvados.collection.Subcollection):
814                 new_prefix = os.path.join(path_prefix, name)
815                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
816         return file_paths
817
818     def _lock_file(self, fileobj):
819         try:
820             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
821         except IOError:
822             raise ResumeCacheConflict("{} locked".format(fileobj.name))
823
824     def _save_state(self):
825         """
826         Atomically save current state into cache.
827         """
828         with self._state_lock:
829             # We're not using copy.deepcopy() here because it's a lot slower
830             # than json.dumps(), and we're already needing JSON format to be
831             # saved on disk.
832             state = json.dumps(self._state)
833         try:
834             new_cache = tempfile.NamedTemporaryFile(
835                 mode='w+',
836                 dir=os.path.dirname(self._cache_filename), delete=False)
837             self._lock_file(new_cache)
838             new_cache.write(state)
839             new_cache.flush()
840             os.fsync(new_cache)
841             os.rename(new_cache.name, self._cache_filename)
842         except (IOError, OSError, ResumeCacheConflict) as error:
843             self.logger.error("There was a problem while saving the cache file: {}".format(error))
844             try:
845                 os.unlink(new_cache_name)
846             except NameError:  # mkstemp failed.
847                 pass
848         else:
849             self._cache_file.close()
850             self._cache_file = new_cache
851
852     def collection_name(self):
853         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
854
855     def manifest_locator(self):
856         return self._my_collection().manifest_locator()
857
858     def portable_data_hash(self):
859         pdh = self._my_collection().portable_data_hash()
860         m = self._my_collection().stripped_manifest().encode()
861         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
862         if pdh != local_pdh:
863             logger.warning("\n".join([
864                 "arv-put: API server provided PDH differs from local manifest.",
865                 "         This should not happen; showing API server version."]))
866         return pdh
867
868     def manifest_text(self, stream_name=".", strip=False, normalize=False):
869         return self._my_collection().manifest_text(stream_name, strip, normalize)
870
871     def _datablocks_on_item(self, item):
872         """
873         Return a list of datablock locators, recursively navigating
874         through subcollections
875         """
876         if isinstance(item, arvados.arvfile.ArvadosFile):
877             if item.size() == 0:
878                 # Empty file locator
879                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
880             else:
881                 locators = []
882                 for segment in item.segments():
883                     loc = segment.locator
884                     locators.append(loc)
885                 return locators
886         elif isinstance(item, arvados.collection.Collection):
887             l = [self._datablocks_on_item(x) for x in listvalues(item)]
888             # Fast list flattener method taken from:
889             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
890             return [loc for sublist in l for loc in sublist]
891         else:
892             return None
893
894     def data_locators(self):
895         with self._collection_lock:
896             # Make sure all datablocks are flushed before getting the locators
897             self._my_collection().manifest_text()
898             datablocks = self._datablocks_on_item(self._my_collection())
899         return datablocks
900
901 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
902                                                             os.getpid())
903
904 # Simulate glob.glob() matching behavior without the need to scan the filesystem
905 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
906 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
907 # so instead we're using it on every path component.
908 def pathname_match(pathname, pattern):
909     name = pathname.split(os.sep)
910     pat = pattern.split(os.sep)
911     if len(name) != len(pat):
912         return False
913     for i in range(len(name)):
914         if not fnmatch.fnmatch(name[i], pat[i]):
915             return False
916     return True
917
918 def machine_progress(bytes_written, bytes_expected):
919     return _machine_format.format(
920         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
921
922 def human_progress(bytes_written, bytes_expected):
923     if bytes_expected:
924         return "\r{}M / {}M {:.1%} ".format(
925             bytes_written >> 20, bytes_expected >> 20,
926             float(bytes_written) / bytes_expected)
927     else:
928         return "\r{} ".format(bytes_written)
929
930 def progress_writer(progress_func, outfile=sys.stderr):
931     def write_progress(bytes_written, bytes_expected):
932         outfile.write(progress_func(bytes_written, bytes_expected))
933     return write_progress
934
935 def exit_signal_handler(sigcode, frame):
936     sys.exit(-sigcode)
937
938 def desired_project_uuid(api_client, project_uuid, num_retries):
939     if not project_uuid:
940         query = api_client.users().current()
941     elif arvados.util.user_uuid_pattern.match(project_uuid):
942         query = api_client.users().get(uuid=project_uuid)
943     elif arvados.util.group_uuid_pattern.match(project_uuid):
944         query = api_client.groups().get(uuid=project_uuid)
945     else:
946         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
947     return query.execute(num_retries=num_retries)['uuid']
948
949 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
950     global api_client
951
952     logger = logging.getLogger('arvados.arv_put')
953     logger.setLevel(logging.INFO)
954     args = parse_arguments(arguments)
955     status = 0
956     if api_client is None:
957         api_client = arvados.api('v1')
958
959     # Determine the name to use
960     if args.name:
961         if args.stream or args.raw:
962             logger.error("Cannot use --name with --stream or --raw")
963             sys.exit(1)
964         elif args.update_collection:
965             logger.error("Cannot use --name with --update-collection")
966             sys.exit(1)
967         collection_name = args.name
968     else:
969         collection_name = "Saved at {} by {}@{}".format(
970             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
971             pwd.getpwuid(os.getuid()).pw_name,
972             socket.gethostname())
973
974     if args.project_uuid and (args.stream or args.raw):
975         logger.error("Cannot use --project-uuid with --stream or --raw")
976         sys.exit(1)
977
978     # Determine the parent project
979     try:
980         project_uuid = desired_project_uuid(api_client, args.project_uuid,
981                                             args.retries)
982     except (apiclient_errors.Error, ValueError) as error:
983         logger.error(error)
984         sys.exit(1)
985
986     if args.progress:
987         reporter = progress_writer(human_progress)
988     elif args.batch_progress:
989         reporter = progress_writer(machine_progress)
990     else:
991         reporter = None
992
993     # Setup exclude regex from all the --exclude arguments provided
994     name_patterns = []
995     exclude_paths = []
996     exclude_names = None
997     if len(args.exclude) > 0:
998         # We're supporting 2 kinds of exclusion patterns:
999         # 1) --exclude '*.jpg'      (file/dir name patterns, will only match the name)
1000         # 2) --exclude 'foo/bar'    (file/dir path patterns, will match the entire path,
1001         #                            and should be relative to any input dir argument)
1002         for p in args.exclude:
1003             # Only relative paths patterns allowed
1004             if p.startswith(os.sep):
1005                 logger.error("Cannot use absolute paths with --exclude")
1006                 sys.exit(1)
1007             if os.path.dirname(p):
1008                 # Path search pattern
1009                 exclude_paths.append(p)
1010             else:
1011                 # Name-only search pattern
1012                 name_patterns.append(p)
1013         # For name only matching, we can combine all patterns into a single regexp,
1014         # for better performance.
1015         exclude_names = re.compile('|'.join(
1016             [fnmatch.translate(p) for p in name_patterns]
1017         )) if len(name_patterns) > 0 else None
1018         # Show the user the patterns to be used, just in case they weren't specified inside
1019         # quotes and got changed by the shell expansion.
1020         logger.info("Exclude patterns: {}".format(args.exclude))
1021
1022     # If this is used by a human, and there's at least one directory to be
1023     # uploaded, the expected bytes calculation can take a moment.
1024     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1025         logger.info("Calculating upload size, this could take some time...")
1026     try:
1027         writer = ArvPutUploadJob(paths = args.paths,
1028                                  resume = args.resume,
1029                                  use_cache = args.use_cache,
1030                                  filename = args.filename,
1031                                  reporter = reporter,
1032                                  num_retries = args.retries,
1033                                  replication_desired = args.replication,
1034                                  put_threads = args.threads,
1035                                  name = collection_name,
1036                                  owner_uuid = project_uuid,
1037                                  ensure_unique_name = True,
1038                                  update_collection = args.update_collection,
1039                                  logger=logger,
1040                                  dry_run=args.dry_run,
1041                                  follow_links=args.follow_links,
1042                                  exclude_paths=exclude_paths,
1043                                  exclude_names=exclude_names)
1044     except ResumeCacheConflict:
1045         logger.error("\n".join([
1046             "arv-put: Another process is already uploading this data.",
1047             "         Use --no-cache if this is really what you want."]))
1048         sys.exit(1)
1049     except CollectionUpdateError as error:
1050         logger.error("\n".join([
1051             "arv-put: %s" % str(error)]))
1052         sys.exit(1)
1053     except ArvPutUploadIsPending:
1054         # Dry run check successful, return proper exit code.
1055         sys.exit(2)
1056     except ArvPutUploadNotPending:
1057         # No files pending for upload
1058         sys.exit(0)
1059     except PathDoesNotExistError as error:
1060         logger.error("\n".join([
1061             "arv-put: %s" % str(error)]))
1062         sys.exit(1)
1063
1064     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1065     # the originals.
1066     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1067                             for sigcode in CAUGHT_SIGNALS}
1068
1069     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1070         logger.warning("\n".join([
1071             "arv-put: Resuming previous upload from last checkpoint.",
1072             "         Use the --no-resume option to start over."]))
1073
1074     if not args.dry_run:
1075         writer.report_progress()
1076     output = None
1077     try:
1078         writer.start(save_collection=not(args.stream or args.raw))
1079     except arvados.errors.ApiError as error:
1080         logger.error("\n".join([
1081             "arv-put: %s" % str(error)]))
1082         sys.exit(1)
1083
1084     if args.progress:  # Print newline to split stderr from stdout for humans.
1085         logger.info("\n")
1086
1087     if args.stream:
1088         if args.normalize:
1089             output = writer.manifest_text(normalize=True)
1090         else:
1091             output = writer.manifest_text()
1092     elif args.raw:
1093         output = ','.join(writer.data_locators())
1094     else:
1095         try:
1096             if args.update_collection:
1097                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1098             else:
1099                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1100             if args.portable_data_hash:
1101                 output = writer.portable_data_hash()
1102             else:
1103                 output = writer.manifest_locator()
1104         except apiclient_errors.Error as error:
1105             logger.error(
1106                 "arv-put: Error creating Collection on project: {}.".format(
1107                     error))
1108             status = 1
1109
1110     # Print the locator (uuid) of the new collection.
1111     if output is None:
1112         status = status or 1
1113     else:
1114         stdout.write(output)
1115         if not output.endswith('\n'):
1116             stdout.write('\n')
1117
1118     for sigcode, orig_handler in listitems(orig_signal_handlers):
1119         signal.signal(sigcode, orig_handler)
1120
1121     if status != 0:
1122         sys.exit(status)
1123
1124     # Success!
1125     return output
1126
1127
1128 if __name__ == '__main__':
1129     main()