11789: Converting a filter() iterable to list, for python3 compatibility.
[arvados.git] / sdk / python / arvados / commands / put.py
1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import copy
10 import datetime
11 import errno
12 import fcntl
13 import fnmatch
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 import traceback
27
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
30
31 import arvados.commands._util as arv_cmd
32
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
34 api_client = None
35
36 upload_opts = argparse.ArgumentParser(add_help=False)
37
38 upload_opts.add_argument('--version', action='version',
39                          version="%s %s" % (sys.argv[0], __version__),
40                          help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
42                          help="""
43 Local file or directory. If path is a directory reference with a trailing
44 slash, then just upload the directory's contents; otherwise upload the
45 directory itself. Default: read from standard input.
46 """)
47
48 _group = upload_opts.add_mutually_exclusive_group()
49
50 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
51                     default=-1, help=argparse.SUPPRESS)
52
53 _group.add_argument('--normalize', action='store_true',
54                     help="""
55 Normalize the manifest by re-ordering files and streams after writing
56 data.
57 """)
58
59 _group.add_argument('--dry-run', action='store_true', default=False,
60                     help="""
61 Don't actually upload files, but only check if any file should be
62 uploaded. Exit with code=2 when files are pending for upload.
63 """)
64
65 _group = upload_opts.add_mutually_exclusive_group()
66
67 _group.add_argument('--as-stream', action='store_true', dest='stream',
68                     help="""
69 Synonym for --stream.
70 """)
71
72 _group.add_argument('--stream', action='store_true',
73                     help="""
74 Store the file content and display the resulting manifest on
75 stdout. Do not write the manifest to Keep or save a Collection object
76 in Arvados.
77 """)
78
79 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
80                     help="""
81 Synonym for --manifest.
82 """)
83
84 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
85                     help="""
86 Synonym for --manifest.
87 """)
88
89 _group.add_argument('--manifest', action='store_true',
90                     help="""
91 Store the file data and resulting manifest in Keep, save a Collection
92 object in Arvados, and display the manifest locator (Collection uuid)
93 on stdout. This is the default behavior.
94 """)
95
96 _group.add_argument('--as-raw', action='store_true', dest='raw',
97                     help="""
98 Synonym for --raw.
99 """)
100
101 _group.add_argument('--raw', action='store_true',
102                     help="""
103 Store the file content and display the data block locators on stdout,
104 separated by commas, with a trailing newline. Do not store a
105 manifest.
106 """)
107
108 upload_opts.add_argument('--update-collection', type=str, default=None,
109                          dest='update_collection', metavar="UUID", help="""
110 Update an existing collection identified by the given Arvados collection
111 UUID. All new local files will be uploaded.
112 """)
113
114 upload_opts.add_argument('--use-filename', type=str, default=None,
115                          dest='filename', help="""
116 Synonym for --filename.
117 """)
118
119 upload_opts.add_argument('--filename', type=str, default=None,
120                          help="""
121 Use the given filename in the manifest, instead of the name of the
122 local file. This is useful when "-" or "/dev/stdin" is given as an
123 input file. It can be used only if there is exactly one path given and
124 it is not a directory. Implies --manifest.
125 """)
126
127 upload_opts.add_argument('--portable-data-hash', action='store_true',
128                          help="""
129 Print the portable data hash instead of the Arvados UUID for the collection
130 created by the upload.
131 """)
132
133 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
134                          help="""
135 Set the replication level for the new collection: how many different
136 physical storage devices (e.g., disks) should have a copy of each data
137 block. Default is to use the server-provided default (if any) or 2.
138 """)
139
140 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
141                          help="""
142 Set the number of upload threads to be used. Take into account that
143 using lots of threads will increase the RAM requirements. Default is
144 to use 2 threads.
145 On high latency installations, using a greater number will improve
146 overall throughput.
147 """)
148
149 run_opts = argparse.ArgumentParser(add_help=False)
150
151 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
152 Store the collection in the specified project, instead of your Home
153 project.
154 """)
155
156 run_opts.add_argument('--name', help="""
157 Save the collection with the specified name.
158 """)
159
160 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
161                       action='append', help="""
162 Exclude files and directories whose names match the given glob pattern. When
163 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
164 directory, relative to the provided input dirs will be excluded.
165 When using a filename pattern like '*.txt', any text file will be excluded
166 no matter where is placed.
167 You can specify multiple patterns by using this argument more than once.
168 """)
169
170 _group = run_opts.add_mutually_exclusive_group()
171 _group.add_argument('--progress', action='store_true',
172                     help="""
173 Display human-readable progress on stderr (bytes and, if possible,
174 percentage of total data size). This is the default behavior when
175 stderr is a tty.
176 """)
177
178 _group.add_argument('--no-progress', action='store_true',
179                     help="""
180 Do not display human-readable progress on stderr, even if stderr is a
181 tty.
182 """)
183
184 _group.add_argument('--batch-progress', action='store_true',
185                     help="""
186 Display machine-readable progress on stderr (bytes and, if known,
187 total data size).
188 """)
189
190 _group = run_opts.add_mutually_exclusive_group()
191 _group.add_argument('--resume', action='store_true', default=True,
192                     help="""
193 Continue interrupted uploads from cached state (default).
194 """)
195 _group.add_argument('--no-resume', action='store_false', dest='resume',
196                     help="""
197 Do not continue interrupted uploads from cached state.
198 """)
199
200 _group = run_opts.add_mutually_exclusive_group()
201 _group.add_argument('--follow-links', action='store_true', default=True,
202                     dest='follow_links', help="""
203 Follow file and directory symlinks (default).
204 """)
205 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
206                     help="""
207 Do not follow file and directory symlinks.
208 """)
209
210 _group = run_opts.add_mutually_exclusive_group()
211 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
212                     help="""
213 Save upload state in a cache file for resuming (default).
214 """)
215 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
216                     help="""
217 Do not save upload state in a cache file for resuming.
218 """)
219
220 arg_parser = argparse.ArgumentParser(
221     description='Copy data from the local filesystem to Keep.',
222     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
223
224 def parse_arguments(arguments):
225     args = arg_parser.parse_args(arguments)
226
227     if len(args.paths) == 0:
228         args.paths = ['-']
229
230     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
231
232     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
233         if args.filename:
234             arg_parser.error("""
235     --filename argument cannot be used when storing a directory or
236     multiple files.
237     """)
238
239     # Turn on --progress by default if stderr is a tty.
240     if (not (args.batch_progress or args.no_progress)
241         and os.isatty(sys.stderr.fileno())):
242         args.progress = True
243
244     # Turn off --resume (default) if --no-cache is used.
245     if not args.use_cache:
246         args.resume = False
247
248     if args.paths == ['-']:
249         if args.update_collection:
250             arg_parser.error("""
251     --update-collection cannot be used when reading from stdin.
252     """)
253         args.resume = False
254         args.use_cache = False
255         if not args.filename:
256             args.filename = 'stdin'
257
258     # Remove possible duplicated patterns
259     if len(args.exclude) > 0:
260         args.exclude = list(set(args.exclude))
261
262     return args
263
264
265 class PathDoesNotExistError(Exception):
266     pass
267
268
269 class CollectionUpdateError(Exception):
270     pass
271
272
273 class ResumeCacheConflict(Exception):
274     pass
275
276
277 class ArvPutArgumentConflict(Exception):
278     pass
279
280
281 class ArvPutUploadIsPending(Exception):
282     pass
283
284
285 class ArvPutUploadNotPending(Exception):
286     pass
287
288
289 class FileUploadList(list):
290     def __init__(self, dry_run=False):
291         list.__init__(self)
292         self.dry_run = dry_run
293
294     def append(self, other):
295         if self.dry_run:
296             raise ArvPutUploadIsPending()
297         super(FileUploadList, self).append(other)
298
299
300 class ResumeCache(object):
301     CACHE_DIR = '.cache/arvados/arv-put'
302
303     def __init__(self, file_spec):
304         self.cache_file = open(file_spec, 'a+')
305         self._lock_file(self.cache_file)
306         self.filename = self.cache_file.name
307
308     @classmethod
309     def make_path(cls, args):
310         md5 = hashlib.md5()
311         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
312         realpaths = sorted(os.path.realpath(path) for path in args.paths)
313         md5.update(b'\0'.join([p.encode() for p in realpaths]))
314         if any(os.path.isdir(path) for path in realpaths):
315             md5.update(b'-1')
316         elif args.filename:
317             md5.update(args.filename.encode())
318         return os.path.join(
319             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
320             md5.hexdigest())
321
322     def _lock_file(self, fileobj):
323         try:
324             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
325         except IOError:
326             raise ResumeCacheConflict("{} locked".format(fileobj.name))
327
328     def load(self):
329         self.cache_file.seek(0)
330         return json.load(self.cache_file)
331
332     def check_cache(self, api_client=None, num_retries=0):
333         try:
334             state = self.load()
335             locator = None
336             try:
337                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
338                     locator = state["_finished_streams"][0][1][0]
339                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
340                     locator = state["_current_stream_locators"][0]
341                 if locator is not None:
342                     kc = arvados.keep.KeepClient(api_client=api_client)
343                     kc.head(locator, num_retries=num_retries)
344             except Exception as e:
345                 self.restart()
346         except (ValueError):
347             pass
348
349     def save(self, data):
350         try:
351             new_cache_fd, new_cache_name = tempfile.mkstemp(
352                 dir=os.path.dirname(self.filename))
353             self._lock_file(new_cache_fd)
354             new_cache = os.fdopen(new_cache_fd, 'r+')
355             json.dump(data, new_cache)
356             os.rename(new_cache_name, self.filename)
357         except (IOError, OSError, ResumeCacheConflict) as error:
358             try:
359                 os.unlink(new_cache_name)
360             except NameError:  # mkstemp failed.
361                 pass
362         else:
363             self.cache_file.close()
364             self.cache_file = new_cache
365
366     def close(self):
367         self.cache_file.close()
368
369     def destroy(self):
370         try:
371             os.unlink(self.filename)
372         except OSError as error:
373             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
374                 raise
375         self.close()
376
377     def restart(self):
378         self.destroy()
379         self.__init__(self.filename)
380
381
382 class ArvPutUploadJob(object):
383     CACHE_DIR = '.cache/arvados/arv-put'
384     EMPTY_STATE = {
385         'manifest' : None, # Last saved manifest checkpoint
386         'files' : {} # Previous run file list: {path : {size, mtime}}
387     }
388
389     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
390                  name=None, owner_uuid=None,
391                  ensure_unique_name=False, num_retries=None,
392                  put_threads=None, replication_desired=None,
393                  filename=None, update_time=60.0, update_collection=None,
394                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
395                  follow_links=True, exclude_paths=[], exclude_names=None):
396         self.paths = paths
397         self.resume = resume
398         self.use_cache = use_cache
399         self.update = False
400         self.reporter = reporter
401         # This will set to 0 before start counting, if no special files are going
402         # to be read.
403         self.bytes_expected = None
404         self.bytes_written = 0
405         self.bytes_skipped = 0
406         self.name = name
407         self.owner_uuid = owner_uuid
408         self.ensure_unique_name = ensure_unique_name
409         self.num_retries = num_retries
410         self.replication_desired = replication_desired
411         self.put_threads = put_threads
412         self.filename = filename
413         self._state_lock = threading.Lock()
414         self._state = None # Previous run state (file list & manifest)
415         self._current_files = [] # Current run file list
416         self._cache_file = None
417         self._collection_lock = threading.Lock()
418         self._remote_collection = None # Collection being updated (if asked)
419         self._local_collection = None # Collection from previous run manifest
420         self._file_paths = set() # Files to be updated in remote collection
421         self._stop_checkpointer = threading.Event()
422         self._checkpointer = threading.Thread(target=self._update_task)
423         self._checkpointer.daemon = True
424         self._update_task_time = update_time  # How many seconds wait between update runs
425         self._files_to_upload = FileUploadList(dry_run=dry_run)
426         self._upload_started = False
427         self.logger = logger
428         self.dry_run = dry_run
429         self._checkpoint_before_quit = True
430         self.follow_links = follow_links
431         self.exclude_paths = exclude_paths
432         self.exclude_names = exclude_names
433
434         if not self.use_cache and self.resume:
435             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
436
437         # Check for obvious dry-run responses
438         if self.dry_run and (not self.use_cache or not self.resume):
439             raise ArvPutUploadIsPending()
440
441         # Load cached data if any and if needed
442         self._setup_state(update_collection)
443
444         # Build the upload file list, excluding requested files and counting the
445         # bytes expected to be uploaded.
446         self._build_upload_list()
447
448     def _build_upload_list(self):
449         """
450         Scan the requested paths to count file sizes, excluding files & dirs if requested
451         and building the upload file list.
452         """
453         # If there aren't special files to be read, reset total bytes count to zero
454         # to start counting.
455         if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)),
456                           self.paths)):
457             self.bytes_expected = 0
458
459         for path in self.paths:
460             # Test for stdin first, in case some file named '-' exist
461             if path == '-':
462                 if self.dry_run:
463                     raise ArvPutUploadIsPending()
464                 self._write_stdin(self.filename or 'stdin')
465             elif not os.path.exists(path):
466                  raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
467             elif os.path.isdir(path):
468                 # Use absolute paths on cache index so CWD doesn't interfere
469                 # with the caching logic.
470                 orig_path = path
471                 path = os.path.abspath(path)
472                 if orig_path[-1:] == os.sep:
473                     # When passing a directory reference with a trailing slash,
474                     # its contents should be uploaded directly to the
475                     # collection's root.
476                     prefixdir = path
477                 else:
478                     # When passing a directory reference with no trailing slash,
479                     # upload the directory to the collection's root.
480                     prefixdir = os.path.dirname(path)
481                 prefixdir += os.sep
482                 for root, dirs, files in os.walk(path,
483                                                  followlinks=self.follow_links):
484                     root_relpath = os.path.relpath(root, path)
485                     if root_relpath == '.':
486                         root_relpath = ''
487                     # Exclude files/dirs by full path matching pattern
488                     if self.exclude_paths:
489                         dirs[:] = list(filter(
490                             lambda d: not any(
491                                 [pathname_match(os.path.join(root_relpath, d),
492                                                 pat)
493                                  for pat in self.exclude_paths]),
494                             dirs))
495                         files = list(filter(
496                             lambda f: not any(
497                                 [pathname_match(os.path.join(root_relpath, f),
498                                                 pat)
499                                  for pat in self.exclude_paths]),
500                             files))
501                     # Exclude files/dirs by name matching pattern
502                     if self.exclude_names is not None:
503                         dirs[:] = list(filter(lambda d: not self.exclude_names.match(d), dirs))
504                         files = list(filter(lambda f: not self.exclude_names.match(f), files))
505                     # Make os.walk()'s dir traversing order deterministic
506                     dirs.sort()
507                     files.sort()
508                     for f in files:
509                         filepath = os.path.join(root, f)
510                         # Add its size to the total bytes count (if applicable)
511                         if self.follow_links or (not os.path.islink(filepath)):
512                             if self.bytes_expected is not None:
513                                 self.bytes_expected += os.path.getsize(filepath)
514                         self._check_file(filepath,
515                                          os.path.join(root[len(prefixdir):], f))
516             else:
517                 filepath = os.path.abspath(path)
518                 # Add its size to the total bytes count (if applicable)
519                 if self.follow_links or (not os.path.islink(filepath)):
520                     if self.bytes_expected is not None:
521                         self.bytes_expected += os.path.getsize(filepath)
522                 self._check_file(filepath,
523                                  self.filename or os.path.basename(path))
524         # If dry-mode is on, and got up to this point, then we should notify that
525         # there aren't any file to upload.
526         if self.dry_run:
527             raise ArvPutUploadNotPending()
528         # Remove local_collection's files that don't exist locally anymore, so the
529         # bytes_written count is correct.
530         for f in self.collection_file_paths(self._local_collection,
531                                             path_prefix=""):
532             if f != 'stdin' and f != self.filename and not f in self._file_paths:
533                 self._local_collection.remove(f)
534
535     def start(self, save_collection):
536         """
537         Start supporting thread & file uploading
538         """
539         self._checkpointer.start()
540         try:
541             # Update bytes_written from current local collection and
542             # report initial progress.
543             self._update()
544             # Actual file upload
545             self._upload_started = True # Used by the update thread to start checkpointing
546             self._upload_files()
547         except (SystemExit, Exception) as e:
548             self._checkpoint_before_quit = False
549             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
550             # Note: We're expecting SystemExit instead of
551             # KeyboardInterrupt because we have a custom signal
552             # handler in place that raises SystemExit with the catched
553             # signal's code.
554             if isinstance(e, PathDoesNotExistError):
555                 # We aren't interested in the traceback for this case
556                 pass
557             elif not isinstance(e, SystemExit) or e.code != -2:
558                 self.logger.warning("Abnormal termination:\n{}".format(
559                     traceback.format_exc()))
560             raise
561         finally:
562             if not self.dry_run:
563                 # Stop the thread before doing anything else
564                 self._stop_checkpointer.set()
565                 self._checkpointer.join()
566                 if self._checkpoint_before_quit:
567                     # Commit all pending blocks & one last _update()
568                     self._local_collection.manifest_text()
569                     self._update(final=True)
570                     if save_collection:
571                         self.save_collection()
572             if self.use_cache:
573                 self._cache_file.close()
574
575     def save_collection(self):
576         if self.update:
577             # Check if files should be updated on the remote collection.
578             for fp in self._file_paths:
579                 remote_file = self._remote_collection.find(fp)
580                 if not remote_file:
581                     # File don't exist on remote collection, copy it.
582                     self._remote_collection.copy(fp, fp, self._local_collection)
583                 elif remote_file != self._local_collection.find(fp):
584                     # A different file exist on remote collection, overwrite it.
585                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
586                 else:
587                     # The file already exist on remote collection, skip it.
588                     pass
589             self._remote_collection.save(num_retries=self.num_retries)
590         else:
591             self._local_collection.save_new(
592                 name=self.name, owner_uuid=self.owner_uuid,
593                 ensure_unique_name=self.ensure_unique_name,
594                 num_retries=self.num_retries)
595
596     def destroy_cache(self):
597         if self.use_cache:
598             try:
599                 os.unlink(self._cache_filename)
600             except OSError as error:
601                 # That's what we wanted anyway.
602                 if error.errno != errno.ENOENT:
603                     raise
604             self._cache_file.close()
605
606     def _collection_size(self, collection):
607         """
608         Recursively get the total size of the collection
609         """
610         size = 0
611         for item in listvalues(collection):
612             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
613                 size += self._collection_size(item)
614             else:
615                 size += item.size()
616         return size
617
618     def _update_task(self):
619         """
620         Periodically called support task. File uploading is
621         asynchronous so we poll status from the collection.
622         """
623         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
624             self._update()
625
626     def _update(self, final=False):
627         """
628         Update cached manifest text and report progress.
629         """
630         if self._upload_started:
631             with self._collection_lock:
632                 self.bytes_written = self._collection_size(self._local_collection)
633                 if self.use_cache:
634                     if final:
635                         manifest = self._local_collection.manifest_text()
636                     else:
637                         # Get the manifest text without comitting pending blocks
638                         manifest = self._local_collection.manifest_text(strip=False,
639                                                                         normalize=False,
640                                                                         only_committed=True)
641                     # Update cache
642                     with self._state_lock:
643                         self._state['manifest'] = manifest
644             if self.use_cache:
645                 try:
646                     self._save_state()
647                 except Exception as e:
648                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
649         else:
650             self.bytes_written = self.bytes_skipped
651         # Call the reporter, if any
652         self.report_progress()
653
654     def report_progress(self):
655         if self.reporter is not None:
656             self.reporter(self.bytes_written, self.bytes_expected)
657
658     def _write_stdin(self, filename):
659         output = self._local_collection.open(filename, 'wb')
660         self._write(sys.stdin, output)
661         output.close()
662
663     def _check_file(self, source, filename):
664         """
665         Check if this file needs to be uploaded
666         """
667         # Ignore symlinks when requested
668         if (not self.follow_links) and os.path.islink(source):
669             return
670         resume_offset = 0
671         should_upload = False
672         new_file_in_cache = False
673         # Record file path for updating the remote collection before exiting
674         self._file_paths.add(filename)
675
676         with self._state_lock:
677             # If no previous cached data on this file, store it for an eventual
678             # repeated run.
679             if source not in self._state['files']:
680                 self._state['files'][source] = {
681                     'mtime': os.path.getmtime(source),
682                     'size' : os.path.getsize(source)
683                 }
684                 new_file_in_cache = True
685             cached_file_data = self._state['files'][source]
686
687         # Check if file was already uploaded (at least partially)
688         file_in_local_collection = self._local_collection.find(filename)
689
690         # If not resuming, upload the full file.
691         if not self.resume:
692             should_upload = True
693         # New file detected from last run, upload it.
694         elif new_file_in_cache:
695             should_upload = True
696         # Local file didn't change from last run.
697         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
698             if not file_in_local_collection:
699                 # File not uploaded yet, upload it completely
700                 should_upload = True
701             elif file_in_local_collection.permission_expired():
702                 # Permission token expired, re-upload file. This will change whenever
703                 # we have a API for refreshing tokens.
704                 should_upload = True
705                 self._local_collection.remove(filename)
706             elif cached_file_data['size'] == file_in_local_collection.size():
707                 # File already there, skip it.
708                 self.bytes_skipped += cached_file_data['size']
709             elif cached_file_data['size'] > file_in_local_collection.size():
710                 # File partially uploaded, resume!
711                 resume_offset = file_in_local_collection.size()
712                 self.bytes_skipped += resume_offset
713                 should_upload = True
714             else:
715                 # Inconsistent cache, re-upload the file
716                 should_upload = True
717                 self._local_collection.remove(filename)
718                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
719         # Local file differs from cached data, re-upload it.
720         else:
721             if file_in_local_collection:
722                 self._local_collection.remove(filename)
723             should_upload = True
724
725         if should_upload:
726             try:
727                 self._files_to_upload.append((source, resume_offset, filename))
728             except ArvPutUploadIsPending:
729                 # This could happen when running on dry-mode, close cache file to
730                 # avoid locking issues.
731                 self._cache_file.close()
732                 raise
733
734     def _upload_files(self):
735         for source, resume_offset, filename in self._files_to_upload:
736             with open(source, 'rb') as source_fd:
737                 with self._state_lock:
738                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
739                     self._state['files'][source]['size'] = os.path.getsize(source)
740                 if resume_offset > 0:
741                     # Start upload where we left off
742                     output = self._local_collection.open(filename, 'ab')
743                     source_fd.seek(resume_offset)
744                 else:
745                     # Start from scratch
746                     output = self._local_collection.open(filename, 'wb')
747                 self._write(source_fd, output)
748                 output.close(flush=False)
749
750     def _write(self, source_fd, output):
751         while True:
752             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
753             if not data:
754                 break
755             output.write(data)
756
757     def _my_collection(self):
758         return self._remote_collection if self.update else self._local_collection
759
760     def _setup_state(self, update_collection):
761         """
762         Create a new cache file or load a previously existing one.
763         """
764         # Load an already existing collection for update
765         if update_collection and re.match(arvados.util.collection_uuid_pattern,
766                                           update_collection):
767             try:
768                 self._remote_collection = arvados.collection.Collection(update_collection)
769             except arvados.errors.ApiError as error:
770                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
771             else:
772                 self.update = True
773         elif update_collection:
774             # Collection locator provided, but unknown format
775             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
776
777         if self.use_cache:
778             # Set up cache file name from input paths.
779             md5 = hashlib.md5()
780             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
781             realpaths = sorted(os.path.realpath(path) for path in self.paths)
782             md5.update(b'\0'.join([p.encode() for p in realpaths]))
783             if self.filename:
784                 md5.update(self.filename.encode())
785             cache_filename = md5.hexdigest()
786             cache_filepath = os.path.join(
787                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
788                 cache_filename)
789             if self.resume and os.path.exists(cache_filepath):
790                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
791                 self._cache_file = open(cache_filepath, 'a+')
792             else:
793                 # --no-resume means start with a empty cache file.
794                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
795                 self._cache_file = open(cache_filepath, 'w+')
796             self._cache_filename = self._cache_file.name
797             self._lock_file(self._cache_file)
798             self._cache_file.seek(0)
799
800         with self._state_lock:
801             if self.use_cache:
802                 try:
803                     self._state = json.load(self._cache_file)
804                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
805                         # Cache at least partially incomplete, set up new cache
806                         self._state = copy.deepcopy(self.EMPTY_STATE)
807                 except ValueError:
808                     # Cache file empty, set up new cache
809                     self._state = copy.deepcopy(self.EMPTY_STATE)
810             else:
811                 self.logger.info("No cache usage requested for this run.")
812                 # No cache file, set empty state
813                 self._state = copy.deepcopy(self.EMPTY_STATE)
814             # Load the previous manifest so we can check if files were modified remotely.
815             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
816
817     def collection_file_paths(self, col, path_prefix='.'):
818         """Return a list of file paths by recursively go through the entire collection `col`"""
819         file_paths = []
820         for name, item in listitems(col):
821             if isinstance(item, arvados.arvfile.ArvadosFile):
822                 file_paths.append(os.path.join(path_prefix, name))
823             elif isinstance(item, arvados.collection.Subcollection):
824                 new_prefix = os.path.join(path_prefix, name)
825                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
826         return file_paths
827
828     def _lock_file(self, fileobj):
829         try:
830             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
831         except IOError:
832             raise ResumeCacheConflict("{} locked".format(fileobj.name))
833
834     def _save_state(self):
835         """
836         Atomically save current state into cache.
837         """
838         with self._state_lock:
839             # We're not using copy.deepcopy() here because it's a lot slower
840             # than json.dumps(), and we're already needing JSON format to be
841             # saved on disk.
842             state = json.dumps(self._state)
843         try:
844             new_cache = tempfile.NamedTemporaryFile(
845                 mode='w+',
846                 dir=os.path.dirname(self._cache_filename), delete=False)
847             self._lock_file(new_cache)
848             new_cache.write(state)
849             new_cache.flush()
850             os.fsync(new_cache)
851             os.rename(new_cache.name, self._cache_filename)
852         except (IOError, OSError, ResumeCacheConflict) as error:
853             self.logger.error("There was a problem while saving the cache file: {}".format(error))
854             try:
855                 os.unlink(new_cache_name)
856             except NameError:  # mkstemp failed.
857                 pass
858         else:
859             self._cache_file.close()
860             self._cache_file = new_cache
861
862     def collection_name(self):
863         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
864
865     def manifest_locator(self):
866         return self._my_collection().manifest_locator()
867
868     def portable_data_hash(self):
869         pdh = self._my_collection().portable_data_hash()
870         m = self._my_collection().stripped_manifest().encode()
871         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
872         if pdh != local_pdh:
873             logger.warning("\n".join([
874                 "arv-put: API server provided PDH differs from local manifest.",
875                 "         This should not happen; showing API server version."]))
876         return pdh
877
878     def manifest_text(self, stream_name=".", strip=False, normalize=False):
879         return self._my_collection().manifest_text(stream_name, strip, normalize)
880
881     def _datablocks_on_item(self, item):
882         """
883         Return a list of datablock locators, recursively navigating
884         through subcollections
885         """
886         if isinstance(item, arvados.arvfile.ArvadosFile):
887             if item.size() == 0:
888                 # Empty file locator
889                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
890             else:
891                 locators = []
892                 for segment in item.segments():
893                     loc = segment.locator
894                     locators.append(loc)
895                 return locators
896         elif isinstance(item, arvados.collection.Collection):
897             l = [self._datablocks_on_item(x) for x in listvalues(item)]
898             # Fast list flattener method taken from:
899             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
900             return [loc for sublist in l for loc in sublist]
901         else:
902             return None
903
904     def data_locators(self):
905         with self._collection_lock:
906             # Make sure all datablocks are flushed before getting the locators
907             self._my_collection().manifest_text()
908             datablocks = self._datablocks_on_item(self._my_collection())
909         return datablocks
910
911 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
912                                                             os.getpid())
913
914 # Simulate glob.glob() matching behavior without the need to scan the filesystem
915 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
916 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
917 # so instead we're using it on every path component.
918 def pathname_match(pathname, pattern):
919     name = pathname.split(os.sep)
920     # Fix patterns like 'some/subdir/' or 'some//subdir'
921     pat = [x for x in pattern.split(os.sep) if x != '']
922     if len(name) != len(pat):
923         return False
924     for i in range(len(name)):
925         if not fnmatch.fnmatch(name[i], pat[i]):
926             return False
927     return True
928
929 def machine_progress(bytes_written, bytes_expected):
930     return _machine_format.format(
931         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
932
933 def human_progress(bytes_written, bytes_expected):
934     if bytes_expected:
935         return "\r{}M / {}M {:.1%} ".format(
936             bytes_written >> 20, bytes_expected >> 20,
937             float(bytes_written) / bytes_expected)
938     else:
939         return "\r{} ".format(bytes_written)
940
941 def progress_writer(progress_func, outfile=sys.stderr):
942     def write_progress(bytes_written, bytes_expected):
943         outfile.write(progress_func(bytes_written, bytes_expected))
944     return write_progress
945
946 def exit_signal_handler(sigcode, frame):
947     sys.exit(-sigcode)
948
949 def desired_project_uuid(api_client, project_uuid, num_retries):
950     if not project_uuid:
951         query = api_client.users().current()
952     elif arvados.util.user_uuid_pattern.match(project_uuid):
953         query = api_client.users().get(uuid=project_uuid)
954     elif arvados.util.group_uuid_pattern.match(project_uuid):
955         query = api_client.groups().get(uuid=project_uuid)
956     else:
957         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
958     return query.execute(num_retries=num_retries)['uuid']
959
960 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
961     global api_client
962
963     logger = logging.getLogger('arvados.arv_put')
964     logger.setLevel(logging.INFO)
965     args = parse_arguments(arguments)
966     status = 0
967     if api_client is None:
968         api_client = arvados.api('v1')
969
970     # Determine the name to use
971     if args.name:
972         if args.stream or args.raw:
973             logger.error("Cannot use --name with --stream or --raw")
974             sys.exit(1)
975         elif args.update_collection:
976             logger.error("Cannot use --name with --update-collection")
977             sys.exit(1)
978         collection_name = args.name
979     else:
980         collection_name = "Saved at {} by {}@{}".format(
981             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
982             pwd.getpwuid(os.getuid()).pw_name,
983             socket.gethostname())
984
985     if args.project_uuid and (args.stream or args.raw):
986         logger.error("Cannot use --project-uuid with --stream or --raw")
987         sys.exit(1)
988
989     # Determine the parent project
990     try:
991         project_uuid = desired_project_uuid(api_client, args.project_uuid,
992                                             args.retries)
993     except (apiclient_errors.Error, ValueError) as error:
994         logger.error(error)
995         sys.exit(1)
996
997     if args.progress:
998         reporter = progress_writer(human_progress)
999     elif args.batch_progress:
1000         reporter = progress_writer(machine_progress)
1001     else:
1002         reporter = None
1003
1004     # Setup exclude regex from all the --exclude arguments provided
1005     name_patterns = []
1006     exclude_paths = []
1007     exclude_names = None
1008     if len(args.exclude) > 0:
1009         # We're supporting 2 kinds of exclusion patterns:
1010         # 1) --exclude '*.jpg'      (file/dir name patterns, will only match
1011         #                            the name)
1012         # 2) --exclude 'foo/bar'    (file/dir path patterns, will match the
1013         #                            entire path, and should be relative to
1014         #                            any input dir argument)
1015         for p in args.exclude:
1016             # Only relative paths patterns allowed
1017             if p.startswith(os.sep):
1018                 logger.error("Cannot use absolute paths with --exclude")
1019                 sys.exit(1)
1020             if os.path.dirname(p):
1021                 # We don't support of path patterns with '.' or '..'
1022                 p_parts = p.split(os.sep)
1023                 if '.' in p_parts or '..' in p_parts:
1024                     logger.error(
1025                         "Cannot use path patterns that include '.' or '..'")
1026                     sys.exit(1)
1027                 # Path search pattern
1028                 exclude_paths.append(p)
1029             else:
1030                 # Name-only search pattern
1031                 name_patterns.append(p)
1032         # For name only matching, we can combine all patterns into a single regexp,
1033         # for better performance.
1034         exclude_names = re.compile('|'.join(
1035             [fnmatch.translate(p) for p in name_patterns]
1036         )) if len(name_patterns) > 0 else None
1037         # Show the user the patterns to be used, just in case they weren't specified inside
1038         # quotes and got changed by the shell expansion.
1039         logger.info("Exclude patterns: {}".format(args.exclude))
1040
1041     # If this is used by a human, and there's at least one directory to be
1042     # uploaded, the expected bytes calculation can take a moment.
1043     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1044         logger.info("Calculating upload size, this could take some time...")
1045     try:
1046         writer = ArvPutUploadJob(paths = args.paths,
1047                                  resume = args.resume,
1048                                  use_cache = args.use_cache,
1049                                  filename = args.filename,
1050                                  reporter = reporter,
1051                                  num_retries = args.retries,
1052                                  replication_desired = args.replication,
1053                                  put_threads = args.threads,
1054                                  name = collection_name,
1055                                  owner_uuid = project_uuid,
1056                                  ensure_unique_name = True,
1057                                  update_collection = args.update_collection,
1058                                  logger=logger,
1059                                  dry_run=args.dry_run,
1060                                  follow_links=args.follow_links,
1061                                  exclude_paths=exclude_paths,
1062                                  exclude_names=exclude_names)
1063     except ResumeCacheConflict:
1064         logger.error("\n".join([
1065             "arv-put: Another process is already uploading this data.",
1066             "         Use --no-cache if this is really what you want."]))
1067         sys.exit(1)
1068     except CollectionUpdateError as error:
1069         logger.error("\n".join([
1070             "arv-put: %s" % str(error)]))
1071         sys.exit(1)
1072     except ArvPutUploadIsPending:
1073         # Dry run check successful, return proper exit code.
1074         sys.exit(2)
1075     except ArvPutUploadNotPending:
1076         # No files pending for upload
1077         sys.exit(0)
1078     except PathDoesNotExistError as error:
1079         logger.error("\n".join([
1080             "arv-put: %s" % str(error)]))
1081         sys.exit(1)
1082
1083     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
1084     # the originals.
1085     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
1086                             for sigcode in CAUGHT_SIGNALS}
1087
1088     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1089         logger.warning("\n".join([
1090             "arv-put: Resuming previous upload from last checkpoint.",
1091             "         Use the --no-resume option to start over."]))
1092
1093     if not args.dry_run:
1094         writer.report_progress()
1095     output = None
1096     try:
1097         writer.start(save_collection=not(args.stream or args.raw))
1098     except arvados.errors.ApiError as error:
1099         logger.error("\n".join([
1100             "arv-put: %s" % str(error)]))
1101         sys.exit(1)
1102
1103     if args.progress:  # Print newline to split stderr from stdout for humans.
1104         logger.info("\n")
1105
1106     if args.stream:
1107         if args.normalize:
1108             output = writer.manifest_text(normalize=True)
1109         else:
1110             output = writer.manifest_text()
1111     elif args.raw:
1112         output = ','.join(writer.data_locators())
1113     else:
1114         try:
1115             if args.update_collection:
1116                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1117             else:
1118                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1119             if args.portable_data_hash:
1120                 output = writer.portable_data_hash()
1121             else:
1122                 output = writer.manifest_locator()
1123         except apiclient_errors.Error as error:
1124             logger.error(
1125                 "arv-put: Error creating Collection on project: {}.".format(
1126                     error))
1127             status = 1
1128
1129     # Print the locator (uuid) of the new collection.
1130     if output is None:
1131         status = status or 1
1132     else:
1133         stdout.write(output)
1134         if not output.endswith('\n'):
1135             stdout.write('\n')
1136
1137     for sigcode, orig_handler in listitems(orig_signal_handlers):
1138         signal.signal(sigcode, orig_handler)
1139
1140     if status != 0:
1141         sys.exit(status)
1142
1143     # Success!
1144     return output
1145
1146
1147 if __name__ == '__main__':
1148     main()