02af01a7e4eb52fb310e52003119d4d799ca14c1
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import ciso8601
14 import copy
15 import datetime
16 import errno
17 import fcntl
18 import fnmatch
19 import hashlib
20 import json
21 import logging
22 import os
23 import pwd
24 import re
25 import signal
26 import socket
27 import sys
28 import tempfile
29 import threading
30 import time
31 import traceback
32
33 from apiclient import errors as apiclient_errors
34 from arvados._version import __version__
35 from arvados.util import keep_locator_pattern
36
37 import arvados.commands._util as arv_cmd
38
39 api_client = None
40
41 upload_opts = argparse.ArgumentParser(add_help=False)
42
43 upload_opts.add_argument('--version', action='version',
44                          version="%s %s" % (sys.argv[0], __version__),
45                          help='Print version and exit.')
46 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47                          help="""
48 Local file or directory. If path is a directory reference with a trailing
49 slash, then just upload the directory's contents; otherwise upload the
50 directory itself. Default: read from standard input.
51 """)
52
53 _group = upload_opts.add_mutually_exclusive_group()
54
55 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
56                     default=-1, help=argparse.SUPPRESS)
57
58 _group.add_argument('--normalize', action='store_true',
59                     help="""
60 Normalize the manifest by re-ordering files and streams after writing
61 data.
62 """)
63
64 _group.add_argument('--dry-run', action='store_true', default=False,
65                     help="""
66 Don't actually upload files, but only check if any file should be
67 uploaded. Exit with code=2 when files are pending for upload.
68 """)
69
70 _group = upload_opts.add_mutually_exclusive_group()
71
72 _group.add_argument('--as-stream', action='store_true', dest='stream',
73                     help="""
74 Synonym for --stream.
75 """)
76
77 _group.add_argument('--stream', action='store_true',
78                     help="""
79 Store the file content and display the resulting manifest on
80 stdout. Do not save a Collection object in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
146 """)
147
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
149                          help="""
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
152 to use 2 threads.
153 On high latency installations, using a greater number will improve
154 overall throughput.
155 """)
156
157 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
158                       action='append', help="""
159 Exclude files and directories whose names match the given glob pattern. When
160 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
161 directory, relative to the provided input dirs will be excluded.
162 When using a filename pattern like '*.txt', any text file will be excluded
163 no matter where it is placed.
164 For the special case of needing to exclude only files or dirs directly below
165 the given input directory, you can use a pattern like './exclude_this.gif'.
166 You can specify multiple patterns by using this argument more than once.
167 """)
168
169 _group = upload_opts.add_mutually_exclusive_group()
170 _group.add_argument('--follow-links', action='store_true', default=True,
171                     dest='follow_links', help="""
172 Follow file and directory symlinks (default).
173 """)
174 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
175                     help="""
176 Ignore file and directory symlinks. Even paths given explicitly on the
177 command line will be skipped if they are symlinks.
178 """)
179
180
181 run_opts = argparse.ArgumentParser(add_help=False)
182
183 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
184 Store the collection in the specified project, instead of your Home
185 project.
186 """)
187
188 run_opts.add_argument('--name', help="""
189 Save the collection with the specified name.
190 """)
191
192 _group = run_opts.add_mutually_exclusive_group()
193 _group.add_argument('--progress', action='store_true',
194                     help="""
195 Display human-readable progress on stderr (bytes and, if possible,
196 percentage of total data size). This is the default behavior when
197 stderr is a tty.
198 """)
199
200 _group.add_argument('--no-progress', action='store_true',
201                     help="""
202 Do not display human-readable progress on stderr, even if stderr is a
203 tty.
204 """)
205
206 _group.add_argument('--batch-progress', action='store_true',
207                     help="""
208 Display machine-readable progress on stderr (bytes and, if known,
209 total data size).
210 """)
211
212 run_opts.add_argument('--silent', action='store_true',
213                       help="""
214 Do not print any debug messages to console. (Any error messages will
215 still be displayed.)
216 """)
217
218 run_opts.add_argument('--batch', action='store_true', default=False,
219                       help="""
220 Retries with '--no-resume --no-cache' if cached state contains invalid/expired
221 block signatures.
222 """)
223
224 _group = run_opts.add_mutually_exclusive_group()
225 _group.add_argument('--resume', action='store_true', default=True,
226                     help="""
227 Continue interrupted uploads from cached state (default).
228 """)
229 _group.add_argument('--no-resume', action='store_false', dest='resume',
230                     help="""
231 Do not continue interrupted uploads from cached state.
232 """)
233
234 _group = run_opts.add_mutually_exclusive_group()
235 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
236                     help="""
237 Save upload state in a cache file for resuming (default).
238 """)
239 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
240                     help="""
241 Do not save upload state in a cache file for resuming.
242 """)
243
244 _group = upload_opts.add_mutually_exclusive_group()
245 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
246                     help="""
247 Set the trash date of the resulting collection to an absolute date in the future.
248 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
249 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
250 """)
251 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
252                     help="""
253 Set the trash date of the resulting collection to an amount of days from the
254 date/time that the upload process finishes.
255 """)
256
257 arg_parser = argparse.ArgumentParser(
258     description='Copy data from the local filesystem to Keep.',
259     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
260
261 def parse_arguments(arguments):
262     args = arg_parser.parse_args(arguments)
263
264     if len(args.paths) == 0:
265         args.paths = ['-']
266
267     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
268
269     if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
270         arg_parser.error("""
271     --filename argument cannot be used when storing a directory or
272     multiple files.
273     """)
274
275     # Turn on --progress by default if stderr is a tty.
276     if (not (args.batch_progress or args.no_progress or args.silent)
277         and os.isatty(sys.stderr.fileno())):
278         args.progress = True
279
280     # Turn off --resume (default) if --no-cache is used.
281     if not args.use_cache:
282         args.resume = False
283
284     if args.paths == ['-']:
285         if args.update_collection:
286             arg_parser.error("""
287     --update-collection cannot be used when reading from stdin.
288     """)
289         args.resume = False
290         args.use_cache = False
291         if not args.filename:
292             args.filename = 'stdin'
293
294     # Remove possible duplicated patterns
295     if len(args.exclude) > 0:
296         args.exclude = list(set(args.exclude))
297
298     return args
299
300
301 class PathDoesNotExistError(Exception):
302     pass
303
304
305 class CollectionUpdateError(Exception):
306     pass
307
308
309 class ResumeCacheConflict(Exception):
310     pass
311
312
313 class ResumeCacheInvalidError(Exception):
314     pass
315
316 class ArvPutArgumentConflict(Exception):
317     pass
318
319
320 class ArvPutUploadIsPending(Exception):
321     pass
322
323
324 class ArvPutUploadNotPending(Exception):
325     pass
326
327
328 class FileUploadList(list):
329     def __init__(self, dry_run=False):
330         list.__init__(self)
331         self.dry_run = dry_run
332
333     def append(self, other):
334         if self.dry_run:
335             raise ArvPutUploadIsPending()
336         super(FileUploadList, self).append(other)
337
338
339 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
340 class ArvPutLogFormatter(logging.Formatter):
341     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
342     err_fmtr = None
343     request_id_informed = False
344
345     def __init__(self, request_id):
346         self.err_fmtr = logging.Formatter(
347             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
348             arvados.log_date_format)
349
350     def format(self, record):
351         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
352             self.request_id_informed = True
353             return self.err_fmtr.format(record)
354         return self.std_fmtr.format(record)
355
356
357 class ResumeCache(object):
358     CACHE_DIR = '.cache/arvados/arv-put'
359
360     def __init__(self, file_spec):
361         self.cache_file = open(file_spec, 'a+')
362         self._lock_file(self.cache_file)
363         self.filename = self.cache_file.name
364
365     @classmethod
366     def make_path(cls, args):
367         md5 = hashlib.md5()
368         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
369         realpaths = sorted(os.path.realpath(path) for path in args.paths)
370         md5.update(b'\0'.join([p.encode() for p in realpaths]))
371         if any(os.path.isdir(path) for path in realpaths):
372             md5.update(b'-1')
373         elif args.filename:
374             md5.update(args.filename.encode())
375         return os.path.join(
376             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
377             md5.hexdigest())
378
379     def _lock_file(self, fileobj):
380         try:
381             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
382         except IOError:
383             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
384
385     def load(self):
386         self.cache_file.seek(0)
387         return json.load(self.cache_file)
388
389     def check_cache(self, api_client=None, num_retries=0):
390         try:
391             state = self.load()
392             locator = None
393             try:
394                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
395                     locator = state["_finished_streams"][0][1][0]
396                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
397                     locator = state["_current_stream_locators"][0]
398                 if locator is not None:
399                     kc = arvados.keep.KeepClient(api_client=api_client)
400                     kc.head(locator, num_retries=num_retries)
401             except Exception as e:
402                 self.restart()
403         except (ValueError):
404             pass
405
406     def save(self, data):
407         try:
408             new_cache_fd, new_cache_name = tempfile.mkstemp(
409                 dir=os.path.dirname(self.filename))
410             self._lock_file(new_cache_fd)
411             new_cache = os.fdopen(new_cache_fd, 'r+')
412             json.dump(data, new_cache)
413             os.rename(new_cache_name, self.filename)
414         except (IOError, OSError, ResumeCacheConflict):
415             try:
416                 os.unlink(new_cache_name)
417             except NameError:  # mkstemp failed.
418                 pass
419         else:
420             self.cache_file.close()
421             self.cache_file = new_cache
422
423     def close(self):
424         self.cache_file.close()
425
426     def destroy(self):
427         try:
428             os.unlink(self.filename)
429         except OSError as error:
430             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
431                 raise
432         self.close()
433
434     def restart(self):
435         self.destroy()
436         self.__init__(self.filename)
437
438
439 class ArvPutUploadJob(object):
440     CACHE_DIR = '.cache/arvados/arv-put'
441     EMPTY_STATE = {
442         'manifest' : None, # Last saved manifest checkpoint
443         'files' : {} # Previous run file list: {path : {size, mtime}}
444     }
445
446     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
447                  name=None, owner_uuid=None, api_client=None,
448                  ensure_unique_name=False, num_retries=None,
449                  put_threads=None, replication_desired=None, filename=None,
450                  update_time=60.0, update_collection=None, storage_classes=None,
451                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
452                  follow_links=True, exclude_paths=[], exclude_names=None,
453                  trash_at=None):
454         self.paths = paths
455         self.resume = resume
456         self.use_cache = use_cache
457         self.update = False
458         self.reporter = reporter
459         # This will set to 0 before start counting, if no special files are going
460         # to be read.
461         self.bytes_expected = None
462         self.bytes_written = 0
463         self.bytes_skipped = 0
464         self.name = name
465         self.owner_uuid = owner_uuid
466         self.ensure_unique_name = ensure_unique_name
467         self.num_retries = num_retries
468         self.replication_desired = replication_desired
469         self.put_threads = put_threads
470         self.filename = filename
471         self.storage_classes = storage_classes
472         self._api_client = api_client
473         self._state_lock = threading.Lock()
474         self._state = None # Previous run state (file list & manifest)
475         self._current_files = [] # Current run file list
476         self._cache_file = None
477         self._collection_lock = threading.Lock()
478         self._remote_collection = None # Collection being updated (if asked)
479         self._local_collection = None # Collection from previous run manifest
480         self._file_paths = set() # Files to be updated in remote collection
481         self._stop_checkpointer = threading.Event()
482         self._checkpointer = threading.Thread(target=self._update_task)
483         self._checkpointer.daemon = True
484         self._update_task_time = update_time  # How many seconds wait between update runs
485         self._files_to_upload = FileUploadList(dry_run=dry_run)
486         self._upload_started = False
487         self.logger = logger
488         self.dry_run = dry_run
489         self._checkpoint_before_quit = True
490         self.follow_links = follow_links
491         self.exclude_paths = exclude_paths
492         self.exclude_names = exclude_names
493         self._trash_at = trash_at
494
495         if self._trash_at is not None:
496             if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
497                 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
498             if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
499                 raise TypeError('provided trash_at datetime should be timezone-naive')
500
501         if not self.use_cache and self.resume:
502             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
503
504         # Check for obvious dry-run responses
505         if self.dry_run and (not self.use_cache or not self.resume):
506             raise ArvPutUploadIsPending()
507
508         # Load cached data if any and if needed
509         self._setup_state(update_collection)
510
511         # Build the upload file list, excluding requested files and counting the
512         # bytes expected to be uploaded.
513         self._build_upload_list()
514
515     def _build_upload_list(self):
516         """
517         Scan the requested paths to count file sizes, excluding requested files
518         and dirs and building the upload file list.
519         """
520         # If there aren't special files to be read, reset total bytes count to zero
521         # to start counting.
522         if not any([p for p in self.paths
523                     if not (os.path.isfile(p) or os.path.isdir(p))]):
524             self.bytes_expected = 0
525
526         for path in self.paths:
527             # Test for stdin first, in case some file named '-' exist
528             if path == '-':
529                 if self.dry_run:
530                     raise ArvPutUploadIsPending()
531                 self._write_stdin(self.filename or 'stdin')
532             elif not os.path.exists(path):
533                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
534             elif (not self.follow_links) and os.path.islink(path):
535                 self.logger.warning("Skipping symlink '{}'".format(path))
536                 continue
537             elif os.path.isdir(path):
538                 # Use absolute paths on cache index so CWD doesn't interfere
539                 # with the caching logic.
540                 orig_path = path
541                 path = os.path.abspath(path)
542                 if orig_path[-1:] == os.sep:
543                     # When passing a directory reference with a trailing slash,
544                     # its contents should be uploaded directly to the
545                     # collection's root.
546                     prefixdir = path
547                 else:
548                     # When passing a directory reference with no trailing slash,
549                     # upload the directory to the collection's root.
550                     prefixdir = os.path.dirname(path)
551                 prefixdir += os.sep
552                 for root, dirs, files in os.walk(path,
553                                                  followlinks=self.follow_links):
554                     root_relpath = os.path.relpath(root, path)
555                     if root_relpath == '.':
556                         root_relpath = ''
557                     # Exclude files/dirs by full path matching pattern
558                     if self.exclude_paths:
559                         dirs[:] = [d for d in dirs
560                                    if not any(pathname_match(
561                                            os.path.join(root_relpath, d), pat)
562                                               for pat in self.exclude_paths)]
563                         files = [f for f in files
564                                  if not any(pathname_match(
565                                          os.path.join(root_relpath, f), pat)
566                                             for pat in self.exclude_paths)]
567                     # Exclude files/dirs by name matching pattern
568                     if self.exclude_names is not None:
569                         dirs[:] = [d for d in dirs
570                                    if not self.exclude_names.match(d)]
571                         files = [f for f in files
572                                  if not self.exclude_names.match(f)]
573                     # Make os.walk()'s dir traversing order deterministic
574                     dirs.sort()
575                     files.sort()
576                     for f in files:
577                         filepath = os.path.join(root, f)
578                         # Add its size to the total bytes count (if applicable)
579                         if self.follow_links or (not os.path.islink(filepath)):
580                             if self.bytes_expected is not None:
581                                 self.bytes_expected += os.path.getsize(filepath)
582                         self._check_file(filepath,
583                                          os.path.join(root[len(prefixdir):], f))
584             else:
585                 filepath = os.path.abspath(path)
586                 # Add its size to the total bytes count (if applicable)
587                 if self.follow_links or (not os.path.islink(filepath)):
588                     if self.bytes_expected is not None:
589                         self.bytes_expected += os.path.getsize(filepath)
590                 self._check_file(filepath,
591                                  self.filename or os.path.basename(path))
592         # If dry-mode is on, and got up to this point, then we should notify that
593         # there aren't any file to upload.
594         if self.dry_run:
595             raise ArvPutUploadNotPending()
596         # Remove local_collection's files that don't exist locally anymore, so the
597         # bytes_written count is correct.
598         for f in self.collection_file_paths(self._local_collection,
599                                             path_prefix=""):
600             if f != 'stdin' and f != self.filename and not f in self._file_paths:
601                 self._local_collection.remove(f)
602
603     def start(self, save_collection):
604         """
605         Start supporting thread & file uploading
606         """
607         self._checkpointer.start()
608         try:
609             # Update bytes_written from current local collection and
610             # report initial progress.
611             self._update()
612             # Actual file upload
613             self._upload_started = True # Used by the update thread to start checkpointing
614             self._upload_files()
615         except (SystemExit, Exception) as e:
616             self._checkpoint_before_quit = False
617             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
618             # Note: We're expecting SystemExit instead of
619             # KeyboardInterrupt because we have a custom signal
620             # handler in place that raises SystemExit with the catched
621             # signal's code.
622             if isinstance(e, PathDoesNotExistError):
623                 # We aren't interested in the traceback for this case
624                 pass
625             elif not isinstance(e, SystemExit) or e.code != -2:
626                 self.logger.warning("Abnormal termination:\n{}".format(
627                     traceback.format_exc()))
628             raise
629         finally:
630             if not self.dry_run:
631                 # Stop the thread before doing anything else
632                 self._stop_checkpointer.set()
633                 self._checkpointer.join()
634                 if self._checkpoint_before_quit:
635                     # Commit all pending blocks & one last _update()
636                     self._local_collection.manifest_text()
637                     self._update(final=True)
638                     if save_collection:
639                         self.save_collection()
640             if self.use_cache:
641                 self._cache_file.close()
642
643     def _collection_trash_at(self):
644         """
645         Returns the trash date that the collection should use at save time.
646         Takes into account absolute/relative trash_at values requested
647         by the user.
648         """
649         if type(self._trash_at) == datetime.timedelta:
650             # Get an absolute datetime for trash_at
651             return datetime.datetime.utcnow() + self._trash_at
652         return self._trash_at
653
654     def save_collection(self):
655         if self.update:
656             # Check if files should be updated on the remote collection.
657             for fp in self._file_paths:
658                 remote_file = self._remote_collection.find(fp)
659                 if not remote_file:
660                     # File don't exist on remote collection, copy it.
661                     self._remote_collection.copy(fp, fp, self._local_collection)
662                 elif remote_file != self._local_collection.find(fp):
663                     # A different file exist on remote collection, overwrite it.
664                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
665                 else:
666                     # The file already exist on remote collection, skip it.
667                     pass
668             self._remote_collection.save(num_retries=self.num_retries,
669                                          trash_at=self._collection_trash_at())
670         else:
671             if len(self._local_collection) == 0:
672                 self.logger.warning("No files were uploaded, skipping collection creation.")
673                 return
674             self._local_collection.save_new(
675                 name=self.name, owner_uuid=self.owner_uuid,
676                 ensure_unique_name=self.ensure_unique_name,
677                 num_retries=self.num_retries,
678                 trash_at=self._collection_trash_at())
679
680     def destroy_cache(self):
681         if self.use_cache:
682             try:
683                 os.unlink(self._cache_filename)
684             except OSError as error:
685                 # That's what we wanted anyway.
686                 if error.errno != errno.ENOENT:
687                     raise
688             self._cache_file.close()
689
690     def _collection_size(self, collection):
691         """
692         Recursively get the total size of the collection
693         """
694         size = 0
695         for item in listvalues(collection):
696             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
697                 size += self._collection_size(item)
698             else:
699                 size += item.size()
700         return size
701
702     def _update_task(self):
703         """
704         Periodically called support task. File uploading is
705         asynchronous so we poll status from the collection.
706         """
707         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
708             self._update()
709
710     def _update(self, final=False):
711         """
712         Update cached manifest text and report progress.
713         """
714         if self._upload_started:
715             with self._collection_lock:
716                 self.bytes_written = self._collection_size(self._local_collection)
717                 if self.use_cache:
718                     if final:
719                         manifest = self._local_collection.manifest_text()
720                     else:
721                         # Get the manifest text without comitting pending blocks
722                         manifest = self._local_collection.manifest_text(strip=False,
723                                                                         normalize=False,
724                                                                         only_committed=True)
725                     # Update cache
726                     with self._state_lock:
727                         self._state['manifest'] = manifest
728             if self.use_cache:
729                 try:
730                     self._save_state()
731                 except Exception as e:
732                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
733             # Keep remote collection's trash_at attribute synced when using relative expire dates
734             if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
735                 try:
736                     self._api_client.collections().update(
737                         uuid=self._remote_collection.manifest_locator(),
738                         body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
739                     ).execute(num_retries=self.num_retries)
740                 except Exception as e:
741                     self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
742         else:
743             self.bytes_written = self.bytes_skipped
744         # Call the reporter, if any
745         self.report_progress()
746
747     def report_progress(self):
748         if self.reporter is not None:
749             self.reporter(self.bytes_written, self.bytes_expected)
750
751     def _write_stdin(self, filename):
752         output = self._local_collection.open(filename, 'wb')
753         self._write(sys.stdin.buffer, output)
754         output.close()
755
756     def _check_file(self, source, filename):
757         """
758         Check if this file needs to be uploaded
759         """
760         # Ignore symlinks when requested
761         if (not self.follow_links) and os.path.islink(source):
762             return
763         resume_offset = 0
764         should_upload = False
765         new_file_in_cache = False
766         # Record file path for updating the remote collection before exiting
767         self._file_paths.add(filename)
768
769         with self._state_lock:
770             # If no previous cached data on this file, store it for an eventual
771             # repeated run.
772             if source not in self._state['files']:
773                 self._state['files'][source] = {
774                     'mtime': os.path.getmtime(source),
775                     'size' : os.path.getsize(source)
776                 }
777                 new_file_in_cache = True
778             cached_file_data = self._state['files'][source]
779
780         # Check if file was already uploaded (at least partially)
781         file_in_local_collection = self._local_collection.find(filename)
782
783         # If not resuming, upload the full file.
784         if not self.resume:
785             should_upload = True
786         # New file detected from last run, upload it.
787         elif new_file_in_cache:
788             should_upload = True
789         # Local file didn't change from last run.
790         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
791             if not file_in_local_collection:
792                 # File not uploaded yet, upload it completely
793                 should_upload = True
794             elif file_in_local_collection.permission_expired():
795                 # Permission token expired, re-upload file. This will change whenever
796                 # we have a API for refreshing tokens.
797                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
798                 should_upload = True
799                 self._local_collection.remove(filename)
800             elif cached_file_data['size'] == file_in_local_collection.size():
801                 # File already there, skip it.
802                 self.bytes_skipped += cached_file_data['size']
803             elif cached_file_data['size'] > file_in_local_collection.size():
804                 # File partially uploaded, resume!
805                 resume_offset = file_in_local_collection.size()
806                 self.bytes_skipped += resume_offset
807                 should_upload = True
808             else:
809                 # Inconsistent cache, re-upload the file
810                 should_upload = True
811                 self._local_collection.remove(filename)
812                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
813         # Local file differs from cached data, re-upload it.
814         else:
815             if file_in_local_collection:
816                 self._local_collection.remove(filename)
817             should_upload = True
818
819         if should_upload:
820             try:
821                 self._files_to_upload.append((source, resume_offset, filename))
822             except ArvPutUploadIsPending:
823                 # This could happen when running on dry-mode, close cache file to
824                 # avoid locking issues.
825                 self._cache_file.close()
826                 raise
827
828     def _upload_files(self):
829         for source, resume_offset, filename in self._files_to_upload:
830             with open(source, 'rb') as source_fd:
831                 with self._state_lock:
832                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
833                     self._state['files'][source]['size'] = os.path.getsize(source)
834                 if resume_offset > 0:
835                     # Start upload where we left off
836                     output = self._local_collection.open(filename, 'ab')
837                     source_fd.seek(resume_offset)
838                 else:
839                     # Start from scratch
840                     output = self._local_collection.open(filename, 'wb')
841                 self._write(source_fd, output)
842                 output.close(flush=False)
843
844     def _write(self, source_fd, output):
845         while True:
846             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
847             if not data:
848                 break
849             output.write(data)
850
851     def _my_collection(self):
852         return self._remote_collection if self.update else self._local_collection
853
854     def _get_cache_filepath(self):
855         # Set up cache file name from input paths.
856         md5 = hashlib.md5()
857         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
858         realpaths = sorted(os.path.realpath(path) for path in self.paths)
859         md5.update(b'\0'.join([p.encode() for p in realpaths]))
860         if self.filename:
861             md5.update(self.filename.encode())
862         cache_filename = md5.hexdigest()
863         cache_filepath = os.path.join(
864             arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
865             cache_filename)
866         return cache_filepath
867
868     def _setup_state(self, update_collection):
869         """
870         Create a new cache file or load a previously existing one.
871         """
872         # Load an already existing collection for update
873         if update_collection and re.match(arvados.util.collection_uuid_pattern,
874                                           update_collection):
875             try:
876                 self._remote_collection = arvados.collection.Collection(
877                     update_collection,
878                     api_client=self._api_client,
879                     storage_classes_desired=self.storage_classes,
880                     num_retries=self.num_retries)
881             except arvados.errors.ApiError as error:
882                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
883             else:
884                 self.update = True
885         elif update_collection:
886             # Collection locator provided, but unknown format
887             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
888
889         if self.use_cache:
890             cache_filepath = self._get_cache_filepath()
891             if self.resume and os.path.exists(cache_filepath):
892                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
893                 self._cache_file = open(cache_filepath, 'a+')
894             else:
895                 # --no-resume means start with a empty cache file.
896                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
897                 self._cache_file = open(cache_filepath, 'w+')
898             self._cache_filename = self._cache_file.name
899             self._lock_file(self._cache_file)
900             self._cache_file.seek(0)
901
902         with self._state_lock:
903             if self.use_cache:
904                 try:
905                     self._state = json.load(self._cache_file)
906                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
907                         # Cache at least partially incomplete, set up new cache
908                         self._state = copy.deepcopy(self.EMPTY_STATE)
909                 except ValueError:
910                     # Cache file empty, set up new cache
911                     self._state = copy.deepcopy(self.EMPTY_STATE)
912             else:
913                 self.logger.info("No cache usage requested for this run.")
914                 # No cache file, set empty state
915                 self._state = copy.deepcopy(self.EMPTY_STATE)
916             if not self._cached_manifest_valid():
917                 raise ResumeCacheInvalidError()
918             # Load the previous manifest so we can check if files were modified remotely.
919             self._local_collection = arvados.collection.Collection(
920                 self._state['manifest'],
921                 replication_desired=self.replication_desired,
922                 storage_classes_desired=(self.storage_classes or ['default']),
923                 put_threads=self.put_threads,
924                 api_client=self._api_client,
925                 num_retries=self.num_retries)
926
927     def _cached_manifest_valid(self):
928         """
929         Validate the oldest non-expired block signature to check if cached manifest
930         is usable: checking if the cached manifest was not created with a different
931         arvados account.
932         """
933         if self._state.get('manifest', None) is None:
934             # No cached manifest yet, all good.
935             return True
936         now = datetime.datetime.utcnow()
937         oldest_exp = None
938         oldest_loc = None
939         block_found = False
940         for m in keep_locator_pattern.finditer(self._state['manifest']):
941             loc = m.group(0)
942             try:
943                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
944             except IndexError:
945                 # Locator without signature
946                 continue
947             block_found = True
948             if exp > now and (oldest_exp is None or exp < oldest_exp):
949                 oldest_exp = exp
950                 oldest_loc = loc
951         if not block_found:
952             # No block signatures found => no invalid block signatures.
953             return True
954         if oldest_loc is None:
955             # Locator signatures found, but all have expired.
956             # Reset the cache and move on.
957             self.logger.info('Cache expired, starting from scratch.')
958             self._state['manifest'] = ''
959             return True
960         kc = arvados.KeepClient(api_client=self._api_client,
961                                 num_retries=self.num_retries)
962         try:
963             kc.head(oldest_loc)
964         except arvados.errors.KeepRequestError:
965             # Something is wrong, cached manifest is not valid.
966             return False
967         return True
968
969     def collection_file_paths(self, col, path_prefix='.'):
970         """Return a list of file paths by recursively go through the entire collection `col`"""
971         file_paths = []
972         for name, item in listitems(col):
973             if isinstance(item, arvados.arvfile.ArvadosFile):
974                 file_paths.append(os.path.join(path_prefix, name))
975             elif isinstance(item, arvados.collection.Subcollection):
976                 new_prefix = os.path.join(path_prefix, name)
977                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
978         return file_paths
979
980     def _lock_file(self, fileobj):
981         try:
982             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
983         except IOError:
984             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
985
986     def _save_state(self):
987         """
988         Atomically save current state into cache.
989         """
990         with self._state_lock:
991             # We're not using copy.deepcopy() here because it's a lot slower
992             # than json.dumps(), and we're already needing JSON format to be
993             # saved on disk.
994             state = json.dumps(self._state)
995         try:
996             new_cache = tempfile.NamedTemporaryFile(
997                 mode='w+',
998                 dir=os.path.dirname(self._cache_filename), delete=False)
999             self._lock_file(new_cache)
1000             new_cache.write(state)
1001             new_cache.flush()
1002             os.fsync(new_cache)
1003             os.rename(new_cache.name, self._cache_filename)
1004         except (IOError, OSError, ResumeCacheConflict) as error:
1005             self.logger.error("There was a problem while saving the cache file: {}".format(error))
1006             try:
1007                 os.unlink(new_cache_name)
1008             except NameError:  # mkstemp failed.
1009                 pass
1010         else:
1011             self._cache_file.close()
1012             self._cache_file = new_cache
1013
1014     def collection_name(self):
1015         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1016
1017     def collection_trash_at(self):
1018         return self._my_collection().get_trash_at()
1019
1020     def manifest_locator(self):
1021         return self._my_collection().manifest_locator()
1022
1023     def portable_data_hash(self):
1024         pdh = self._my_collection().portable_data_hash()
1025         m = self._my_collection().stripped_manifest().encode()
1026         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1027         if pdh != local_pdh:
1028             self.logger.warning("\n".join([
1029                 "arv-put: API server provided PDH differs from local manifest.",
1030                 "         This should not happen; showing API server version."]))
1031         return pdh
1032
1033     def manifest_text(self, stream_name=".", strip=False, normalize=False):
1034         return self._my_collection().manifest_text(stream_name, strip, normalize)
1035
1036     def _datablocks_on_item(self, item):
1037         """
1038         Return a list of datablock locators, recursively navigating
1039         through subcollections
1040         """
1041         if isinstance(item, arvados.arvfile.ArvadosFile):
1042             if item.size() == 0:
1043                 # Empty file locator
1044                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1045             else:
1046                 locators = []
1047                 for segment in item.segments():
1048                     loc = segment.locator
1049                     locators.append(loc)
1050                 return locators
1051         elif isinstance(item, arvados.collection.Collection):
1052             l = [self._datablocks_on_item(x) for x in listvalues(item)]
1053             # Fast list flattener method taken from:
1054             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1055             return [loc for sublist in l for loc in sublist]
1056         else:
1057             return None
1058
1059     def data_locators(self):
1060         with self._collection_lock:
1061             # Make sure all datablocks are flushed before getting the locators
1062             self._my_collection().manifest_text()
1063             datablocks = self._datablocks_on_item(self._my_collection())
1064         return datablocks
1065
1066 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1067                                                             os.getpid())
1068
1069 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1070 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1071 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1072 # so instead we're using it on every path component.
1073 def pathname_match(pathname, pattern):
1074     name = pathname.split(os.sep)
1075     # Fix patterns like 'some/subdir/' or 'some//subdir'
1076     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1077     if len(name) != len(pat):
1078         return False
1079     for i in range(len(name)):
1080         if not fnmatch.fnmatch(name[i], pat[i]):
1081             return False
1082     return True
1083
1084 def machine_progress(bytes_written, bytes_expected):
1085     return _machine_format.format(
1086         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1087
1088 def human_progress(bytes_written, bytes_expected):
1089     if bytes_expected:
1090         return "\r{}M / {}M {:.1%} ".format(
1091             bytes_written >> 20, bytes_expected >> 20,
1092             float(bytes_written) / bytes_expected)
1093     else:
1094         return "\r{} ".format(bytes_written)
1095
1096 def progress_writer(progress_func, outfile=sys.stderr):
1097     def write_progress(bytes_written, bytes_expected):
1098         outfile.write(progress_func(bytes_written, bytes_expected))
1099     return write_progress
1100
1101 def desired_project_uuid(api_client, project_uuid, num_retries):
1102     if not project_uuid:
1103         query = api_client.users().current()
1104     elif arvados.util.user_uuid_pattern.match(project_uuid):
1105         query = api_client.users().get(uuid=project_uuid)
1106     elif arvados.util.group_uuid_pattern.match(project_uuid):
1107         query = api_client.groups().get(uuid=project_uuid)
1108     else:
1109         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1110     return query.execute(num_retries=num_retries)['uuid']
1111
1112 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1113          install_sig_handlers=True):
1114     global api_client
1115
1116     args = parse_arguments(arguments)
1117     logger = logging.getLogger('arvados.arv_put')
1118     if args.silent:
1119         logger.setLevel(logging.WARNING)
1120     else:
1121         logger.setLevel(logging.INFO)
1122     status = 0
1123
1124     request_id = arvados.util.new_request_id()
1125
1126     formatter = ArvPutLogFormatter(request_id)
1127     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1128
1129     if api_client is None:
1130         api_client = arvados.api('v1', request_id=request_id)
1131
1132     if install_sig_handlers:
1133         arv_cmd.install_signal_handlers()
1134
1135     # Trash arguments validation
1136     trash_at = None
1137     if args.trash_at is not None:
1138         # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1139         # make sure the user provides a complete YYYY-MM-DD date.
1140         if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1141             logger.error("--trash-at argument format invalid, use --help to see examples.")
1142             sys.exit(1)
1143         # Check if no time information was provided. In that case, assume end-of-day.
1144         if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1145             args.trash_at += 'T23:59:59'
1146         try:
1147             trash_at = ciso8601.parse_datetime(args.trash_at)
1148         except:
1149             logger.error("--trash-at argument format invalid, use --help to see examples.")
1150             sys.exit(1)
1151         else:
1152             if trash_at.tzinfo is not None:
1153                 # Timezone aware datetime provided.
1154                 utcoffset = -trash_at.utcoffset()
1155             else:
1156                 # Timezone naive datetime provided. Assume is local.
1157                 if time.daylight:
1158                     utcoffset = datetime.timedelta(seconds=time.altzone)
1159                 else:
1160                     utcoffset = datetime.timedelta(seconds=time.timezone)
1161             # Convert to UTC timezone naive datetime.
1162             trash_at = trash_at.replace(tzinfo=None) + utcoffset
1163
1164         if trash_at <= datetime.datetime.utcnow():
1165             logger.error("--trash-at argument must be set in the future")
1166             sys.exit(1)
1167     if args.trash_after is not None:
1168         if args.trash_after < 1:
1169             logger.error("--trash-after argument must be >= 1")
1170             sys.exit(1)
1171         trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1172
1173     # Determine the name to use
1174     if args.name:
1175         if args.stream or args.raw:
1176             logger.error("Cannot use --name with --stream or --raw")
1177             sys.exit(1)
1178         elif args.update_collection:
1179             logger.error("Cannot use --name with --update-collection")
1180             sys.exit(1)
1181         collection_name = args.name
1182     else:
1183         collection_name = "Saved at {} by {}@{}".format(
1184             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1185             pwd.getpwuid(os.getuid()).pw_name,
1186             socket.gethostname())
1187
1188     if args.project_uuid and (args.stream or args.raw):
1189         logger.error("Cannot use --project-uuid with --stream or --raw")
1190         sys.exit(1)
1191
1192     # Determine the parent project
1193     try:
1194         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1195                                             args.retries)
1196     except (apiclient_errors.Error, ValueError) as error:
1197         logger.error(error)
1198         sys.exit(1)
1199
1200     if args.progress:
1201         reporter = progress_writer(human_progress)
1202     elif args.batch_progress:
1203         reporter = progress_writer(machine_progress)
1204     else:
1205         reporter = None
1206
1207     #  Split storage-classes argument
1208     storage_classes = None
1209     if args.storage_classes:
1210         storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1211
1212     # Setup exclude regex from all the --exclude arguments provided
1213     name_patterns = []
1214     exclude_paths = []
1215     exclude_names = None
1216     if len(args.exclude) > 0:
1217         # We're supporting 2 kinds of exclusion patterns:
1218         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1219         #                            the name, wherever the file is on the tree)
1220         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1221         #                            entire path, and should be relative to
1222         #                            any input dir argument)
1223         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1224         #                            placed directly underneath the input dir)
1225         for p in args.exclude:
1226             # Only relative paths patterns allowed
1227             if p.startswith(os.sep):
1228                 logger.error("Cannot use absolute paths with --exclude")
1229                 sys.exit(1)
1230             if os.path.dirname(p):
1231                 # We don't support of path patterns with '..'
1232                 p_parts = p.split(os.sep)
1233                 if '..' in p_parts:
1234                     logger.error(
1235                         "Cannot use path patterns that include or '..'")
1236                     sys.exit(1)
1237                 # Path search pattern
1238                 exclude_paths.append(p)
1239             else:
1240                 # Name-only search pattern
1241                 name_patterns.append(p)
1242         # For name only matching, we can combine all patterns into a single
1243         # regexp, for better performance.
1244         exclude_names = re.compile('|'.join(
1245             [fnmatch.translate(p) for p in name_patterns]
1246         )) if len(name_patterns) > 0 else None
1247         # Show the user the patterns to be used, just in case they weren't
1248         # specified inside quotes and got changed by the shell expansion.
1249         logger.info("Exclude patterns: {}".format(args.exclude))
1250
1251     # If this is used by a human, and there's at least one directory to be
1252     # uploaded, the expected bytes calculation can take a moment.
1253     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1254         logger.info("Calculating upload size, this could take some time...")
1255     try:
1256         writer = ArvPutUploadJob(paths = args.paths,
1257                                  resume = args.resume,
1258                                  use_cache = args.use_cache,
1259                                  filename = args.filename,
1260                                  reporter = reporter,
1261                                  api_client = api_client,
1262                                  num_retries = args.retries,
1263                                  replication_desired = args.replication,
1264                                  put_threads = args.threads,
1265                                  name = collection_name,
1266                                  owner_uuid = project_uuid,
1267                                  ensure_unique_name = True,
1268                                  update_collection = args.update_collection,
1269                                  storage_classes=storage_classes,
1270                                  logger=logger,
1271                                  dry_run=args.dry_run,
1272                                  follow_links=args.follow_links,
1273                                  exclude_paths=exclude_paths,
1274                                  exclude_names=exclude_names,
1275                                  trash_at=trash_at)
1276     except ResumeCacheConflict:
1277         logger.error("\n".join([
1278             "arv-put: Another process is already uploading this data.",
1279             "         Use --no-cache if this is really what you want."]))
1280         sys.exit(1)
1281     except ResumeCacheInvalidError:
1282         logger.error("\n".join([
1283             "arv-put: Resume cache contains invalid signature: it may have expired",
1284             "         or been created with another Arvados user's credentials.",
1285             "         Switch user or use one of the following options to restart upload:",
1286             "         --no-resume to start a new resume cache.",
1287             "         --no-cache to disable resume cache."]))
1288         sys.exit(1)
1289     except (CollectionUpdateError, PathDoesNotExistError) as error:
1290         logger.error("\n".join([
1291             "arv-put: %s" % str(error)]))
1292         sys.exit(1)
1293     except ArvPutUploadIsPending:
1294         # Dry run check successful, return proper exit code.
1295         sys.exit(2)
1296     except ArvPutUploadNotPending:
1297         # No files pending for upload
1298         sys.exit(0)
1299
1300     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1301         logger.warning("\n".join([
1302             "arv-put: Resuming previous upload from last checkpoint.",
1303             "         Use the --no-resume option to start over."]))
1304
1305     if not args.dry_run:
1306         writer.report_progress()
1307     output = None
1308     try:
1309         writer.start(save_collection=not(args.stream or args.raw))
1310     except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1311         logger.error("\n".join([
1312             "arv-put: %s" % str(error)]))
1313         sys.exit(1)
1314
1315     if args.progress:  # Print newline to split stderr from stdout for humans.
1316         logger.info("\n")
1317
1318     if args.stream:
1319         if args.normalize:
1320             output = writer.manifest_text(normalize=True)
1321         else:
1322             output = writer.manifest_text()
1323     elif args.raw:
1324         output = ','.join(writer.data_locators())
1325     elif writer.manifest_locator() is not None:
1326         try:
1327             expiration_notice = ""
1328             if writer.collection_trash_at() is not None:
1329                 # Get the local timezone-naive version, and log it with timezone information.
1330                 if time.daylight:
1331                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1332                 else:
1333                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1334                 expiration_notice = ". It will expire on {} {}.".format(
1335                     local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1336             if args.update_collection:
1337                 logger.info(u"Collection updated: '{}'{}".format(
1338                     writer.collection_name(), expiration_notice))
1339             else:
1340                 logger.info(u"Collection saved as '{}'{}".format(
1341                     writer.collection_name(), expiration_notice))
1342             if args.portable_data_hash:
1343                 output = writer.portable_data_hash()
1344             else:
1345                 output = writer.manifest_locator()
1346         except apiclient_errors.Error as error:
1347             logger.error(
1348                 "arv-put: Error creating Collection on project: {}.".format(
1349                     error))
1350             status = 1
1351     else:
1352         status = 1
1353
1354     # Print the locator (uuid) of the new collection.
1355     if output is None:
1356         status = status or 1
1357     elif not args.silent:
1358         stdout.write(output)
1359         if not output.endswith('\n'):
1360             stdout.write('\n')
1361
1362     if install_sig_handlers:
1363         arv_cmd.restore_signal_handlers()
1364
1365     if status != 0:
1366         sys.exit(status)
1367
1368     # Success!
1369     return output
1370
1371
1372 if __name__ == '__main__':
1373     main()