10584: When evicting a Collection from the cache, ensure that get/put worker
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import hashlib
18 import json
19 import logging
20 import os
21 import pwd
22 import re
23 import signal
24 import socket
25 import sys
26 import tempfile
27 import threading
28 import time
29 import traceback
30
31 from apiclient import errors as apiclient_errors
32 from arvados._version import __version__
33
34 import arvados.commands._util as arv_cmd
35
36 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
37 api_client = None
38
39 upload_opts = argparse.ArgumentParser(add_help=False)
40
41 upload_opts.add_argument('--version', action='version',
42                          version="%s %s" % (sys.argv[0], __version__),
43                          help='Print version and exit.')
44 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
45                          help="""
46 Local file or directory. If path is a directory reference with a trailing
47 slash, then just upload the directory's contents; otherwise upload the
48 directory itself. Default: read from standard input.
49 """)
50
51 _group = upload_opts.add_mutually_exclusive_group()
52
53 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
54                     default=-1, help=argparse.SUPPRESS)
55
56 _group.add_argument('--normalize', action='store_true',
57                     help="""
58 Normalize the manifest by re-ordering files and streams after writing
59 data.
60 """)
61
62 _group.add_argument('--dry-run', action='store_true', default=False,
63                     help="""
64 Don't actually upload files, but only check if any file should be
65 uploaded. Exit with code=2 when files are pending for upload.
66 """)
67
68 _group = upload_opts.add_mutually_exclusive_group()
69
70 _group.add_argument('--as-stream', action='store_true', dest='stream',
71                     help="""
72 Synonym for --stream.
73 """)
74
75 _group.add_argument('--stream', action='store_true',
76                     help="""
77 Store the file content and display the resulting manifest on
78 stdout. Do not write the manifest to Keep or save a Collection object
79 in Arvados.
80 """)
81
82 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
83                     help="""
84 Synonym for --manifest.
85 """)
86
87 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
88                     help="""
89 Synonym for --manifest.
90 """)
91
92 _group.add_argument('--manifest', action='store_true',
93                     help="""
94 Store the file data and resulting manifest in Keep, save a Collection
95 object in Arvados, and display the manifest locator (Collection uuid)
96 on stdout. This is the default behavior.
97 """)
98
99 _group.add_argument('--as-raw', action='store_true', dest='raw',
100                     help="""
101 Synonym for --raw.
102 """)
103
104 _group.add_argument('--raw', action='store_true',
105                     help="""
106 Store the file content and display the data block locators on stdout,
107 separated by commas, with a trailing newline. Do not store a
108 manifest.
109 """)
110
111 upload_opts.add_argument('--update-collection', type=str, default=None,
112                          dest='update_collection', metavar="UUID", help="""
113 Update an existing collection identified by the given Arvados collection
114 UUID. All new local files will be uploaded.
115 """)
116
117 upload_opts.add_argument('--use-filename', type=str, default=None,
118                          dest='filename', help="""
119 Synonym for --filename.
120 """)
121
122 upload_opts.add_argument('--filename', type=str, default=None,
123                          help="""
124 Use the given filename in the manifest, instead of the name of the
125 local file. This is useful when "-" or "/dev/stdin" is given as an
126 input file. It can be used only if there is exactly one path given and
127 it is not a directory. Implies --manifest.
128 """)
129
130 upload_opts.add_argument('--portable-data-hash', action='store_true',
131                          help="""
132 Print the portable data hash instead of the Arvados UUID for the collection
133 created by the upload.
134 """)
135
136 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
137                          help="""
138 Set the replication level for the new collection: how many different
139 physical storage devices (e.g., disks) should have a copy of each data
140 block. Default is to use the server-provided default (if any) or 2.
141 """)
142
143 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
144                          help="""
145 Set the number of upload threads to be used. Take into account that
146 using lots of threads will increase the RAM requirements. Default is
147 to use 2 threads.
148 On high latency installations, using a greater number will improve
149 overall throughput.
150 """)
151
152 run_opts = argparse.ArgumentParser(add_help=False)
153
154 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
155 Store the collection in the specified project, instead of your Home
156 project.
157 """)
158
159 run_opts.add_argument('--name', help="""
160 Save the collection with the specified name.
161 """)
162
163 _group = run_opts.add_mutually_exclusive_group()
164 _group.add_argument('--progress', action='store_true',
165                     help="""
166 Display human-readable progress on stderr (bytes and, if possible,
167 percentage of total data size). This is the default behavior when
168 stderr is a tty.
169 """)
170
171 _group.add_argument('--no-progress', action='store_true',
172                     help="""
173 Do not display human-readable progress on stderr, even if stderr is a
174 tty.
175 """)
176
177 _group.add_argument('--batch-progress', action='store_true',
178                     help="""
179 Display machine-readable progress on stderr (bytes and, if known,
180 total data size).
181 """)
182
183 _group = run_opts.add_mutually_exclusive_group()
184 _group.add_argument('--resume', action='store_true', default=True,
185                     help="""
186 Continue interrupted uploads from cached state (default).
187 """)
188 _group.add_argument('--no-resume', action='store_false', dest='resume',
189                     help="""
190 Do not continue interrupted uploads from cached state.
191 """)
192
193 _group = run_opts.add_mutually_exclusive_group()
194 _group.add_argument('--follow-links', action='store_true', default=True,
195                     dest='follow_links', help="""
196 Follow file and directory symlinks (default).
197 """)
198 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
199                     help="""
200 Do not follow file and directory symlinks.
201 """)
202
203 _group = run_opts.add_mutually_exclusive_group()
204 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
205                     help="""
206 Save upload state in a cache file for resuming (default).
207 """)
208 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
209                     help="""
210 Do not save upload state in a cache file for resuming.
211 """)
212
213 arg_parser = argparse.ArgumentParser(
214     description='Copy data from the local filesystem to Keep.',
215     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
216
217 def parse_arguments(arguments):
218     args = arg_parser.parse_args(arguments)
219
220     if len(args.paths) == 0:
221         args.paths = ['-']
222
223     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
224
225     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
226         if args.filename:
227             arg_parser.error("""
228     --filename argument cannot be used when storing a directory or
229     multiple files.
230     """)
231
232     # Turn on --progress by default if stderr is a tty.
233     if (not (args.batch_progress or args.no_progress)
234         and os.isatty(sys.stderr.fileno())):
235         args.progress = True
236
237     # Turn off --resume (default) if --no-cache is used.
238     if not args.use_cache:
239         args.resume = False
240
241     if args.paths == ['-']:
242         if args.update_collection:
243             arg_parser.error("""
244     --update-collection cannot be used when reading from stdin.
245     """)
246         args.resume = False
247         args.use_cache = False
248         if not args.filename:
249             args.filename = 'stdin'
250
251     return args
252
253
254 class PathDoesNotExistError(Exception):
255     pass
256
257
258 class CollectionUpdateError(Exception):
259     pass
260
261
262 class ResumeCacheConflict(Exception):
263     pass
264
265
266 class ArvPutArgumentConflict(Exception):
267     pass
268
269
270 class ArvPutUploadIsPending(Exception):
271     pass
272
273
274 class ArvPutUploadNotPending(Exception):
275     pass
276
277
278 class FileUploadList(list):
279     def __init__(self, dry_run=False):
280         list.__init__(self)
281         self.dry_run = dry_run
282
283     def append(self, other):
284         if self.dry_run:
285             raise ArvPutUploadIsPending()
286         super(FileUploadList, self).append(other)
287
288
289 class ResumeCache(object):
290     CACHE_DIR = '.cache/arvados/arv-put'
291
292     def __init__(self, file_spec):
293         self.cache_file = open(file_spec, 'a+')
294         self._lock_file(self.cache_file)
295         self.filename = self.cache_file.name
296
297     @classmethod
298     def make_path(cls, args):
299         md5 = hashlib.md5()
300         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
301         realpaths = sorted(os.path.realpath(path) for path in args.paths)
302         md5.update(b'\0'.join([p.encode() for p in realpaths]))
303         if any(os.path.isdir(path) for path in realpaths):
304             md5.update(b'-1')
305         elif args.filename:
306             md5.update(args.filename.encode())
307         return os.path.join(
308             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
309             md5.hexdigest())
310
311     def _lock_file(self, fileobj):
312         try:
313             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
314         except IOError:
315             raise ResumeCacheConflict("{} locked".format(fileobj.name))
316
317     def load(self):
318         self.cache_file.seek(0)
319         return json.load(self.cache_file)
320
321     def check_cache(self, api_client=None, num_retries=0):
322         try:
323             state = self.load()
324             locator = None
325             try:
326                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
327                     locator = state["_finished_streams"][0][1][0]
328                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
329                     locator = state["_current_stream_locators"][0]
330                 if locator is not None:
331                     kc = arvados.keep.KeepClient(api_client=api_client)
332                     kc.head(locator, num_retries=num_retries)
333             except Exception as e:
334                 self.restart()
335         except (ValueError):
336             pass
337
338     def save(self, data):
339         try:
340             new_cache_fd, new_cache_name = tempfile.mkstemp(
341                 dir=os.path.dirname(self.filename))
342             self._lock_file(new_cache_fd)
343             new_cache = os.fdopen(new_cache_fd, 'r+')
344             json.dump(data, new_cache)
345             os.rename(new_cache_name, self.filename)
346         except (IOError, OSError, ResumeCacheConflict) as error:
347             try:
348                 os.unlink(new_cache_name)
349             except NameError:  # mkstemp failed.
350                 pass
351         else:
352             self.cache_file.close()
353             self.cache_file = new_cache
354
355     def close(self):
356         self.cache_file.close()
357
358     def destroy(self):
359         try:
360             os.unlink(self.filename)
361         except OSError as error:
362             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
363                 raise
364         self.close()
365
366     def restart(self):
367         self.destroy()
368         self.__init__(self.filename)
369
370
371 class ArvPutUploadJob(object):
372     CACHE_DIR = '.cache/arvados/arv-put'
373     EMPTY_STATE = {
374         'manifest' : None, # Last saved manifest checkpoint
375         'files' : {} # Previous run file list: {path : {size, mtime}}
376     }
377
378     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
379                  bytes_expected=None, name=None, owner_uuid=None,
380                  ensure_unique_name=False, num_retries=None,
381                  put_threads=None, replication_desired=None,
382                  filename=None, update_time=60.0, update_collection=None,
383                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
384                  follow_links=True):
385         self.paths = paths
386         self.resume = resume
387         self.use_cache = use_cache
388         self.update = False
389         self.reporter = reporter
390         self.bytes_expected = bytes_expected
391         self.bytes_written = 0
392         self.bytes_skipped = 0
393         self.name = name
394         self.owner_uuid = owner_uuid
395         self.ensure_unique_name = ensure_unique_name
396         self.num_retries = num_retries
397         self.replication_desired = replication_desired
398         self.put_threads = put_threads
399         self.filename = filename
400         self._state_lock = threading.Lock()
401         self._state = None # Previous run state (file list & manifest)
402         self._current_files = [] # Current run file list
403         self._cache_file = None
404         self._collection_lock = threading.Lock()
405         self._remote_collection = None # Collection being updated (if asked)
406         self._local_collection = None # Collection from previous run manifest
407         self._file_paths = set() # Files to be updated in remote collection
408         self._stop_checkpointer = threading.Event()
409         self._checkpointer = threading.Thread(target=self._update_task)
410         self._checkpointer.daemon = True
411         self._update_task_time = update_time  # How many seconds wait between update runs
412         self._files_to_upload = FileUploadList(dry_run=dry_run)
413         self._upload_started = False
414         self.logger = logger
415         self.dry_run = dry_run
416         self._checkpoint_before_quit = True
417         self.follow_links = follow_links
418
419         if not self.use_cache and self.resume:
420             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
421
422         # Check for obvious dry-run responses
423         if self.dry_run and (not self.use_cache or not self.resume):
424             raise ArvPutUploadIsPending()
425
426         # Load cached data if any and if needed
427         self._setup_state(update_collection)
428
429     def start(self, save_collection):
430         """
431         Start supporting thread & file uploading
432         """
433         if not self.dry_run:
434             self._checkpointer.start()
435         try:
436             for path in self.paths:
437                 # Test for stdin first, in case some file named '-' exist
438                 if path == '-':
439                     if self.dry_run:
440                         raise ArvPutUploadIsPending()
441                     self._write_stdin(self.filename or 'stdin')
442                 elif not os.path.exists(path):
443                      raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
444                 elif os.path.isdir(path):
445                     # Use absolute paths on cache index so CWD doesn't interfere
446                     # with the caching logic.
447                     orig_path = path
448                     path = os.path.abspath(path)
449                     if orig_path[-1:] == os.sep:
450                         # When passing a directory reference with a trailing slash,
451                         # its contents should be uploaded directly to the collection's root.
452                         prefixdir = path
453                     else:
454                         # When passing a directory reference with no trailing slash,
455                         # upload the directory to the collection's root.
456                         prefixdir = os.path.dirname(path)
457                     prefixdir += os.sep
458                     for root, dirs, files in os.walk(path, followlinks=self.follow_links):
459                         # Make os.walk()'s dir traversing order deterministic
460                         dirs.sort()
461                         files.sort()
462                         for f in files:
463                             self._check_file(os.path.join(root, f),
464                                              os.path.join(root[len(prefixdir):], f))
465                 else:
466                     self._check_file(os.path.abspath(path),
467                                      self.filename or os.path.basename(path))
468             # If dry-mode is on, and got up to this point, then we should notify that
469             # there aren't any file to upload.
470             if self.dry_run:
471                 raise ArvPutUploadNotPending()
472             # Remove local_collection's files that don't exist locally anymore, so the
473             # bytes_written count is correct.
474             for f in self.collection_file_paths(self._local_collection,
475                                                 path_prefix=""):
476                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
477                     self._local_collection.remove(f)
478             # Update bytes_written from current local collection and
479             # report initial progress.
480             self._update()
481             # Actual file upload
482             self._upload_started = True # Used by the update thread to start checkpointing
483             self._upload_files()
484         except (SystemExit, Exception) as e:
485             self._checkpoint_before_quit = False
486             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
487             # Note: We're expecting SystemExit instead of
488             # KeyboardInterrupt because we have a custom signal
489             # handler in place that raises SystemExit with the catched
490             # signal's code.
491             if isinstance(e, PathDoesNotExistError):
492                 # We aren't interested in the traceback for this case
493                 pass
494             elif not isinstance(e, SystemExit) or e.code != -2:
495                 self.logger.warning("Abnormal termination:\n{}".format(
496                     traceback.format_exc()))
497             raise
498         finally:
499             if not self.dry_run:
500                 # Stop the thread before doing anything else
501                 self._stop_checkpointer.set()
502                 self._checkpointer.join()
503                 if self._checkpoint_before_quit:
504                     # Commit all pending blocks & one last _update()
505                     self._local_collection.manifest_text()
506                     self._update(final=True)
507                     if save_collection:
508                         self.save_collection()
509             if self.use_cache:
510                 self._cache_file.close()
511
512     def save_collection(self):
513         if self.update:
514             # Check if files should be updated on the remote collection.
515             for fp in self._file_paths:
516                 remote_file = self._remote_collection.find(fp)
517                 if not remote_file:
518                     # File don't exist on remote collection, copy it.
519                     self._remote_collection.copy(fp, fp, self._local_collection)
520                 elif remote_file != self._local_collection.find(fp):
521                     # A different file exist on remote collection, overwrite it.
522                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
523                 else:
524                     # The file already exist on remote collection, skip it.
525                     pass
526             self._remote_collection.save(num_retries=self.num_retries)
527         else:
528             self._local_collection.save_new(
529                 name=self.name, owner_uuid=self.owner_uuid,
530                 ensure_unique_name=self.ensure_unique_name,
531                 num_retries=self.num_retries)
532
533     def destroy_cache(self):
534         if self.use_cache:
535             try:
536                 os.unlink(self._cache_filename)
537             except OSError as error:
538                 # That's what we wanted anyway.
539                 if error.errno != errno.ENOENT:
540                     raise
541             self._cache_file.close()
542
543     def _collection_size(self, collection):
544         """
545         Recursively get the total size of the collection
546         """
547         size = 0
548         for item in listvalues(collection):
549             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
550                 size += self._collection_size(item)
551             else:
552                 size += item.size()
553         return size
554
555     def _update_task(self):
556         """
557         Periodically called support task. File uploading is
558         asynchronous so we poll status from the collection.
559         """
560         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
561             self._update()
562
563     def _update(self, final=False):
564         """
565         Update cached manifest text and report progress.
566         """
567         if self._upload_started:
568             with self._collection_lock:
569                 self.bytes_written = self._collection_size(self._local_collection)
570                 if self.use_cache:
571                     if final:
572                         manifest = self._local_collection.manifest_text()
573                     else:
574                         # Get the manifest text without comitting pending blocks
575                         manifest = self._local_collection.manifest_text(strip=False,
576                                                                         normalize=False,
577                                                                         only_committed=True)
578                     # Update cache
579                     with self._state_lock:
580                         self._state['manifest'] = manifest
581             if self.use_cache:
582                 try:
583                     self._save_state()
584                 except Exception as e:
585                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
586         else:
587             self.bytes_written = self.bytes_skipped
588         # Call the reporter, if any
589         self.report_progress()
590
591     def report_progress(self):
592         if self.reporter is not None:
593             self.reporter(self.bytes_written, self.bytes_expected)
594
595     def _write_stdin(self, filename):
596         output = self._local_collection.open(filename, 'wb')
597         self._write(sys.stdin, output)
598         output.close()
599
600     def _check_file(self, source, filename):
601         """
602         Check if this file needs to be uploaded
603         """
604         # Ignore symlinks when requested
605         if (not self.follow_links) and os.path.islink(source):
606             return
607         resume_offset = 0
608         should_upload = False
609         new_file_in_cache = False
610         # Record file path for updating the remote collection before exiting
611         self._file_paths.add(filename)
612
613         with self._state_lock:
614             # If no previous cached data on this file, store it for an eventual
615             # repeated run.
616             if source not in self._state['files']:
617                 self._state['files'][source] = {
618                     'mtime': os.path.getmtime(source),
619                     'size' : os.path.getsize(source)
620                 }
621                 new_file_in_cache = True
622             cached_file_data = self._state['files'][source]
623
624         # Check if file was already uploaded (at least partially)
625         file_in_local_collection = self._local_collection.find(filename)
626
627         # If not resuming, upload the full file.
628         if not self.resume:
629             should_upload = True
630         # New file detected from last run, upload it.
631         elif new_file_in_cache:
632             should_upload = True
633         # Local file didn't change from last run.
634         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
635             if not file_in_local_collection:
636                 # File not uploaded yet, upload it completely
637                 should_upload = True
638             elif file_in_local_collection.permission_expired():
639                 # Permission token expired, re-upload file. This will change whenever
640                 # we have a API for refreshing tokens.
641                 should_upload = True
642                 self._local_collection.remove(filename)
643             elif cached_file_data['size'] == file_in_local_collection.size():
644                 # File already there, skip it.
645                 self.bytes_skipped += cached_file_data['size']
646             elif cached_file_data['size'] > file_in_local_collection.size():
647                 # File partially uploaded, resume!
648                 resume_offset = file_in_local_collection.size()
649                 self.bytes_skipped += resume_offset
650                 should_upload = True
651             else:
652                 # Inconsistent cache, re-upload the file
653                 should_upload = True
654                 self._local_collection.remove(filename)
655                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
656         # Local file differs from cached data, re-upload it.
657         else:
658             if file_in_local_collection:
659                 self._local_collection.remove(filename)
660             should_upload = True
661
662         if should_upload:
663             self._files_to_upload.append((source, resume_offset, filename))
664
665     def _upload_files(self):
666         for source, resume_offset, filename in self._files_to_upload:
667             with open(source, 'rb') as source_fd:
668                 with self._state_lock:
669                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
670                     self._state['files'][source]['size'] = os.path.getsize(source)
671                 if resume_offset > 0:
672                     # Start upload where we left off
673                     output = self._local_collection.open(filename, 'ab')
674                     source_fd.seek(resume_offset)
675                 else:
676                     # Start from scratch
677                     output = self._local_collection.open(filename, 'wb')
678                 self._write(source_fd, output)
679                 output.close(flush=False)
680
681     def _write(self, source_fd, output):
682         while True:
683             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
684             if not data:
685                 break
686             output.write(data)
687
688     def _my_collection(self):
689         return self._remote_collection if self.update else self._local_collection
690
691     def _setup_state(self, update_collection):
692         """
693         Create a new cache file or load a previously existing one.
694         """
695         # Load an already existing collection for update
696         if update_collection and re.match(arvados.util.collection_uuid_pattern,
697                                           update_collection):
698             try:
699                 self._remote_collection = arvados.collection.Collection(update_collection)
700             except arvados.errors.ApiError as error:
701                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
702             else:
703                 self.update = True
704         elif update_collection:
705             # Collection locator provided, but unknown format
706             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
707
708         if self.use_cache:
709             # Set up cache file name from input paths.
710             md5 = hashlib.md5()
711             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
712             realpaths = sorted(os.path.realpath(path) for path in self.paths)
713             md5.update(b'\0'.join([p.encode() for p in realpaths]))
714             if self.filename:
715                 md5.update(self.filename.encode())
716             cache_filename = md5.hexdigest()
717             cache_filepath = os.path.join(
718                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
719                 cache_filename)
720             if self.resume and os.path.exists(cache_filepath):
721                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
722                 self._cache_file = open(cache_filepath, 'a+')
723             else:
724                 # --no-resume means start with a empty cache file.
725                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
726                 self._cache_file = open(cache_filepath, 'w+')
727             self._cache_filename = self._cache_file.name
728             self._lock_file(self._cache_file)
729             self._cache_file.seek(0)
730
731         with self._state_lock:
732             if self.use_cache:
733                 try:
734                     self._state = json.load(self._cache_file)
735                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
736                         # Cache at least partially incomplete, set up new cache
737                         self._state = copy.deepcopy(self.EMPTY_STATE)
738                 except ValueError:
739                     # Cache file empty, set up new cache
740                     self._state = copy.deepcopy(self.EMPTY_STATE)
741             else:
742                 self.logger.info("No cache usage requested for this run.")
743                 # No cache file, set empty state
744                 self._state = copy.deepcopy(self.EMPTY_STATE)
745             # Load the previous manifest so we can check if files were modified remotely.
746             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
747
748     def collection_file_paths(self, col, path_prefix='.'):
749         """Return a list of file paths by recursively go through the entire collection `col`"""
750         file_paths = []
751         for name, item in listitems(col):
752             if isinstance(item, arvados.arvfile.ArvadosFile):
753                 file_paths.append(os.path.join(path_prefix, name))
754             elif isinstance(item, arvados.collection.Subcollection):
755                 new_prefix = os.path.join(path_prefix, name)
756                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
757         return file_paths
758
759     def _lock_file(self, fileobj):
760         try:
761             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
762         except IOError:
763             raise ResumeCacheConflict("{} locked".format(fileobj.name))
764
765     def _save_state(self):
766         """
767         Atomically save current state into cache.
768         """
769         with self._state_lock:
770             # We're not using copy.deepcopy() here because it's a lot slower
771             # than json.dumps(), and we're already needing JSON format to be
772             # saved on disk.
773             state = json.dumps(self._state)
774         try:
775             new_cache = tempfile.NamedTemporaryFile(
776                 mode='w+',
777                 dir=os.path.dirname(self._cache_filename), delete=False)
778             self._lock_file(new_cache)
779             new_cache.write(state)
780             new_cache.flush()
781             os.fsync(new_cache)
782             os.rename(new_cache.name, self._cache_filename)
783         except (IOError, OSError, ResumeCacheConflict) as error:
784             self.logger.error("There was a problem while saving the cache file: {}".format(error))
785             try:
786                 os.unlink(new_cache_name)
787             except NameError:  # mkstemp failed.
788                 pass
789         else:
790             self._cache_file.close()
791             self._cache_file = new_cache
792
793     def collection_name(self):
794         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
795
796     def manifest_locator(self):
797         return self._my_collection().manifest_locator()
798
799     def portable_data_hash(self):
800         pdh = self._my_collection().portable_data_hash()
801         m = self._my_collection().stripped_manifest().encode()
802         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
803         if pdh != local_pdh:
804             logger.warning("\n".join([
805                 "arv-put: API server provided PDH differs from local manifest.",
806                 "         This should not happen; showing API server version."]))
807         return pdh
808
809     def manifest_text(self, stream_name=".", strip=False, normalize=False):
810         return self._my_collection().manifest_text(stream_name, strip, normalize)
811
812     def _datablocks_on_item(self, item):
813         """
814         Return a list of datablock locators, recursively navigating
815         through subcollections
816         """
817         if isinstance(item, arvados.arvfile.ArvadosFile):
818             if item.size() == 0:
819                 # Empty file locator
820                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
821             else:
822                 locators = []
823                 for segment in item.segments():
824                     loc = segment.locator
825                     locators.append(loc)
826                 return locators
827         elif isinstance(item, arvados.collection.Collection):
828             l = [self._datablocks_on_item(x) for x in listvalues(item)]
829             # Fast list flattener method taken from:
830             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
831             return [loc for sublist in l for loc in sublist]
832         else:
833             return None
834
835     def data_locators(self):
836         with self._collection_lock:
837             # Make sure all datablocks are flushed before getting the locators
838             self._my_collection().manifest_text()
839             datablocks = self._datablocks_on_item(self._my_collection())
840         return datablocks
841
842
843 def expected_bytes_for(pathlist, follow_links=True):
844     # Walk the given directory trees and stat files, adding up file sizes,
845     # so we can display progress as percent
846     bytesum = 0
847     for path in pathlist:
848         if os.path.isdir(path):
849             for root, dirs, files in os.walk(path, followlinks=follow_links):
850                 # Sum file sizes
851                 for f in files:
852                     filepath = os.path.join(root, f)
853                     # Ignore symlinked files when requested
854                     if (not follow_links) and os.path.islink(filepath):
855                         continue
856                     bytesum += os.path.getsize(filepath)
857         elif not os.path.isfile(path):
858             return None
859         else:
860             bytesum += os.path.getsize(path)
861     return bytesum
862
863 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
864                                                             os.getpid())
865 def machine_progress(bytes_written, bytes_expected):
866     return _machine_format.format(
867         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
868
869 def human_progress(bytes_written, bytes_expected):
870     if bytes_expected:
871         return "\r{}M / {}M {:.1%} ".format(
872             bytes_written >> 20, bytes_expected >> 20,
873             float(bytes_written) / bytes_expected)
874     else:
875         return "\r{} ".format(bytes_written)
876
877 def progress_writer(progress_func, outfile=sys.stderr):
878     def write_progress(bytes_written, bytes_expected):
879         outfile.write(progress_func(bytes_written, bytes_expected))
880     return write_progress
881
882 def exit_signal_handler(sigcode, frame):
883     sys.exit(-sigcode)
884
885 def desired_project_uuid(api_client, project_uuid, num_retries):
886     if not project_uuid:
887         query = api_client.users().current()
888     elif arvados.util.user_uuid_pattern.match(project_uuid):
889         query = api_client.users().get(uuid=project_uuid)
890     elif arvados.util.group_uuid_pattern.match(project_uuid):
891         query = api_client.groups().get(uuid=project_uuid)
892     else:
893         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
894     return query.execute(num_retries=num_retries)['uuid']
895
896 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
897     global api_client
898
899     logger = logging.getLogger('arvados.arv_put')
900     logger.setLevel(logging.INFO)
901     args = parse_arguments(arguments)
902     status = 0
903     if api_client is None:
904         api_client = arvados.api('v1')
905
906     # Determine the name to use
907     if args.name:
908         if args.stream or args.raw:
909             logger.error("Cannot use --name with --stream or --raw")
910             sys.exit(1)
911         elif args.update_collection:
912             logger.error("Cannot use --name with --update-collection")
913             sys.exit(1)
914         collection_name = args.name
915     else:
916         collection_name = "Saved at {} by {}@{}".format(
917             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
918             pwd.getpwuid(os.getuid()).pw_name,
919             socket.gethostname())
920
921     if args.project_uuid and (args.stream or args.raw):
922         logger.error("Cannot use --project-uuid with --stream or --raw")
923         sys.exit(1)
924
925     # Determine the parent project
926     try:
927         project_uuid = desired_project_uuid(api_client, args.project_uuid,
928                                             args.retries)
929     except (apiclient_errors.Error, ValueError) as error:
930         logger.error(error)
931         sys.exit(1)
932
933     if args.progress:
934         reporter = progress_writer(human_progress)
935     elif args.batch_progress:
936         reporter = progress_writer(machine_progress)
937     else:
938         reporter = None
939
940     # If this is used by a human, and there's at least one directory to be
941     # uploaded, the expected bytes calculation can take a moment.
942     if args.progress and any([os.path.isdir(f) for f in args.paths]):
943         logger.info("Calculating upload size, this could take some time...")
944     bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
945
946     try:
947         writer = ArvPutUploadJob(paths = args.paths,
948                                  resume = args.resume,
949                                  use_cache = args.use_cache,
950                                  filename = args.filename,
951                                  reporter = reporter,
952                                  bytes_expected = bytes_expected,
953                                  num_retries = args.retries,
954                                  replication_desired = args.replication,
955                                  put_threads = args.threads,
956                                  name = collection_name,
957                                  owner_uuid = project_uuid,
958                                  ensure_unique_name = True,
959                                  update_collection = args.update_collection,
960                                  logger=logger,
961                                  dry_run=args.dry_run,
962                                  follow_links=args.follow_links)
963     except ResumeCacheConflict:
964         logger.error("\n".join([
965             "arv-put: Another process is already uploading this data.",
966             "         Use --no-cache if this is really what you want."]))
967         sys.exit(1)
968     except CollectionUpdateError as error:
969         logger.error("\n".join([
970             "arv-put: %s" % str(error)]))
971         sys.exit(1)
972     except ArvPutUploadIsPending:
973         # Dry run check successful, return proper exit code.
974         sys.exit(2)
975     except ArvPutUploadNotPending:
976         # No files pending for upload
977         sys.exit(0)
978
979     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
980     # the originals.
981     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
982                             for sigcode in CAUGHT_SIGNALS}
983
984     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
985         logger.warning("\n".join([
986             "arv-put: Resuming previous upload from last checkpoint.",
987             "         Use the --no-resume option to start over."]))
988
989     if not args.dry_run:
990         writer.report_progress()
991     output = None
992     try:
993         writer.start(save_collection=not(args.stream or args.raw))
994     except arvados.errors.ApiError as error:
995         logger.error("\n".join([
996             "arv-put: %s" % str(error)]))
997         sys.exit(1)
998     except ArvPutUploadIsPending:
999         # Dry run check successful, return proper exit code.
1000         sys.exit(2)
1001     except ArvPutUploadNotPending:
1002         # No files pending for upload
1003         sys.exit(0)
1004     except PathDoesNotExistError as error:
1005         logger.error("\n".join([
1006             "arv-put: %s" % str(error)]))
1007         sys.exit(1)
1008
1009     if args.progress:  # Print newline to split stderr from stdout for humans.
1010         logger.info("\n")
1011
1012     if args.stream:
1013         if args.normalize:
1014             output = writer.manifest_text(normalize=True)
1015         else:
1016             output = writer.manifest_text()
1017     elif args.raw:
1018         output = ','.join(writer.data_locators())
1019     else:
1020         try:
1021             if args.update_collection:
1022                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1023             else:
1024                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1025             if args.portable_data_hash:
1026                 output = writer.portable_data_hash()
1027             else:
1028                 output = writer.manifest_locator()
1029         except apiclient_errors.Error as error:
1030             logger.error(
1031                 "arv-put: Error creating Collection on project: {}.".format(
1032                     error))
1033             status = 1
1034
1035     # Print the locator (uuid) of the new collection.
1036     if output is None:
1037         status = status or 1
1038     else:
1039         stdout.write(output)
1040         if not output.endswith('\n'):
1041             stdout.write('\n')
1042
1043     for sigcode, orig_handler in listitems(orig_signal_handlers):
1044         signal.signal(sigcode, orig_handler)
1045
1046     if status != 0:
1047         sys.exit(status)
1048
1049     # Success!
1050     return output
1051
1052
1053 if __name__ == '__main__':
1054     main()