11579: Added symlinked dir traversal as the default behavior.
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 import traceback
27
28 from apiclient import errors as apiclient_errors
29 from arvados._version import __version__
30
31 import arvados.commands._util as arv_cmd
32
33 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
34 api_client = None
35
36 upload_opts = argparse.ArgumentParser(add_help=False)
37
38 upload_opts.add_argument('--version', action='version',
39                          version="%s %s" % (sys.argv[0], __version__),
40                          help='Print version and exit.')
41 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
42                          help="""
43 Local file or directory. Default: read from standard input.
44 """)
45
46 _group = upload_opts.add_mutually_exclusive_group()
47
48 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
49                     default=-1, help=argparse.SUPPRESS)
50
51 _group.add_argument('--normalize', action='store_true',
52                     help="""
53 Normalize the manifest by re-ordering files and streams after writing
54 data.
55 """)
56
57 _group.add_argument('--dry-run', action='store_true', default=False,
58                     help="""
59 Don't actually upload files, but only check if any file should be
60 uploaded. Exit with code=2 when files are pending for upload.
61 """)
62
63 _group = upload_opts.add_mutually_exclusive_group()
64
65 _group.add_argument('--as-stream', action='store_true', dest='stream',
66                     help="""
67 Synonym for --stream.
68 """)
69
70 _group.add_argument('--stream', action='store_true',
71                     help="""
72 Store the file content and display the resulting manifest on
73 stdout. Do not write the manifest to Keep or save a Collection object
74 in Arvados.
75 """)
76
77 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
78                     help="""
79 Synonym for --manifest.
80 """)
81
82 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
83                     help="""
84 Synonym for --manifest.
85 """)
86
87 _group.add_argument('--manifest', action='store_true',
88                     help="""
89 Store the file data and resulting manifest in Keep, save a Collection
90 object in Arvados, and display the manifest locator (Collection uuid)
91 on stdout. This is the default behavior.
92 """)
93
94 _group.add_argument('--as-raw', action='store_true', dest='raw',
95                     help="""
96 Synonym for --raw.
97 """)
98
99 _group.add_argument('--raw', action='store_true',
100                     help="""
101 Store the file content and display the data block locators on stdout,
102 separated by commas, with a trailing newline. Do not store a
103 manifest.
104 """)
105
106 upload_opts.add_argument('--update-collection', type=str, default=None,
107                          dest='update_collection', metavar="UUID", help="""
108 Update an existing collection identified by the given Arvados collection
109 UUID. All new local files will be uploaded.
110 """)
111
112 upload_opts.add_argument('--use-filename', type=str, default=None,
113                          dest='filename', help="""
114 Synonym for --filename.
115 """)
116
117 upload_opts.add_argument('--filename', type=str, default=None,
118                          help="""
119 Use the given filename in the manifest, instead of the name of the
120 local file. This is useful when "-" or "/dev/stdin" is given as an
121 input file. It can be used only if there is exactly one path given and
122 it is not a directory. Implies --manifest.
123 """)
124
125 upload_opts.add_argument('--portable-data-hash', action='store_true',
126                          help="""
127 Print the portable data hash instead of the Arvados UUID for the collection
128 created by the upload.
129 """)
130
131 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
132                          help="""
133 Set the replication level for the new collection: how many different
134 physical storage devices (e.g., disks) should have a copy of each data
135 block. Default is to use the server-provided default (if any) or 2.
136 """)
137
138 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
139                          help="""
140 Set the number of upload threads to be used. Take into account that
141 using lots of threads will increase the RAM requirements. Default is
142 to use 2 threads.
143 On high latency installations, using a greater number will improve
144 overall throughput.
145 """)
146
147 run_opts = argparse.ArgumentParser(add_help=False)
148
149 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
150 Store the collection in the specified project, instead of your Home
151 project.
152 """)
153
154 run_opts.add_argument('--name', help="""
155 Save the collection with the specified name.
156 """)
157
158 _group = run_opts.add_mutually_exclusive_group()
159 _group.add_argument('--progress', action='store_true',
160                     help="""
161 Display human-readable progress on stderr (bytes and, if possible,
162 percentage of total data size). This is the default behavior when
163 stderr is a tty.
164 """)
165
166 _group.add_argument('--no-progress', action='store_true',
167                     help="""
168 Do not display human-readable progress on stderr, even if stderr is a
169 tty.
170 """)
171
172 _group.add_argument('--batch-progress', action='store_true',
173                     help="""
174 Display machine-readable progress on stderr (bytes and, if known,
175 total data size).
176 """)
177
178 _group = run_opts.add_mutually_exclusive_group()
179 _group.add_argument('--resume', action='store_true', default=True,
180                     help="""
181 Continue interrupted uploads from cached state (default).
182 """)
183 _group.add_argument('--no-resume', action='store_false', dest='resume',
184                     help="""
185 Do not continue interrupted uploads from cached state.
186 """)
187
188 _group = run_opts.add_mutually_exclusive_group()
189 _group.add_argument('--follow-links', action='store_true', default=True,
190                     dest='follow_links', help="""
191 Traverse directory symlinks (default).
192 Multiple symlinks pointing to the same directory will only be followed once.
193 """)
194 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
195                     help="""
196 Do not traverse directory symlinks.
197 """)
198
199 _group = run_opts.add_mutually_exclusive_group()
200 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
201                     help="""
202 Save upload state in a cache file for resuming (default).
203 """)
204 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
205                     help="""
206 Do not save upload state in a cache file for resuming.
207 """)
208
209 arg_parser = argparse.ArgumentParser(
210     description='Copy data from the local filesystem to Keep.',
211     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
212
213 def parse_arguments(arguments):
214     args = arg_parser.parse_args(arguments)
215
216     if len(args.paths) == 0:
217         args.paths = ['-']
218
219     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
220
221     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
222         if args.filename:
223             arg_parser.error("""
224     --filename argument cannot be used when storing a directory or
225     multiple files.
226     """)
227
228     # Turn on --progress by default if stderr is a tty.
229     if (not (args.batch_progress or args.no_progress)
230         and os.isatty(sys.stderr.fileno())):
231         args.progress = True
232
233     # Turn off --resume (default) if --no-cache is used.
234     if not args.use_cache:
235         args.resume = False
236
237     if args.paths == ['-']:
238         if args.update_collection:
239             arg_parser.error("""
240     --update-collection cannot be used when reading from stdin.
241     """)
242         args.resume = False
243         args.use_cache = False
244         if not args.filename:
245             args.filename = 'stdin'
246
247     return args
248
249
250 class CollectionUpdateError(Exception):
251     pass
252
253
254 class ResumeCacheConflict(Exception):
255     pass
256
257
258 class ArvPutArgumentConflict(Exception):
259     pass
260
261
262 class ArvPutUploadIsPending(Exception):
263     pass
264
265
266 class ArvPutUploadNotPending(Exception):
267     pass
268
269
270 class FileUploadList(list):
271     def __init__(self, dry_run=False):
272         list.__init__(self)
273         self.dry_run = dry_run
274
275     def append(self, other):
276         if self.dry_run:
277             raise ArvPutUploadIsPending()
278         super(FileUploadList, self).append(other)
279
280
281 class ResumeCache(object):
282     CACHE_DIR = '.cache/arvados/arv-put'
283
284     def __init__(self, file_spec):
285         self.cache_file = open(file_spec, 'a+')
286         self._lock_file(self.cache_file)
287         self.filename = self.cache_file.name
288
289     @classmethod
290     def make_path(cls, args):
291         md5 = hashlib.md5()
292         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
293         realpaths = sorted(os.path.realpath(path) for path in args.paths)
294         md5.update('\0'.join(realpaths))
295         if any(os.path.isdir(path) for path in realpaths):
296             md5.update("-1")
297         elif args.filename:
298             md5.update(args.filename)
299         return os.path.join(
300             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
301             md5.hexdigest())
302
303     def _lock_file(self, fileobj):
304         try:
305             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
306         except IOError:
307             raise ResumeCacheConflict("{} locked".format(fileobj.name))
308
309     def load(self):
310         self.cache_file.seek(0)
311         return json.load(self.cache_file)
312
313     def check_cache(self, api_client=None, num_retries=0):
314         try:
315             state = self.load()
316             locator = None
317             try:
318                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
319                     locator = state["_finished_streams"][0][1][0]
320                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
321                     locator = state["_current_stream_locators"][0]
322                 if locator is not None:
323                     kc = arvados.keep.KeepClient(api_client=api_client)
324                     kc.head(locator, num_retries=num_retries)
325             except Exception as e:
326                 self.restart()
327         except (ValueError):
328             pass
329
330     def save(self, data):
331         try:
332             new_cache_fd, new_cache_name = tempfile.mkstemp(
333                 dir=os.path.dirname(self.filename))
334             self._lock_file(new_cache_fd)
335             new_cache = os.fdopen(new_cache_fd, 'r+')
336             json.dump(data, new_cache)
337             os.rename(new_cache_name, self.filename)
338         except (IOError, OSError, ResumeCacheConflict) as error:
339             try:
340                 os.unlink(new_cache_name)
341             except NameError:  # mkstemp failed.
342                 pass
343         else:
344             self.cache_file.close()
345             self.cache_file = new_cache
346
347     def close(self):
348         self.cache_file.close()
349
350     def destroy(self):
351         try:
352             os.unlink(self.filename)
353         except OSError as error:
354             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
355                 raise
356         self.close()
357
358     def restart(self):
359         self.destroy()
360         self.__init__(self.filename)
361
362
363 class ArvPutUploadJob(object):
364     CACHE_DIR = '.cache/arvados/arv-put'
365     EMPTY_STATE = {
366         'manifest' : None, # Last saved manifest checkpoint
367         'files' : {} # Previous run file list: {path : {size, mtime}}
368     }
369
370     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
371                  bytes_expected=None, name=None, owner_uuid=None,
372                  ensure_unique_name=False, num_retries=None,
373                  put_threads=None, replication_desired=None,
374                  filename=None, update_time=60.0, update_collection=None,
375                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
376                  follow_links=True):
377         self.paths = paths
378         self.resume = resume
379         self.use_cache = use_cache
380         self.update = False
381         self.reporter = reporter
382         self.bytes_expected = bytes_expected
383         self.bytes_written = 0
384         self.bytes_skipped = 0
385         self.name = name
386         self.owner_uuid = owner_uuid
387         self.ensure_unique_name = ensure_unique_name
388         self.num_retries = num_retries
389         self.replication_desired = replication_desired
390         self.put_threads = put_threads
391         self.filename = filename
392         self._state_lock = threading.Lock()
393         self._state = None # Previous run state (file list & manifest)
394         self._current_files = [] # Current run file list
395         self._cache_file = None
396         self._collection_lock = threading.Lock()
397         self._remote_collection = None # Collection being updated (if asked)
398         self._local_collection = None # Collection from previous run manifest
399         self._file_paths = set() # Files to be updated in remote collection
400         self._stop_checkpointer = threading.Event()
401         self._checkpointer = threading.Thread(target=self._update_task)
402         self._checkpointer.daemon = True
403         self._update_task_time = update_time  # How many seconds wait between update runs
404         self._files_to_upload = FileUploadList(dry_run=dry_run)
405         self._upload_started = False
406         self.logger = logger
407         self.dry_run = dry_run
408         self._checkpoint_before_quit = True
409         self.follow_links = follow_links
410         self._traversed_links = set()
411
412         if not self.use_cache and self.resume:
413             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
414
415         # Check for obvious dry-run responses
416         if self.dry_run and (not self.use_cache or not self.resume):
417             raise ArvPutUploadIsPending()
418
419         # Load cached data if any and if needed
420         self._setup_state(update_collection)
421
422     def _check_traversed_dir_links(self, root, dirs):
423         """
424         Remove from the 'dirs' list the already traversed directory symlinks,
425         register the new dir symlinks as traversed.
426         """
427         for d in [d for d in dirs if os.path.isdir(os.path.join(root, d)) and
428                   os.path.islink(os.path.join(root, d))]:
429             real_dirpath = os.path.realpath(os.path.join(root, d))
430             if real_dirpath in self._traversed_links:
431                 dirs.remove(d)
432                 self.logger.warning("Skipping '{}' symlink to directory '{}' because it was already uploaded".format(os.path.join(root, d), real_dirpath))
433             else:
434                 self._traversed_links.add(real_dirpath)
435         return dirs
436         
437     def start(self, save_collection):
438         """
439         Start supporting thread & file uploading
440         """
441         if not self.dry_run:
442             self._checkpointer.start()
443         try:
444             for path in self.paths:
445                 # Test for stdin first, in case some file named '-' exist
446                 if path == '-':
447                     if self.dry_run:
448                         raise ArvPutUploadIsPending()
449                     self._write_stdin(self.filename or 'stdin')
450                 elif os.path.isdir(path):
451                     # Use absolute paths on cache index so CWD doesn't interfere
452                     # with the caching logic.
453                     prefixdir = path = os.path.abspath(path)
454                     if prefixdir != '/':
455                         prefixdir += '/'
456                     # If following symlinks, avoid recursive traversals
457                     if self.follow_links and os.path.islink(path):
458                         self._traversed_links.add(os.path.realpath(path))
459                     for root, dirs, files in os.walk(path, followlinks=self.follow_links):
460                         if self.follow_links:
461                             dirs = self._check_traversed_dir_links(root, dirs)
462                         # Make os.walk()'s dir traversing order deterministic
463                         dirs.sort()
464                         files.sort()
465                         for f in files:
466                             self._check_file(os.path.join(root, f),
467                                              os.path.join(root[len(prefixdir):], f))
468                 else:
469                     self._check_file(os.path.abspath(path),
470                                      self.filename or os.path.basename(path))
471             # If dry-mode is on, and got up to this point, then we should notify that
472             # there aren't any file to upload.
473             if self.dry_run:
474                 raise ArvPutUploadNotPending()
475             # Remove local_collection's files that don't exist locally anymore, so the
476             # bytes_written count is correct.
477             for f in self.collection_file_paths(self._local_collection,
478                                                 path_prefix=""):
479                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
480                     self._local_collection.remove(f)
481             # Update bytes_written from current local collection and
482             # report initial progress.
483             self._update()
484             # Actual file upload
485             self._upload_started = True # Used by the update thread to start checkpointing
486             self._upload_files()
487         except (SystemExit, Exception) as e:
488             self._checkpoint_before_quit = False
489             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
490             # Note: We're expecting SystemExit instead of KeyboardInterrupt because
491             #   we have a custom signal handler in place that raises SystemExit with
492             #   the catched signal's code.
493             if not isinstance(e, SystemExit) or e.code != -2:
494                 self.logger.warning("Abnormal termination:\n{}".format(traceback.format_exc(e)))
495             raise
496         finally:
497             if not self.dry_run:
498                 # Stop the thread before doing anything else
499                 self._stop_checkpointer.set()
500                 self._checkpointer.join()
501                 if self._checkpoint_before_quit:
502                     # Commit all pending blocks & one last _update()
503                     self._local_collection.manifest_text()
504                     self._update(final=True)
505                     if save_collection:
506                         self.save_collection()
507             if self.use_cache:
508                 self._cache_file.close()
509
510     def save_collection(self):
511         if self.update:
512             # Check if files should be updated on the remote collection.
513             for fp in self._file_paths:
514                 remote_file = self._remote_collection.find(fp)
515                 if not remote_file:
516                     # File don't exist on remote collection, copy it.
517                     self._remote_collection.copy(fp, fp, self._local_collection)
518                 elif remote_file != self._local_collection.find(fp):
519                     # A different file exist on remote collection, overwrite it.
520                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
521                 else:
522                     # The file already exist on remote collection, skip it.
523                     pass
524             self._remote_collection.save(num_retries=self.num_retries)
525         else:
526             self._local_collection.save_new(
527                 name=self.name, owner_uuid=self.owner_uuid,
528                 ensure_unique_name=self.ensure_unique_name,
529                 num_retries=self.num_retries)
530
531     def destroy_cache(self):
532         if self.use_cache:
533             try:
534                 os.unlink(self._cache_filename)
535             except OSError as error:
536                 # That's what we wanted anyway.
537                 if error.errno != errno.ENOENT:
538                     raise
539             self._cache_file.close()
540
541     def _collection_size(self, collection):
542         """
543         Recursively get the total size of the collection
544         """
545         size = 0
546         for item in collection.values():
547             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
548                 size += self._collection_size(item)
549             else:
550                 size += item.size()
551         return size
552
553     def _update_task(self):
554         """
555         Periodically called support task. File uploading is
556         asynchronous so we poll status from the collection.
557         """
558         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
559             self._update()
560
561     def _update(self, final=False):
562         """
563         Update cached manifest text and report progress.
564         """
565         if self._upload_started:
566             with self._collection_lock:
567                 self.bytes_written = self._collection_size(self._local_collection)
568                 if self.use_cache:
569                     if final:
570                         manifest = self._local_collection.manifest_text()
571                     else:
572                         # Get the manifest text without comitting pending blocks
573                         manifest = self._local_collection.manifest_text(strip=False,
574                                                                         normalize=False,
575                                                                         only_committed=True)
576                     # Update cache
577                     with self._state_lock:
578                         self._state['manifest'] = manifest
579             if self.use_cache:
580                 try:
581                     self._save_state()
582                 except Exception as e:
583                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
584         else:
585             self.bytes_written = self.bytes_skipped
586         # Call the reporter, if any
587         self.report_progress()
588
589     def report_progress(self):
590         if self.reporter is not None:
591             self.reporter(self.bytes_written, self.bytes_expected)
592
593     def _write_stdin(self, filename):
594         output = self._local_collection.open(filename, 'w')
595         self._write(sys.stdin, output)
596         output.close()
597
598     def _check_file(self, source, filename):
599         """Check if this file needs to be uploaded"""
600         resume_offset = 0
601         should_upload = False
602         new_file_in_cache = False
603         # Record file path for updating the remote collection before exiting
604         self._file_paths.add(filename)
605
606         with self._state_lock:
607             # If no previous cached data on this file, store it for an eventual
608             # repeated run.
609             if source not in self._state['files']:
610                 self._state['files'][source] = {
611                     'mtime': os.path.getmtime(source),
612                     'size' : os.path.getsize(source)
613                 }
614                 new_file_in_cache = True
615             cached_file_data = self._state['files'][source]
616
617         # Check if file was already uploaded (at least partially)
618         file_in_local_collection = self._local_collection.find(filename)
619
620         # If not resuming, upload the full file.
621         if not self.resume:
622             should_upload = True
623         # New file detected from last run, upload it.
624         elif new_file_in_cache:
625             should_upload = True
626         # Local file didn't change from last run.
627         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
628             if not file_in_local_collection:
629                 # File not uploaded yet, upload it completely
630                 should_upload = True
631             elif file_in_local_collection.permission_expired():
632                 # Permission token expired, re-upload file. This will change whenever
633                 # we have a API for refreshing tokens.
634                 should_upload = True
635                 self._local_collection.remove(filename)
636             elif cached_file_data['size'] == file_in_local_collection.size():
637                 # File already there, skip it.
638                 self.bytes_skipped += cached_file_data['size']
639             elif cached_file_data['size'] > file_in_local_collection.size():
640                 # File partially uploaded, resume!
641                 resume_offset = file_in_local_collection.size()
642                 self.bytes_skipped += resume_offset
643                 should_upload = True
644             else:
645                 # Inconsistent cache, re-upload the file
646                 should_upload = True
647                 self._local_collection.remove(filename)
648                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
649         # Local file differs from cached data, re-upload it.
650         else:
651             if file_in_local_collection:
652                 self._local_collection.remove(filename)
653             should_upload = True
654
655         if should_upload:
656             self._files_to_upload.append((source, resume_offset, filename))
657
658     def _upload_files(self):
659         for source, resume_offset, filename in self._files_to_upload:
660             with open(source, 'r') as source_fd:
661                 with self._state_lock:
662                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
663                     self._state['files'][source]['size'] = os.path.getsize(source)
664                 if resume_offset > 0:
665                     # Start upload where we left off
666                     output = self._local_collection.open(filename, 'a')
667                     source_fd.seek(resume_offset)
668                 else:
669                     # Start from scratch
670                     output = self._local_collection.open(filename, 'w')
671                 self._write(source_fd, output)
672                 output.close(flush=False)
673
674     def _write(self, source_fd, output):
675         while True:
676             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
677             if not data:
678                 break
679             output.write(data)
680
681     def _my_collection(self):
682         return self._remote_collection if self.update else self._local_collection
683
684     def _setup_state(self, update_collection):
685         """
686         Create a new cache file or load a previously existing one.
687         """
688         # Load an already existing collection for update
689         if update_collection and re.match(arvados.util.collection_uuid_pattern,
690                                           update_collection):
691             try:
692                 self._remote_collection = arvados.collection.Collection(update_collection)
693             except arvados.errors.ApiError as error:
694                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
695             else:
696                 self.update = True
697         elif update_collection:
698             # Collection locator provided, but unknown format
699             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
700
701         if self.use_cache:
702             # Set up cache file name from input paths.
703             md5 = hashlib.md5()
704             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
705             realpaths = sorted(os.path.realpath(path) for path in self.paths)
706             md5.update('\0'.join(realpaths))
707             if self.filename:
708                 md5.update(self.filename)
709             cache_filename = md5.hexdigest()
710             cache_filepath = os.path.join(
711                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
712                 cache_filename)
713             if self.resume and os.path.exists(cache_filepath):
714                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
715                 self._cache_file = open(cache_filepath, 'a+')
716             else:
717                 # --no-resume means start with a empty cache file.
718                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
719                 self._cache_file = open(cache_filepath, 'w+')
720             self._cache_filename = self._cache_file.name
721             self._lock_file(self._cache_file)
722             self._cache_file.seek(0)
723
724         with self._state_lock:
725             if self.use_cache:
726                 try:
727                     self._state = json.load(self._cache_file)
728                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
729                         # Cache at least partially incomplete, set up new cache
730                         self._state = copy.deepcopy(self.EMPTY_STATE)
731                 except ValueError:
732                     # Cache file empty, set up new cache
733                     self._state = copy.deepcopy(self.EMPTY_STATE)
734             else:
735                 self.logger.info("No cache usage requested for this run.")
736                 # No cache file, set empty state
737                 self._state = copy.deepcopy(self.EMPTY_STATE)
738             # Load the previous manifest so we can check if files were modified remotely.
739             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
740
741     def collection_file_paths(self, col, path_prefix='.'):
742         """Return a list of file paths by recursively go through the entire collection `col`"""
743         file_paths = []
744         for name, item in col.items():
745             if isinstance(item, arvados.arvfile.ArvadosFile):
746                 file_paths.append(os.path.join(path_prefix, name))
747             elif isinstance(item, arvados.collection.Subcollection):
748                 new_prefix = os.path.join(path_prefix, name)
749                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
750         return file_paths
751
752     def _lock_file(self, fileobj):
753         try:
754             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
755         except IOError:
756             raise ResumeCacheConflict("{} locked".format(fileobj.name))
757
758     def _save_state(self):
759         """
760         Atomically save current state into cache.
761         """
762         with self._state_lock:
763             # We're not using copy.deepcopy() here because it's a lot slower
764             # than json.dumps(), and we're already needing JSON format to be
765             # saved on disk.
766             state = json.dumps(self._state)
767         try:
768             new_cache = tempfile.NamedTemporaryFile(
769                 dir=os.path.dirname(self._cache_filename), delete=False)
770             self._lock_file(new_cache)
771             new_cache.write(state)
772             new_cache.flush()
773             os.fsync(new_cache)
774             os.rename(new_cache.name, self._cache_filename)
775         except (IOError, OSError, ResumeCacheConflict) as error:
776             self.logger.error("There was a problem while saving the cache file: {}".format(error))
777             try:
778                 os.unlink(new_cache_name)
779             except NameError:  # mkstemp failed.
780                 pass
781         else:
782             self._cache_file.close()
783             self._cache_file = new_cache
784
785     def collection_name(self):
786         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
787
788     def manifest_locator(self):
789         return self._my_collection().manifest_locator()
790
791     def portable_data_hash(self):
792         pdh = self._my_collection().portable_data_hash()
793         m = self._my_collection().stripped_manifest()
794         local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
795         if pdh != local_pdh:
796             logger.warning("\n".join([
797                 "arv-put: API server provided PDH differs from local manifest.",
798                 "         This should not happen; showing API server version."]))
799         return pdh
800
801     def manifest_text(self, stream_name=".", strip=False, normalize=False):
802         return self._my_collection().manifest_text(stream_name, strip, normalize)
803
804     def _datablocks_on_item(self, item):
805         """
806         Return a list of datablock locators, recursively navigating
807         through subcollections
808         """
809         if isinstance(item, arvados.arvfile.ArvadosFile):
810             if item.size() == 0:
811                 # Empty file locator
812                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
813             else:
814                 locators = []
815                 for segment in item.segments():
816                     loc = segment.locator
817                     locators.append(loc)
818                 return locators
819         elif isinstance(item, arvados.collection.Collection):
820             l = [self._datablocks_on_item(x) for x in item.values()]
821             # Fast list flattener method taken from:
822             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
823             return [loc for sublist in l for loc in sublist]
824         else:
825             return None
826
827     def data_locators(self):
828         with self._collection_lock:
829             # Make sure all datablocks are flushed before getting the locators
830             self._my_collection().manifest_text()
831             datablocks = self._datablocks_on_item(self._my_collection())
832         return datablocks
833
834
835 def expected_bytes_for(pathlist, follow_links=True):
836     # Walk the given directory trees and stat files, adding up file sizes,
837     # so we can display progress as percent
838     linked_dirs = set()
839     bytesum = 0
840     for path in pathlist:
841         if os.path.isdir(path):
842             for root, dirs, files in os.walk(path, followlinks=follow_links):
843                 if follow_links:
844                     # Skip those linked dirs that were visited more than once.
845                     for d in [x for x in dirs if os.path.islink(os.path.join(root, x))]:
846                         d_realpath = os.path.realpath(os.path.join(root, d))
847                         if d_realpath in linked_dirs:
848                             # Linked dir already visited, skip it.
849                             dirs.remove(d)
850                         else:
851                             # Will only visit this dir once
852                             linked_dirs.add(d_realpath)
853                 # Sum file sizes
854                 for f in files:
855                     bytesum += os.path.getsize(os.path.join(root, f))
856         elif not os.path.isfile(path):
857             return None
858         else:
859             bytesum += os.path.getsize(path)
860     return bytesum
861
862 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
863                                                             os.getpid())
864 def machine_progress(bytes_written, bytes_expected):
865     return _machine_format.format(
866         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
867
868 def human_progress(bytes_written, bytes_expected):
869     if bytes_expected:
870         return "\r{}M / {}M {:.1%} ".format(
871             bytes_written >> 20, bytes_expected >> 20,
872             float(bytes_written) / bytes_expected)
873     else:
874         return "\r{} ".format(bytes_written)
875
876 def progress_writer(progress_func, outfile=sys.stderr):
877     def write_progress(bytes_written, bytes_expected):
878         outfile.write(progress_func(bytes_written, bytes_expected))
879     return write_progress
880
881 def exit_signal_handler(sigcode, frame):
882     sys.exit(-sigcode)
883
884 def desired_project_uuid(api_client, project_uuid, num_retries):
885     if not project_uuid:
886         query = api_client.users().current()
887     elif arvados.util.user_uuid_pattern.match(project_uuid):
888         query = api_client.users().get(uuid=project_uuid)
889     elif arvados.util.group_uuid_pattern.match(project_uuid):
890         query = api_client.groups().get(uuid=project_uuid)
891     else:
892         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
893     return query.execute(num_retries=num_retries)['uuid']
894
895 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
896     global api_client
897
898     logger = logging.getLogger('arvados.arv_put')
899     logger.setLevel(logging.INFO)
900     args = parse_arguments(arguments)
901     status = 0
902     if api_client is None:
903         api_client = arvados.api('v1')
904
905     # Determine the name to use
906     if args.name:
907         if args.stream or args.raw:
908             logger.error("Cannot use --name with --stream or --raw")
909             sys.exit(1)
910         elif args.update_collection:
911             logger.error("Cannot use --name with --update-collection")
912             sys.exit(1)
913         collection_name = args.name
914     else:
915         collection_name = "Saved at {} by {}@{}".format(
916             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
917             pwd.getpwuid(os.getuid()).pw_name,
918             socket.gethostname())
919
920     if args.project_uuid and (args.stream or args.raw):
921         logger.error("Cannot use --project-uuid with --stream or --raw")
922         sys.exit(1)
923
924     # Determine the parent project
925     try:
926         project_uuid = desired_project_uuid(api_client, args.project_uuid,
927                                             args.retries)
928     except (apiclient_errors.Error, ValueError) as error:
929         logger.error(error)
930         sys.exit(1)
931
932     if args.progress:
933         reporter = progress_writer(human_progress)
934     elif args.batch_progress:
935         reporter = progress_writer(machine_progress)
936     else:
937         reporter = None
938
939     # If this is used by a human, and there's at least one directory to be
940     # uploaded, the expected bytes calculation can take a moment.
941     if args.progress and any([os.path.isdir(f) for f in args.paths]):
942         logger.info("Calculating upload size, this could take some time...")
943     bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
944
945     try:
946         writer = ArvPutUploadJob(paths = args.paths,
947                                  resume = args.resume,
948                                  use_cache = args.use_cache,
949                                  filename = args.filename,
950                                  reporter = reporter,
951                                  bytes_expected = bytes_expected,
952                                  num_retries = args.retries,
953                                  replication_desired = args.replication,
954                                  put_threads = args.threads,
955                                  name = collection_name,
956                                  owner_uuid = project_uuid,
957                                  ensure_unique_name = True,
958                                  update_collection = args.update_collection,
959                                  logger=logger,
960                                  dry_run=args.dry_run,
961                                  follow_links=args.follow_links)
962     except ResumeCacheConflict:
963         logger.error("\n".join([
964             "arv-put: Another process is already uploading this data.",
965             "         Use --no-cache if this is really what you want."]))
966         sys.exit(1)
967     except CollectionUpdateError as error:
968         logger.error("\n".join([
969             "arv-put: %s" % str(error)]))
970         sys.exit(1)
971     except ArvPutUploadIsPending:
972         # Dry run check successful, return proper exit code.
973         sys.exit(2)
974     except ArvPutUploadNotPending:
975         # No files pending for upload
976         sys.exit(0)
977
978     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
979     # the originals.
980     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
981                             for sigcode in CAUGHT_SIGNALS}
982
983     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
984         logger.warning("\n".join([
985             "arv-put: Resuming previous upload from last checkpoint.",
986             "         Use the --no-resume option to start over."]))
987
988     if not args.dry_run:
989         writer.report_progress()
990     output = None
991     try:
992         writer.start(save_collection=not(args.stream or args.raw))
993     except arvados.errors.ApiError as error:
994         logger.error("\n".join([
995             "arv-put: %s" % str(error)]))
996         sys.exit(1)
997     except ArvPutUploadIsPending:
998         # Dry run check successful, return proper exit code.
999         sys.exit(2)
1000     except ArvPutUploadNotPending:
1001         # No files pending for upload
1002         sys.exit(0)
1003
1004     if args.progress:  # Print newline to split stderr from stdout for humans.
1005         logger.info("\n")
1006
1007     if args.stream:
1008         if args.normalize:
1009             output = writer.manifest_text(normalize=True)
1010         else:
1011             output = writer.manifest_text()
1012     elif args.raw:
1013         output = ','.join(writer.data_locators())
1014     else:
1015         try:
1016             if args.update_collection:
1017                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1018             else:
1019                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1020             if args.portable_data_hash:
1021                 output = writer.portable_data_hash()
1022             else:
1023                 output = writer.manifest_locator()
1024         except apiclient_errors.Error as error:
1025             logger.error(
1026                 "arv-put: Error creating Collection on project: {}.".format(
1027                     error))
1028             status = 1
1029
1030     # Print the locator (uuid) of the new collection.
1031     if output is None:
1032         status = status or 1
1033     else:
1034         stdout.write(output)
1035         if not output.endswith('\n'):
1036             stdout.write('\n')
1037
1038     for sigcode, orig_handler in orig_signal_handlers.items():
1039         signal.signal(sigcode, orig_handler)
1040
1041     if status != 0:
1042         sys.exit(status)
1043
1044     # Success!
1045     return output
1046
1047
1048 if __name__ == '__main__':
1049     main()