18480: Adds test exposing the bug.
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 from __future__ import absolute_import
8 from __future__ import division
9 from future import standard_library
10 standard_library.install_aliases()
11 from builtins import str
12 from builtins import range
13 from functools import partial
14 import apiclient
15 import ciso8601
16 import datetime
17 import json
18 import logging
19 import mock
20 import multiprocessing
21 import os
22 import pwd
23 import random
24 import re
25 import select
26 import shutil
27 import signal
28 import subprocess
29 import sys
30 import tempfile
31 import time
32 import unittest
33 import uuid
34
35 import arvados
36 import arvados.commands.put as arv_put
37 from . import arvados_testutil as tutil
38
39 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
40 from . import run_test_server
41
42 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
43     CACHE_ARGSET = [
44         [],
45         ['/dev/null'],
46         ['/dev/null', '--filename', 'empty'],
47         ['/tmp']
48         ]
49
50     def tearDown(self):
51         super(ArvadosPutResumeCacheTest, self).tearDown()
52         try:
53             self.last_cache.destroy()
54         except AttributeError:
55             pass
56
57     def cache_path_from_arglist(self, arglist):
58         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
59
60     def test_cache_names_stable(self):
61         for argset in self.CACHE_ARGSET:
62             self.assertEqual(self.cache_path_from_arglist(argset),
63                               self.cache_path_from_arglist(argset),
64                               "cache name changed for {}".format(argset))
65
66     def test_cache_names_unique(self):
67         results = []
68         for argset in self.CACHE_ARGSET:
69             path = self.cache_path_from_arglist(argset)
70             self.assertNotIn(path, results)
71             results.append(path)
72
73     def test_cache_names_simple(self):
74         # The goal here is to make sure the filename doesn't use characters
75         # reserved by the filesystem.  Feel free to adjust this regexp as
76         # long as it still does that.
77         bad_chars = re.compile(r'[^-\.\w]')
78         for argset in self.CACHE_ARGSET:
79             path = self.cache_path_from_arglist(argset)
80             self.assertFalse(bad_chars.search(os.path.basename(path)),
81                              "path too exotic: {}".format(path))
82
83     def test_cache_names_ignore_argument_order(self):
84         self.assertEqual(
85             self.cache_path_from_arglist(['a', 'b', 'c']),
86             self.cache_path_from_arglist(['c', 'a', 'b']))
87         self.assertEqual(
88             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
89             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
90
91     def test_cache_names_differ_for_similar_paths(self):
92         # This test needs names at / that don't exist on the real filesystem.
93         self.assertNotEqual(
94             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
95             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
96
97     def test_cache_names_ignore_irrelevant_arguments(self):
98         # Workaround: parse_arguments bails on --filename with a directory.
99         path1 = self.cache_path_from_arglist(['/tmp'])
100         args = arv_put.parse_arguments(['/tmp'])
101         args.filename = 'tmp'
102         path2 = arv_put.ResumeCache.make_path(args)
103         self.assertEqual(path1, path2,
104                          "cache path considered --filename for directory")
105         self.assertEqual(
106             self.cache_path_from_arglist(['-']),
107             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
108             "cache path considered --max-manifest-depth for file")
109
110     def test_cache_names_treat_negative_manifest_depths_identically(self):
111         base_args = ['/tmp', '--max-manifest-depth']
112         self.assertEqual(
113             self.cache_path_from_arglist(base_args + ['-1']),
114             self.cache_path_from_arglist(base_args + ['-2']))
115
116     def test_cache_names_treat_stdin_consistently(self):
117         self.assertEqual(
118             self.cache_path_from_arglist(['-', '--filename', 'test']),
119             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
120
121     def test_cache_names_identical_for_synonymous_names(self):
122         self.assertEqual(
123             self.cache_path_from_arglist(['.']),
124             self.cache_path_from_arglist([os.path.realpath('.')]))
125         testdir = self.make_tmpdir()
126         looplink = os.path.join(testdir, 'loop')
127         os.symlink(testdir, looplink)
128         self.assertEqual(
129             self.cache_path_from_arglist([testdir]),
130             self.cache_path_from_arglist([looplink]))
131
132     def test_cache_names_different_by_api_host(self):
133         config = arvados.config.settings()
134         orig_host = config.get('ARVADOS_API_HOST')
135         try:
136             name1 = self.cache_path_from_arglist(['.'])
137             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
138             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
139         finally:
140             if orig_host is None:
141                 del config['ARVADOS_API_HOST']
142             else:
143                 config['ARVADOS_API_HOST'] = orig_host
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams(self, keep_client_head):
159         keep_client_head.side_effect = [True]
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168
169     @mock.patch('arvados.keep.KeepClient.head')
170     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
171         keep_client_head.side_effect = Exception('Locator not found')
172         thing = {}
173         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
174         with tempfile.NamedTemporaryFile() as cachefile:
175             self.last_cache = arv_put.ResumeCache(cachefile.name)
176         self.last_cache.save(thing)
177         self.last_cache.close()
178         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
179         self.assertNotEqual(None, resume_cache)
180         resume_cache.check_cache()
181
182     def test_basic_cache_storage(self):
183         thing = ['test', 'list']
184         with tempfile.NamedTemporaryFile() as cachefile:
185             self.last_cache = arv_put.ResumeCache(cachefile.name)
186         self.last_cache.save(thing)
187         self.assertEqual(thing, self.last_cache.load())
188
189     def test_empty_cache(self):
190         with tempfile.NamedTemporaryFile() as cachefile:
191             cache = arv_put.ResumeCache(cachefile.name)
192         self.assertRaises(ValueError, cache.load)
193
194     def test_cache_persistent(self):
195         thing = ['test', 'list']
196         path = os.path.join(self.make_tmpdir(), 'cache')
197         cache = arv_put.ResumeCache(path)
198         cache.save(thing)
199         cache.close()
200         self.last_cache = arv_put.ResumeCache(path)
201         self.assertEqual(thing, self.last_cache.load())
202
203     def test_multiple_cache_writes(self):
204         thing = ['short', 'list']
205         with tempfile.NamedTemporaryFile() as cachefile:
206             self.last_cache = arv_put.ResumeCache(cachefile.name)
207         # Start writing an object longer than the one we test, to make
208         # sure the cache file gets truncated.
209         self.last_cache.save(['long', 'long', 'list'])
210         self.last_cache.save(thing)
211         self.assertEqual(thing, self.last_cache.load())
212
213     def test_cache_is_locked(self):
214         with tempfile.NamedTemporaryFile() as cachefile:
215             _ = arv_put.ResumeCache(cachefile.name)
216             self.assertRaises(arv_put.ResumeCacheConflict,
217                               arv_put.ResumeCache, cachefile.name)
218
219     def test_cache_stays_locked(self):
220         with tempfile.NamedTemporaryFile() as cachefile:
221             self.last_cache = arv_put.ResumeCache(cachefile.name)
222             path = cachefile.name
223         self.last_cache.save('test')
224         self.assertRaises(arv_put.ResumeCacheConflict,
225                           arv_put.ResumeCache, path)
226
227     def test_destroy_cache(self):
228         cachefile = tempfile.NamedTemporaryFile(delete=False)
229         try:
230             cache = arv_put.ResumeCache(cachefile.name)
231             cache.save('test')
232             cache.destroy()
233             try:
234                 arv_put.ResumeCache(cachefile.name)
235             except arv_put.ResumeCacheConflict:
236                 self.fail("could not load cache after destroying it")
237             self.assertRaises(ValueError, cache.load)
238         finally:
239             if os.path.exists(cachefile.name):
240                 os.unlink(cachefile.name)
241
242     def test_restart_cache(self):
243         path = os.path.join(self.make_tmpdir(), 'cache')
244         cache = arv_put.ResumeCache(path)
245         cache.save('test')
246         cache.restart()
247         self.assertRaises(ValueError, cache.load)
248         self.assertRaises(arv_put.ResumeCacheConflict,
249                           arv_put.ResumeCache, path)
250
251
252 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
253                           ArvadosBaseTestCase):
254
255     def setUp(self):
256         super(ArvPutUploadJobTest, self).setUp()
257         run_test_server.authorize_with('active')
258         # Temp files creation
259         self.tempdir = tempfile.mkdtemp()
260         subdir = os.path.join(self.tempdir, 'subdir')
261         os.mkdir(subdir)
262         data = "x" * 1024 # 1 KB
263         for i in range(1, 5):
264             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
265                 f.write(data * i)
266         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
267             f.write(data * 5)
268         # Large temp file for resume test
269         _, self.large_file_name = tempfile.mkstemp()
270         fileobj = open(self.large_file_name, 'w')
271         # Make sure to write just a little more than one block
272         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
273             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
274             fileobj.write(data)
275         fileobj.close()
276         # Temp dir containing small files to be repacked
277         self.small_files_dir = tempfile.mkdtemp()
278         data = 'y' * 1024 * 1024 # 1 MB
279         for i in range(1, 70):
280             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
281                 f.write(data + str(i))
282         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
283         # Temp dir to hold a symlink to other temp dir
284         self.tempdir_with_symlink = tempfile.mkdtemp()
285         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
286         os.symlink(os.path.join(self.tempdir, '1'),
287                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
288
289     def tearDown(self):
290         super(ArvPutUploadJobTest, self).tearDown()
291         shutil.rmtree(self.tempdir)
292         os.unlink(self.large_file_name)
293         shutil.rmtree(self.small_files_dir)
294         shutil.rmtree(self.tempdir_with_symlink)
295
296     def test_non_regular_files_are_ignored(self):
297         def pfunc(x):
298             with open(x, 'w') as f:
299                 f.write('test')
300         fifo_filename = 'fifo-file'
301         fifo_path = os.path.join(self.tempdir, fifo_filename)
302         os.mkfifo(fifo_path)
303         producer = multiprocessing.Process(target=pfunc, args=(fifo_path,))
304         producer.start()
305         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
306         cwriter.start(save_collection=False)
307         producer.join()
308         self.assertNotIn(fifo_filename, cwriter.manifest_text())
309
310     def test_symlinks_are_followed_by_default(self):
311         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
312         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
313         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
314         cwriter.start(save_collection=False)
315         self.assertIn('linkeddir', cwriter.manifest_text())
316         self.assertIn('linkedfile', cwriter.manifest_text())
317         cwriter.destroy_cache()
318
319     def test_symlinks_are_not_followed_when_requested(self):
320         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
321         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
322         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
323                                           follow_links=False)
324         cwriter.start(save_collection=False)
325         self.assertNotIn('linkeddir', cwriter.manifest_text())
326         self.assertNotIn('linkedfile', cwriter.manifest_text())
327         cwriter.destroy_cache()
328         # Check for bug #17800: passed symlinks should also be ignored.
329         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
330         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
331         cwriter.start(save_collection=False)
332         self.assertNotIn('linkeddir', cwriter.manifest_text())
333         cwriter.destroy_cache()
334
335     def test_no_empty_collection_saved(self):
336         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
337         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
338         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
339         cwriter.start(save_collection=True)
340         self.assertIsNone(cwriter.manifest_locator())
341         self.assertEqual('', cwriter.manifest_text())
342         cwriter.destroy_cache()
343
344     def test_passing_nonexistant_path_raise_exception(self):
345         uuid_str = str(uuid.uuid4())
346         with self.assertRaises(arv_put.PathDoesNotExistError):
347             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
348
349     def test_writer_works_without_cache(self):
350         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
351         cwriter.start(save_collection=False)
352         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
353
354     def test_writer_works_with_cache(self):
355         with tempfile.NamedTemporaryFile() as f:
356             f.write(b'foo')
357             f.flush()
358             cwriter = arv_put.ArvPutUploadJob([f.name])
359             cwriter.start(save_collection=False)
360             self.assertEqual(0, cwriter.bytes_skipped)
361             self.assertEqual(3, cwriter.bytes_written)
362             # Don't destroy the cache, and start another upload
363             cwriter_new = arv_put.ArvPutUploadJob([f.name])
364             cwriter_new.start(save_collection=False)
365             cwriter_new.destroy_cache()
366             self.assertEqual(3, cwriter_new.bytes_skipped)
367             self.assertEqual(3, cwriter_new.bytes_written)
368
369     def make_progress_tester(self):
370         progression = []
371         def record_func(written, expected):
372             progression.append((written, expected))
373         return progression, record_func
374
375     def test_progress_reporting(self):
376         with tempfile.NamedTemporaryFile() as f:
377             f.write(b'foo')
378             f.flush()
379             for expect_count in (None, 8):
380                 progression, reporter = self.make_progress_tester()
381                 cwriter = arv_put.ArvPutUploadJob([f.name],
382                                                   reporter=reporter)
383                 cwriter.bytes_expected = expect_count
384                 cwriter.start(save_collection=False)
385                 cwriter.destroy_cache()
386                 self.assertIn((3, expect_count), progression)
387
388     def test_writer_upload_directory(self):
389         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
390         cwriter.start(save_collection=False)
391         cwriter.destroy_cache()
392         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
393
394     def test_resume_large_file_upload(self):
395         def wrapped_write(*args, **kwargs):
396             data = args[1]
397             # Exit only on last block
398             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
399                 # Simulate a checkpoint before quitting. Ensure block commit.
400                 self.writer._update(final=True)
401                 raise SystemExit("Simulated error")
402             return self.arvfile_write(*args, **kwargs)
403
404         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
405                         autospec=True) as mocked_write:
406             mocked_write.side_effect = wrapped_write
407             writer = arv_put.ArvPutUploadJob([self.large_file_name],
408                                              replication_desired=1)
409             # We'll be accessing from inside the wrapper
410             self.writer = writer
411             with self.assertRaises(SystemExit):
412                 writer.start(save_collection=False)
413             # Confirm that the file was partially uploaded
414             self.assertGreater(writer.bytes_written, 0)
415             self.assertLess(writer.bytes_written,
416                             os.path.getsize(self.large_file_name))
417         # Retry the upload
418         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
419                                           replication_desired=1)
420         writer2.start(save_collection=False)
421         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
422                          os.path.getsize(self.large_file_name))
423         writer2.destroy_cache()
424         del(self.writer)
425
426     # Test for bug #11002
427     def test_graceful_exit_while_repacking_small_blocks(self):
428         def wrapped_commit(*args, **kwargs):
429             raise SystemExit("Simulated error")
430
431         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
432                         autospec=True) as mocked_commit:
433             mocked_commit.side_effect = wrapped_commit
434             # Upload a little more than 1 block, wrapped_commit will make the first block
435             # commit to fail.
436             # arv-put should not exit with an exception by trying to commit the collection
437             # as it's in an inconsistent state.
438             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
439                                              replication_desired=1)
440             try:
441                 with self.assertRaises(SystemExit):
442                     writer.start(save_collection=False)
443             except arvados.arvfile.UnownedBlockError:
444                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
445         writer.destroy_cache()
446
447     def test_no_resume_when_asked(self):
448         def wrapped_write(*args, **kwargs):
449             data = args[1]
450             # Exit only on last block
451             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
452                 # Simulate a checkpoint before quitting.
453                 self.writer._update()
454                 raise SystemExit("Simulated error")
455             return self.arvfile_write(*args, **kwargs)
456
457         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
458                         autospec=True) as mocked_write:
459             mocked_write.side_effect = wrapped_write
460             writer = arv_put.ArvPutUploadJob([self.large_file_name],
461                                              replication_desired=1)
462             # We'll be accessing from inside the wrapper
463             self.writer = writer
464             with self.assertRaises(SystemExit):
465                 writer.start(save_collection=False)
466             # Confirm that the file was partially uploaded
467             self.assertGreater(writer.bytes_written, 0)
468             self.assertLess(writer.bytes_written,
469                             os.path.getsize(self.large_file_name))
470         # Retry the upload, this time without resume
471         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
472                                           replication_desired=1,
473                                           resume=False)
474         writer2.start(save_collection=False)
475         self.assertEqual(writer2.bytes_skipped, 0)
476         self.assertEqual(writer2.bytes_written,
477                          os.path.getsize(self.large_file_name))
478         writer2.destroy_cache()
479         del(self.writer)
480
481     def test_no_resume_when_no_cache(self):
482         def wrapped_write(*args, **kwargs):
483             data = args[1]
484             # Exit only on last block
485             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
486                 # Simulate a checkpoint before quitting.
487                 self.writer._update()
488                 raise SystemExit("Simulated error")
489             return self.arvfile_write(*args, **kwargs)
490
491         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
492                         autospec=True) as mocked_write:
493             mocked_write.side_effect = wrapped_write
494             writer = arv_put.ArvPutUploadJob([self.large_file_name],
495                                              replication_desired=1)
496             # We'll be accessing from inside the wrapper
497             self.writer = writer
498             with self.assertRaises(SystemExit):
499                 writer.start(save_collection=False)
500             # Confirm that the file was partially uploaded
501             self.assertGreater(writer.bytes_written, 0)
502             self.assertLess(writer.bytes_written,
503                             os.path.getsize(self.large_file_name))
504         # Retry the upload, this time without cache usage
505         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
506                                           replication_desired=1,
507                                           resume=False,
508                                           use_cache=False)
509         writer2.start(save_collection=False)
510         self.assertEqual(writer2.bytes_skipped, 0)
511         self.assertEqual(writer2.bytes_written,
512                          os.path.getsize(self.large_file_name))
513         writer2.destroy_cache()
514         del(self.writer)
515
516     def test_dry_run_feature(self):
517         def wrapped_write(*args, **kwargs):
518             data = args[1]
519             # Exit only on last block
520             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
521                 # Simulate a checkpoint before quitting.
522                 self.writer._update()
523                 raise SystemExit("Simulated error")
524             return self.arvfile_write(*args, **kwargs)
525
526         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
527                         autospec=True) as mocked_write:
528             mocked_write.side_effect = wrapped_write
529             writer = arv_put.ArvPutUploadJob([self.large_file_name],
530                                              replication_desired=1)
531             # We'll be accessing from inside the wrapper
532             self.writer = writer
533             with self.assertRaises(SystemExit):
534                 writer.start(save_collection=False)
535             # Confirm that the file was partially uploaded
536             self.assertGreater(writer.bytes_written, 0)
537             self.assertLess(writer.bytes_written,
538                             os.path.getsize(self.large_file_name))
539         with self.assertRaises(arv_put.ArvPutUploadIsPending):
540             # Retry the upload using dry_run to check if there is a pending upload
541             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
542                                               replication_desired=1,
543                                               dry_run=True)
544         # Complete the pending upload
545         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
546                                           replication_desired=1)
547         writer3.start(save_collection=False)
548         with self.assertRaises(arv_put.ArvPutUploadNotPending):
549             # Confirm there's no pending upload with dry_run=True
550             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
551                                               replication_desired=1,
552                                               dry_run=True)
553         # Test obvious cases
554         with self.assertRaises(arv_put.ArvPutUploadIsPending):
555             arv_put.ArvPutUploadJob([self.large_file_name],
556                                     replication_desired=1,
557                                     dry_run=True,
558                                     resume=False,
559                                     use_cache=False)
560         with self.assertRaises(arv_put.ArvPutUploadIsPending):
561             arv_put.ArvPutUploadJob([self.large_file_name],
562                                     replication_desired=1,
563                                     dry_run=True,
564                                     resume=False)
565         del(self.writer)
566
567 class CachedManifestValidationTest(ArvadosBaseTestCase):
568     class MockedPut(arv_put.ArvPutUploadJob):
569         def __init__(self, cached_manifest=None):
570             self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
571             self._state['manifest'] = cached_manifest
572             self._api_client = mock.MagicMock()
573             self.logger = mock.MagicMock()
574             self.num_retries = 1
575
576     def datetime_to_hex(self, dt):
577         return hex(int(time.mktime(dt.timetuple())))[2:]
578
579     def setUp(self):
580         super(CachedManifestValidationTest, self).setUp()
581         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
582         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
583         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
584
585     def test_empty_cached_manifest_is_valid(self):
586         put_mock = self.MockedPut()
587         self.assertEqual(None, put_mock._state.get('manifest'))
588         self.assertTrue(put_mock._cached_manifest_valid())
589         put_mock._state['manifest'] = ''
590         self.assertTrue(put_mock._cached_manifest_valid())
591
592     def test_signature_cases(self):
593         now = datetime.datetime.utcnow()
594         yesterday = now - datetime.timedelta(days=1)
595         lastweek = now - datetime.timedelta(days=7)
596         tomorrow = now + datetime.timedelta(days=1)
597         nextweek = now + datetime.timedelta(days=7)
598
599         def mocked_head(blocks={}, loc=None):
600             blk = loc.split('+', 1)[0]
601             if blocks.get(blk):
602                 return True
603             raise arvados.errors.KeepRequestError("mocked error - block invalid")
604
605         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
606         cases = [
607             # All expired, reset cache - OK
608             (yesterday, lastweek, False, False, True),
609             (lastweek, yesterday, False, False, True),
610             # All non-expired valid blocks - OK
611             (tomorrow, nextweek, True, True, True),
612             (nextweek, tomorrow, True, True, True),
613             # All non-expired invalid blocks - Not OK
614             (tomorrow, nextweek, False, False, False),
615             (nextweek, tomorrow, False, False, False),
616             # One non-expired valid block - OK
617             (tomorrow, yesterday, True, False, True),
618             (yesterday, tomorrow, False, True, True),
619             # One non-expired invalid block - Not OK
620             (tomorrow, yesterday, False, False, False),
621             (yesterday, tomorrow, False, False, False),
622         ]
623         for case in cases:
624             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
625             head_responses = {
626                 self.block1: b1_valid,
627                 self.block2: b2_valid,
628             }
629             cached_manifest = self.template % (
630                 self.datetime_to_hex(b1_expiration),
631                 self.datetime_to_hex(b2_expiration),
632             )
633             arvput = self.MockedPut(cached_manifest)
634             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
635                 head_mock.side_effect = partial(mocked_head, head_responses)
636                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
637                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
638                 )
639                 if b1_expiration > now or b2_expiration > now:
640                     # A HEAD request should have been done
641                     head_mock.assert_called_once()
642                 else:
643                     head_mock.assert_not_called()
644
645
646 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
647     TEST_SIZE = os.path.getsize(__file__)
648
649     def test_expected_bytes_for_file(self):
650         writer = arv_put.ArvPutUploadJob([__file__])
651         self.assertEqual(self.TEST_SIZE,
652                          writer.bytes_expected)
653
654     def test_expected_bytes_for_tree(self):
655         tree = self.make_tmpdir()
656         shutil.copyfile(__file__, os.path.join(tree, 'one'))
657         shutil.copyfile(__file__, os.path.join(tree, 'two'))
658
659         writer = arv_put.ArvPutUploadJob([tree])
660         self.assertEqual(self.TEST_SIZE * 2,
661                          writer.bytes_expected)
662         writer = arv_put.ArvPutUploadJob([tree, __file__])
663         self.assertEqual(self.TEST_SIZE * 3,
664                          writer.bytes_expected)
665
666     def test_expected_bytes_for_device(self):
667         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
668         self.assertIsNone(writer.bytes_expected)
669         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
670         self.assertIsNone(writer.bytes_expected)
671
672
673 class ArvadosPutReportTest(ArvadosBaseTestCase):
674     def test_machine_progress(self):
675         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
676             expect = ": {} written {} total\n".format(
677                 count, -1 if (total is None) else total)
678             self.assertTrue(
679                 arv_put.machine_progress(count, total).endswith(expect))
680
681     def test_known_human_progress(self):
682         for count, total in [(0, 1), (2, 4), (45, 60)]:
683             expect = '{:.1%}'.format(1.0*count/total)
684             actual = arv_put.human_progress(count, total)
685             self.assertTrue(actual.startswith('\r'))
686             self.assertIn(expect, actual)
687
688     def test_unknown_human_progress(self):
689         for count in [1, 20, 300, 4000, 50000]:
690             self.assertTrue(re.search(r'\b{}\b'.format(count),
691                                       arv_put.human_progress(count, None)))
692
693
694 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
695     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
696
697     def setUp(self):
698         super(ArvPutLogFormatterTest, self).setUp()
699         self.stderr = tutil.StringIO()
700         self.loggingHandler = logging.StreamHandler(self.stderr)
701         self.loggingHandler.setFormatter(
702             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
703         self.logger = logging.getLogger()
704         self.logger.addHandler(self.loggingHandler)
705         self.logger.setLevel(logging.DEBUG)
706
707     def tearDown(self):
708         self.logger.removeHandler(self.loggingHandler)
709         self.stderr.close()
710         self.stderr = None
711         super(ArvPutLogFormatterTest, self).tearDown()
712
713     def test_request_id_logged_only_once_on_error(self):
714         self.logger.error('Ooops, something bad happened.')
715         self.logger.error('Another bad thing just happened.')
716         log_lines = self.stderr.getvalue().split('\n')[:-1]
717         self.assertEqual(2, len(log_lines))
718         self.assertRegex(log_lines[0], self.matcher)
719         self.assertNotRegex(log_lines[1], self.matcher)
720
721     def test_request_id_logged_only_once_on_debug(self):
722         self.logger.debug('This is just a debug message.')
723         self.logger.debug('Another message, move along.')
724         log_lines = self.stderr.getvalue().split('\n')[:-1]
725         self.assertEqual(2, len(log_lines))
726         self.assertRegex(log_lines[0], self.matcher)
727         self.assertNotRegex(log_lines[1], self.matcher)
728
729     def test_request_id_not_logged_on_info(self):
730         self.logger.info('This should be a useful message')
731         log_lines = self.stderr.getvalue().split('\n')[:-1]
732         self.assertEqual(1, len(log_lines))
733         self.assertNotRegex(log_lines[0], self.matcher)
734
735 class ArvadosPutTest(run_test_server.TestCaseWithServers,
736                      ArvadosBaseTestCase,
737                      tutil.VersionChecker):
738     MAIN_SERVER = {}
739     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
740
741     def call_main_with_args(self, args):
742         self.main_stdout.seek(0, 0)
743         self.main_stdout.truncate(0)
744         self.main_stderr.seek(0, 0)
745         self.main_stderr.truncate(0)
746         return arv_put.main(args, self.main_stdout, self.main_stderr)
747
748     def call_main_on_test_file(self, args=[]):
749         with self.make_test_file() as testfile:
750             path = testfile.name
751             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
752         self.assertTrue(
753             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
754                                         '098f6bcd4621d373cade4e832627b4f6')),
755             "did not find file stream in Keep store")
756
757     def setUp(self):
758         super(ArvadosPutTest, self).setUp()
759         run_test_server.authorize_with('active')
760         arv_put.api_client = None
761         self.main_stdout = tutil.StringIO()
762         self.main_stderr = tutil.StringIO()
763         self.loggingHandler = logging.StreamHandler(self.main_stderr)
764         self.loggingHandler.setFormatter(
765             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
766         logging.getLogger().addHandler(self.loggingHandler)
767
768     def tearDown(self):
769         logging.getLogger().removeHandler(self.loggingHandler)
770         for outbuf in ['main_stdout', 'main_stderr']:
771             if hasattr(self, outbuf):
772                 getattr(self, outbuf).close()
773                 delattr(self, outbuf)
774         super(ArvadosPutTest, self).tearDown()
775
776     def test_version_argument(self):
777         with tutil.redirected_streams(
778                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
779             with self.assertRaises(SystemExit):
780                 self.call_main_with_args(['--version'])
781         self.assertVersionOutput(out, err)
782
783     def test_simple_file_put(self):
784         self.call_main_on_test_file()
785
786     def test_put_with_unwriteable_cache_dir(self):
787         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
788         cachedir = self.make_tmpdir()
789         os.chmod(cachedir, 0o0)
790         arv_put.ResumeCache.CACHE_DIR = cachedir
791         try:
792             self.call_main_on_test_file()
793         finally:
794             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
795             os.chmod(cachedir, 0o700)
796
797     def test_put_with_unwritable_cache_subdir(self):
798         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
799         cachedir = self.make_tmpdir()
800         os.chmod(cachedir, 0o0)
801         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
802         try:
803             self.call_main_on_test_file()
804         finally:
805             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
806             os.chmod(cachedir, 0o700)
807
808     def test_put_block_replication(self):
809         self.call_main_on_test_file()
810         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
811             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
812             self.call_main_on_test_file(['--replication', '1'])
813             self.call_main_on_test_file(['--replication', '4'])
814             self.call_main_on_test_file(['--replication', '5'])
815             self.assertEqual(
816                 [x[-1].get('copies') for x in put_mock.call_args_list],
817                 [1, 4, 5])
818
819     def test_normalize(self):
820         testfile1 = self.make_test_file()
821         testfile2 = self.make_test_file()
822         test_paths = [testfile1.name, testfile2.name]
823         # Reverse-sort the paths, so normalization must change their order.
824         test_paths.sort(reverse=True)
825         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
826                                  test_paths)
827         manifest = self.main_stdout.getvalue()
828         # Assert the second file we specified appears first in the manifest.
829         file_indices = [manifest.find(':' + os.path.basename(path))
830                         for path in test_paths]
831         self.assertGreater(*file_indices)
832
833     def test_error_name_without_collection(self):
834         self.assertRaises(SystemExit, self.call_main_with_args,
835                           ['--name', 'test without Collection',
836                            '--stream', '/dev/null'])
837
838     def test_error_when_project_not_found(self):
839         self.assertRaises(SystemExit,
840                           self.call_main_with_args,
841                           ['--project-uuid', self.Z_UUID])
842
843     def test_error_bad_project_uuid(self):
844         self.assertRaises(SystemExit,
845                           self.call_main_with_args,
846                           ['--project-uuid', self.Z_UUID, '--stream'])
847
848     def test_error_when_excluding_absolute_path(self):
849         tmpdir = self.make_tmpdir()
850         self.assertRaises(SystemExit,
851                           self.call_main_with_args,
852                           ['--exclude', '/some/absolute/path/*',
853                            tmpdir])
854
855     def test_api_error_handling(self):
856         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
857         coll_save_mock.side_effect = arvados.errors.ApiError(
858             fake_httplib2_response(403), b'{}')
859         with mock.patch('arvados.collection.Collection.save_new',
860                         new=coll_save_mock):
861             with self.assertRaises(SystemExit) as exc_test:
862                 self.call_main_with_args(['/dev/null'])
863             self.assertLess(0, exc_test.exception.args[0])
864             self.assertLess(0, coll_save_mock.call_count)
865             self.assertEqual("", self.main_stdout.getvalue())
866
867     def test_request_id_logging_on_error(self):
868         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
869         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
870         coll_save_mock.side_effect = arvados.errors.ApiError(
871             fake_httplib2_response(403), b'{}')
872         with mock.patch('arvados.collection.Collection.save_new',
873                         new=coll_save_mock):
874             with self.assertRaises(SystemExit):
875                 self.call_main_with_args(['/dev/null'])
876             self.assertRegex(
877                 self.main_stderr.getvalue(), matcher)
878
879
880 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
881                             ArvadosBaseTestCase):
882     MAIN_SERVER = {}
883     KEEP_SERVER = {'blob_signing': True}
884     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
885
886     @classmethod
887     def setUpClass(cls):
888         super(ArvPutIntegrationTest, cls).setUpClass()
889         cls.ENVIRON = os.environ.copy()
890         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
891
892     def datetime_to_hex(self, dt):
893         return hex(int(time.mktime(dt.timetuple())))[2:]
894
895     def setUp(self):
896         super(ArvPutIntegrationTest, self).setUp()
897         arv_put.api_client = None
898
899     def authorize_with(self, token_name):
900         run_test_server.authorize_with(token_name)
901         for v in ["ARVADOS_API_HOST",
902                   "ARVADOS_API_HOST_INSECURE",
903                   "ARVADOS_API_TOKEN"]:
904             self.ENVIRON[v] = arvados.config.settings()[v]
905         arv_put.api_client = arvados.api('v1')
906
907     def current_user(self):
908         return arv_put.api_client.users().current().execute()
909
910     def test_check_real_project_found(self):
911         self.authorize_with('active')
912         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
913                         "did not correctly find test fixture project")
914
915     def test_check_error_finding_nonexistent_uuid(self):
916         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
917         self.authorize_with('active')
918         try:
919             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
920                                                   0)
921         except ValueError as error:
922             self.assertIn(BAD_UUID, str(error))
923         else:
924             self.assertFalse(result, "incorrectly found nonexistent project")
925
926     def test_check_error_finding_nonexistent_project(self):
927         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
928         self.authorize_with('active')
929         with self.assertRaises(apiclient.errors.HttpError):
930             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
931                                                   0)
932
933     def test_short_put_from_stdin(self):
934         # Have to run this as an integration test since arv-put can't
935         # read from the tests' stdin.
936         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
937         # case, because the /proc entry is already gone by the time it tries.
938         pipe = subprocess.Popen(
939             [sys.executable, arv_put.__file__, '--stream'],
940             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
941             stderr=subprocess.STDOUT, env=self.ENVIRON)
942         pipe.stdin.write(b'stdin test\xa6\n')
943         pipe.stdin.close()
944         deadline = time.time() + 5
945         while (pipe.poll() is None) and (time.time() < deadline):
946             time.sleep(.1)
947         returncode = pipe.poll()
948         if returncode is None:
949             pipe.terminate()
950             self.fail("arv-put did not PUT from stdin within 5 seconds")
951         elif returncode != 0:
952             sys.stdout.write(pipe.stdout.read())
953             self.fail("arv-put returned exit code {}".format(returncode))
954         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
955                       pipe.stdout.read().decode())
956
957     def test_sigint_logs_request_id(self):
958         # Start arv-put, give it a chance to start up, send SIGINT,
959         # and check that its output includes the X-Request-Id.
960         input_stream = subprocess.Popen(
961             ['sleep', '10'],
962             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
963         pipe = subprocess.Popen(
964             [sys.executable, arv_put.__file__, '--stream'],
965             stdin=input_stream.stdout, stdout=subprocess.PIPE,
966             stderr=subprocess.STDOUT, env=self.ENVIRON)
967         # Wait for arv-put child process to print something (i.e., a
968         # log message) so we know its signal handler is installed.
969         select.select([pipe.stdout], [], [], 10)
970         pipe.send_signal(signal.SIGINT)
971         deadline = time.time() + 5
972         while (pipe.poll() is None) and (time.time() < deadline):
973             time.sleep(.1)
974         returncode = pipe.poll()
975         input_stream.terminate()
976         if returncode is None:
977             pipe.terminate()
978             self.fail("arv-put did not exit within 5 seconds")
979         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
980
981     def test_ArvPutSignedManifest(self):
982         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
983         # the newly created manifest from the API server, testing to confirm
984         # that the block locators in the returned manifest are signed.
985         self.authorize_with('active')
986
987         # Before doing anything, demonstrate that the collection
988         # we're about to create is not present in our test fixture.
989         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
990         with self.assertRaises(apiclient.errors.HttpError):
991             arv_put.api_client.collections().get(
992                 uuid=manifest_uuid).execute()
993
994         datadir = self.make_tmpdir()
995         with open(os.path.join(datadir, "foo"), "w") as f:
996             f.write("The quick brown fox jumped over the lazy dog")
997         p = subprocess.Popen([sys.executable, arv_put.__file__,
998                               os.path.join(datadir, 'foo')],
999                              stdout=subprocess.PIPE,
1000                              stderr=subprocess.PIPE,
1001                              env=self.ENVIRON)
1002         (_, err) = p.communicate()
1003         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
1004         self.assertEqual(p.returncode, 0)
1005
1006         # The manifest text stored in the API server under the same
1007         # manifest UUID must use signed locators.
1008         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
1009         self.assertRegex(
1010             c['manifest_text'],
1011             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1012
1013         os.remove(os.path.join(datadir, "foo"))
1014         os.rmdir(datadir)
1015
1016     def run_and_find_collection(self, text, extra_args=[]):
1017         self.authorize_with('active')
1018         pipe = subprocess.Popen(
1019             [sys.executable, arv_put.__file__] + extra_args,
1020             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1021             stderr=subprocess.PIPE, env=self.ENVIRON)
1022         stdout, stderr = pipe.communicate(text.encode())
1023         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1024         search_key = ('portable_data_hash'
1025                       if '--portable-data-hash' in extra_args else 'uuid')
1026         collection_list = arvados.api('v1').collections().list(
1027             filters=[[search_key, '=', stdout.decode().strip()]]
1028         ).execute().get('items', [])
1029         self.assertEqual(1, len(collection_list))
1030         return collection_list[0]
1031
1032     def test_all_expired_signatures_invalidates_cache(self):
1033         self.authorize_with('active')
1034         tmpdir = self.make_tmpdir()
1035         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1036             f.write('foo')
1037         # Upload a directory and get the cache file name
1038         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1039                              stdout=subprocess.PIPE,
1040                              stderr=subprocess.PIPE,
1041                              env=self.ENVIRON)
1042         (_, err) = p.communicate()
1043         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1044         self.assertEqual(p.returncode, 0)
1045         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1046                                    err.decode()).groups()[0]
1047         self.assertTrue(os.path.isfile(cache_filepath))
1048         # Load the cache file contents and modify the manifest to simulate
1049         # an expired access token
1050         with open(cache_filepath, 'r') as c:
1051             cache = json.load(c)
1052         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1053         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1054         cache['manifest'] = re.sub(
1055             r'\@.*? ',
1056             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1057             cache['manifest'])
1058         with open(cache_filepath, 'w') as c:
1059             c.write(json.dumps(cache))
1060         # Re-run the upload and expect to get an invalid cache message
1061         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1062                              stdout=subprocess.PIPE,
1063                              stderr=subprocess.PIPE,
1064                              env=self.ENVIRON)
1065         (_, err) = p.communicate()
1066         self.assertRegex(
1067             err.decode(),
1068             r'INFO: Cache expired, starting from scratch.*')
1069         self.assertEqual(p.returncode, 0)
1070
1071     def test_invalid_signature_in_cache(self):
1072         for batch_mode in [False, True]:
1073             self.authorize_with('active')
1074             tmpdir = self.make_tmpdir()
1075             with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1076                 f.write('foo')
1077             # Upload a directory and get the cache file name
1078             arv_put_args = [tmpdir]
1079             if batch_mode:
1080                 arv_put_args = ['--batch'] + arv_put_args
1081             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1082                                 stdout=subprocess.PIPE,
1083                                 stderr=subprocess.PIPE,
1084                                 env=self.ENVIRON)
1085             (_, err) = p.communicate()
1086             self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1087             self.assertEqual(p.returncode, 0)
1088             cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1089                                     err.decode()).groups()[0]
1090             self.assertTrue(os.path.isfile(cache_filepath))
1091             # Load the cache file contents and modify the manifest to simulate
1092             # an invalid access token
1093             with open(cache_filepath, 'r') as c:
1094                 cache = json.load(c)
1095             self.assertRegex(cache['manifest'], r'\+A\S+\@')
1096             cache['manifest'] = re.sub(
1097                 r'\+A.*\@',
1098                 "+Aabcdef0123456789abcdef0123456789abcdef01@",
1099                 cache['manifest'])
1100             with open(cache_filepath, 'w') as c:
1101                 c.write(json.dumps(cache))
1102             # Re-run the upload and expect to get an invalid cache message
1103             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1104                                 stdout=subprocess.PIPE,
1105                                 stderr=subprocess.PIPE,
1106                                 env=self.ENVIRON)
1107             (_, err) = p.communicate()
1108             if not batch_mode:
1109                 self.assertRegex(
1110                     err.decode(),
1111                     r'ERROR: arv-put: Resume cache contains invalid signature.*')
1112                 self.assertEqual(p.returncode, 1)
1113             else:
1114                 self.assertRegex(
1115                     err.decode(),
1116                     r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
1117                 self.assertEqual(p.returncode, 0)
1118
1119     def test_single_expired_signature_reuploads_file(self):
1120         self.authorize_with('active')
1121         tmpdir = self.make_tmpdir()
1122         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1123             f.write('foo')
1124         # Write a second file on its own subdir to force a new stream
1125         os.mkdir(os.path.join(tmpdir, 'bar'))
1126         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1127             f.write('bar')
1128         # Upload a directory and get the cache file name
1129         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1130                              stdout=subprocess.PIPE,
1131                              stderr=subprocess.PIPE,
1132                              env=self.ENVIRON)
1133         (_, err) = p.communicate()
1134         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1135         self.assertEqual(p.returncode, 0)
1136         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1137                                    err.decode()).groups()[0]
1138         self.assertTrue(os.path.isfile(cache_filepath))
1139         # Load the cache file contents and modify the manifest to simulate
1140         # an expired access token
1141         with open(cache_filepath, 'r') as c:
1142             cache = json.load(c)
1143         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1144         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1145         # Make one of the signatures appear to have expired
1146         cache['manifest'] = re.sub(
1147             r'\@.*? 3:3:barfile.txt',
1148             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1149             cache['manifest'])
1150         with open(cache_filepath, 'w') as c:
1151             c.write(json.dumps(cache))
1152         # Re-run the upload and expect to get an invalid cache message
1153         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1154                              stdout=subprocess.PIPE,
1155                              stderr=subprocess.PIPE,
1156                              env=self.ENVIRON)
1157         (_, err) = p.communicate()
1158         self.assertRegex(
1159             err.decode(),
1160             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1161         self.assertEqual(p.returncode, 0)
1162         # Confirm that the resulting cache is different from the last run.
1163         with open(cache_filepath, 'r') as c2:
1164             new_cache = json.load(c2)
1165         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1166
1167     def test_put_collection_with_later_update(self):
1168         tmpdir = self.make_tmpdir()
1169         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1170             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1171         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1172         self.assertNotEqual(None, col['uuid'])
1173         # Add a new file to the directory
1174         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1175             f.write('The quick brown fox jumped over the lazy dog')
1176         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1177         self.assertEqual(col['uuid'], updated_col['uuid'])
1178         # Get the manifest and check that the new file is being included
1179         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1180         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1181
1182     def test_put_collection_with_utc_expiring_datetime(self):
1183         tmpdir = self.make_tmpdir()
1184         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1185         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1186             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1187         col = self.run_and_find_collection(
1188             "",
1189             ['--no-progress', '--trash-at', trash_at, tmpdir])
1190         self.assertNotEqual(None, col['uuid'])
1191         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1192         self.assertEqual(ciso8601.parse_datetime(trash_at),
1193             ciso8601.parse_datetime(c['trash_at']))
1194
1195     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1196         tmpdir = self.make_tmpdir()
1197         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1198         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1199             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1200         col = self.run_and_find_collection(
1201             "",
1202             ['--no-progress', '--trash-at', trash_at, tmpdir])
1203         self.assertNotEqual(None, col['uuid'])
1204         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1205         self.assertEqual(
1206             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1207             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1208
1209     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1210         tmpdir = self.make_tmpdir()
1211         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1212         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1213             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1214         col = self.run_and_find_collection(
1215             "",
1216             ['--no-progress', '--trash-at', trash_at, tmpdir])
1217         self.assertNotEqual(None, col['uuid'])
1218         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1219         if time.daylight:
1220             offset = datetime.timedelta(seconds=time.altzone)
1221         else:
1222             offset = datetime.timedelta(seconds=time.timezone)
1223         self.assertEqual(
1224             ciso8601.parse_datetime(trash_at) + offset,
1225             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1226
1227     def test_put_collection_with_expiring_date_only(self):
1228         tmpdir = self.make_tmpdir()
1229         trash_at = '2140-01-01'
1230         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1231         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1232             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1233         col = self.run_and_find_collection(
1234             "",
1235             ['--no-progress', '--trash-at', trash_at, tmpdir])
1236         self.assertNotEqual(None, col['uuid'])
1237         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1238         if time.daylight:
1239             offset = datetime.timedelta(seconds=time.altzone)
1240         else:
1241             offset = datetime.timedelta(seconds=time.timezone)
1242         self.assertEqual(
1243             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1244             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1245
1246     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1247         cases = ['2100', '210010','2100-10', '2100-Oct']
1248         tmpdir = self.make_tmpdir()
1249         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1250             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1251         for test_datetime in cases:
1252             with self.assertRaises(AssertionError):
1253                 self.run_and_find_collection(
1254                     "",
1255                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1256
1257     def test_put_collection_with_relative_expiring_datetime(self):
1258         expire_after = 7
1259         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1260         tmpdir = self.make_tmpdir()
1261         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1262             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1263         col = self.run_and_find_collection(
1264             "",
1265             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1266         self.assertNotEqual(None, col['uuid'])
1267         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1268         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1269         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1270         self.assertTrue(dt_before < trash_at)
1271         self.assertTrue(dt_after > trash_at)
1272
1273     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1274         expire_after = 0 # Must be >= 1
1275         tmpdir = self.make_tmpdir()
1276         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1277             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1278         with self.assertRaises(AssertionError):
1279             self.run_and_find_collection(
1280                 "",
1281                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1282
1283     def test_upload_directory_reference_without_trailing_slash(self):
1284         tmpdir1 = self.make_tmpdir()
1285         tmpdir2 = self.make_tmpdir()
1286         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1287             f.write('This is foo')
1288         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1289             f.write('This is not foo')
1290         # Upload one directory and one file
1291         col = self.run_and_find_collection("", ['--no-progress',
1292                                                 tmpdir1,
1293                                                 os.path.join(tmpdir2, 'bar')])
1294         self.assertNotEqual(None, col['uuid'])
1295         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1296         # Check that 'foo' was written inside a subcollection
1297         # OTOH, 'bar' should have been directly uploaded on the root collection
1298         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1299
1300     def test_upload_directory_reference_with_trailing_slash(self):
1301         tmpdir1 = self.make_tmpdir()
1302         tmpdir2 = self.make_tmpdir()
1303         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1304             f.write('This is foo')
1305         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1306             f.write('This is not foo')
1307         # Upload one directory (with trailing slash) and one file
1308         col = self.run_and_find_collection("", ['--no-progress',
1309                                                 tmpdir1 + os.sep,
1310                                                 os.path.join(tmpdir2, 'bar')])
1311         self.assertNotEqual(None, col['uuid'])
1312         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1313         # Check that 'foo' and 'bar' were written at the same level
1314         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1315
1316     def test_put_collection_with_high_redundancy(self):
1317         # Write empty data: we're not testing CollectionWriter, just
1318         # making sure collections.create tells the API server what our
1319         # desired replication level is.
1320         collection = self.run_and_find_collection("", ['--replication', '4'])
1321         self.assertEqual(4, collection['replication_desired'])
1322
1323     def test_put_collection_with_default_redundancy(self):
1324         collection = self.run_and_find_collection("")
1325         self.assertEqual(None, collection['replication_desired'])
1326
1327     def test_put_collection_with_unnamed_project_link(self):
1328         link = self.run_and_find_collection(
1329             "Test unnamed collection",
1330             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1331         username = pwd.getpwuid(os.getuid()).pw_name
1332         self.assertRegex(
1333             link['name'],
1334             r'^Saved at .* by {}@'.format(re.escape(username)))
1335
1336     def test_put_collection_with_name_and_no_project(self):
1337         link_name = 'Test Collection Link in home project'
1338         collection = self.run_and_find_collection(
1339             "Test named collection in home project",
1340             ['--portable-data-hash', '--name', link_name])
1341         self.assertEqual(link_name, collection['name'])
1342         my_user_uuid = self.current_user()['uuid']
1343         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1344
1345     def test_put_collection_with_named_project_link(self):
1346         link_name = 'Test auto Collection Link'
1347         collection = self.run_and_find_collection("Test named collection",
1348                                       ['--portable-data-hash',
1349                                        '--name', link_name,
1350                                        '--project-uuid', self.PROJECT_UUID])
1351         self.assertEqual(link_name, collection['name'])
1352
1353     def test_put_collection_with_storage_classes_specified(self):
1354         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1355         self.assertEqual(len(collection['storage_classes_desired']), 1)
1356         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1357
1358     def test_put_collection_with_multiple_storage_classes_specified(self):
1359         collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar  ,baz'])
1360         self.assertEqual(len(collection['storage_classes_desired']), 3)
1361         self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
1362
1363     def test_put_collection_without_storage_classes_specified(self):
1364         collection = self.run_and_find_collection("")
1365         self.assertEqual(len(collection['storage_classes_desired']), 1)
1366         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1367
1368     def test_exclude_filename_pattern(self):
1369         tmpdir = self.make_tmpdir()
1370         tmpsubdir = os.path.join(tmpdir, 'subdir')
1371         os.mkdir(tmpsubdir)
1372         for fname in ['file1', 'file2', 'file3']:
1373             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1374                 f.write("This is %s" % fname)
1375             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1376                 f.write("This is %s" % fname)
1377         col = self.run_and_find_collection("", ['--no-progress',
1378                                                 '--exclude', '*2.txt',
1379                                                 '--exclude', 'file3.*',
1380                                                  tmpdir])
1381         self.assertNotEqual(None, col['uuid'])
1382         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1383         # None of the file2.txt & file3.txt should have been uploaded
1384         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1385         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1386         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1387
1388     def test_exclude_filepath_pattern(self):
1389         tmpdir = self.make_tmpdir()
1390         tmpsubdir = os.path.join(tmpdir, 'subdir')
1391         os.mkdir(tmpsubdir)
1392         for fname in ['file1', 'file2', 'file3']:
1393             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1394                 f.write("This is %s" % fname)
1395             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1396                 f.write("This is %s" % fname)
1397         col = self.run_and_find_collection("", ['--no-progress',
1398                                                 '--exclude', 'subdir/*2.txt',
1399                                                 '--exclude', './file1.*',
1400                                                  tmpdir])
1401         self.assertNotEqual(None, col['uuid'])
1402         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1403         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1404         self.assertNotRegex(c['manifest_text'],
1405                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1406         self.assertNotRegex(c['manifest_text'],
1407                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1408         self.assertRegex(c['manifest_text'],
1409                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1410         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1411
1412     def test_unicode_on_filename(self):
1413         tmpdir = self.make_tmpdir()
1414         fname = u"iā¤arvados.txt"
1415         with open(os.path.join(tmpdir, fname), 'w') as f:
1416             f.write("This is a unicode named file")
1417         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1418         self.assertNotEqual(None, col['uuid'])
1419         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1420         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1421
1422     def test_silent_mode_no_errors(self):
1423         self.authorize_with('active')
1424         tmpdir = self.make_tmpdir()
1425         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1426             f.write('hello world')
1427         pipe = subprocess.Popen(
1428             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1429             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1430             stderr=subprocess.PIPE, env=self.ENVIRON)
1431         stdout, stderr = pipe.communicate()
1432         # No console output should occur on normal operations
1433         self.assertNotRegex(stderr.decode(), r'.+')
1434         self.assertNotRegex(stdout.decode(), r'.+')
1435
1436     def test_silent_mode_does_not_avoid_error_messages(self):
1437         self.authorize_with('active')
1438         pipe = subprocess.Popen(
1439             [sys.executable, arv_put.__file__] + ['--silent',
1440                                                   '/path/not/existant'],
1441             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1442             stderr=subprocess.PIPE, env=self.ENVIRON)
1443         stdout, stderr = pipe.communicate()
1444         # Error message should be displayed when errors happen
1445         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1446         self.assertNotRegex(stdout.decode(), r'.+')
1447
1448
1449 if __name__ == '__main__':
1450     unittest.main()