Merge branch '21146-pysdk-new-websockets'
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 from __future__ import absolute_import
8 from __future__ import division
9 from future import standard_library
10 standard_library.install_aliases()
11 from builtins import str
12 from builtins import range
13 from functools import partial
14 import apiclient
15 import ciso8601
16 import datetime
17 import json
18 import logging
19 import mock
20 import multiprocessing
21 import os
22 import pwd
23 import random
24 import re
25 import select
26 import shutil
27 import signal
28 import subprocess
29 import sys
30 import tempfile
31 import time
32 import unittest
33 import uuid
34
35 import arvados
36 import arvados.commands.put as arv_put
37 from . import arvados_testutil as tutil
38
39 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
40 from . import run_test_server
41
42 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
43     CACHE_ARGSET = [
44         [],
45         ['/dev/null'],
46         ['/dev/null', '--filename', 'empty'],
47         ['/tmp']
48         ]
49
50     def tearDown(self):
51         super(ArvadosPutResumeCacheTest, self).tearDown()
52         try:
53             self.last_cache.destroy()
54         except AttributeError:
55             pass
56
57     def cache_path_from_arglist(self, arglist):
58         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
59
60     def test_cache_names_stable(self):
61         for argset in self.CACHE_ARGSET:
62             self.assertEqual(self.cache_path_from_arglist(argset),
63                               self.cache_path_from_arglist(argset),
64                               "cache name changed for {}".format(argset))
65
66     def test_cache_names_unique(self):
67         results = []
68         for argset in self.CACHE_ARGSET:
69             path = self.cache_path_from_arglist(argset)
70             self.assertNotIn(path, results)
71             results.append(path)
72
73     def test_cache_names_simple(self):
74         # The goal here is to make sure the filename doesn't use characters
75         # reserved by the filesystem.  Feel free to adjust this regexp as
76         # long as it still does that.
77         bad_chars = re.compile(r'[^-\.\w]')
78         for argset in self.CACHE_ARGSET:
79             path = self.cache_path_from_arglist(argset)
80             self.assertFalse(bad_chars.search(os.path.basename(path)),
81                              "path too exotic: {}".format(path))
82
83     def test_cache_names_ignore_argument_order(self):
84         self.assertEqual(
85             self.cache_path_from_arglist(['a', 'b', 'c']),
86             self.cache_path_from_arglist(['c', 'a', 'b']))
87         self.assertEqual(
88             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
89             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
90
91     def test_cache_names_differ_for_similar_paths(self):
92         # This test needs names at / that don't exist on the real filesystem.
93         self.assertNotEqual(
94             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
95             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
96
97     def test_cache_names_ignore_irrelevant_arguments(self):
98         # Workaround: parse_arguments bails on --filename with a directory.
99         path1 = self.cache_path_from_arglist(['/tmp'])
100         args = arv_put.parse_arguments(['/tmp'])
101         args.filename = 'tmp'
102         path2 = arv_put.ResumeCache.make_path(args)
103         self.assertEqual(path1, path2,
104                          "cache path considered --filename for directory")
105         self.assertEqual(
106             self.cache_path_from_arglist(['-']),
107             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
108             "cache path considered --max-manifest-depth for file")
109
110     def test_cache_names_treat_negative_manifest_depths_identically(self):
111         base_args = ['/tmp', '--max-manifest-depth']
112         self.assertEqual(
113             self.cache_path_from_arglist(base_args + ['-1']),
114             self.cache_path_from_arglist(base_args + ['-2']))
115
116     def test_cache_names_treat_stdin_consistently(self):
117         self.assertEqual(
118             self.cache_path_from_arglist(['-', '--filename', 'test']),
119             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
120
121     def test_cache_names_identical_for_synonymous_names(self):
122         self.assertEqual(
123             self.cache_path_from_arglist(['.']),
124             self.cache_path_from_arglist([os.path.realpath('.')]))
125         testdir = self.make_tmpdir()
126         looplink = os.path.join(testdir, 'loop')
127         os.symlink(testdir, looplink)
128         self.assertEqual(
129             self.cache_path_from_arglist([testdir]),
130             self.cache_path_from_arglist([looplink]))
131
132     def test_cache_names_different_by_api_host(self):
133         config = arvados.config.settings()
134         orig_host = config.get('ARVADOS_API_HOST')
135         try:
136             name1 = self.cache_path_from_arglist(['.'])
137             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
138             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
139         finally:
140             if orig_host is None:
141                 del config['ARVADOS_API_HOST']
142             else:
143                 config['ARVADOS_API_HOST'] = orig_host
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams(self, keep_client_head):
159         keep_client_head.side_effect = [True]
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168
169     @mock.patch('arvados.keep.KeepClient.head')
170     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
171         keep_client_head.side_effect = Exception('Locator not found')
172         thing = {}
173         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
174         with tempfile.NamedTemporaryFile() as cachefile:
175             self.last_cache = arv_put.ResumeCache(cachefile.name)
176         self.last_cache.save(thing)
177         self.last_cache.close()
178         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
179         self.assertNotEqual(None, resume_cache)
180         resume_cache.check_cache()
181
182     def test_basic_cache_storage(self):
183         thing = ['test', 'list']
184         with tempfile.NamedTemporaryFile() as cachefile:
185             self.last_cache = arv_put.ResumeCache(cachefile.name)
186         self.last_cache.save(thing)
187         self.assertEqual(thing, self.last_cache.load())
188
189     def test_empty_cache(self):
190         with tempfile.NamedTemporaryFile() as cachefile:
191             cache = arv_put.ResumeCache(cachefile.name)
192         self.assertRaises(ValueError, cache.load)
193
194     def test_cache_persistent(self):
195         thing = ['test', 'list']
196         path = os.path.join(self.make_tmpdir(), 'cache')
197         cache = arv_put.ResumeCache(path)
198         cache.save(thing)
199         cache.close()
200         self.last_cache = arv_put.ResumeCache(path)
201         self.assertEqual(thing, self.last_cache.load())
202
203     def test_multiple_cache_writes(self):
204         thing = ['short', 'list']
205         with tempfile.NamedTemporaryFile() as cachefile:
206             self.last_cache = arv_put.ResumeCache(cachefile.name)
207         # Start writing an object longer than the one we test, to make
208         # sure the cache file gets truncated.
209         self.last_cache.save(['long', 'long', 'list'])
210         self.last_cache.save(thing)
211         self.assertEqual(thing, self.last_cache.load())
212
213     def test_cache_is_locked(self):
214         with tempfile.NamedTemporaryFile() as cachefile:
215             _ = arv_put.ResumeCache(cachefile.name)
216             self.assertRaises(arv_put.ResumeCacheConflict,
217                               arv_put.ResumeCache, cachefile.name)
218
219     def test_cache_stays_locked(self):
220         with tempfile.NamedTemporaryFile() as cachefile:
221             self.last_cache = arv_put.ResumeCache(cachefile.name)
222             path = cachefile.name
223         self.last_cache.save('test')
224         self.assertRaises(arv_put.ResumeCacheConflict,
225                           arv_put.ResumeCache, path)
226
227     def test_destroy_cache(self):
228         cachefile = tempfile.NamedTemporaryFile(delete=False)
229         try:
230             cache = arv_put.ResumeCache(cachefile.name)
231             cache.save('test')
232             cache.destroy()
233             try:
234                 arv_put.ResumeCache(cachefile.name)
235             except arv_put.ResumeCacheConflict:
236                 self.fail("could not load cache after destroying it")
237             self.assertRaises(ValueError, cache.load)
238         finally:
239             if os.path.exists(cachefile.name):
240                 os.unlink(cachefile.name)
241
242     def test_restart_cache(self):
243         path = os.path.join(self.make_tmpdir(), 'cache')
244         cache = arv_put.ResumeCache(path)
245         cache.save('test')
246         cache.restart()
247         self.assertRaises(ValueError, cache.load)
248         self.assertRaises(arv_put.ResumeCacheConflict,
249                           arv_put.ResumeCache, path)
250
251
252 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
253                           ArvadosBaseTestCase):
254
255     def setUp(self):
256         super(ArvPutUploadJobTest, self).setUp()
257         run_test_server.authorize_with('active')
258         # Temp files creation
259         self.tempdir = tempfile.mkdtemp()
260         subdir = os.path.join(self.tempdir, 'subdir')
261         os.mkdir(subdir)
262         data = "x" * 1024 # 1 KB
263         for i in range(1, 5):
264             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
265                 f.write(data * i)
266         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
267             f.write(data * 5)
268         # Large temp file for resume test
269         _, self.large_file_name = tempfile.mkstemp()
270         fileobj = open(self.large_file_name, 'w')
271         # Make sure to write just a little more than one block
272         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
273             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
274             fileobj.write(data)
275         fileobj.close()
276         # Temp dir containing small files to be repacked
277         self.small_files_dir = tempfile.mkdtemp()
278         data = 'y' * 1024 * 1024 # 1 MB
279         for i in range(1, 70):
280             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
281                 f.write(data + str(i))
282         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
283         # Temp dir to hold a symlink to other temp dir
284         self.tempdir_with_symlink = tempfile.mkdtemp()
285         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
286         os.symlink(os.path.join(self.tempdir, '1'),
287                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
288
289     def tearDown(self):
290         super(ArvPutUploadJobTest, self).tearDown()
291         shutil.rmtree(self.tempdir)
292         os.unlink(self.large_file_name)
293         shutil.rmtree(self.small_files_dir)
294         shutil.rmtree(self.tempdir_with_symlink)
295
296     def test_non_regular_files_are_ignored_except_symlinks_to_dirs(self):
297         def pfunc(x):
298             with open(x, 'w') as f:
299                 f.write('test')
300         fifo_filename = 'fifo-file'
301         fifo_path = os.path.join(self.tempdir_with_symlink, fifo_filename)
302         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
303         os.mkfifo(fifo_path)
304         producer = multiprocessing.Process(target=pfunc, args=(fifo_path,))
305         producer.start()
306         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
307         cwriter.start(save_collection=False)
308         if producer.exitcode is None:
309             # If the producer is still running, kill it. This should always be
310             # before any assertion that may fail.
311             producer.terminate()
312             producer.join(1)
313         self.assertIn('linkeddir', cwriter.manifest_text())
314         self.assertNotIn(fifo_filename, cwriter.manifest_text())
315
316     def test_symlinks_are_followed_by_default(self):
317         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
318         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
319         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
320         cwriter.start(save_collection=False)
321         self.assertIn('linkeddir', cwriter.manifest_text())
322         self.assertIn('linkedfile', cwriter.manifest_text())
323         cwriter.destroy_cache()
324
325     def test_symlinks_are_not_followed_when_requested(self):
326         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
327         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
328         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
329                                           follow_links=False)
330         cwriter.start(save_collection=False)
331         self.assertNotIn('linkeddir', cwriter.manifest_text())
332         self.assertNotIn('linkedfile', cwriter.manifest_text())
333         cwriter.destroy_cache()
334         # Check for bug #17800: passed symlinks should also be ignored.
335         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
336         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
337         cwriter.start(save_collection=False)
338         self.assertNotIn('linkeddir', cwriter.manifest_text())
339         cwriter.destroy_cache()
340
341     def test_no_empty_collection_saved(self):
342         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
343         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
344         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
345         cwriter.start(save_collection=True)
346         self.assertIsNone(cwriter.manifest_locator())
347         self.assertEqual('', cwriter.manifest_text())
348         cwriter.destroy_cache()
349
350     def test_passing_nonexistant_path_raise_exception(self):
351         uuid_str = str(uuid.uuid4())
352         with self.assertRaises(arv_put.PathDoesNotExistError):
353             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
354
355     def test_writer_works_without_cache(self):
356         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
357         cwriter.start(save_collection=False)
358         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
359
360     def test_writer_works_with_cache(self):
361         with tempfile.NamedTemporaryFile() as f:
362             f.write(b'foo')
363             f.flush()
364             cwriter = arv_put.ArvPutUploadJob([f.name])
365             cwriter.start(save_collection=False)
366             self.assertEqual(0, cwriter.bytes_skipped)
367             self.assertEqual(3, cwriter.bytes_written)
368             # Don't destroy the cache, and start another upload
369             cwriter_new = arv_put.ArvPutUploadJob([f.name])
370             cwriter_new.start(save_collection=False)
371             cwriter_new.destroy_cache()
372             self.assertEqual(3, cwriter_new.bytes_skipped)
373             self.assertEqual(3, cwriter_new.bytes_written)
374
375     def make_progress_tester(self):
376         progression = []
377         def record_func(written, expected):
378             progression.append((written, expected))
379         return progression, record_func
380
381     def test_progress_reporting(self):
382         with tempfile.NamedTemporaryFile() as f:
383             f.write(b'foo')
384             f.flush()
385             for expect_count in (None, 8):
386                 progression, reporter = self.make_progress_tester()
387                 cwriter = arv_put.ArvPutUploadJob([f.name],
388                                                   reporter=reporter)
389                 cwriter.bytes_expected = expect_count
390                 cwriter.start(save_collection=False)
391                 cwriter.destroy_cache()
392                 self.assertIn((3, expect_count), progression)
393
394     def test_writer_upload_directory(self):
395         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
396         cwriter.start(save_collection=False)
397         cwriter.destroy_cache()
398         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
399
400     def test_resume_large_file_upload(self):
401         def wrapped_write(*args, **kwargs):
402             data = args[1]
403             # Exit only on last block
404             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
405                 # Simulate a checkpoint before quitting. Ensure block commit.
406                 self.writer._update(final=True)
407                 raise SystemExit("Simulated error")
408             return self.arvfile_write(*args, **kwargs)
409
410         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
411                         autospec=True) as mocked_write:
412             mocked_write.side_effect = wrapped_write
413             writer = arv_put.ArvPutUploadJob([self.large_file_name],
414                                              replication_desired=1)
415             # We'll be accessing from inside the wrapper
416             self.writer = writer
417             with self.assertRaises(SystemExit):
418                 writer.start(save_collection=False)
419             # Confirm that the file was partially uploaded
420             self.assertGreater(writer.bytes_written, 0)
421             self.assertLess(writer.bytes_written,
422                             os.path.getsize(self.large_file_name))
423         # Retry the upload
424         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
425                                           replication_desired=1)
426         writer2.start(save_collection=False)
427         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
428                          os.path.getsize(self.large_file_name))
429         writer2.destroy_cache()
430         del(self.writer)
431
432     # Test for bug #11002
433     def test_graceful_exit_while_repacking_small_blocks(self):
434         def wrapped_commit(*args, **kwargs):
435             raise SystemExit("Simulated error")
436
437         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
438                         autospec=True) as mocked_commit:
439             mocked_commit.side_effect = wrapped_commit
440             # Upload a little more than 1 block, wrapped_commit will make the first block
441             # commit to fail.
442             # arv-put should not exit with an exception by trying to commit the collection
443             # as it's in an inconsistent state.
444             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
445                                              replication_desired=1)
446             try:
447                 with self.assertRaises(SystemExit):
448                     writer.start(save_collection=False)
449             except arvados.arvfile.UnownedBlockError:
450                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
451         writer.destroy_cache()
452
453     def test_no_resume_when_asked(self):
454         def wrapped_write(*args, **kwargs):
455             data = args[1]
456             # Exit only on last block
457             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
458                 # Simulate a checkpoint before quitting.
459                 self.writer._update()
460                 raise SystemExit("Simulated error")
461             return self.arvfile_write(*args, **kwargs)
462
463         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
464                         autospec=True) as mocked_write:
465             mocked_write.side_effect = wrapped_write
466             writer = arv_put.ArvPutUploadJob([self.large_file_name],
467                                              replication_desired=1)
468             # We'll be accessing from inside the wrapper
469             self.writer = writer
470             with self.assertRaises(SystemExit):
471                 writer.start(save_collection=False)
472             # Confirm that the file was partially uploaded
473             self.assertGreater(writer.bytes_written, 0)
474             self.assertLess(writer.bytes_written,
475                             os.path.getsize(self.large_file_name))
476         # Retry the upload, this time without resume
477         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
478                                           replication_desired=1,
479                                           resume=False)
480         writer2.start(save_collection=False)
481         self.assertEqual(writer2.bytes_skipped, 0)
482         self.assertEqual(writer2.bytes_written,
483                          os.path.getsize(self.large_file_name))
484         writer2.destroy_cache()
485         del(self.writer)
486
487     def test_no_resume_when_no_cache(self):
488         def wrapped_write(*args, **kwargs):
489             data = args[1]
490             # Exit only on last block
491             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
492                 # Simulate a checkpoint before quitting.
493                 self.writer._update()
494                 raise SystemExit("Simulated error")
495             return self.arvfile_write(*args, **kwargs)
496
497         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
498                         autospec=True) as mocked_write:
499             mocked_write.side_effect = wrapped_write
500             writer = arv_put.ArvPutUploadJob([self.large_file_name],
501                                              replication_desired=1)
502             # We'll be accessing from inside the wrapper
503             self.writer = writer
504             with self.assertRaises(SystemExit):
505                 writer.start(save_collection=False)
506             # Confirm that the file was partially uploaded
507             self.assertGreater(writer.bytes_written, 0)
508             self.assertLess(writer.bytes_written,
509                             os.path.getsize(self.large_file_name))
510         # Retry the upload, this time without cache usage
511         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
512                                           replication_desired=1,
513                                           resume=False,
514                                           use_cache=False)
515         writer2.start(save_collection=False)
516         self.assertEqual(writer2.bytes_skipped, 0)
517         self.assertEqual(writer2.bytes_written,
518                          os.path.getsize(self.large_file_name))
519         writer2.destroy_cache()
520         del(self.writer)
521
522     def test_dry_run_feature(self):
523         def wrapped_write(*args, **kwargs):
524             data = args[1]
525             # Exit only on last block
526             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
527                 # Simulate a checkpoint before quitting.
528                 self.writer._update()
529                 raise SystemExit("Simulated error")
530             return self.arvfile_write(*args, **kwargs)
531
532         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
533                         autospec=True) as mocked_write:
534             mocked_write.side_effect = wrapped_write
535             writer = arv_put.ArvPutUploadJob([self.large_file_name],
536                                              replication_desired=1)
537             # We'll be accessing from inside the wrapper
538             self.writer = writer
539             with self.assertRaises(SystemExit):
540                 writer.start(save_collection=False)
541             # Confirm that the file was partially uploaded
542             self.assertGreater(writer.bytes_written, 0)
543             self.assertLess(writer.bytes_written,
544                             os.path.getsize(self.large_file_name))
545         with self.assertRaises(arv_put.ArvPutUploadIsPending):
546             # Retry the upload using dry_run to check if there is a pending upload
547             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
548                                               replication_desired=1,
549                                               dry_run=True)
550         # Complete the pending upload
551         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
552                                           replication_desired=1)
553         writer3.start(save_collection=False)
554         with self.assertRaises(arv_put.ArvPutUploadNotPending):
555             # Confirm there's no pending upload with dry_run=True
556             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
557                                               replication_desired=1,
558                                               dry_run=True)
559         # Test obvious cases
560         with self.assertRaises(arv_put.ArvPutUploadIsPending):
561             arv_put.ArvPutUploadJob([self.large_file_name],
562                                     replication_desired=1,
563                                     dry_run=True,
564                                     resume=False,
565                                     use_cache=False)
566         with self.assertRaises(arv_put.ArvPutUploadIsPending):
567             arv_put.ArvPutUploadJob([self.large_file_name],
568                                     replication_desired=1,
569                                     dry_run=True,
570                                     resume=False)
571         del(self.writer)
572
573 class CachedManifestValidationTest(ArvadosBaseTestCase):
574     class MockedPut(arv_put.ArvPutUploadJob):
575         def __init__(self, cached_manifest=None):
576             self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
577             self._state['manifest'] = cached_manifest
578             self._api_client = mock.MagicMock()
579             self.logger = mock.MagicMock()
580             self.num_retries = 1
581
582     def datetime_to_hex(self, dt):
583         return hex(int(time.mktime(dt.timetuple())))[2:]
584
585     def setUp(self):
586         super(CachedManifestValidationTest, self).setUp()
587         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
588         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
589         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
590
591     def test_empty_cached_manifest_is_valid(self):
592         put_mock = self.MockedPut()
593         self.assertEqual(None, put_mock._state.get('manifest'))
594         self.assertTrue(put_mock._cached_manifest_valid())
595         put_mock._state['manifest'] = ''
596         self.assertTrue(put_mock._cached_manifest_valid())
597
598     def test_signature_cases(self):
599         now = datetime.datetime.utcnow()
600         yesterday = now - datetime.timedelta(days=1)
601         lastweek = now - datetime.timedelta(days=7)
602         tomorrow = now + datetime.timedelta(days=1)
603         nextweek = now + datetime.timedelta(days=7)
604
605         def mocked_head(blocks={}, loc=None):
606             blk = loc.split('+', 1)[0]
607             if blocks.get(blk):
608                 return True
609             raise arvados.errors.KeepRequestError("mocked error - block invalid")
610
611         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
612         cases = [
613             # All expired, reset cache - OK
614             (yesterday, lastweek, False, False, True),
615             (lastweek, yesterday, False, False, True),
616             # All non-expired valid blocks - OK
617             (tomorrow, nextweek, True, True, True),
618             (nextweek, tomorrow, True, True, True),
619             # All non-expired invalid blocks - Not OK
620             (tomorrow, nextweek, False, False, False),
621             (nextweek, tomorrow, False, False, False),
622             # One non-expired valid block - OK
623             (tomorrow, yesterday, True, False, True),
624             (yesterday, tomorrow, False, True, True),
625             # One non-expired invalid block - Not OK
626             (tomorrow, yesterday, False, False, False),
627             (yesterday, tomorrow, False, False, False),
628         ]
629         for case in cases:
630             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
631             head_responses = {
632                 self.block1: b1_valid,
633                 self.block2: b2_valid,
634             }
635             cached_manifest = self.template % (
636                 self.datetime_to_hex(b1_expiration),
637                 self.datetime_to_hex(b2_expiration),
638             )
639             arvput = self.MockedPut(cached_manifest)
640             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
641                 head_mock.side_effect = partial(mocked_head, head_responses)
642                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
643                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
644                 )
645                 if b1_expiration > now or b2_expiration > now:
646                     # A HEAD request should have been done
647                     head_mock.assert_called_once()
648                 else:
649                     head_mock.assert_not_called()
650
651
652 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
653     TEST_SIZE = os.path.getsize(__file__)
654
655     def test_expected_bytes_for_file(self):
656         writer = arv_put.ArvPutUploadJob([__file__])
657         self.assertEqual(self.TEST_SIZE,
658                          writer.bytes_expected)
659
660     def test_expected_bytes_for_tree(self):
661         tree = self.make_tmpdir()
662         shutil.copyfile(__file__, os.path.join(tree, 'one'))
663         shutil.copyfile(__file__, os.path.join(tree, 'two'))
664
665         writer = arv_put.ArvPutUploadJob([tree])
666         self.assertEqual(self.TEST_SIZE * 2,
667                          writer.bytes_expected)
668         writer = arv_put.ArvPutUploadJob([tree, __file__])
669         self.assertEqual(self.TEST_SIZE * 3,
670                          writer.bytes_expected)
671
672     def test_expected_bytes_for_device(self):
673         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
674         self.assertIsNone(writer.bytes_expected)
675         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
676         self.assertIsNone(writer.bytes_expected)
677
678
679 class ArvadosPutReportTest(ArvadosBaseTestCase):
680     def test_machine_progress(self):
681         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
682             expect = ": {} written {} total\n".format(
683                 count, -1 if (total is None) else total)
684             self.assertTrue(
685                 arv_put.machine_progress(count, total).endswith(expect))
686
687     def test_known_human_progress(self):
688         for count, total in [(0, 1), (2, 4), (45, 60)]:
689             expect = '{:.1%}'.format(1.0*count/total)
690             actual = arv_put.human_progress(count, total)
691             self.assertTrue(actual.startswith('\r'))
692             self.assertIn(expect, actual)
693
694     def test_unknown_human_progress(self):
695         for count in [1, 20, 300, 4000, 50000]:
696             self.assertTrue(re.search(r'\b{}\b'.format(count),
697                                       arv_put.human_progress(count, None)))
698
699
700 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
701     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
702
703     def setUp(self):
704         super(ArvPutLogFormatterTest, self).setUp()
705         self.stderr = tutil.StringIO()
706         self.loggingHandler = logging.StreamHandler(self.stderr)
707         self.loggingHandler.setFormatter(
708             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
709         self.logger = logging.getLogger()
710         self.logger.addHandler(self.loggingHandler)
711         self.logger.setLevel(logging.DEBUG)
712
713     def tearDown(self):
714         self.logger.removeHandler(self.loggingHandler)
715         self.stderr.close()
716         self.stderr = None
717         super(ArvPutLogFormatterTest, self).tearDown()
718
719     def test_request_id_logged_only_once_on_error(self):
720         self.logger.error('Ooops, something bad happened.')
721         self.logger.error('Another bad thing just happened.')
722         log_lines = self.stderr.getvalue().split('\n')[:-1]
723         self.assertEqual(2, len(log_lines))
724         self.assertRegex(log_lines[0], self.matcher)
725         self.assertNotRegex(log_lines[1], self.matcher)
726
727     def test_request_id_logged_only_once_on_debug(self):
728         self.logger.debug('This is just a debug message.')
729         self.logger.debug('Another message, move along.')
730         log_lines = self.stderr.getvalue().split('\n')[:-1]
731         self.assertEqual(2, len(log_lines))
732         self.assertRegex(log_lines[0], self.matcher)
733         self.assertNotRegex(log_lines[1], self.matcher)
734
735     def test_request_id_not_logged_on_info(self):
736         self.logger.info('This should be a useful message')
737         log_lines = self.stderr.getvalue().split('\n')[:-1]
738         self.assertEqual(1, len(log_lines))
739         self.assertNotRegex(log_lines[0], self.matcher)
740
741 class ArvadosPutTest(run_test_server.TestCaseWithServers,
742                      ArvadosBaseTestCase,
743                      tutil.VersionChecker):
744     MAIN_SERVER = {}
745     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
746
747     def call_main_with_args(self, args):
748         self.main_stdout.seek(0, 0)
749         self.main_stdout.truncate(0)
750         self.main_stderr.seek(0, 0)
751         self.main_stderr.truncate(0)
752         return arv_put.main(args, self.main_stdout, self.main_stderr)
753
754     def call_main_on_test_file(self, args=[]):
755         with self.make_test_file() as testfile:
756             path = testfile.name
757             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
758         self.assertTrue(
759             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
760                                         '098f6bcd4621d373cade4e832627b4f6')),
761             "did not find file stream in Keep store")
762
763     def setUp(self):
764         super(ArvadosPutTest, self).setUp()
765         run_test_server.authorize_with('active')
766         arv_put.api_client = None
767         self.main_stdout = tutil.StringIO()
768         self.main_stderr = tutil.StringIO()
769         self.loggingHandler = logging.StreamHandler(self.main_stderr)
770         self.loggingHandler.setFormatter(
771             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
772         logging.getLogger().addHandler(self.loggingHandler)
773
774     def tearDown(self):
775         logging.getLogger().removeHandler(self.loggingHandler)
776         for outbuf in ['main_stdout', 'main_stderr']:
777             if hasattr(self, outbuf):
778                 getattr(self, outbuf).close()
779                 delattr(self, outbuf)
780         super(ArvadosPutTest, self).tearDown()
781
782     def test_version_argument(self):
783         with tutil.redirected_streams(
784                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
785             with self.assertRaises(SystemExit):
786                 self.call_main_with_args(['--version'])
787         self.assertVersionOutput(out, err)
788
789     def test_simple_file_put(self):
790         self.call_main_on_test_file()
791
792     def test_put_with_unwriteable_cache_dir(self):
793         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
794         cachedir = self.make_tmpdir()
795         os.chmod(cachedir, 0o0)
796         arv_put.ResumeCache.CACHE_DIR = cachedir
797         try:
798             self.call_main_on_test_file()
799         finally:
800             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
801             os.chmod(cachedir, 0o700)
802
803     def test_put_with_unwritable_cache_subdir(self):
804         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
805         cachedir = self.make_tmpdir()
806         os.chmod(cachedir, 0o0)
807         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
808         try:
809             self.call_main_on_test_file()
810         finally:
811             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
812             os.chmod(cachedir, 0o700)
813
814     def test_put_block_replication(self):
815         self.call_main_on_test_file()
816         arv_put.api_client = None
817         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
818             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
819             self.call_main_on_test_file(['--replication', '1'])
820             self.call_main_on_test_file(['--replication', '4'])
821             self.call_main_on_test_file(['--replication', '5'])
822             self.assertEqual(
823                 [x[-1].get('copies') for x in put_mock.call_args_list],
824                 [1, 4, 5])
825
826     def test_normalize(self):
827         testfile1 = self.make_test_file()
828         testfile2 = self.make_test_file()
829         test_paths = [testfile1.name, testfile2.name]
830         # Reverse-sort the paths, so normalization must change their order.
831         test_paths.sort(reverse=True)
832         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
833                                  test_paths)
834         manifest = self.main_stdout.getvalue()
835         # Assert the second file we specified appears first in the manifest.
836         file_indices = [manifest.find(':' + os.path.basename(path))
837                         for path in test_paths]
838         self.assertGreater(*file_indices)
839
840     def test_error_name_without_collection(self):
841         self.assertRaises(SystemExit, self.call_main_with_args,
842                           ['--name', 'test without Collection',
843                            '--stream', '/dev/null'])
844
845     def test_error_when_project_not_found(self):
846         self.assertRaises(SystemExit,
847                           self.call_main_with_args,
848                           ['--project-uuid', self.Z_UUID])
849
850     def test_error_bad_project_uuid(self):
851         self.assertRaises(SystemExit,
852                           self.call_main_with_args,
853                           ['--project-uuid', self.Z_UUID, '--stream'])
854
855     def test_error_when_excluding_absolute_path(self):
856         tmpdir = self.make_tmpdir()
857         self.assertRaises(SystemExit,
858                           self.call_main_with_args,
859                           ['--exclude', '/some/absolute/path/*',
860                            tmpdir])
861
862     def test_api_error_handling(self):
863         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
864         coll_save_mock.side_effect = arvados.errors.ApiError(
865             fake_httplib2_response(403), b'{}')
866         with mock.patch('arvados.collection.Collection.save_new',
867                         new=coll_save_mock):
868             with self.assertRaises(SystemExit) as exc_test:
869                 self.call_main_with_args(['/dev/null'])
870             self.assertLess(0, exc_test.exception.args[0])
871             self.assertLess(0, coll_save_mock.call_count)
872             self.assertEqual("", self.main_stdout.getvalue())
873
874     def test_request_id_logging_on_error(self):
875         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
876         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
877         coll_save_mock.side_effect = arvados.errors.ApiError(
878             fake_httplib2_response(403), b'{}')
879         with mock.patch('arvados.collection.Collection.save_new',
880                         new=coll_save_mock):
881             with self.assertRaises(SystemExit):
882                 self.call_main_with_args(['/dev/null'])
883             self.assertRegex(
884                 self.main_stderr.getvalue(), matcher)
885
886
887 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
888                             ArvadosBaseTestCase):
889     MAIN_SERVER = {}
890     KEEP_SERVER = {'blob_signing': True}
891     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
892
893     @classmethod
894     def setUpClass(cls):
895         super(ArvPutIntegrationTest, cls).setUpClass()
896         cls.ENVIRON = os.environ.copy()
897         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
898
899     def datetime_to_hex(self, dt):
900         return hex(int(time.mktime(dt.timetuple())))[2:]
901
902     def setUp(self):
903         super(ArvPutIntegrationTest, self).setUp()
904         arv_put.api_client = None
905
906     def authorize_with(self, token_name):
907         run_test_server.authorize_with(token_name)
908         for v in ["ARVADOS_API_HOST",
909                   "ARVADOS_API_HOST_INSECURE",
910                   "ARVADOS_API_TOKEN"]:
911             self.ENVIRON[v] = arvados.config.settings()[v]
912         arv_put.api_client = arvados.api('v1')
913
914     def current_user(self):
915         return arv_put.api_client.users().current().execute()
916
917     def test_check_real_project_found(self):
918         self.authorize_with('active')
919         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
920                         "did not correctly find test fixture project")
921
922     def test_check_error_finding_nonexistent_uuid(self):
923         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
924         self.authorize_with('active')
925         try:
926             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
927                                                   0)
928         except ValueError as error:
929             self.assertIn(BAD_UUID, str(error))
930         else:
931             self.assertFalse(result, "incorrectly found nonexistent project")
932
933     def test_check_error_finding_nonexistent_project(self):
934         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
935         self.authorize_with('active')
936         with self.assertRaises(apiclient.errors.HttpError):
937             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
938                                                   0)
939
940     def test_short_put_from_stdin(self):
941         # Have to run this as an integration test since arv-put can't
942         # read from the tests' stdin.
943         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
944         # case, because the /proc entry is already gone by the time it tries.
945         pipe = subprocess.Popen(
946             [sys.executable, arv_put.__file__, '--stream'],
947             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
948             stderr=subprocess.STDOUT, env=self.ENVIRON)
949         pipe.stdin.write(b'stdin test\xa6\n')
950         pipe.stdin.close()
951         deadline = time.time() + 5
952         while (pipe.poll() is None) and (time.time() < deadline):
953             time.sleep(.1)
954         returncode = pipe.poll()
955         if returncode is None:
956             pipe.terminate()
957             self.fail("arv-put did not PUT from stdin within 5 seconds")
958         elif returncode != 0:
959             sys.stdout.write(pipe.stdout.read())
960             self.fail("arv-put returned exit code {}".format(returncode))
961         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
962                       pipe.stdout.read().decode())
963
964     def test_sigint_logs_request_id(self):
965         # Start arv-put, give it a chance to start up, send SIGINT,
966         # and check that its output includes the X-Request-Id.
967         input_stream = subprocess.Popen(
968             ['sleep', '10'],
969             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
970         pipe = subprocess.Popen(
971             [sys.executable, arv_put.__file__, '--stream'],
972             stdin=input_stream.stdout, stdout=subprocess.PIPE,
973             stderr=subprocess.STDOUT, env=self.ENVIRON)
974         # Wait for arv-put child process to print something (i.e., a
975         # log message) so we know its signal handler is installed.
976         select.select([pipe.stdout], [], [], 10)
977         pipe.send_signal(signal.SIGINT)
978         deadline = time.time() + 5
979         while (pipe.poll() is None) and (time.time() < deadline):
980             time.sleep(.1)
981         returncode = pipe.poll()
982         input_stream.terminate()
983         if returncode is None:
984             pipe.terminate()
985             self.fail("arv-put did not exit within 5 seconds")
986         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
987
988     def test_ArvPutSignedManifest(self):
989         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
990         # the newly created manifest from the API server, testing to confirm
991         # that the block locators in the returned manifest are signed.
992         self.authorize_with('active')
993
994         # Before doing anything, demonstrate that the collection
995         # we're about to create is not present in our test fixture.
996         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
997         with self.assertRaises(apiclient.errors.HttpError):
998             arv_put.api_client.collections().get(
999                 uuid=manifest_uuid).execute()
1000
1001         datadir = self.make_tmpdir()
1002         with open(os.path.join(datadir, "foo"), "w") as f:
1003             f.write("The quick brown fox jumped over the lazy dog")
1004         p = subprocess.Popen([sys.executable, arv_put.__file__,
1005                               os.path.join(datadir, 'foo')],
1006                              stdout=subprocess.PIPE,
1007                              stderr=subprocess.PIPE,
1008                              env=self.ENVIRON)
1009         (_, err) = p.communicate()
1010         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
1011         self.assertEqual(p.returncode, 0)
1012
1013         # The manifest text stored in the API server under the same
1014         # manifest UUID must use signed locators.
1015         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
1016         self.assertRegex(
1017             c['manifest_text'],
1018             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1019
1020         os.remove(os.path.join(datadir, "foo"))
1021         os.rmdir(datadir)
1022
1023     def run_and_find_collection(self, text, extra_args=[]):
1024         self.authorize_with('active')
1025         pipe = subprocess.Popen(
1026             [sys.executable, arv_put.__file__] + extra_args,
1027             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1028             stderr=subprocess.PIPE, env=self.ENVIRON)
1029         stdout, stderr = pipe.communicate(text.encode())
1030         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1031         search_key = ('portable_data_hash'
1032                       if '--portable-data-hash' in extra_args else 'uuid')
1033         collection_list = arvados.api('v1').collections().list(
1034             filters=[[search_key, '=', stdout.decode().strip()]]
1035         ).execute().get('items', [])
1036         self.assertEqual(1, len(collection_list))
1037         return collection_list[0]
1038
1039     def test_all_expired_signatures_invalidates_cache(self):
1040         self.authorize_with('active')
1041         tmpdir = self.make_tmpdir()
1042         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1043             f.write('foo')
1044         # Upload a directory and get the cache file name
1045         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1046                              stdout=subprocess.PIPE,
1047                              stderr=subprocess.PIPE,
1048                              env=self.ENVIRON)
1049         (_, err) = p.communicate()
1050         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1051         self.assertEqual(p.returncode, 0)
1052         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1053                                    err.decode()).groups()[0]
1054         self.assertTrue(os.path.isfile(cache_filepath))
1055         # Load the cache file contents and modify the manifest to simulate
1056         # an expired access token
1057         with open(cache_filepath, 'r') as c:
1058             cache = json.load(c)
1059         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1060         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1061         cache['manifest'] = re.sub(
1062             r'\@.*? ',
1063             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1064             cache['manifest'])
1065         with open(cache_filepath, 'w') as c:
1066             c.write(json.dumps(cache))
1067         # Re-run the upload and expect to get an invalid cache message
1068         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1069                              stdout=subprocess.PIPE,
1070                              stderr=subprocess.PIPE,
1071                              env=self.ENVIRON)
1072         (_, err) = p.communicate()
1073         self.assertRegex(
1074             err.decode(),
1075             r'INFO: Cache expired, starting from scratch.*')
1076         self.assertEqual(p.returncode, 0)
1077
1078     def test_invalid_signature_in_cache(self):
1079         for batch_mode in [False, True]:
1080             self.authorize_with('active')
1081             tmpdir = self.make_tmpdir()
1082             with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1083                 f.write('foo')
1084             # Upload a directory and get the cache file name
1085             arv_put_args = [tmpdir]
1086             if batch_mode:
1087                 arv_put_args = ['--batch'] + arv_put_args
1088             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1089                                 stdout=subprocess.PIPE,
1090                                 stderr=subprocess.PIPE,
1091                                 env=self.ENVIRON)
1092             (_, err) = p.communicate()
1093             self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1094             self.assertEqual(p.returncode, 0)
1095             cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1096                                     err.decode()).groups()[0]
1097             self.assertTrue(os.path.isfile(cache_filepath))
1098             # Load the cache file contents and modify the manifest to simulate
1099             # an invalid access token
1100             with open(cache_filepath, 'r') as c:
1101                 cache = json.load(c)
1102             self.assertRegex(cache['manifest'], r'\+A\S+\@')
1103             cache['manifest'] = re.sub(
1104                 r'\+A.*\@',
1105                 "+Aabcdef0123456789abcdef0123456789abcdef01@",
1106                 cache['manifest'])
1107             with open(cache_filepath, 'w') as c:
1108                 c.write(json.dumps(cache))
1109             # Re-run the upload and expect to get an invalid cache message
1110             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1111                                 stdout=subprocess.PIPE,
1112                                 stderr=subprocess.PIPE,
1113                                 env=self.ENVIRON)
1114             (_, err) = p.communicate()
1115             if not batch_mode:
1116                 self.assertRegex(
1117                     err.decode(),
1118                     r'ERROR: arv-put: Resume cache contains invalid signature.*')
1119                 self.assertEqual(p.returncode, 1)
1120             else:
1121                 self.assertRegex(
1122                     err.decode(),
1123                     r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
1124                 self.assertEqual(p.returncode, 0)
1125
1126     def test_single_expired_signature_reuploads_file(self):
1127         self.authorize_with('active')
1128         tmpdir = self.make_tmpdir()
1129         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1130             f.write('foo')
1131         # Write a second file on its own subdir to force a new stream
1132         os.mkdir(os.path.join(tmpdir, 'bar'))
1133         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1134             f.write('bar')
1135         # Upload a directory and get the cache file name
1136         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1137                              stdout=subprocess.PIPE,
1138                              stderr=subprocess.PIPE,
1139                              env=self.ENVIRON)
1140         (_, err) = p.communicate()
1141         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1142         self.assertEqual(p.returncode, 0)
1143         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1144                                    err.decode()).groups()[0]
1145         self.assertTrue(os.path.isfile(cache_filepath))
1146         # Load the cache file contents and modify the manifest to simulate
1147         # an expired access token
1148         with open(cache_filepath, 'r') as c:
1149             cache = json.load(c)
1150         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1151         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1152         # Make one of the signatures appear to have expired
1153         cache['manifest'] = re.sub(
1154             r'\@.*? 3:3:barfile.txt',
1155             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1156             cache['manifest'])
1157         with open(cache_filepath, 'w') as c:
1158             c.write(json.dumps(cache))
1159         # Re-run the upload and expect to get an invalid cache message
1160         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1161                              stdout=subprocess.PIPE,
1162                              stderr=subprocess.PIPE,
1163                              env=self.ENVIRON)
1164         (_, err) = p.communicate()
1165         self.assertRegex(
1166             err.decode(),
1167             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1168         self.assertEqual(p.returncode, 0)
1169         # Confirm that the resulting cache is different from the last run.
1170         with open(cache_filepath, 'r') as c2:
1171             new_cache = json.load(c2)
1172         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1173
1174     def test_put_collection_with_later_update(self):
1175         tmpdir = self.make_tmpdir()
1176         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1177             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1178         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1179         self.assertNotEqual(None, col['uuid'])
1180         # Add a new file to the directory
1181         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1182             f.write('The quick brown fox jumped over the lazy dog')
1183         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1184         self.assertEqual(col['uuid'], updated_col['uuid'])
1185         # Get the manifest and check that the new file is being included
1186         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1187         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1188
1189     def test_put_collection_with_utc_expiring_datetime(self):
1190         tmpdir = self.make_tmpdir()
1191         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1192         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1193             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1194         col = self.run_and_find_collection(
1195             "",
1196             ['--no-progress', '--trash-at', trash_at, tmpdir])
1197         self.assertNotEqual(None, col['uuid'])
1198         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1199         self.assertEqual(ciso8601.parse_datetime(trash_at),
1200             ciso8601.parse_datetime(c['trash_at']))
1201
1202     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1203         tmpdir = self.make_tmpdir()
1204         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1205         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1206             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1207         col = self.run_and_find_collection(
1208             "",
1209             ['--no-progress', '--trash-at', trash_at, tmpdir])
1210         self.assertNotEqual(None, col['uuid'])
1211         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1212         self.assertEqual(
1213             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1214             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1215
1216     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1217         tmpdir = self.make_tmpdir()
1218         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1219         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1220             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1221         col = self.run_and_find_collection(
1222             "",
1223             ['--no-progress', '--trash-at', trash_at, tmpdir])
1224         self.assertNotEqual(None, col['uuid'])
1225         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1226         if time.daylight:
1227             offset = datetime.timedelta(seconds=time.altzone)
1228         else:
1229             offset = datetime.timedelta(seconds=time.timezone)
1230         self.assertEqual(
1231             ciso8601.parse_datetime(trash_at) + offset,
1232             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1233
1234     def test_put_collection_with_expiring_date_only(self):
1235         tmpdir = self.make_tmpdir()
1236         trash_at = '2140-01-01'
1237         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1238         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1239             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1240         col = self.run_and_find_collection(
1241             "",
1242             ['--no-progress', '--trash-at', trash_at, tmpdir])
1243         self.assertNotEqual(None, col['uuid'])
1244         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1245         if time.daylight:
1246             offset = datetime.timedelta(seconds=time.altzone)
1247         else:
1248             offset = datetime.timedelta(seconds=time.timezone)
1249         self.assertEqual(
1250             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1251             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1252
1253     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1254         cases = ['2100', '210010','2100-10', '2100-Oct']
1255         tmpdir = self.make_tmpdir()
1256         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1257             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1258         for test_datetime in cases:
1259             with self.assertRaises(AssertionError):
1260                 self.run_and_find_collection(
1261                     "",
1262                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1263
1264     def test_put_collection_with_relative_expiring_datetime(self):
1265         expire_after = 7
1266         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1267         tmpdir = self.make_tmpdir()
1268         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1269             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1270         col = self.run_and_find_collection(
1271             "",
1272             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1273         self.assertNotEqual(None, col['uuid'])
1274         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1275         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1276         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1277         self.assertTrue(dt_before < trash_at)
1278         self.assertTrue(dt_after > trash_at)
1279
1280     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1281         expire_after = 0 # Must be >= 1
1282         tmpdir = self.make_tmpdir()
1283         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1284             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1285         with self.assertRaises(AssertionError):
1286             self.run_and_find_collection(
1287                 "",
1288                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1289
1290     def test_upload_directory_reference_without_trailing_slash(self):
1291         tmpdir1 = self.make_tmpdir()
1292         tmpdir2 = self.make_tmpdir()
1293         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1294             f.write('This is foo')
1295         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1296             f.write('This is not foo')
1297         # Upload one directory and one file
1298         col = self.run_and_find_collection("", ['--no-progress',
1299                                                 tmpdir1,
1300                                                 os.path.join(tmpdir2, 'bar')])
1301         self.assertNotEqual(None, col['uuid'])
1302         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1303         # Check that 'foo' was written inside a subcollection
1304         # OTOH, 'bar' should have been directly uploaded on the root collection
1305         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1306
1307     def test_upload_directory_reference_with_trailing_slash(self):
1308         tmpdir1 = self.make_tmpdir()
1309         tmpdir2 = self.make_tmpdir()
1310         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1311             f.write('This is foo')
1312         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1313             f.write('This is not foo')
1314         # Upload one directory (with trailing slash) and one file
1315         col = self.run_and_find_collection("", ['--no-progress',
1316                                                 tmpdir1 + os.sep,
1317                                                 os.path.join(tmpdir2, 'bar')])
1318         self.assertNotEqual(None, col['uuid'])
1319         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1320         # Check that 'foo' and 'bar' were written at the same level
1321         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1322
1323     def test_put_collection_with_high_redundancy(self):
1324         # Write empty data: we're not testing CollectionWriter, just
1325         # making sure collections.create tells the API server what our
1326         # desired replication level is.
1327         collection = self.run_and_find_collection("", ['--replication', '4'])
1328         self.assertEqual(4, collection['replication_desired'])
1329
1330     def test_put_collection_with_default_redundancy(self):
1331         collection = self.run_and_find_collection("")
1332         self.assertEqual(None, collection['replication_desired'])
1333
1334     def test_put_collection_with_unnamed_project_link(self):
1335         link = self.run_and_find_collection(
1336             "Test unnamed collection",
1337             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1338         username = pwd.getpwuid(os.getuid()).pw_name
1339         self.assertRegex(
1340             link['name'],
1341             r'^Saved at .* by {}@'.format(re.escape(username)))
1342
1343     def test_put_collection_with_name_and_no_project(self):
1344         link_name = 'Test Collection Link in home project'
1345         collection = self.run_and_find_collection(
1346             "Test named collection in home project",
1347             ['--portable-data-hash', '--name', link_name])
1348         self.assertEqual(link_name, collection['name'])
1349         my_user_uuid = self.current_user()['uuid']
1350         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1351
1352     def test_put_collection_with_named_project_link(self):
1353         link_name = 'Test auto Collection Link'
1354         collection = self.run_and_find_collection("Test named collection",
1355                                       ['--portable-data-hash',
1356                                        '--name', link_name,
1357                                        '--project-uuid', self.PROJECT_UUID])
1358         self.assertEqual(link_name, collection['name'])
1359
1360     def test_put_collection_with_storage_classes_specified(self):
1361         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1362         self.assertEqual(len(collection['storage_classes_desired']), 1)
1363         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1364
1365     def test_put_collection_with_multiple_storage_classes_specified(self):
1366         collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar  ,baz'])
1367         self.assertEqual(len(collection['storage_classes_desired']), 3)
1368         self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
1369
1370     def test_put_collection_without_storage_classes_specified(self):
1371         collection = self.run_and_find_collection("")
1372         self.assertEqual(len(collection['storage_classes_desired']), 1)
1373         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1374
1375     def test_exclude_filename_pattern(self):
1376         tmpdir = self.make_tmpdir()
1377         tmpsubdir = os.path.join(tmpdir, 'subdir')
1378         os.mkdir(tmpsubdir)
1379         for fname in ['file1', 'file2', 'file3']:
1380             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1381                 f.write("This is %s" % fname)
1382             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1383                 f.write("This is %s" % fname)
1384         col = self.run_and_find_collection("", ['--no-progress',
1385                                                 '--exclude', '*2.txt',
1386                                                 '--exclude', 'file3.*',
1387                                                  tmpdir])
1388         self.assertNotEqual(None, col['uuid'])
1389         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1390         # None of the file2.txt & file3.txt should have been uploaded
1391         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1392         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1393         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1394
1395     def test_exclude_filepath_pattern(self):
1396         tmpdir = self.make_tmpdir()
1397         tmpsubdir = os.path.join(tmpdir, 'subdir')
1398         os.mkdir(tmpsubdir)
1399         for fname in ['file1', 'file2', 'file3']:
1400             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1401                 f.write("This is %s" % fname)
1402             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1403                 f.write("This is %s" % fname)
1404         col = self.run_and_find_collection("", ['--no-progress',
1405                                                 '--exclude', 'subdir/*2.txt',
1406                                                 '--exclude', './file1.*',
1407                                                  tmpdir])
1408         self.assertNotEqual(None, col['uuid'])
1409         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1410         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1411         self.assertNotRegex(c['manifest_text'],
1412                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1413         self.assertNotRegex(c['manifest_text'],
1414                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1415         self.assertRegex(c['manifest_text'],
1416                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1417         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1418
1419     def test_unicode_on_filename(self):
1420         tmpdir = self.make_tmpdir()
1421         fname = u"iā¤arvados.txt"
1422         with open(os.path.join(tmpdir, fname), 'w') as f:
1423             f.write("This is a unicode named file")
1424         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1425         self.assertNotEqual(None, col['uuid'])
1426         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1427         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1428
1429     def test_silent_mode_no_errors(self):
1430         self.authorize_with('active')
1431         tmpdir = self.make_tmpdir()
1432         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1433             f.write('hello world')
1434         pipe = subprocess.Popen(
1435             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1436             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1437             stderr=subprocess.PIPE, env=self.ENVIRON)
1438         stdout, stderr = pipe.communicate()
1439         # No console output should occur on normal operations
1440         self.assertNotRegex(stderr.decode(), r'.+')
1441         self.assertNotRegex(stdout.decode(), r'.+')
1442
1443     def test_silent_mode_does_not_avoid_error_messages(self):
1444         self.authorize_with('active')
1445         pipe = subprocess.Popen(
1446             [sys.executable, arv_put.__file__] + ['--silent',
1447                                                   '/path/not/existant'],
1448             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1449             stderr=subprocess.PIPE, env=self.ENVIRON)
1450         stdout, stderr = pipe.communicate()
1451         # Error message should be displayed when errors happen
1452         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1453         self.assertNotRegex(stdout.decode(), r'.+')
1454
1455
1456 if __name__ == '__main__':
1457     unittest.main()