8784: Fix test for latest firefox.
[arvados.git] / sdk / python / arvados / commands / put.py
1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import copy
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import logging
16 import os
17 import pwd
18 import re
19 import signal
20 import socket
21 import sys
22 import tempfile
23 import threading
24 import time
25 import traceback
26
27 from apiclient import errors as apiclient_errors
28 from arvados._version import __version__
29
30 import arvados.commands._util as arv_cmd
31
32 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
33 api_client = None
34
35 upload_opts = argparse.ArgumentParser(add_help=False)
36
37 upload_opts.add_argument('--version', action='version',
38                          version="%s %s" % (sys.argv[0], __version__),
39                          help='Print version and exit.')
40 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
41                          help="""
42 Local file or directory. If path is a directory reference with a trailing
43 slash, then just upload the directory's contents; otherwise upload the
44 directory itself. Default: read from standard input.
45 """)
46
47 _group = upload_opts.add_mutually_exclusive_group()
48
49 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
50                     default=-1, help=argparse.SUPPRESS)
51
52 _group.add_argument('--normalize', action='store_true',
53                     help="""
54 Normalize the manifest by re-ordering files and streams after writing
55 data.
56 """)
57
58 _group.add_argument('--dry-run', action='store_true', default=False,
59                     help="""
60 Don't actually upload files, but only check if any file should be
61 uploaded. Exit with code=2 when files are pending for upload.
62 """)
63
64 _group = upload_opts.add_mutually_exclusive_group()
65
66 _group.add_argument('--as-stream', action='store_true', dest='stream',
67                     help="""
68 Synonym for --stream.
69 """)
70
71 _group.add_argument('--stream', action='store_true',
72                     help="""
73 Store the file content and display the resulting manifest on
74 stdout. Do not write the manifest to Keep or save a Collection object
75 in Arvados.
76 """)
77
78 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
79                     help="""
80 Synonym for --manifest.
81 """)
82
83 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--manifest', action='store_true',
89                     help="""
90 Store the file data and resulting manifest in Keep, save a Collection
91 object in Arvados, and display the manifest locator (Collection uuid)
92 on stdout. This is the default behavior.
93 """)
94
95 _group.add_argument('--as-raw', action='store_true', dest='raw',
96                     help="""
97 Synonym for --raw.
98 """)
99
100 _group.add_argument('--raw', action='store_true',
101                     help="""
102 Store the file content and display the data block locators on stdout,
103 separated by commas, with a trailing newline. Do not store a
104 manifest.
105 """)
106
107 upload_opts.add_argument('--update-collection', type=str, default=None,
108                          dest='update_collection', metavar="UUID", help="""
109 Update an existing collection identified by the given Arvados collection
110 UUID. All new local files will be uploaded.
111 """)
112
113 upload_opts.add_argument('--use-filename', type=str, default=None,
114                          dest='filename', help="""
115 Synonym for --filename.
116 """)
117
118 upload_opts.add_argument('--filename', type=str, default=None,
119                          help="""
120 Use the given filename in the manifest, instead of the name of the
121 local file. This is useful when "-" or "/dev/stdin" is given as an
122 input file. It can be used only if there is exactly one path given and
123 it is not a directory. Implies --manifest.
124 """)
125
126 upload_opts.add_argument('--portable-data-hash', action='store_true',
127                          help="""
128 Print the portable data hash instead of the Arvados UUID for the collection
129 created by the upload.
130 """)
131
132 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
133                          help="""
134 Set the replication level for the new collection: how many different
135 physical storage devices (e.g., disks) should have a copy of each data
136 block. Default is to use the server-provided default (if any) or 2.
137 """)
138
139 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
140                          help="""
141 Set the number of upload threads to be used. Take into account that
142 using lots of threads will increase the RAM requirements. Default is
143 to use 2 threads.
144 On high latency installations, using a greater number will improve
145 overall throughput.
146 """)
147
148 run_opts = argparse.ArgumentParser(add_help=False)
149
150 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
151 Store the collection in the specified project, instead of your Home
152 project.
153 """)
154
155 run_opts.add_argument('--name', help="""
156 Save the collection with the specified name.
157 """)
158
159 _group = run_opts.add_mutually_exclusive_group()
160 _group.add_argument('--progress', action='store_true',
161                     help="""
162 Display human-readable progress on stderr (bytes and, if possible,
163 percentage of total data size). This is the default behavior when
164 stderr is a tty.
165 """)
166
167 _group.add_argument('--no-progress', action='store_true',
168                     help="""
169 Do not display human-readable progress on stderr, even if stderr is a
170 tty.
171 """)
172
173 _group.add_argument('--batch-progress', action='store_true',
174                     help="""
175 Display machine-readable progress on stderr (bytes and, if known,
176 total data size).
177 """)
178
179 _group = run_opts.add_mutually_exclusive_group()
180 _group.add_argument('--resume', action='store_true', default=True,
181                     help="""
182 Continue interrupted uploads from cached state (default).
183 """)
184 _group.add_argument('--no-resume', action='store_false', dest='resume',
185                     help="""
186 Do not continue interrupted uploads from cached state.
187 """)
188
189 _group = run_opts.add_mutually_exclusive_group()
190 _group.add_argument('--follow-links', action='store_true', default=True,
191                     dest='follow_links', help="""
192 Follow file and directory symlinks (default).
193 """)
194 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
195                     help="""
196 Do not follow file and directory symlinks.
197 """)
198
199 _group = run_opts.add_mutually_exclusive_group()
200 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
201                     help="""
202 Save upload state in a cache file for resuming (default).
203 """)
204 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
205                     help="""
206 Do not save upload state in a cache file for resuming.
207 """)
208
209 arg_parser = argparse.ArgumentParser(
210     description='Copy data from the local filesystem to Keep.',
211     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
212
213 def parse_arguments(arguments):
214     args = arg_parser.parse_args(arguments)
215
216     if len(args.paths) == 0:
217         args.paths = ['-']
218
219     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
220
221     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
222         if args.filename:
223             arg_parser.error("""
224     --filename argument cannot be used when storing a directory or
225     multiple files.
226     """)
227
228     # Turn on --progress by default if stderr is a tty.
229     if (not (args.batch_progress or args.no_progress)
230         and os.isatty(sys.stderr.fileno())):
231         args.progress = True
232
233     # Turn off --resume (default) if --no-cache is used.
234     if not args.use_cache:
235         args.resume = False
236
237     if args.paths == ['-']:
238         if args.update_collection:
239             arg_parser.error("""
240     --update-collection cannot be used when reading from stdin.
241     """)
242         args.resume = False
243         args.use_cache = False
244         if not args.filename:
245             args.filename = 'stdin'
246
247     return args
248
249
250 class PathDoesNotExistError(Exception):
251     pass
252
253
254 class CollectionUpdateError(Exception):
255     pass
256
257
258 class ResumeCacheConflict(Exception):
259     pass
260
261
262 class ArvPutArgumentConflict(Exception):
263     pass
264
265
266 class ArvPutUploadIsPending(Exception):
267     pass
268
269
270 class ArvPutUploadNotPending(Exception):
271     pass
272
273
274 class FileUploadList(list):
275     def __init__(self, dry_run=False):
276         list.__init__(self)
277         self.dry_run = dry_run
278
279     def append(self, other):
280         if self.dry_run:
281             raise ArvPutUploadIsPending()
282         super(FileUploadList, self).append(other)
283
284
285 class ResumeCache(object):
286     CACHE_DIR = '.cache/arvados/arv-put'
287
288     def __init__(self, file_spec):
289         self.cache_file = open(file_spec, 'a+')
290         self._lock_file(self.cache_file)
291         self.filename = self.cache_file.name
292
293     @classmethod
294     def make_path(cls, args):
295         md5 = hashlib.md5()
296         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
297         realpaths = sorted(os.path.realpath(path) for path in args.paths)
298         md5.update(b'\0'.join([p.encode() for p in realpaths]))
299         if any(os.path.isdir(path) for path in realpaths):
300             md5.update(b'-1')
301         elif args.filename:
302             md5.update(args.filename.encode())
303         return os.path.join(
304             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
305             md5.hexdigest())
306
307     def _lock_file(self, fileobj):
308         try:
309             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
310         except IOError:
311             raise ResumeCacheConflict("{} locked".format(fileobj.name))
312
313     def load(self):
314         self.cache_file.seek(0)
315         return json.load(self.cache_file)
316
317     def check_cache(self, api_client=None, num_retries=0):
318         try:
319             state = self.load()
320             locator = None
321             try:
322                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
323                     locator = state["_finished_streams"][0][1][0]
324                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
325                     locator = state["_current_stream_locators"][0]
326                 if locator is not None:
327                     kc = arvados.keep.KeepClient(api_client=api_client)
328                     kc.head(locator, num_retries=num_retries)
329             except Exception as e:
330                 self.restart()
331         except (ValueError):
332             pass
333
334     def save(self, data):
335         try:
336             new_cache_fd, new_cache_name = tempfile.mkstemp(
337                 dir=os.path.dirname(self.filename))
338             self._lock_file(new_cache_fd)
339             new_cache = os.fdopen(new_cache_fd, 'r+')
340             json.dump(data, new_cache)
341             os.rename(new_cache_name, self.filename)
342         except (IOError, OSError, ResumeCacheConflict) as error:
343             try:
344                 os.unlink(new_cache_name)
345             except NameError:  # mkstemp failed.
346                 pass
347         else:
348             self.cache_file.close()
349             self.cache_file = new_cache
350
351     def close(self):
352         self.cache_file.close()
353
354     def destroy(self):
355         try:
356             os.unlink(self.filename)
357         except OSError as error:
358             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
359                 raise
360         self.close()
361
362     def restart(self):
363         self.destroy()
364         self.__init__(self.filename)
365
366
367 class ArvPutUploadJob(object):
368     CACHE_DIR = '.cache/arvados/arv-put'
369     EMPTY_STATE = {
370         'manifest' : None, # Last saved manifest checkpoint
371         'files' : {} # Previous run file list: {path : {size, mtime}}
372     }
373
374     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
375                  bytes_expected=None, name=None, owner_uuid=None,
376                  ensure_unique_name=False, num_retries=None,
377                  put_threads=None, replication_desired=None,
378                  filename=None, update_time=60.0, update_collection=None,
379                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
380                  follow_links=True):
381         self.paths = paths
382         self.resume = resume
383         self.use_cache = use_cache
384         self.update = False
385         self.reporter = reporter
386         self.bytes_expected = bytes_expected
387         self.bytes_written = 0
388         self.bytes_skipped = 0
389         self.name = name
390         self.owner_uuid = owner_uuid
391         self.ensure_unique_name = ensure_unique_name
392         self.num_retries = num_retries
393         self.replication_desired = replication_desired
394         self.put_threads = put_threads
395         self.filename = filename
396         self._state_lock = threading.Lock()
397         self._state = None # Previous run state (file list & manifest)
398         self._current_files = [] # Current run file list
399         self._cache_file = None
400         self._collection_lock = threading.Lock()
401         self._remote_collection = None # Collection being updated (if asked)
402         self._local_collection = None # Collection from previous run manifest
403         self._file_paths = set() # Files to be updated in remote collection
404         self._stop_checkpointer = threading.Event()
405         self._checkpointer = threading.Thread(target=self._update_task)
406         self._checkpointer.daemon = True
407         self._update_task_time = update_time  # How many seconds wait between update runs
408         self._files_to_upload = FileUploadList(dry_run=dry_run)
409         self._upload_started = False
410         self.logger = logger
411         self.dry_run = dry_run
412         self._checkpoint_before_quit = True
413         self.follow_links = follow_links
414
415         if not self.use_cache and self.resume:
416             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
417
418         # Check for obvious dry-run responses
419         if self.dry_run and (not self.use_cache or not self.resume):
420             raise ArvPutUploadIsPending()
421
422         # Load cached data if any and if needed
423         self._setup_state(update_collection)
424
425     def start(self, save_collection):
426         """
427         Start supporting thread & file uploading
428         """
429         if not self.dry_run:
430             self._checkpointer.start()
431         try:
432             for path in self.paths:
433                 # Test for stdin first, in case some file named '-' exist
434                 if path == '-':
435                     if self.dry_run:
436                         raise ArvPutUploadIsPending()
437                     self._write_stdin(self.filename or 'stdin')
438                 elif not os.path.exists(path):
439                      raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
440                 elif os.path.isdir(path):
441                     # Use absolute paths on cache index so CWD doesn't interfere
442                     # with the caching logic.
443                     orig_path = path
444                     path = os.path.abspath(path)
445                     if orig_path[-1:] == os.sep:
446                         # When passing a directory reference with a trailing slash,
447                         # its contents should be uploaded directly to the collection's root.
448                         prefixdir = path
449                     else:
450                         # When passing a directory reference with no trailing slash,
451                         # upload the directory to the collection's root.
452                         prefixdir = os.path.dirname(path)
453                     prefixdir += os.sep
454                     for root, dirs, files in os.walk(path, followlinks=self.follow_links):
455                         # Make os.walk()'s dir traversing order deterministic
456                         dirs.sort()
457                         files.sort()
458                         for f in files:
459                             self._check_file(os.path.join(root, f),
460                                              os.path.join(root[len(prefixdir):], f))
461                 else:
462                     self._check_file(os.path.abspath(path),
463                                      self.filename or os.path.basename(path))
464             # If dry-mode is on, and got up to this point, then we should notify that
465             # there aren't any file to upload.
466             if self.dry_run:
467                 raise ArvPutUploadNotPending()
468             # Remove local_collection's files that don't exist locally anymore, so the
469             # bytes_written count is correct.
470             for f in self.collection_file_paths(self._local_collection,
471                                                 path_prefix=""):
472                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
473                     self._local_collection.remove(f)
474             # Update bytes_written from current local collection and
475             # report initial progress.
476             self._update()
477             # Actual file upload
478             self._upload_started = True # Used by the update thread to start checkpointing
479             self._upload_files()
480         except (SystemExit, Exception) as e:
481             self._checkpoint_before_quit = False
482             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
483             # Note: We're expecting SystemExit instead of
484             # KeyboardInterrupt because we have a custom signal
485             # handler in place that raises SystemExit with the catched
486             # signal's code.
487             if isinstance(e, PathDoesNotExistError):
488                 # We aren't interested in the traceback for this case
489                 pass
490             elif not isinstance(e, SystemExit) or e.code != -2:
491                 self.logger.warning("Abnormal termination:\n{}".format(
492                     traceback.format_exc()))
493             raise
494         finally:
495             if not self.dry_run:
496                 # Stop the thread before doing anything else
497                 self._stop_checkpointer.set()
498                 self._checkpointer.join()
499                 if self._checkpoint_before_quit:
500                     # Commit all pending blocks & one last _update()
501                     self._local_collection.manifest_text()
502                     self._update(final=True)
503                     if save_collection:
504                         self.save_collection()
505             if self.use_cache:
506                 self._cache_file.close()
507
508     def save_collection(self):
509         if self.update:
510             # Check if files should be updated on the remote collection.
511             for fp in self._file_paths:
512                 remote_file = self._remote_collection.find(fp)
513                 if not remote_file:
514                     # File don't exist on remote collection, copy it.
515                     self._remote_collection.copy(fp, fp, self._local_collection)
516                 elif remote_file != self._local_collection.find(fp):
517                     # A different file exist on remote collection, overwrite it.
518                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
519                 else:
520                     # The file already exist on remote collection, skip it.
521                     pass
522             self._remote_collection.save(num_retries=self.num_retries)
523         else:
524             self._local_collection.save_new(
525                 name=self.name, owner_uuid=self.owner_uuid,
526                 ensure_unique_name=self.ensure_unique_name,
527                 num_retries=self.num_retries)
528
529     def destroy_cache(self):
530         if self.use_cache:
531             try:
532                 os.unlink(self._cache_filename)
533             except OSError as error:
534                 # That's what we wanted anyway.
535                 if error.errno != errno.ENOENT:
536                     raise
537             self._cache_file.close()
538
539     def _collection_size(self, collection):
540         """
541         Recursively get the total size of the collection
542         """
543         size = 0
544         for item in listvalues(collection):
545             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
546                 size += self._collection_size(item)
547             else:
548                 size += item.size()
549         return size
550
551     def _update_task(self):
552         """
553         Periodically called support task. File uploading is
554         asynchronous so we poll status from the collection.
555         """
556         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
557             self._update()
558
559     def _update(self, final=False):
560         """
561         Update cached manifest text and report progress.
562         """
563         if self._upload_started:
564             with self._collection_lock:
565                 self.bytes_written = self._collection_size(self._local_collection)
566                 if self.use_cache:
567                     if final:
568                         manifest = self._local_collection.manifest_text()
569                     else:
570                         # Get the manifest text without comitting pending blocks
571                         manifest = self._local_collection.manifest_text(strip=False,
572                                                                         normalize=False,
573                                                                         only_committed=True)
574                     # Update cache
575                     with self._state_lock:
576                         self._state['manifest'] = manifest
577             if self.use_cache:
578                 try:
579                     self._save_state()
580                 except Exception as e:
581                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
582         else:
583             self.bytes_written = self.bytes_skipped
584         # Call the reporter, if any
585         self.report_progress()
586
587     def report_progress(self):
588         if self.reporter is not None:
589             self.reporter(self.bytes_written, self.bytes_expected)
590
591     def _write_stdin(self, filename):
592         output = self._local_collection.open(filename, 'wb')
593         self._write(sys.stdin, output)
594         output.close()
595
596     def _check_file(self, source, filename):
597         """
598         Check if this file needs to be uploaded
599         """
600         # Ignore symlinks when requested
601         if (not self.follow_links) and os.path.islink(source):
602             return
603         resume_offset = 0
604         should_upload = False
605         new_file_in_cache = False
606         # Record file path for updating the remote collection before exiting
607         self._file_paths.add(filename)
608
609         with self._state_lock:
610             # If no previous cached data on this file, store it for an eventual
611             # repeated run.
612             if source not in self._state['files']:
613                 self._state['files'][source] = {
614                     'mtime': os.path.getmtime(source),
615                     'size' : os.path.getsize(source)
616                 }
617                 new_file_in_cache = True
618             cached_file_data = self._state['files'][source]
619
620         # Check if file was already uploaded (at least partially)
621         file_in_local_collection = self._local_collection.find(filename)
622
623         # If not resuming, upload the full file.
624         if not self.resume:
625             should_upload = True
626         # New file detected from last run, upload it.
627         elif new_file_in_cache:
628             should_upload = True
629         # Local file didn't change from last run.
630         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
631             if not file_in_local_collection:
632                 # File not uploaded yet, upload it completely
633                 should_upload = True
634             elif file_in_local_collection.permission_expired():
635                 # Permission token expired, re-upload file. This will change whenever
636                 # we have a API for refreshing tokens.
637                 should_upload = True
638                 self._local_collection.remove(filename)
639             elif cached_file_data['size'] == file_in_local_collection.size():
640                 # File already there, skip it.
641                 self.bytes_skipped += cached_file_data['size']
642             elif cached_file_data['size'] > file_in_local_collection.size():
643                 # File partially uploaded, resume!
644                 resume_offset = file_in_local_collection.size()
645                 self.bytes_skipped += resume_offset
646                 should_upload = True
647             else:
648                 # Inconsistent cache, re-upload the file
649                 should_upload = True
650                 self._local_collection.remove(filename)
651                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
652         # Local file differs from cached data, re-upload it.
653         else:
654             if file_in_local_collection:
655                 self._local_collection.remove(filename)
656             should_upload = True
657
658         if should_upload:
659             self._files_to_upload.append((source, resume_offset, filename))
660
661     def _upload_files(self):
662         for source, resume_offset, filename in self._files_to_upload:
663             with open(source, 'rb') as source_fd:
664                 with self._state_lock:
665                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
666                     self._state['files'][source]['size'] = os.path.getsize(source)
667                 if resume_offset > 0:
668                     # Start upload where we left off
669                     output = self._local_collection.open(filename, 'ab')
670                     source_fd.seek(resume_offset)
671                 else:
672                     # Start from scratch
673                     output = self._local_collection.open(filename, 'wb')
674                 self._write(source_fd, output)
675                 output.close(flush=False)
676
677     def _write(self, source_fd, output):
678         while True:
679             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
680             if not data:
681                 break
682             output.write(data)
683
684     def _my_collection(self):
685         return self._remote_collection if self.update else self._local_collection
686
687     def _setup_state(self, update_collection):
688         """
689         Create a new cache file or load a previously existing one.
690         """
691         # Load an already existing collection for update
692         if update_collection and re.match(arvados.util.collection_uuid_pattern,
693                                           update_collection):
694             try:
695                 self._remote_collection = arvados.collection.Collection(update_collection)
696             except arvados.errors.ApiError as error:
697                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
698             else:
699                 self.update = True
700         elif update_collection:
701             # Collection locator provided, but unknown format
702             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
703
704         if self.use_cache:
705             # Set up cache file name from input paths.
706             md5 = hashlib.md5()
707             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
708             realpaths = sorted(os.path.realpath(path) for path in self.paths)
709             md5.update(b'\0'.join([p.encode() for p in realpaths]))
710             if self.filename:
711                 md5.update(self.filename.encode())
712             cache_filename = md5.hexdigest()
713             cache_filepath = os.path.join(
714                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
715                 cache_filename)
716             if self.resume and os.path.exists(cache_filepath):
717                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
718                 self._cache_file = open(cache_filepath, 'a+')
719             else:
720                 # --no-resume means start with a empty cache file.
721                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
722                 self._cache_file = open(cache_filepath, 'w+')
723             self._cache_filename = self._cache_file.name
724             self._lock_file(self._cache_file)
725             self._cache_file.seek(0)
726
727         with self._state_lock:
728             if self.use_cache:
729                 try:
730                     self._state = json.load(self._cache_file)
731                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
732                         # Cache at least partially incomplete, set up new cache
733                         self._state = copy.deepcopy(self.EMPTY_STATE)
734                 except ValueError:
735                     # Cache file empty, set up new cache
736                     self._state = copy.deepcopy(self.EMPTY_STATE)
737             else:
738                 self.logger.info("No cache usage requested for this run.")
739                 # No cache file, set empty state
740                 self._state = copy.deepcopy(self.EMPTY_STATE)
741             # Load the previous manifest so we can check if files were modified remotely.
742             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
743
744     def collection_file_paths(self, col, path_prefix='.'):
745         """Return a list of file paths by recursively go through the entire collection `col`"""
746         file_paths = []
747         for name, item in listitems(col):
748             if isinstance(item, arvados.arvfile.ArvadosFile):
749                 file_paths.append(os.path.join(path_prefix, name))
750             elif isinstance(item, arvados.collection.Subcollection):
751                 new_prefix = os.path.join(path_prefix, name)
752                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
753         return file_paths
754
755     def _lock_file(self, fileobj):
756         try:
757             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
758         except IOError:
759             raise ResumeCacheConflict("{} locked".format(fileobj.name))
760
761     def _save_state(self):
762         """
763         Atomically save current state into cache.
764         """
765         with self._state_lock:
766             # We're not using copy.deepcopy() here because it's a lot slower
767             # than json.dumps(), and we're already needing JSON format to be
768             # saved on disk.
769             state = json.dumps(self._state)
770         try:
771             new_cache = tempfile.NamedTemporaryFile(
772                 mode='w+',
773                 dir=os.path.dirname(self._cache_filename), delete=False)
774             self._lock_file(new_cache)
775             new_cache.write(state)
776             new_cache.flush()
777             os.fsync(new_cache)
778             os.rename(new_cache.name, self._cache_filename)
779         except (IOError, OSError, ResumeCacheConflict) as error:
780             self.logger.error("There was a problem while saving the cache file: {}".format(error))
781             try:
782                 os.unlink(new_cache_name)
783             except NameError:  # mkstemp failed.
784                 pass
785         else:
786             self._cache_file.close()
787             self._cache_file = new_cache
788
789     def collection_name(self):
790         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
791
792     def manifest_locator(self):
793         return self._my_collection().manifest_locator()
794
795     def portable_data_hash(self):
796         pdh = self._my_collection().portable_data_hash()
797         m = self._my_collection().stripped_manifest().encode()
798         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
799         if pdh != local_pdh:
800             logger.warning("\n".join([
801                 "arv-put: API server provided PDH differs from local manifest.",
802                 "         This should not happen; showing API server version."]))
803         return pdh
804
805     def manifest_text(self, stream_name=".", strip=False, normalize=False):
806         return self._my_collection().manifest_text(stream_name, strip, normalize)
807
808     def _datablocks_on_item(self, item):
809         """
810         Return a list of datablock locators, recursively navigating
811         through subcollections
812         """
813         if isinstance(item, arvados.arvfile.ArvadosFile):
814             if item.size() == 0:
815                 # Empty file locator
816                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
817             else:
818                 locators = []
819                 for segment in item.segments():
820                     loc = segment.locator
821                     locators.append(loc)
822                 return locators
823         elif isinstance(item, arvados.collection.Collection):
824             l = [self._datablocks_on_item(x) for x in listvalues(item)]
825             # Fast list flattener method taken from:
826             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
827             return [loc for sublist in l for loc in sublist]
828         else:
829             return None
830
831     def data_locators(self):
832         with self._collection_lock:
833             # Make sure all datablocks are flushed before getting the locators
834             self._my_collection().manifest_text()
835             datablocks = self._datablocks_on_item(self._my_collection())
836         return datablocks
837
838
839 def expected_bytes_for(pathlist, follow_links=True):
840     # Walk the given directory trees and stat files, adding up file sizes,
841     # so we can display progress as percent
842     bytesum = 0
843     for path in pathlist:
844         if os.path.isdir(path):
845             for root, dirs, files in os.walk(path, followlinks=follow_links):
846                 # Sum file sizes
847                 for f in files:
848                     filepath = os.path.join(root, f)
849                     # Ignore symlinked files when requested
850                     if (not follow_links) and os.path.islink(filepath):
851                         continue
852                     bytesum += os.path.getsize(filepath)
853         elif not os.path.isfile(path):
854             return None
855         else:
856             bytesum += os.path.getsize(path)
857     return bytesum
858
859 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
860                                                             os.getpid())
861 def machine_progress(bytes_written, bytes_expected):
862     return _machine_format.format(
863         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
864
865 def human_progress(bytes_written, bytes_expected):
866     if bytes_expected:
867         return "\r{}M / {}M {:.1%} ".format(
868             bytes_written >> 20, bytes_expected >> 20,
869             float(bytes_written) / bytes_expected)
870     else:
871         return "\r{} ".format(bytes_written)
872
873 def progress_writer(progress_func, outfile=sys.stderr):
874     def write_progress(bytes_written, bytes_expected):
875         outfile.write(progress_func(bytes_written, bytes_expected))
876     return write_progress
877
878 def exit_signal_handler(sigcode, frame):
879     sys.exit(-sigcode)
880
881 def desired_project_uuid(api_client, project_uuid, num_retries):
882     if not project_uuid:
883         query = api_client.users().current()
884     elif arvados.util.user_uuid_pattern.match(project_uuid):
885         query = api_client.users().get(uuid=project_uuid)
886     elif arvados.util.group_uuid_pattern.match(project_uuid):
887         query = api_client.groups().get(uuid=project_uuid)
888     else:
889         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
890     return query.execute(num_retries=num_retries)['uuid']
891
892 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
893     global api_client
894
895     logger = logging.getLogger('arvados.arv_put')
896     logger.setLevel(logging.INFO)
897     args = parse_arguments(arguments)
898     status = 0
899     if api_client is None:
900         api_client = arvados.api('v1')
901
902     # Determine the name to use
903     if args.name:
904         if args.stream or args.raw:
905             logger.error("Cannot use --name with --stream or --raw")
906             sys.exit(1)
907         elif args.update_collection:
908             logger.error("Cannot use --name with --update-collection")
909             sys.exit(1)
910         collection_name = args.name
911     else:
912         collection_name = "Saved at {} by {}@{}".format(
913             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
914             pwd.getpwuid(os.getuid()).pw_name,
915             socket.gethostname())
916
917     if args.project_uuid and (args.stream or args.raw):
918         logger.error("Cannot use --project-uuid with --stream or --raw")
919         sys.exit(1)
920
921     # Determine the parent project
922     try:
923         project_uuid = desired_project_uuid(api_client, args.project_uuid,
924                                             args.retries)
925     except (apiclient_errors.Error, ValueError) as error:
926         logger.error(error)
927         sys.exit(1)
928
929     if args.progress:
930         reporter = progress_writer(human_progress)
931     elif args.batch_progress:
932         reporter = progress_writer(machine_progress)
933     else:
934         reporter = None
935
936     # If this is used by a human, and there's at least one directory to be
937     # uploaded, the expected bytes calculation can take a moment.
938     if args.progress and any([os.path.isdir(f) for f in args.paths]):
939         logger.info("Calculating upload size, this could take some time...")
940     bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
941
942     try:
943         writer = ArvPutUploadJob(paths = args.paths,
944                                  resume = args.resume,
945                                  use_cache = args.use_cache,
946                                  filename = args.filename,
947                                  reporter = reporter,
948                                  bytes_expected = bytes_expected,
949                                  num_retries = args.retries,
950                                  replication_desired = args.replication,
951                                  put_threads = args.threads,
952                                  name = collection_name,
953                                  owner_uuid = project_uuid,
954                                  ensure_unique_name = True,
955                                  update_collection = args.update_collection,
956                                  logger=logger,
957                                  dry_run=args.dry_run,
958                                  follow_links=args.follow_links)
959     except ResumeCacheConflict:
960         logger.error("\n".join([
961             "arv-put: Another process is already uploading this data.",
962             "         Use --no-cache if this is really what you want."]))
963         sys.exit(1)
964     except CollectionUpdateError as error:
965         logger.error("\n".join([
966             "arv-put: %s" % str(error)]))
967         sys.exit(1)
968     except ArvPutUploadIsPending:
969         # Dry run check successful, return proper exit code.
970         sys.exit(2)
971     except ArvPutUploadNotPending:
972         # No files pending for upload
973         sys.exit(0)
974
975     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
976     # the originals.
977     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
978                             for sigcode in CAUGHT_SIGNALS}
979
980     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
981         logger.warning("\n".join([
982             "arv-put: Resuming previous upload from last checkpoint.",
983             "         Use the --no-resume option to start over."]))
984
985     if not args.dry_run:
986         writer.report_progress()
987     output = None
988     try:
989         writer.start(save_collection=not(args.stream or args.raw))
990     except arvados.errors.ApiError as error:
991         logger.error("\n".join([
992             "arv-put: %s" % str(error)]))
993         sys.exit(1)
994     except ArvPutUploadIsPending:
995         # Dry run check successful, return proper exit code.
996         sys.exit(2)
997     except ArvPutUploadNotPending:
998         # No files pending for upload
999         sys.exit(0)
1000     except PathDoesNotExistError as error:
1001         logger.error("\n".join([
1002             "arv-put: %s" % str(error)]))
1003         sys.exit(1)
1004
1005     if args.progress:  # Print newline to split stderr from stdout for humans.
1006         logger.info("\n")
1007
1008     if args.stream:
1009         if args.normalize:
1010             output = writer.manifest_text(normalize=True)
1011         else:
1012             output = writer.manifest_text()
1013     elif args.raw:
1014         output = ','.join(writer.data_locators())
1015     else:
1016         try:
1017             if args.update_collection:
1018                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1019             else:
1020                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1021             if args.portable_data_hash:
1022                 output = writer.portable_data_hash()
1023             else:
1024                 output = writer.manifest_locator()
1025         except apiclient_errors.Error as error:
1026             logger.error(
1027                 "arv-put: Error creating Collection on project: {}.".format(
1028                     error))
1029             status = 1
1030
1031     # Print the locator (uuid) of the new collection.
1032     if output is None:
1033         status = status or 1
1034     else:
1035         stdout.write(output)
1036         if not output.endswith('\n'):
1037             stdout.write('\n')
1038
1039     for sigcode, orig_handler in listitems(orig_signal_handlers):
1040         signal.signal(sigcode, orig_handler)
1041
1042     if status != 0:
1043         sys.exit(status)
1044
1045     # Success!
1046     return output
1047
1048
1049 if __name__ == '__main__':
1050     main()