ba7ff2bbd9f62af754bb59762d17342eb1484441
[arvados.git] / sdk / python / arvados / commands / put.py
1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import copy
10 import datetime
11 import errno
12 import fcntl
13 import fnmatch
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 import traceback
27
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
30
31 import arvados.commands._util as arv_cmd
32
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
34 api_client = None
35
36 upload_opts = argparse.ArgumentParser(add_help=False)
37
38 upload_opts.add_argument('--version', action='version',
39                          version="%s %s" % (sys.argv[0], __version__),
40                          help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
42                          help="""
43 Local file or directory. If path is a directory reference with a trailing
44 slash, then just upload the directory's contents; otherwise upload the
45 directory itself. Default: read from standard input.
46 """)
47
48 _group = upload_opts.add_mutually_exclusive_group()
49
50 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
51                     default=-1, help=argparse.SUPPRESS)
52
53 _group.add_argument('--normalize', action='store_true',
54                     help="""
55 Normalize the manifest by re-ordering files and streams after writing
56 data.
57 """)
58
59 _group.add_argument('--dry-run', action='store_true', default=False,
60                     help="""
61 Don't actually upload files, but only check if any file should be
62 uploaded. Exit with code=2 when files are pending for upload.
63 """)
64
65 _group = upload_opts.add_mutually_exclusive_group()
66
67 _group.add_argument('--as-stream', action='store_true', dest='stream',
68                     help="""
69 Synonym for --stream.
70 """)
71
72 _group.add_argument('--stream', action='store_true',
73                     help="""
74 Store the file content and display the resulting manifest on
75 stdout. Do not write the manifest to Keep or save a Collection object
76 in Arvados.
77 """)
78
79 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
80                     help="""
81 Synonym for --manifest.
82 """)
83
84 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
85                     help="""
86 Synonym for --manifest.
87 """)
88
89 _group.add_argument('--manifest', action='store_true',
90                     help="""
91 Store the file data and resulting manifest in Keep, save a Collection
92 object in Arvados, and display the manifest locator (Collection uuid)
93 on stdout. This is the default behavior.
94 """)
95
96 _group.add_argument('--as-raw', action='store_true', dest='raw',
97                     help="""
98 Synonym for --raw.
99 """)
100
101 _group.add_argument('--raw', action='store_true',
102                     help="""
103 Store the file content and display the data block locators on stdout,
104 separated by commas, with a trailing newline. Do not store a
105 manifest.
106 """)
107
108 upload_opts.add_argument('--update-collection', type=str, default=None,
109                          dest='update_collection', metavar="UUID", help="""
110 Update an existing collection identified by the given Arvados collection
111 UUID. All new local files will be uploaded.
112 """)
113
114 upload_opts.add_argument('--use-filename', type=str, default=None,
115                          dest='filename', help="""
116 Synonym for --filename.
117 """)
118
119 upload_opts.add_argument('--filename', type=str, default=None,
120                          help="""
121 Use the given filename in the manifest, instead of the name of the
122 local file. This is useful when "-" or "/dev/stdin" is given as an
123 input file. It can be used only if there is exactly one path given and
124 it is not a directory. Implies --manifest.
125 """)
126
127 upload_opts.add_argument('--portable-data-hash', action='store_true',
128                          help="""
129 Print the portable data hash instead of the Arvados UUID for the collection
130 created by the upload.
131 """)
132
133 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
134                          help="""
135 Set the replication level for the new collection: how many different
136 physical storage devices (e.g., disks) should have a copy of each data
137 block. Default is to use the server-provided default (if any) or 2.
138 """)
139
140 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
141                          help="""
142 Set the number of upload threads to be used. Take into account that
143 using lots of threads will increase the RAM requirements. Default is
144 to use 2 threads.
145 On high latency installations, using a greater number will improve
146 overall throughput.
147 """)
148
149 run_opts = argparse.ArgumentParser(add_help=False)
150
151 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
152 Store the collection in the specified project, instead of your Home
153 project.
154 """)
155
156 run_opts.add_argument('--name', help="""
157 Save the collection with the specified name.
158 """)
159
160 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
161                       action='append', help="""
162 Exclude files and directories whose names match the given pattern. You
163 can specify multiple patterns by using this argument more than once.
164 """)
165
166 _group = run_opts.add_mutually_exclusive_group()
167 _group.add_argument('--progress', action='store_true',
168                     help="""
169 Display human-readable progress on stderr (bytes and, if possible,
170 percentage of total data size). This is the default behavior when
171 stderr is a tty.
172 """)
173
174 _group.add_argument('--no-progress', action='store_true',
175                     help="""
176 Do not display human-readable progress on stderr, even if stderr is a
177 tty.
178 """)
179
180 _group.add_argument('--batch-progress', action='store_true',
181                     help="""
182 Display machine-readable progress on stderr (bytes and, if known,
183 total data size).
184 """)
185
186 _group = run_opts.add_mutually_exclusive_group()
187 _group.add_argument('--resume', action='store_true', default=True,
188                     help="""
189 Continue interrupted uploads from cached state (default).
190 """)
191 _group.add_argument('--no-resume', action='store_false', dest='resume',
192                     help="""
193 Do not continue interrupted uploads from cached state.
194 """)
195
196 _group = run_opts.add_mutually_exclusive_group()
197 _group.add_argument('--follow-links', action='store_true', default=True,
198                     dest='follow_links', help="""
199 Follow file and directory symlinks (default).
200 """)
201 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
202                     help="""
203 Do not follow file and directory symlinks.
204 """)
205
206 _group = run_opts.add_mutually_exclusive_group()
207 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
208                     help="""
209 Save upload state in a cache file for resuming (default).
210 """)
211 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
212                     help="""
213 Do not save upload state in a cache file for resuming.
214 """)
215
216 arg_parser = argparse.ArgumentParser(
217     description='Copy data from the local filesystem to Keep.',
218     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
219
220 def parse_arguments(arguments):
221     args = arg_parser.parse_args(arguments)
222
223     if len(args.paths) == 0:
224         args.paths = ['-']
225
226     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
227
228     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
229         if args.filename:
230             arg_parser.error("""
231     --filename argument cannot be used when storing a directory or
232     multiple files.
233     """)
234
235     # Turn on --progress by default if stderr is a tty.
236     if (not (args.batch_progress or args.no_progress)
237         and os.isatty(sys.stderr.fileno())):
238         args.progress = True
239
240     # Turn off --resume (default) if --no-cache is used.
241     if not args.use_cache:
242         args.resume = False
243
244     if args.paths == ['-']:
245         if args.update_collection:
246             arg_parser.error("""
247     --update-collection cannot be used when reading from stdin.
248     """)
249         args.resume = False
250         args.use_cache = False
251         if not args.filename:
252             args.filename = 'stdin'
253
254     # Remove possible duplicated patterns
255     if len(args.exclude) > 0:
256         args.exclude = list(set(args.exclude))
257
258     return args
259
260
261 class PathDoesNotExistError(Exception):
262     pass
263
264
265 class CollectionUpdateError(Exception):
266     pass
267
268
269 class ResumeCacheConflict(Exception):
270     pass
271
272
273 class ArvPutArgumentConflict(Exception):
274     pass
275
276
277 class ArvPutUploadIsPending(Exception):
278     pass
279
280
281 class ArvPutUploadNotPending(Exception):
282     pass
283
284
285 class FileUploadList(list):
286     def __init__(self, dry_run=False):
287         list.__init__(self)
288         self.dry_run = dry_run
289
290     def append(self, other):
291         if self.dry_run:
292             raise ArvPutUploadIsPending()
293         super(FileUploadList, self).append(other)
294
295
296 class ResumeCache(object):
297     CACHE_DIR = '.cache/arvados/arv-put'
298
299     def __init__(self, file_spec):
300         self.cache_file = open(file_spec, 'a+')
301         self._lock_file(self.cache_file)
302         self.filename = self.cache_file.name
303
304     @classmethod
305     def make_path(cls, args):
306         md5 = hashlib.md5()
307         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
308         realpaths = sorted(os.path.realpath(path) for path in args.paths)
309         md5.update(b'\0'.join([p.encode() for p in realpaths]))
310         if any(os.path.isdir(path) for path in realpaths):
311             md5.update(b'-1')
312         elif args.filename:
313             md5.update(args.filename.encode())
314         return os.path.join(
315             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
316             md5.hexdigest())
317
318     def _lock_file(self, fileobj):
319         try:
320             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
321         except IOError:
322             raise ResumeCacheConflict("{} locked".format(fileobj.name))
323
324     def load(self):
325         self.cache_file.seek(0)
326         return json.load(self.cache_file)
327
328     def check_cache(self, api_client=None, num_retries=0):
329         try:
330             state = self.load()
331             locator = None
332             try:
333                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
334                     locator = state["_finished_streams"][0][1][0]
335                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
336                     locator = state["_current_stream_locators"][0]
337                 if locator is not None:
338                     kc = arvados.keep.KeepClient(api_client=api_client)
339                     kc.head(locator, num_retries=num_retries)
340             except Exception as e:
341                 self.restart()
342         except (ValueError):
343             pass
344
345     def save(self, data):
346         try:
347             new_cache_fd, new_cache_name = tempfile.mkstemp(
348                 dir=os.path.dirname(self.filename))
349             self._lock_file(new_cache_fd)
350             new_cache = os.fdopen(new_cache_fd, 'r+')
351             json.dump(data, new_cache)
352             os.rename(new_cache_name, self.filename)
353         except (IOError, OSError, ResumeCacheConflict) as error:
354             try:
355                 os.unlink(new_cache_name)
356             except NameError:  # mkstemp failed.
357                 pass
358         else:
359             self.cache_file.close()
360             self.cache_file = new_cache
361
362     def close(self):
363         self.cache_file.close()
364
365     def destroy(self):
366         try:
367             os.unlink(self.filename)
368         except OSError as error:
369             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
370                 raise
371         self.close()
372
373     def restart(self):
374         self.destroy()
375         self.__init__(self.filename)
376
377
378 class ArvPutUploadJob(object):
379     CACHE_DIR = '.cache/arvados/arv-put'
380     EMPTY_STATE = {
381         'manifest' : None, # Last saved manifest checkpoint
382         'files' : {} # Previous run file list: {path : {size, mtime}}
383     }
384
385     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
386                  bytes_expected=None, name=None, owner_uuid=None,
387                  ensure_unique_name=False, num_retries=None,
388                  put_threads=None, replication_desired=None,
389                  filename=None, update_time=60.0, update_collection=None,
390                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
391                  follow_links=True, exclude_paths=[], exclude_names=None):
392         self.paths = paths
393         self.resume = resume
394         self.use_cache = use_cache
395         self.update = False
396         self.reporter = reporter
397         self.bytes_expected = bytes_expected
398         self.bytes_written = 0
399         self.bytes_skipped = 0
400         self.name = name
401         self.owner_uuid = owner_uuid
402         self.ensure_unique_name = ensure_unique_name
403         self.num_retries = num_retries
404         self.replication_desired = replication_desired
405         self.put_threads = put_threads
406         self.filename = filename
407         self._state_lock = threading.Lock()
408         self._state = None # Previous run state (file list & manifest)
409         self._current_files = [] # Current run file list
410         self._cache_file = None
411         self._collection_lock = threading.Lock()
412         self._remote_collection = None # Collection being updated (if asked)
413         self._local_collection = None # Collection from previous run manifest
414         self._file_paths = set() # Files to be updated in remote collection
415         self._stop_checkpointer = threading.Event()
416         self._checkpointer = threading.Thread(target=self._update_task)
417         self._checkpointer.daemon = True
418         self._update_task_time = update_time  # How many seconds wait between update runs
419         self._files_to_upload = FileUploadList(dry_run=dry_run)
420         self._upload_started = False
421         self.logger = logger
422         self.dry_run = dry_run
423         self._checkpoint_before_quit = True
424         self.follow_links = follow_links
425         self.exclude_paths = exclude_paths
426         self.exclude_names = exclude_names
427
428         if not self.use_cache and self.resume:
429             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
430
431         # Check for obvious dry-run responses
432         if self.dry_run and (not self.use_cache or not self.resume):
433             raise ArvPutUploadIsPending()
434
435         # Load cached data if any and if needed
436         self._setup_state(update_collection)
437
438     def start(self, save_collection):
439         """
440         Start supporting thread & file uploading
441         """
442         if not self.dry_run:
443             self._checkpointer.start()
444         try:
445             for path in self.paths:
446                 # Test for stdin first, in case some file named '-' exist
447                 if path == '-':
448                     if self.dry_run:
449                         raise ArvPutUploadIsPending()
450                     self._write_stdin(self.filename or 'stdin')
451                 elif not os.path.exists(path):
452                      raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
453                 elif os.path.isdir(path):
454                     # Use absolute paths on cache index so CWD doesn't interfere
455                     # with the caching logic.
456                     orig_path = path
457                     path = os.path.abspath(path)
458                     if orig_path[-1:] == os.sep:
459                         # When passing a directory reference with a trailing slash,
460                         # its contents should be uploaded directly to the collection's root.
461                         prefixdir = path
462                     else:
463                         # When passing a directory reference with no trailing slash,
464                         # upload the directory to the collection's root.
465                         prefixdir = os.path.dirname(path)
466                     prefixdir += os.sep
467                     for root, dirs, files in os.walk(path, followlinks=self.follow_links):
468                         root_relpath = os.path.relpath(root, path)
469                         # Exclude files/dirs by full path matching pattern
470                         if self.exclude_paths:
471                             dirs[:] = filter(
472                                 lambda d: not any([pathname_match(os.path.join(root_relpath, d),
473                                                                   pat)
474                                                    for pat in self.exclude_paths]),
475                                 dirs)
476                             files = filter(
477                                 lambda f: not any([pathname_match(os.path.join(root_relpath, f),
478                                                                   pat)
479                                                    for pat in self.exclude_paths]),
480                                 files)
481                         # Exclude files/dirs by name matching pattern
482                         if self.exclude_names is not None:
483                             dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs)
484                             files = filter(lambda f: not self.exclude_names.match(f), files)
485                         # Make os.walk()'s dir traversing order deterministic
486                         dirs.sort()
487                         files.sort()
488                         for f in files:
489                             self._check_file(os.path.join(root, f),
490                                              os.path.join(root[len(prefixdir):], f))
491                 else:
492                     self._check_file(os.path.abspath(path),
493                                      self.filename or os.path.basename(path))
494             # If dry-mode is on, and got up to this point, then we should notify that
495             # there aren't any file to upload.
496             if self.dry_run:
497                 raise ArvPutUploadNotPending()
498             # Remove local_collection's files that don't exist locally anymore, so the
499             # bytes_written count is correct.
500             for f in self.collection_file_paths(self._local_collection,
501                                                 path_prefix=""):
502                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
503                     self._local_collection.remove(f)
504             # Update bytes_written from current local collection and
505             # report initial progress.
506             self._update()
507             # Actual file upload
508             self._upload_started = True # Used by the update thread to start checkpointing
509             self._upload_files()
510         except (SystemExit, Exception) as e:
511             self._checkpoint_before_quit = False
512             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
513             # Note: We're expecting SystemExit instead of
514             # KeyboardInterrupt because we have a custom signal
515             # handler in place that raises SystemExit with the catched
516             # signal's code.
517             if isinstance(e, PathDoesNotExistError):
518                 # We aren't interested in the traceback for this case
519                 pass
520             elif not isinstance(e, SystemExit) or e.code != -2:
521                 self.logger.warning("Abnormal termination:\n{}".format(
522                     traceback.format_exc()))
523             raise
524         finally:
525             if not self.dry_run:
526                 # Stop the thread before doing anything else
527                 self._stop_checkpointer.set()
528                 self._checkpointer.join()
529                 if self._checkpoint_before_quit:
530                     # Commit all pending blocks & one last _update()
531                     self._local_collection.manifest_text()
532                     self._update(final=True)
533                     if save_collection:
534                         self.save_collection()
535             if self.use_cache:
536                 self._cache_file.close()
537
538     def save_collection(self):
539         if self.update:
540             # Check if files should be updated on the remote collection.
541             for fp in self._file_paths:
542                 remote_file = self._remote_collection.find(fp)
543                 if not remote_file:
544                     # File don't exist on remote collection, copy it.
545                     self._remote_collection.copy(fp, fp, self._local_collection)
546                 elif remote_file != self._local_collection.find(fp):
547                     # A different file exist on remote collection, overwrite it.
548                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
549                 else:
550                     # The file already exist on remote collection, skip it.
551                     pass
552             self._remote_collection.save(num_retries=self.num_retries)
553         else:
554             self._local_collection.save_new(
555                 name=self.name, owner_uuid=self.owner_uuid,
556                 ensure_unique_name=self.ensure_unique_name,
557                 num_retries=self.num_retries)
558
559     def destroy_cache(self):
560         if self.use_cache:
561             try:
562                 os.unlink(self._cache_filename)
563             except OSError as error:
564                 # That's what we wanted anyway.
565                 if error.errno != errno.ENOENT:
566                     raise
567             self._cache_file.close()
568
569     def _collection_size(self, collection):
570         """
571         Recursively get the total size of the collection
572         """
573         size = 0
574         for item in listvalues(collection):
575             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
576                 size += self._collection_size(item)
577             else:
578                 size += item.size()
579         return size
580
581     def _update_task(self):
582         """
583         Periodically called support task. File uploading is
584         asynchronous so we poll status from the collection.
585         """
586         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
587             self._update()
588
589     def _update(self, final=False):
590         """
591         Update cached manifest text and report progress.
592         """
593         if self._upload_started:
594             with self._collection_lock:
595                 self.bytes_written = self._collection_size(self._local_collection)
596                 if self.use_cache:
597                     if final:
598                         manifest = self._local_collection.manifest_text()
599                     else:
600                         # Get the manifest text without comitting pending blocks
601                         manifest = self._local_collection.manifest_text(strip=False,
602                                                                         normalize=False,
603                                                                         only_committed=True)
604                     # Update cache
605                     with self._state_lock:
606                         self._state['manifest'] = manifest
607             if self.use_cache:
608                 try:
609                     self._save_state()
610                 except Exception as e:
611                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
612         else:
613             self.bytes_written = self.bytes_skipped
614         # Call the reporter, if any
615         self.report_progress()
616
617     def report_progress(self):
618         if self.reporter is not None:
619             self.reporter(self.bytes_written, self.bytes_expected)
620
621     def _write_stdin(self, filename):
622         output = self._local_collection.open(filename, 'wb')
623         self._write(sys.stdin, output)
624         output.close()
625
626     def _check_file(self, source, filename):
627         """
628         Check if this file needs to be uploaded
629         """
630         # Ignore symlinks when requested
631         if (not self.follow_links) and os.path.islink(source):
632             return
633         resume_offset = 0
634         should_upload = False
635         new_file_in_cache = False
636         # Record file path for updating the remote collection before exiting
637         self._file_paths.add(filename)
638
639         with self._state_lock:
640             # If no previous cached data on this file, store it for an eventual
641             # repeated run.
642             if source not in self._state['files']:
643                 self._state['files'][source] = {
644                     'mtime': os.path.getmtime(source),
645                     'size' : os.path.getsize(source)
646                 }
647                 new_file_in_cache = True
648             cached_file_data = self._state['files'][source]
649
650         # Check if file was already uploaded (at least partially)
651         file_in_local_collection = self._local_collection.find(filename)
652
653         # If not resuming, upload the full file.
654         if not self.resume:
655             should_upload = True
656         # New file detected from last run, upload it.
657         elif new_file_in_cache:
658             should_upload = True
659         # Local file didn't change from last run.
660         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
661             if not file_in_local_collection:
662                 # File not uploaded yet, upload it completely
663                 should_upload = True
664             elif file_in_local_collection.permission_expired():
665                 # Permission token expired, re-upload file. This will change whenever
666                 # we have a API for refreshing tokens.
667                 should_upload = True
668                 self._local_collection.remove(filename)
669             elif cached_file_data['size'] == file_in_local_collection.size():
670                 # File already there, skip it.
671                 self.bytes_skipped += cached_file_data['size']
672             elif cached_file_data['size'] > file_in_local_collection.size():
673                 # File partially uploaded, resume!
674                 resume_offset = file_in_local_collection.size()
675                 self.bytes_skipped += resume_offset
676                 should_upload = True
677             else:
678                 # Inconsistent cache, re-upload the file
679                 should_upload = True
680                 self._local_collection.remove(filename)
681                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
682         # Local file differs from cached data, re-upload it.
683         else:
684             if file_in_local_collection:
685                 self._local_collection.remove(filename)
686             should_upload = True
687
688         if should_upload:
689             self._files_to_upload.append((source, resume_offset, filename))
690
691     def _upload_files(self):
692         for source, resume_offset, filename in self._files_to_upload:
693             with open(source, 'rb') as source_fd:
694                 with self._state_lock:
695                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
696                     self._state['files'][source]['size'] = os.path.getsize(source)
697                 if resume_offset > 0:
698                     # Start upload where we left off
699                     output = self._local_collection.open(filename, 'ab')
700                     source_fd.seek(resume_offset)
701                 else:
702                     # Start from scratch
703                     output = self._local_collection.open(filename, 'wb')
704                 self._write(source_fd, output)
705                 output.close(flush=False)
706
707     def _write(self, source_fd, output):
708         while True:
709             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
710             if not data:
711                 break
712             output.write(data)
713
714     def _my_collection(self):
715         return self._remote_collection if self.update else self._local_collection
716
717     def _setup_state(self, update_collection):
718         """
719         Create a new cache file or load a previously existing one.
720         """
721         # Load an already existing collection for update
722         if update_collection and re.match(arvados.util.collection_uuid_pattern,
723                                           update_collection):
724             try:
725                 self._remote_collection = arvados.collection.Collection(update_collection)
726             except arvados.errors.ApiError as error:
727                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
728             else:
729                 self.update = True
730         elif update_collection:
731             # Collection locator provided, but unknown format
732             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
733
734         if self.use_cache:
735             # Set up cache file name from input paths.
736             md5 = hashlib.md5()
737             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
738             realpaths = sorted(os.path.realpath(path) for path in self.paths)
739             md5.update(b'\0'.join([p.encode() for p in realpaths]))
740             if self.filename:
741                 md5.update(self.filename.encode())
742             cache_filename = md5.hexdigest()
743             cache_filepath = os.path.join(
744                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
745                 cache_filename)
746             if self.resume and os.path.exists(cache_filepath):
747                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
748                 self._cache_file = open(cache_filepath, 'a+')
749             else:
750                 # --no-resume means start with a empty cache file.
751                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
752                 self._cache_file = open(cache_filepath, 'w+')
753             self._cache_filename = self._cache_file.name
754             self._lock_file(self._cache_file)
755             self._cache_file.seek(0)
756
757         with self._state_lock:
758             if self.use_cache:
759                 try:
760                     self._state = json.load(self._cache_file)
761                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
762                         # Cache at least partially incomplete, set up new cache
763                         self._state = copy.deepcopy(self.EMPTY_STATE)
764                 except ValueError:
765                     # Cache file empty, set up new cache
766                     self._state = copy.deepcopy(self.EMPTY_STATE)
767             else:
768                 self.logger.info("No cache usage requested for this run.")
769                 # No cache file, set empty state
770                 self._state = copy.deepcopy(self.EMPTY_STATE)
771             # Load the previous manifest so we can check if files were modified remotely.
772             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
773
774     def collection_file_paths(self, col, path_prefix='.'):
775         """Return a list of file paths by recursively go through the entire collection `col`"""
776         file_paths = []
777         for name, item in listitems(col):
778             if isinstance(item, arvados.arvfile.ArvadosFile):
779                 file_paths.append(os.path.join(path_prefix, name))
780             elif isinstance(item, arvados.collection.Subcollection):
781                 new_prefix = os.path.join(path_prefix, name)
782                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
783         return file_paths
784
785     def _lock_file(self, fileobj):
786         try:
787             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
788         except IOError:
789             raise ResumeCacheConflict("{} locked".format(fileobj.name))
790
791     def _save_state(self):
792         """
793         Atomically save current state into cache.
794         """
795         with self._state_lock:
796             # We're not using copy.deepcopy() here because it's a lot slower
797             # than json.dumps(), and we're already needing JSON format to be
798             # saved on disk.
799             state = json.dumps(self._state)
800         try:
801             new_cache = tempfile.NamedTemporaryFile(
802                 mode='w+',
803                 dir=os.path.dirname(self._cache_filename), delete=False)
804             self._lock_file(new_cache)
805             new_cache.write(state)
806             new_cache.flush()
807             os.fsync(new_cache)
808             os.rename(new_cache.name, self._cache_filename)
809         except (IOError, OSError, ResumeCacheConflict) as error:
810             self.logger.error("There was a problem while saving the cache file: {}".format(error))
811             try:
812                 os.unlink(new_cache_name)
813             except NameError:  # mkstemp failed.
814                 pass
815         else:
816             self._cache_file.close()
817             self._cache_file = new_cache
818
819     def collection_name(self):
820         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
821
822     def manifest_locator(self):
823         return self._my_collection().manifest_locator()
824
825     def portable_data_hash(self):
826         pdh = self._my_collection().portable_data_hash()
827         m = self._my_collection().stripped_manifest().encode()
828         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
829         if pdh != local_pdh:
830             logger.warning("\n".join([
831                 "arv-put: API server provided PDH differs from local manifest.",
832                 "         This should not happen; showing API server version."]))
833         return pdh
834
835     def manifest_text(self, stream_name=".", strip=False, normalize=False):
836         return self._my_collection().manifest_text(stream_name, strip, normalize)
837
838     def _datablocks_on_item(self, item):
839         """
840         Return a list of datablock locators, recursively navigating
841         through subcollections
842         """
843         if isinstance(item, arvados.arvfile.ArvadosFile):
844             if item.size() == 0:
845                 # Empty file locator
846                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
847             else:
848                 locators = []
849                 for segment in item.segments():
850                     loc = segment.locator
851                     locators.append(loc)
852                 return locators
853         elif isinstance(item, arvados.collection.Collection):
854             l = [self._datablocks_on_item(x) for x in listvalues(item)]
855             # Fast list flattener method taken from:
856             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
857             return [loc for sublist in l for loc in sublist]
858         else:
859             return None
860
861     def data_locators(self):
862         with self._collection_lock:
863             # Make sure all datablocks are flushed before getting the locators
864             self._my_collection().manifest_text()
865             datablocks = self._datablocks_on_item(self._my_collection())
866         return datablocks
867
868 def expected_bytes_for(pathlist, follow_links=True, exclude={}):
869     # Walk the given directory trees and stat files, adding up file sizes,
870     # so we can display progress as percent
871     bytesum = 0
872     exclude_paths = exclude.get('paths', None)
873     exclude_names = exclude.get('names', None)
874     for path in pathlist:
875         if os.path.isdir(path):
876             for root, dirs, files in os.walk(path, followlinks=follow_links):
877                 root_relpath = os.path.relpath(root, path)
878                 # Exclude files/dirs by full path matching pattern
879                 if exclude_paths is not None:
880                     dirs[:] = filter(
881                         lambda d: not any([pathname_match(os.path.join(root_relpath, d),
882                                                           pat)
883                                            for pat in exclude_paths]),
884                         dirs)
885                     files = filter(
886                         lambda f: not any([pathname_match(os.path.join(root_relpath, f),
887                                                           pat)
888                                            for pat in exclude_paths]),
889                         files)
890                 # Exclude files/dirs by name matching pattern
891                 if exclude_names is not None:
892                     dirs[:] = filter(lambda d: not exclude_names.match(d), dirs)
893                     files = filter(lambda f: not exclude_names.match(f), files)
894                 # Sum file sizes
895                 for f in files:
896                     filepath = os.path.join(root, f)
897                     # Ignore symlinked files when requested
898                     if (not follow_links) and os.path.islink(filepath):
899                         continue
900                     bytesum += os.path.getsize(filepath)
901         elif not os.path.isfile(path):
902             return None
903         else:
904             bytesum += os.path.getsize(path)
905     return bytesum
906
907 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
908                                                             os.getpid())
909
910 # Simulate glob.glob() matching behavior without the need to scan the filesystem
911 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
912 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
913 # so instead we're using it on every path component.
914 def pathname_match(pathname, pattern):
915     name = pathname.split(os.sep)
916     pat = pattern.split(os.sep)
917     if len(name) != len(pat):
918         return False
919     for i in range(len(name)):
920         if not fnmatch.fnmatch(name[i], pat[i]):
921             return False
922     return True
923
924 def machine_progress(bytes_written, bytes_expected):
925     return _machine_format.format(
926         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
927
928 def human_progress(bytes_written, bytes_expected):
929     if bytes_expected:
930         return "\r{}M / {}M {:.1%} ".format(
931             bytes_written >> 20, bytes_expected >> 20,
932             float(bytes_written) / bytes_expected)
933     else:
934         return "\r{} ".format(bytes_written)
935
936 def progress_writer(progress_func, outfile=sys.stderr):
937     def write_progress(bytes_written, bytes_expected):
938         outfile.write(progress_func(bytes_written, bytes_expected))
939     return write_progress
940
941 def exit_signal_handler(sigcode, frame):
942     sys.exit(-sigcode)
943
944 def desired_project_uuid(api_client, project_uuid, num_retries):
945     if not project_uuid:
946         query = api_client.users().current()
947     elif arvados.util.user_uuid_pattern.match(project_uuid):
948         query = api_client.users().get(uuid=project_uuid)
949     elif arvados.util.group_uuid_pattern.match(project_uuid):
950         query = api_client.groups().get(uuid=project_uuid)
951     else:
952         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
953     return query.execute(num_retries=num_retries)['uuid']
954
955 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
956     global api_client
957
958     logger = logging.getLogger('arvados.arv_put')
959     logger.setLevel(logging.INFO)
960     args = parse_arguments(arguments)
961     status = 0
962     if api_client is None:
963         api_client = arvados.api('v1')
964
965     # Determine the name to use
966     if args.name:
967         if args.stream or args.raw:
968             logger.error("Cannot use --name with --stream or --raw")
969             sys.exit(1)
970         elif args.update_collection:
971             logger.error("Cannot use --name with --update-collection")
972             sys.exit(1)
973         collection_name = args.name
974     else:
975         collection_name = "Saved at {} by {}@{}".format(
976             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
977             pwd.getpwuid(os.getuid()).pw_name,
978             socket.gethostname())
979
980     if args.project_uuid and (args.stream or args.raw):
981         logger.error("Cannot use --project-uuid with --stream or --raw")
982         sys.exit(1)
983
984     # Determine the parent project
985     try:
986         project_uuid = desired_project_uuid(api_client, args.project_uuid,
987                                             args.retries)
988     except (apiclient_errors.Error, ValueError) as error:
989         logger.error(error)
990         sys.exit(1)
991
992     if args.progress:
993         reporter = progress_writer(human_progress)
994     elif args.batch_progress:
995         reporter = progress_writer(machine_progress)
996     else:
997         reporter = None
998
999     # Setup exclude regex from all the --exclude arguments provided
1000     name_patterns = []
1001     exclude_paths = []
1002     exclude_names = None
1003     if len(args.exclude) > 0:
1004         # We're supporting 2 kinds of exclusion patterns:
1005         # 1) --exclude '*.jpg'      (file/dir name patterns, will only match the name)
1006         # 2) --exclude 'foo/bar'    (file/dir path patterns, will match the entire path,
1007         #                            and should be relative to any input dir argument)
1008         for p in args.exclude:
1009             # Only relative paths patterns allowed
1010             if p.startswith(os.sep):
1011                 logger.error("Cannot use absolute paths with --exclude")
1012                 sys.exit(1)
1013             if os.path.dirname(p):
1014                 # Path search pattern
1015                 exclude_paths.append(p)
1016             else:
1017                 # Name-only search pattern
1018                 name_patterns.append(p)
1019         # For name only matching, we can combine all patterns into a single regexp,
1020         # for better performance.
1021         exclude_names = re.compile('|'.join(
1022             [fnmatch.translate(p) for p in name_patterns]
1023         )) if len(name_patterns) > 0 else None
1024         # Show the user the patterns to be used, just in case they weren't specified inside
1025         # quotes and got changed by the shell expansion.
1026         logger.info("Exclude patterns: {}".format(args.exclude))
1027
1028     # If this is used by a human, and there's at least one directory to be
1029     # uploaded, the expected bytes calculation can take a moment.
1030     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1031         logger.info("Calculating upload size, this could take some time...")
1032     bytes_expected = expected_bytes_for(args.paths,
1033                                         follow_links=args.follow_links,
1034                                         exclude={'paths': exclude_paths,
1035                                                  'names': exclude_names})
1036
1037
1038     try:
1039         writer = ArvPutUploadJob(paths = args.paths,
1040                                  resume = args.resume,
1041                                  use_cache = args.use_cache,
1042                                  filename = args.filename,
1043                                  reporter = reporter,
1044                                  bytes_expected = bytes_expected,
1045                                  num_retries = args.retries,
1046                                  replication_desired = args.replication,
1047                                  put_threads = args.threads,
1048                                  name = collection_name,
1049                                  owner_uuid = project_uuid,
1050                                  ensure_unique_name = True,
1051                                  update_collection = args.update_collection,
1052                                  logger=logger,
1053                                  dry_run=args.dry_run,
1054                                  follow_links=args.follow_links,
1055                                  exclude_paths=exclude_paths,
1056                                  exclude_names=exclude_names)
1057     except ResumeCacheConflict:
1058         logger.error("\n".join([
1059             "arv-put: Another process is already uploading this data.",
1060             "         Use --no-cache if this is really what you want."]))
1061         sys.exit(1)
1062     except CollectionUpdateError as error:
1063         logger.error("\n".join([
1064             "arv-put: %s" % str(error)]))
1065         sys.exit(1)
1066     except ArvPutUploadIsPending:
1067         # Dry run check successful, return proper exit code.
1068         sys.exit(2)
1069     except ArvPutUploadNotPending:
1070         # No files pending for upload
1071         sys.exit(0)
1072
1073     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1074     # the originals.
1075     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1076                             for sigcode in CAUGHT_SIGNALS}
1077
1078     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1079         logger.warning("\n".join([
1080             "arv-put: Resuming previous upload from last checkpoint.",
1081             "         Use the --no-resume option to start over."]))
1082
1083     if not args.dry_run:
1084         writer.report_progress()
1085     output = None
1086     try:
1087         writer.start(save_collection=not(args.stream or args.raw))
1088     except arvados.errors.ApiError as error:
1089         logger.error("\n".join([
1090             "arv-put: %s" % str(error)]))
1091         sys.exit(1)
1092     except ArvPutUploadIsPending:
1093         # Dry run check successful, return proper exit code.
1094         sys.exit(2)
1095     except ArvPutUploadNotPending:
1096         # No files pending for upload
1097         sys.exit(0)
1098     except PathDoesNotExistError as error:
1099         logger.error("\n".join([
1100             "arv-put: %s" % str(error)]))
1101         sys.exit(1)
1102
1103     if args.progress:  # Print newline to split stderr from stdout for humans.
1104         logger.info("\n")
1105
1106     if args.stream:
1107         if args.normalize:
1108             output = writer.manifest_text(normalize=True)
1109         else:
1110             output = writer.manifest_text()
1111     elif args.raw:
1112         output = ','.join(writer.data_locators())
1113     else:
1114         try:
1115             if args.update_collection:
1116                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1117             else:
1118                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1119             if args.portable_data_hash:
1120                 output = writer.portable_data_hash()
1121             else:
1122                 output = writer.manifest_locator()
1123         except apiclient_errors.Error as error:
1124             logger.error(
1125                 "arv-put: Error creating Collection on project: {}.".format(
1126                     error))
1127             status = 1
1128
1129     # Print the locator (uuid) of the new collection.
1130     if output is None:
1131         status = status or 1
1132     else:
1133         stdout.write(output)
1134         if not output.endswith('\n'):
1135             stdout.write('\n')
1136
1137     for sigcode, orig_handler in listitems(orig_signal_handlers):
1138         signal.signal(sigcode, orig_handler)
1139
1140     if status != 0:
1141         sys.exit(status)
1142
1143     # Success!
1144     return output
1145
1146
1147 if __name__ == '__main__':
1148     main()