Merge branch '21904-no-unqueueable-reqs'
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import ciso8601
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import fnmatch
15 import hashlib
16 import json
17 import logging
18 import os
19 import pwd
20 import re
21 import signal
22 import socket
23 import sys
24 import tempfile
25 import threading
26 import time
27 import traceback
28
29 from pathlib import Path
30
31 import arvados.util
32 import arvados.commands._util as arv_cmd
33
34 from apiclient import errors as apiclient_errors
35 from arvados._internal import basedirs
36 from arvados._version import __version__
37
38 api_client = None
39
40 upload_opts = argparse.ArgumentParser(add_help=False)
41
42 upload_opts.add_argument('--version', action='version',
43                          version="%s %s" % (sys.argv[0], __version__),
44                          help='Print version and exit.')
45 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
46                          help="""
47 Local file or directory. If path is a directory reference with a trailing
48 slash, then just upload the directory's contents; otherwise upload the
49 directory itself. Default: read from standard input.
50 """)
51
52 _group = upload_opts.add_mutually_exclusive_group()
53
54 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
55                     default=-1, help=argparse.SUPPRESS)
56
57 _group.add_argument('--normalize', action='store_true',
58                     help="""
59 Normalize the manifest by re-ordering files and streams after writing
60 data.
61 """)
62
63 _group.add_argument('--dry-run', action='store_true', default=False,
64                     help="""
65 Don't actually upload files, but only check if any file should be
66 uploaded. Exit with code=2 when files are pending for upload.
67 """)
68
69 _group = upload_opts.add_mutually_exclusive_group()
70
71 _group.add_argument('--as-stream', action='store_true', dest='stream',
72                     help="""
73 Synonym for --stream.
74 """)
75
76 _group.add_argument('--stream', action='store_true',
77                     help="""
78 Store the file content and display the resulting manifest on
79 stdout. Do not save a Collection object in Arvados.
80 """)
81
82 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
83                     help="""
84 Synonym for --manifest.
85 """)
86
87 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
88                     help="""
89 Synonym for --manifest.
90 """)
91
92 _group.add_argument('--manifest', action='store_true',
93                     help="""
94 Store the file data and resulting manifest in Keep, save a Collection
95 object in Arvados, and display the manifest locator (Collection uuid)
96 on stdout. This is the default behavior.
97 """)
98
99 _group.add_argument('--as-raw', action='store_true', dest='raw',
100                     help="""
101 Synonym for --raw.
102 """)
103
104 _group.add_argument('--raw', action='store_true',
105                     help="""
106 Store the file content and display the data block locators on stdout,
107 separated by commas, with a trailing newline. Do not store a
108 manifest.
109 """)
110
111 upload_opts.add_argument('--update-collection', type=str, default=None,
112                          dest='update_collection', metavar="UUID", help="""
113 Update an existing collection identified by the given Arvados collection
114 UUID. All new local files will be uploaded.
115 """)
116
117 upload_opts.add_argument('--use-filename', type=str, default=None,
118                          dest='filename', help="""
119 Synonym for --filename.
120 """)
121
122 upload_opts.add_argument('--filename', type=str, default=None,
123                          help="""
124 Use the given filename in the manifest, instead of the name of the
125 local file. This is useful when "-" or "/dev/stdin" is given as an
126 input file. It can be used only if there is exactly one path given and
127 it is not a directory. Implies --manifest.
128 """)
129
130 upload_opts.add_argument('--portable-data-hash', action='store_true',
131                          help="""
132 Print the portable data hash instead of the Arvados UUID for the collection
133 created by the upload.
134 """)
135
136 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
137                          help="""
138 Set the replication level for the new collection: how many different
139 physical storage devices (e.g., disks) should have a copy of each data
140 block. Default is to use the server-provided default (if any) or 2.
141 """)
142
143 upload_opts.add_argument('--storage-classes', help="""
144 Specify comma separated list of storage classes to be used when saving data to Keep.
145 """)
146
147 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
148                          help="""
149 Set the number of upload threads to be used. Take into account that
150 using lots of threads will increase the RAM requirements. Default is
151 to use 2 threads.
152 On high latency installations, using a greater number will improve
153 overall throughput.
154 """)
155
156 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
157                       action='append', help="""
158 Exclude files and directories whose names match the given glob pattern. When
159 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
160 directory, relative to the provided input dirs will be excluded.
161 When using a filename pattern like '*.txt', any text file will be excluded
162 no matter where it is placed.
163 For the special case of needing to exclude only files or dirs directly below
164 the given input directory, you can use a pattern like './exclude_this.gif'.
165 You can specify multiple patterns by using this argument more than once.
166 """)
167
168 _group = upload_opts.add_mutually_exclusive_group()
169 _group.add_argument('--follow-links', action='store_true', default=True,
170                     dest='follow_links', help="""
171 Follow file and directory symlinks (default).
172 """)
173 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
174                     help="""
175 Ignore file and directory symlinks. Even paths given explicitly on the
176 command line will be skipped if they are symlinks.
177 """)
178
179
180 run_opts = argparse.ArgumentParser(add_help=False)
181
182 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
183 Store the collection in the specified project, instead of your Home
184 project.
185 """)
186
187 run_opts.add_argument('--name', help="""
188 Save the collection with the specified name.
189 """)
190
191 _group = run_opts.add_mutually_exclusive_group()
192 _group.add_argument('--progress', action='store_true',
193                     help="""
194 Display human-readable progress on stderr (bytes and, if possible,
195 percentage of total data size). This is the default behavior when
196 stderr is a tty.
197 """)
198
199 _group.add_argument('--no-progress', action='store_true',
200                     help="""
201 Do not display human-readable progress on stderr, even if stderr is a
202 tty.
203 """)
204
205 _group.add_argument('--batch-progress', action='store_true',
206                     help="""
207 Display machine-readable progress on stderr (bytes and, if known,
208 total data size).
209 """)
210
211 run_opts.add_argument('--silent', action='store_true',
212                       help="""
213 Do not print any debug messages to console. (Any error messages will
214 still be displayed.)
215 """)
216
217 run_opts.add_argument('--batch', action='store_true', default=False,
218                       help="""
219 Retries with '--no-resume --no-cache' if cached state contains invalid/expired
220 block signatures.
221 """)
222
223 _group = run_opts.add_mutually_exclusive_group()
224 _group.add_argument('--resume', action='store_true', default=True,
225                     help="""
226 Continue interrupted uploads from cached state (default).
227 """)
228 _group.add_argument('--no-resume', action='store_false', dest='resume',
229                     help="""
230 Do not continue interrupted uploads from cached state.
231 """)
232
233 _group = run_opts.add_mutually_exclusive_group()
234 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
235                     help="""
236 Save upload state in a cache file for resuming (default).
237 """)
238 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
239                     help="""
240 Do not save upload state in a cache file for resuming.
241 """)
242
243 _group = upload_opts.add_mutually_exclusive_group()
244 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
245                     help="""
246 Set the trash date of the resulting collection to an absolute date in the future.
247 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
248 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
249 """)
250 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
251                     help="""
252 Set the trash date of the resulting collection to an amount of days from the
253 date/time that the upload process finishes.
254 """)
255
256 arg_parser = argparse.ArgumentParser(
257     description='Copy data from the local filesystem to Keep.',
258     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
259
260 def parse_arguments(arguments):
261     args = arg_parser.parse_args(arguments)
262
263     if len(args.paths) == 0:
264         args.paths = ['-']
265
266     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
267
268     if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
269         arg_parser.error("""
270     --filename argument cannot be used when storing a directory or
271     multiple files.
272     """)
273
274     # Turn on --progress by default if stderr is a tty.
275     if (not (args.batch_progress or args.no_progress or args.silent)
276         and os.isatty(sys.stderr.fileno())):
277         args.progress = True
278
279     # Turn off --resume (default) if --no-cache is used.
280     if not args.use_cache:
281         args.resume = False
282
283     if args.paths == ['-']:
284         if args.update_collection:
285             arg_parser.error("""
286     --update-collection cannot be used when reading from stdin.
287     """)
288         args.resume = False
289         args.use_cache = False
290         if not args.filename:
291             args.filename = 'stdin'
292
293     # Remove possible duplicated patterns
294     if len(args.exclude) > 0:
295         args.exclude = list(set(args.exclude))
296
297     return args
298
299
300 class PathDoesNotExistError(Exception):
301     pass
302
303
304 class CollectionUpdateError(Exception):
305     pass
306
307
308 class ResumeCacheConflict(Exception):
309     pass
310
311
312 class ResumeCacheInvalidError(Exception):
313     pass
314
315 class ArvPutArgumentConflict(Exception):
316     pass
317
318
319 class ArvPutUploadIsPending(Exception):
320     pass
321
322
323 class ArvPutUploadNotPending(Exception):
324     pass
325
326
327 class FileUploadList(list):
328     def __init__(self, dry_run=False):
329         list.__init__(self)
330         self.dry_run = dry_run
331
332     def append(self, other):
333         if self.dry_run:
334             raise ArvPutUploadIsPending()
335         super(FileUploadList, self).append(other)
336
337
338 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
339 class ArvPutLogFormatter(logging.Formatter):
340     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
341     err_fmtr = None
342     request_id_informed = False
343
344     def __init__(self, request_id):
345         self.err_fmtr = logging.Formatter(
346             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
347             arvados.log_date_format)
348
349     def format(self, record):
350         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
351             self.request_id_informed = True
352             return self.err_fmtr.format(record)
353         return self.std_fmtr.format(record)
354
355
356 class ResumeCache(object):
357     CACHE_DIR = 'arv-put'
358
359     def __init__(self, file_spec):
360         self.cache_file = open(file_spec, 'a+')
361         self._lock_file(self.cache_file)
362         self.filename = self.cache_file.name
363
364     @classmethod
365     def make_path(cls, args):
366         md5 = hashlib.md5()
367         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
368         realpaths = sorted(os.path.realpath(path) for path in args.paths)
369         md5.update(b'\0'.join([p.encode() for p in realpaths]))
370         if any(os.path.isdir(path) for path in realpaths):
371             md5.update(b'-1')
372         elif args.filename:
373             md5.update(args.filename.encode())
374         cache_path = Path(cls.CACHE_DIR)
375         if len(cache_path.parts) == 1:
376             cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path)
377         else:
378             # Note this is a noop if cache_path is absolute, which is what we want.
379             cache_path = Path.home() / cache_path
380             cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
381         return str(cache_path / md5.hexdigest())
382
383     def _lock_file(self, fileobj):
384         try:
385             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
386         except IOError:
387             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
388
389     def load(self):
390         self.cache_file.seek(0)
391         return json.load(self.cache_file)
392
393     def check_cache(self, api_client=None, num_retries=0):
394         try:
395             state = self.load()
396             locator = None
397             try:
398                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
399                     locator = state["_finished_streams"][0][1][0]
400                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
401                     locator = state["_current_stream_locators"][0]
402                 if locator is not None:
403                     kc = arvados.keep.KeepClient(api_client=api_client)
404                     kc.head(locator, num_retries=num_retries)
405             except Exception as e:
406                 self.restart()
407         except (ValueError):
408             pass
409
410     def save(self, data):
411         try:
412             new_cache_fd, new_cache_name = tempfile.mkstemp(
413                 dir=os.path.dirname(self.filename))
414             self._lock_file(new_cache_fd)
415             new_cache = os.fdopen(new_cache_fd, 'r+')
416             json.dump(data, new_cache)
417             os.rename(new_cache_name, self.filename)
418         except (IOError, OSError, ResumeCacheConflict):
419             try:
420                 os.unlink(new_cache_name)
421             except NameError:  # mkstemp failed.
422                 pass
423         else:
424             self.cache_file.close()
425             self.cache_file = new_cache
426
427     def close(self):
428         self.cache_file.close()
429
430     def destroy(self):
431         try:
432             os.unlink(self.filename)
433         except OSError as error:
434             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
435                 raise
436         self.close()
437
438     def restart(self):
439         self.destroy()
440         self.__init__(self.filename)
441
442
443 class ArvPutUploadJob(object):
444     CACHE_DIR = 'arv-put'
445     EMPTY_STATE = {
446         'manifest' : None, # Last saved manifest checkpoint
447         'files' : {} # Previous run file list: {path : {size, mtime}}
448     }
449
450     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
451                  name=None, owner_uuid=None, api_client=None, batch_mode=False,
452                  ensure_unique_name=False, num_retries=None,
453                  put_threads=None, replication_desired=None, filename=None,
454                  update_time=60.0, update_collection=None, storage_classes=None,
455                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
456                  follow_links=True, exclude_paths=[], exclude_names=None,
457                  trash_at=None):
458         self.paths = paths
459         self.resume = resume
460         self.use_cache = use_cache
461         self.batch_mode = batch_mode
462         self.update = False
463         self.reporter = reporter
464         # This will set to 0 before start counting, if no special files are going
465         # to be read.
466         self.bytes_expected = None
467         self.bytes_written = 0
468         self.bytes_skipped = 0
469         self.name = name
470         self.owner_uuid = owner_uuid
471         self.ensure_unique_name = ensure_unique_name
472         self.num_retries = num_retries
473         self.replication_desired = replication_desired
474         self.put_threads = put_threads
475         self.filename = filename
476         self.storage_classes = storage_classes
477         self._api_client = api_client
478         self._state_lock = threading.Lock()
479         self._state = None # Previous run state (file list & manifest)
480         self._current_files = [] # Current run file list
481         self._cache_file = None
482         self._collection_lock = threading.Lock()
483         self._remote_collection = None # Collection being updated (if asked)
484         self._local_collection = None # Collection from previous run manifest
485         self._file_paths = set() # Files to be updated in remote collection
486         self._stop_checkpointer = threading.Event()
487         self._checkpointer = threading.Thread(target=self._update_task)
488         self._checkpointer.daemon = True
489         self._update_task_time = update_time  # How many seconds wait between update runs
490         self._files_to_upload = FileUploadList(dry_run=dry_run)
491         self._upload_started = False
492         self.logger = logger
493         self.dry_run = dry_run
494         self._checkpoint_before_quit = True
495         self.follow_links = follow_links
496         self.exclude_paths = exclude_paths
497         self.exclude_names = exclude_names
498         self._trash_at = trash_at
499
500         if self._trash_at is not None:
501             if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
502                 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
503             if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
504                 raise TypeError('provided trash_at datetime should be timezone-naive')
505
506         if not self.use_cache and self.resume:
507             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
508
509         # Check for obvious dry-run responses
510         if self.dry_run and (not self.use_cache or not self.resume):
511             raise ArvPutUploadIsPending()
512
513         # Load cached data if any and if needed
514         self._setup_state(update_collection)
515
516         # Build the upload file list, excluding requested files and counting the
517         # bytes expected to be uploaded.
518         self._build_upload_list()
519
520     def _build_upload_list(self):
521         """
522         Scan the requested paths to count file sizes, excluding requested files
523         and dirs and building the upload file list.
524         """
525         # If there aren't special files to be read, reset total bytes count to zero
526         # to start counting.
527         if not any([p for p in self.paths
528                     if not (os.path.isfile(p) or os.path.isdir(p))]):
529             self.bytes_expected = 0
530
531         for path in self.paths:
532             # Test for stdin first, in case some file named '-' exist
533             if path == '-':
534                 if self.dry_run:
535                     raise ArvPutUploadIsPending()
536                 self._write_stdin(self.filename or 'stdin')
537             elif not os.path.exists(path):
538                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
539             elif (not self.follow_links) and os.path.islink(path):
540                 self.logger.warning("Skipping symlink '{}'".format(path))
541                 continue
542             elif os.path.isdir(path):
543                 # Use absolute paths on cache index so CWD doesn't interfere
544                 # with the caching logic.
545                 orig_path = path
546                 path = os.path.abspath(path)
547                 if orig_path[-1:] == os.sep:
548                     # When passing a directory reference with a trailing slash,
549                     # its contents should be uploaded directly to the
550                     # collection's root.
551                     prefixdir = path
552                 else:
553                     # When passing a directory reference with no trailing slash,
554                     # upload the directory to the collection's root.
555                     prefixdir = os.path.dirname(path)
556                 prefixdir += os.sep
557                 for root, dirs, files in os.walk(path,
558                                                  followlinks=self.follow_links):
559                     root_relpath = os.path.relpath(root, path)
560                     if root_relpath == '.':
561                         root_relpath = ''
562                     # Exclude files/dirs by full path matching pattern
563                     if self.exclude_paths:
564                         dirs[:] = [d for d in dirs
565                                    if not any(pathname_match(
566                                            os.path.join(root_relpath, d), pat)
567                                               for pat in self.exclude_paths)]
568                         files = [f for f in files
569                                  if not any(pathname_match(
570                                          os.path.join(root_relpath, f), pat)
571                                             for pat in self.exclude_paths)]
572                     # Exclude files/dirs by name matching pattern
573                     if self.exclude_names is not None:
574                         dirs[:] = [d for d in dirs
575                                    if not self.exclude_names.match(d)]
576                         files = [f for f in files
577                                  if not self.exclude_names.match(f)]
578                     # Make os.walk()'s dir traversing order deterministic
579                     dirs.sort()
580                     files.sort()
581                     for f in files:
582                         filepath = os.path.join(root, f)
583                         if not os.path.isfile(filepath):
584                             self.logger.warning("Skipping non-regular file '{}'".format(filepath))
585                             continue
586                         # Add its size to the total bytes count (if applicable)
587                         if self.follow_links or (not os.path.islink(filepath)):
588                             if self.bytes_expected is not None:
589                                 self.bytes_expected += os.path.getsize(filepath)
590                         self._check_file(filepath,
591                                          os.path.join(root[len(prefixdir):], f))
592             else:
593                 filepath = os.path.abspath(path)
594                 # Add its size to the total bytes count (if applicable)
595                 if self.follow_links or (not os.path.islink(filepath)):
596                     if self.bytes_expected is not None:
597                         self.bytes_expected += os.path.getsize(filepath)
598                 self._check_file(filepath,
599                                  self.filename or os.path.basename(path))
600         # If dry-mode is on, and got up to this point, then we should notify that
601         # there aren't any file to upload.
602         if self.dry_run:
603             raise ArvPutUploadNotPending()
604         # Remove local_collection's files that don't exist locally anymore, so the
605         # bytes_written count is correct.
606         for f in self.collection_file_paths(self._local_collection,
607                                             path_prefix=""):
608             if f != 'stdin' and f != self.filename and not f in self._file_paths:
609                 self._local_collection.remove(f)
610
611     def start(self, save_collection):
612         """
613         Start supporting thread & file uploading
614         """
615         self._checkpointer.start()
616         try:
617             # Update bytes_written from current local collection and
618             # report initial progress.
619             self._update()
620             # Actual file upload
621             self._upload_started = True # Used by the update thread to start checkpointing
622             self._upload_files()
623         except (SystemExit, Exception) as e:
624             self._checkpoint_before_quit = False
625             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
626             # Note: We're expecting SystemExit instead of
627             # KeyboardInterrupt because we have a custom signal
628             # handler in place that raises SystemExit with the catched
629             # signal's code.
630             if isinstance(e, PathDoesNotExistError):
631                 # We aren't interested in the traceback for this case
632                 pass
633             elif not isinstance(e, SystemExit) or e.code != -2:
634                 self.logger.warning("Abnormal termination:\n{}".format(
635                     traceback.format_exc()))
636             raise
637         finally:
638             if not self.dry_run:
639                 # Stop the thread before doing anything else
640                 self._stop_checkpointer.set()
641                 self._checkpointer.join()
642                 if self._checkpoint_before_quit:
643                     # Commit all pending blocks & one last _update()
644                     self._local_collection.manifest_text()
645                     self._update(final=True)
646                     if save_collection:
647                         self.save_collection()
648             if self.use_cache:
649                 self._cache_file.close()
650
651     def _collection_trash_at(self):
652         """
653         Returns the trash date that the collection should use at save time.
654         Takes into account absolute/relative trash_at values requested
655         by the user.
656         """
657         if type(self._trash_at) == datetime.timedelta:
658             # Get an absolute datetime for trash_at
659             return datetime.datetime.utcnow() + self._trash_at
660         return self._trash_at
661
662     def save_collection(self):
663         if self.update:
664             # Check if files should be updated on the remote collection.
665             for fp in self._file_paths:
666                 remote_file = self._remote_collection.find(fp)
667                 if not remote_file:
668                     # File don't exist on remote collection, copy it.
669                     self._remote_collection.copy(fp, fp, self._local_collection)
670                 elif remote_file != self._local_collection.find(fp):
671                     # A different file exist on remote collection, overwrite it.
672                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
673                 else:
674                     # The file already exist on remote collection, skip it.
675                     pass
676             self._remote_collection.save(num_retries=self.num_retries,
677                                          trash_at=self._collection_trash_at())
678         else:
679             if len(self._local_collection) == 0:
680                 self.logger.warning("No files were uploaded, skipping collection creation.")
681                 return
682             self._local_collection.save_new(
683                 name=self.name, owner_uuid=self.owner_uuid,
684                 ensure_unique_name=self.ensure_unique_name,
685                 num_retries=self.num_retries,
686                 trash_at=self._collection_trash_at())
687
688     def destroy_cache(self):
689         if self.use_cache:
690             try:
691                 os.unlink(self._cache_filename)
692             except OSError as error:
693                 # That's what we wanted anyway.
694                 if error.errno != errno.ENOENT:
695                     raise
696             self._cache_file.close()
697
698     def _collection_size(self, collection):
699         """
700         Recursively get the total size of the collection
701         """
702         size = 0
703         for item in collection.values():
704             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
705                 size += self._collection_size(item)
706             else:
707                 size += item.size()
708         return size
709
710     def _update_task(self):
711         """
712         Periodically called support task. File uploading is
713         asynchronous so we poll status from the collection.
714         """
715         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
716             self._update()
717
718     def _update(self, final=False):
719         """
720         Update cached manifest text and report progress.
721         """
722         if self._upload_started:
723             with self._collection_lock:
724                 self.bytes_written = self._collection_size(self._local_collection)
725                 if self.use_cache:
726                     if final:
727                         manifest = self._local_collection.manifest_text()
728                     else:
729                         # Get the manifest text without comitting pending blocks
730                         manifest = self._local_collection.manifest_text(strip=False,
731                                                                         normalize=False,
732                                                                         only_committed=True)
733                     # Update cache
734                     with self._state_lock:
735                         self._state['manifest'] = manifest
736             if self.use_cache:
737                 try:
738                     self._save_state()
739                 except Exception as e:
740                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
741             # Keep remote collection's trash_at attribute synced when using relative expire dates
742             if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
743                 try:
744                     self._api_client.collections().update(
745                         uuid=self._remote_collection.manifest_locator(),
746                         body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
747                     ).execute(num_retries=self.num_retries)
748                 except Exception as e:
749                     self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
750         else:
751             self.bytes_written = self.bytes_skipped
752         # Call the reporter, if any
753         self.report_progress()
754
755     def report_progress(self):
756         if self.reporter is not None:
757             self.reporter(self.bytes_written, self.bytes_expected)
758
759     def _write_stdin(self, filename):
760         output = self._local_collection.open(filename, 'wb')
761         self._write(sys.stdin.buffer, output)
762         output.close()
763
764     def _check_file(self, source, filename):
765         """
766         Check if this file needs to be uploaded
767         """
768         # Ignore symlinks when requested
769         if (not self.follow_links) and os.path.islink(source):
770             return
771         resume_offset = 0
772         should_upload = False
773         new_file_in_cache = False
774         # Record file path for updating the remote collection before exiting
775         self._file_paths.add(filename)
776
777         with self._state_lock:
778             # If no previous cached data on this file, store it for an eventual
779             # repeated run.
780             if source not in self._state['files']:
781                 self._state['files'][source] = {
782                     'mtime': os.path.getmtime(source),
783                     'size' : os.path.getsize(source)
784                 }
785                 new_file_in_cache = True
786             cached_file_data = self._state['files'][source]
787
788         # Check if file was already uploaded (at least partially)
789         file_in_local_collection = self._local_collection.find(filename)
790
791         # If not resuming, upload the full file.
792         if not self.resume:
793             should_upload = True
794         # New file detected from last run, upload it.
795         elif new_file_in_cache:
796             should_upload = True
797         # Local file didn't change from last run.
798         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
799             if not file_in_local_collection:
800                 # File not uploaded yet, upload it completely
801                 should_upload = True
802             elif file_in_local_collection.permission_expired():
803                 # Permission token expired, re-upload file. This will change whenever
804                 # we have a API for refreshing tokens.
805                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
806                 should_upload = True
807                 self._local_collection.remove(filename)
808             elif cached_file_data['size'] == file_in_local_collection.size():
809                 # File already there, skip it.
810                 self.bytes_skipped += cached_file_data['size']
811             elif cached_file_data['size'] > file_in_local_collection.size():
812                 # File partially uploaded, resume!
813                 resume_offset = file_in_local_collection.size()
814                 self.bytes_skipped += resume_offset
815                 should_upload = True
816             else:
817                 # Inconsistent cache, re-upload the file
818                 should_upload = True
819                 self._local_collection.remove(filename)
820                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
821         # Local file differs from cached data, re-upload it.
822         else:
823             if file_in_local_collection:
824                 self._local_collection.remove(filename)
825             should_upload = True
826
827         if should_upload:
828             try:
829                 self._files_to_upload.append((source, resume_offset, filename))
830             except ArvPutUploadIsPending:
831                 # This could happen when running on dry-mode, close cache file to
832                 # avoid locking issues.
833                 self._cache_file.close()
834                 raise
835
836     def _upload_files(self):
837         for source, resume_offset, filename in self._files_to_upload:
838             with open(source, 'rb') as source_fd:
839                 with self._state_lock:
840                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
841                     self._state['files'][source]['size'] = os.path.getsize(source)
842                 if resume_offset > 0:
843                     # Start upload where we left off
844                     output = self._local_collection.open(filename, 'ab')
845                     source_fd.seek(resume_offset)
846                 else:
847                     # Start from scratch
848                     output = self._local_collection.open(filename, 'wb')
849                 self._write(source_fd, output)
850                 output.close(flush=False)
851
852     def _write(self, source_fd, output):
853         while True:
854             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
855             if not data:
856                 break
857             output.write(data)
858
859     def _my_collection(self):
860         return self._remote_collection if self.update else self._local_collection
861
862     def _get_cache_filepath(self):
863         # Set up cache file name from input paths.
864         md5 = hashlib.md5()
865         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
866         realpaths = sorted(os.path.realpath(path) for path in self.paths)
867         md5.update(b'\0'.join([p.encode() for p in realpaths]))
868         if self.filename:
869             md5.update(self.filename.encode())
870         cache_path = Path(self.CACHE_DIR)
871         if len(cache_path.parts) == 1:
872             cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path)
873         else:
874             # Note this is a noop if cache_path is absolute, which is what we want.
875             cache_path = Path.home() / cache_path
876             cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
877         return str(cache_path / md5.hexdigest())
878
879     def _setup_state(self, update_collection):
880         """
881         Create a new cache file or load a previously existing one.
882         """
883         # Load an already existing collection for update
884         if update_collection and re.match(arvados.util.collection_uuid_pattern,
885                                           update_collection):
886             try:
887                 self._remote_collection = arvados.collection.Collection(
888                     update_collection,
889                     api_client=self._api_client,
890                     storage_classes_desired=self.storage_classes,
891                     num_retries=self.num_retries)
892             except arvados.errors.ApiError as error:
893                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
894             else:
895                 self.update = True
896         elif update_collection:
897             # Collection locator provided, but unknown format
898             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
899
900         if self.use_cache:
901             cache_filepath = self._get_cache_filepath()
902             if self.resume and os.path.exists(cache_filepath):
903                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
904                 self._cache_file = open(cache_filepath, 'a+')
905             else:
906                 # --no-resume means start with a empty cache file.
907                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
908                 self._cache_file = open(cache_filepath, 'w+')
909             self._cache_filename = self._cache_file.name
910             self._lock_file(self._cache_file)
911             self._cache_file.seek(0)
912
913         with self._state_lock:
914             if self.use_cache:
915                 try:
916                     self._state = json.load(self._cache_file)
917                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
918                         # Cache at least partially incomplete, set up new cache
919                         self._state = copy.deepcopy(self.EMPTY_STATE)
920                 except ValueError:
921                     # Cache file empty, set up new cache
922                     self._state = copy.deepcopy(self.EMPTY_STATE)
923             else:
924                 self.logger.info("No cache usage requested for this run.")
925                 # No cache file, set empty state
926                 self._state = copy.deepcopy(self.EMPTY_STATE)
927             if not self._cached_manifest_valid():
928                 if not self.batch_mode:
929                     raise ResumeCacheInvalidError()
930                 else:
931                     self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
932                     self.use_cache = False # Don't overwrite preexisting cache file.
933                     self._state = copy.deepcopy(self.EMPTY_STATE)
934             # Load the previous manifest so we can check if files were modified remotely.
935             self._local_collection = arvados.collection.Collection(
936                 self._state['manifest'],
937                 replication_desired=self.replication_desired,
938                 storage_classes_desired=self.storage_classes,
939                 put_threads=self.put_threads,
940                 api_client=self._api_client,
941                 num_retries=self.num_retries)
942
943     def _cached_manifest_valid(self):
944         """
945         Validate the oldest non-expired block signature to check if cached manifest
946         is usable: checking if the cached manifest was not created with a different
947         arvados account.
948         """
949         if self._state.get('manifest', None) is None:
950             # No cached manifest yet, all good.
951             return True
952         now = datetime.datetime.utcnow()
953         oldest_exp = None
954         oldest_loc = None
955         block_found = False
956         for m in arvados.util.keep_locator_pattern.finditer(self._state['manifest']):
957             loc = m.group(0)
958             try:
959                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
960             except IndexError:
961                 # Locator without signature
962                 continue
963             block_found = True
964             if exp > now and (oldest_exp is None or exp < oldest_exp):
965                 oldest_exp = exp
966                 oldest_loc = loc
967         if not block_found:
968             # No block signatures found => no invalid block signatures.
969             return True
970         if oldest_loc is None:
971             # Locator signatures found, but all have expired.
972             # Reset the cache and move on.
973             self.logger.info('Cache expired, starting from scratch.')
974             self._state['manifest'] = ''
975             return True
976         kc = arvados.KeepClient(api_client=self._api_client,
977                                 num_retries=self.num_retries)
978         try:
979             kc.head(oldest_loc)
980         except arvados.errors.KeepRequestError:
981             # Something is wrong, cached manifest is not valid.
982             return False
983         return True
984
985     def collection_file_paths(self, col, path_prefix='.'):
986         """Return a list of file paths by recursively go through the entire collection `col`"""
987         file_paths = []
988         for name, item in col.items():
989             if isinstance(item, arvados.arvfile.ArvadosFile):
990                 file_paths.append(os.path.join(path_prefix, name))
991             elif isinstance(item, arvados.collection.Subcollection):
992                 new_prefix = os.path.join(path_prefix, name)
993                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
994         return file_paths
995
996     def _lock_file(self, fileobj):
997         try:
998             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
999         except IOError:
1000             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
1001
1002     def _save_state(self):
1003         """
1004         Atomically save current state into cache.
1005         """
1006         with self._state_lock:
1007             # We're not using copy.deepcopy() here because it's a lot slower
1008             # than json.dumps(), and we're already needing JSON format to be
1009             # saved on disk.
1010             state = json.dumps(self._state)
1011         try:
1012             new_cache = tempfile.NamedTemporaryFile(
1013                 mode='w+',
1014                 dir=os.path.dirname(self._cache_filename), delete=False)
1015             self._lock_file(new_cache)
1016             new_cache.write(state)
1017             new_cache.flush()
1018             os.fsync(new_cache)
1019             os.rename(new_cache.name, self._cache_filename)
1020         except (IOError, OSError, ResumeCacheConflict) as error:
1021             self.logger.error("There was a problem while saving the cache file: {}".format(error))
1022             try:
1023                 os.unlink(new_cache_name)
1024             except NameError:  # mkstemp failed.
1025                 pass
1026         else:
1027             self._cache_file.close()
1028             self._cache_file = new_cache
1029
1030     def collection_name(self):
1031         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1032
1033     def collection_trash_at(self):
1034         return self._my_collection().get_trash_at()
1035
1036     def manifest_locator(self):
1037         return self._my_collection().manifest_locator()
1038
1039     def portable_data_hash(self):
1040         pdh = self._my_collection().portable_data_hash()
1041         m = self._my_collection().stripped_manifest().encode()
1042         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1043         if pdh != local_pdh:
1044             self.logger.warning("\n".join([
1045                 "arv-put: API server provided PDH differs from local manifest.",
1046                 "         This should not happen; showing API server version."]))
1047         return pdh
1048
1049     def manifest_text(self, stream_name=".", strip=False, normalize=False):
1050         return self._my_collection().manifest_text(stream_name, strip, normalize)
1051
1052     def _datablocks_on_item(self, item):
1053         """
1054         Return a list of datablock locators, recursively navigating
1055         through subcollections
1056         """
1057         if isinstance(item, arvados.arvfile.ArvadosFile):
1058             if item.size() == 0:
1059                 # Empty file locator
1060                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1061             else:
1062                 locators = []
1063                 for segment in item.segments():
1064                     loc = segment.locator
1065                     locators.append(loc)
1066                 return locators
1067         elif isinstance(item, arvados.collection.Collection):
1068             l = [self._datablocks_on_item(x) for x in item.values()]
1069             # Fast list flattener method taken from:
1070             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1071             return [loc for sublist in l for loc in sublist]
1072         else:
1073             return None
1074
1075     def data_locators(self):
1076         with self._collection_lock:
1077             # Make sure all datablocks are flushed before getting the locators
1078             self._my_collection().manifest_text()
1079             datablocks = self._datablocks_on_item(self._my_collection())
1080         return datablocks
1081
1082 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1083                                                             os.getpid())
1084
1085 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1086 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1087 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1088 # so instead we're using it on every path component.
1089 def pathname_match(pathname, pattern):
1090     name = pathname.split(os.sep)
1091     # Fix patterns like 'some/subdir/' or 'some//subdir'
1092     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1093     if len(name) != len(pat):
1094         return False
1095     for i in range(len(name)):
1096         if not fnmatch.fnmatch(name[i], pat[i]):
1097             return False
1098     return True
1099
1100 def machine_progress(bytes_written, bytes_expected):
1101     return _machine_format.format(
1102         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1103
1104 def human_progress(bytes_written, bytes_expected):
1105     if bytes_expected:
1106         return "\r{}M / {}M {:.1%} ".format(
1107             bytes_written >> 20, bytes_expected >> 20,
1108             float(bytes_written) / bytes_expected)
1109     else:
1110         return "\r{} ".format(bytes_written)
1111
1112 def progress_writer(progress_func, outfile=sys.stderr):
1113     def write_progress(bytes_written, bytes_expected):
1114         outfile.write(progress_func(bytes_written, bytes_expected))
1115     return write_progress
1116
1117 def desired_project_uuid(api_client, project_uuid, num_retries):
1118     if not project_uuid:
1119         query = api_client.users().current()
1120     elif arvados.util.user_uuid_pattern.match(project_uuid):
1121         query = api_client.users().get(uuid=project_uuid)
1122     elif arvados.util.group_uuid_pattern.match(project_uuid):
1123         query = api_client.groups().get(uuid=project_uuid)
1124     else:
1125         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1126     return query.execute(num_retries=num_retries)['uuid']
1127
1128 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1129          install_sig_handlers=True):
1130     global api_client
1131
1132     args = parse_arguments(arguments)
1133     logger = logging.getLogger('arvados.arv_put')
1134     if args.silent:
1135         logger.setLevel(logging.WARNING)
1136     else:
1137         logger.setLevel(logging.INFO)
1138     status = 0
1139
1140     request_id = arvados.util.new_request_id()
1141
1142     formatter = ArvPutLogFormatter(request_id)
1143     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1144
1145     if api_client is None:
1146         api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1147
1148     if install_sig_handlers:
1149         arv_cmd.install_signal_handlers()
1150
1151     # Trash arguments validation
1152     trash_at = None
1153     if args.trash_at is not None:
1154         # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1155         # make sure the user provides a complete YYYY-MM-DD date.
1156         if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1157             logger.error("--trash-at argument format invalid, use --help to see examples.")
1158             sys.exit(1)
1159         # Check if no time information was provided. In that case, assume end-of-day.
1160         if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1161             args.trash_at += 'T23:59:59'
1162         try:
1163             trash_at = ciso8601.parse_datetime(args.trash_at)
1164         except:
1165             logger.error("--trash-at argument format invalid, use --help to see examples.")
1166             sys.exit(1)
1167         else:
1168             if trash_at.tzinfo is not None:
1169                 # Timezone aware datetime provided.
1170                 utcoffset = -trash_at.utcoffset()
1171             else:
1172                 # Timezone naive datetime provided. Assume is local.
1173                 if time.daylight:
1174                     utcoffset = datetime.timedelta(seconds=time.altzone)
1175                 else:
1176                     utcoffset = datetime.timedelta(seconds=time.timezone)
1177             # Convert to UTC timezone naive datetime.
1178             trash_at = trash_at.replace(tzinfo=None) + utcoffset
1179
1180         if trash_at <= datetime.datetime.utcnow():
1181             logger.error("--trash-at argument must be set in the future")
1182             sys.exit(1)
1183     if args.trash_after is not None:
1184         if args.trash_after < 1:
1185             logger.error("--trash-after argument must be >= 1")
1186             sys.exit(1)
1187         trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1188
1189     # Determine the name to use
1190     if args.name:
1191         if args.stream or args.raw:
1192             logger.error("Cannot use --name with --stream or --raw")
1193             sys.exit(1)
1194         elif args.update_collection:
1195             logger.error("Cannot use --name with --update-collection")
1196             sys.exit(1)
1197         collection_name = args.name
1198     else:
1199         collection_name = "Saved at {} by {}@{}".format(
1200             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1201             pwd.getpwuid(os.getuid()).pw_name,
1202             socket.gethostname())
1203
1204     if args.project_uuid and (args.stream or args.raw):
1205         logger.error("Cannot use --project-uuid with --stream or --raw")
1206         sys.exit(1)
1207
1208     # Determine the parent project
1209     try:
1210         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1211                                             args.retries)
1212     except (apiclient_errors.Error, ValueError) as error:
1213         logger.error(error)
1214         sys.exit(1)
1215
1216     if args.progress:
1217         reporter = progress_writer(human_progress)
1218     elif args.batch_progress:
1219         reporter = progress_writer(machine_progress)
1220     else:
1221         reporter = None
1222
1223     #  Split storage-classes argument
1224     storage_classes = None
1225     if args.storage_classes:
1226         storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1227
1228     # Setup exclude regex from all the --exclude arguments provided
1229     name_patterns = []
1230     exclude_paths = []
1231     exclude_names = None
1232     if len(args.exclude) > 0:
1233         # We're supporting 2 kinds of exclusion patterns:
1234         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1235         #                            the name, wherever the file is on the tree)
1236         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1237         #                            entire path, and should be relative to
1238         #                            any input dir argument)
1239         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1240         #                            placed directly underneath the input dir)
1241         for p in args.exclude:
1242             # Only relative paths patterns allowed
1243             if p.startswith(os.sep):
1244                 logger.error("Cannot use absolute paths with --exclude")
1245                 sys.exit(1)
1246             if os.path.dirname(p):
1247                 # We don't support of path patterns with '..'
1248                 p_parts = p.split(os.sep)
1249                 if '..' in p_parts:
1250                     logger.error(
1251                         "Cannot use path patterns that include or '..'")
1252                     sys.exit(1)
1253                 # Path search pattern
1254                 exclude_paths.append(p)
1255             else:
1256                 # Name-only search pattern
1257                 name_patterns.append(p)
1258         # For name only matching, we can combine all patterns into a single
1259         # regexp, for better performance.
1260         exclude_names = re.compile('|'.join(
1261             [fnmatch.translate(p) for p in name_patterns]
1262         )) if len(name_patterns) > 0 else None
1263         # Show the user the patterns to be used, just in case they weren't
1264         # specified inside quotes and got changed by the shell expansion.
1265         logger.info("Exclude patterns: {}".format(args.exclude))
1266
1267     # If this is used by a human, and there's at least one directory to be
1268     # uploaded, the expected bytes calculation can take a moment.
1269     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1270         logger.info("Calculating upload size, this could take some time...")
1271     try:
1272         writer = ArvPutUploadJob(paths = args.paths,
1273                                  resume = args.resume,
1274                                  use_cache = args.use_cache,
1275                                  batch_mode= args.batch,
1276                                  filename = args.filename,
1277                                  reporter = reporter,
1278                                  api_client = api_client,
1279                                  num_retries = args.retries,
1280                                  replication_desired = args.replication,
1281                                  put_threads = args.threads,
1282                                  name = collection_name,
1283                                  owner_uuid = project_uuid,
1284                                  ensure_unique_name = True,
1285                                  update_collection = args.update_collection,
1286                                  storage_classes=storage_classes,
1287                                  logger=logger,
1288                                  dry_run=args.dry_run,
1289                                  follow_links=args.follow_links,
1290                                  exclude_paths=exclude_paths,
1291                                  exclude_names=exclude_names,
1292                                  trash_at=trash_at)
1293     except ResumeCacheConflict:
1294         logger.error("\n".join([
1295             "arv-put: Another process is already uploading this data.",
1296             "         Use --no-cache if this is really what you want."]))
1297         sys.exit(1)
1298     except ResumeCacheInvalidError:
1299         logger.error("\n".join([
1300             "arv-put: Resume cache contains invalid signature: it may have expired",
1301             "         or been created with another Arvados user's credentials.",
1302             "         Switch user or use one of the following options to restart upload:",
1303             "         --no-resume to start a new resume cache.",
1304             "         --no-cache to disable resume cache.",
1305             "         --batch to ignore the resume cache if invalid."]))
1306         sys.exit(1)
1307     except (CollectionUpdateError, PathDoesNotExistError) as error:
1308         logger.error("\n".join([
1309             "arv-put: %s" % str(error)]))
1310         sys.exit(1)
1311     except ArvPutUploadIsPending:
1312         # Dry run check successful, return proper exit code.
1313         sys.exit(2)
1314     except ArvPutUploadNotPending:
1315         # No files pending for upload
1316         sys.exit(0)
1317
1318     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1319         logger.warning("\n".join([
1320             "arv-put: Resuming previous upload from last checkpoint.",
1321             "         Use the --no-resume option to start over."]))
1322
1323     if not args.dry_run:
1324         writer.report_progress()
1325     output = None
1326     try:
1327         writer.start(save_collection=not(args.stream or args.raw))
1328     except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1329         logger.error("\n".join([
1330             "arv-put: %s" % str(error)]))
1331         sys.exit(1)
1332
1333     if args.progress:  # Print newline to split stderr from stdout for humans.
1334         logger.info("\n")
1335
1336     if args.stream:
1337         if args.normalize:
1338             output = writer.manifest_text(normalize=True)
1339         else:
1340             output = writer.manifest_text()
1341     elif args.raw:
1342         output = ','.join(writer.data_locators())
1343     elif writer.manifest_locator() is not None:
1344         try:
1345             expiration_notice = ""
1346             if writer.collection_trash_at() is not None:
1347                 # Get the local timezone-naive version, and log it with timezone information.
1348                 if time.daylight:
1349                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1350                 else:
1351                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1352                 expiration_notice = ". It will expire on {} {}.".format(
1353                     local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1354             if args.update_collection:
1355                 logger.info(u"Collection updated: '{}'{}".format(
1356                     writer.collection_name(), expiration_notice))
1357             else:
1358                 logger.info(u"Collection saved as '{}'{}".format(
1359                     writer.collection_name(), expiration_notice))
1360             if args.portable_data_hash:
1361                 output = writer.portable_data_hash()
1362             else:
1363                 output = writer.manifest_locator()
1364         except apiclient_errors.Error as error:
1365             logger.error(
1366                 "arv-put: Error creating Collection on project: {}.".format(
1367                     error))
1368             status = 1
1369     else:
1370         status = 1
1371
1372     # Print the locator (uuid) of the new collection.
1373     if output is None:
1374         status = status or 1
1375     elif not args.silent:
1376         stdout.write(output)
1377         if not output.endswith('\n'):
1378             stdout.write('\n')
1379
1380     if install_sig_handlers:
1381         arv_cmd.restore_signal_handlers()
1382
1383     if status != 0:
1384         sys.exit(status)
1385
1386     # Success!
1387     return output
1388
1389
1390 if __name__ == '__main__':
1391     main()