Merge branch 'main' into 15397-remove-obsolete-apis
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 import apiclient
8 import ciso8601
9 import copy
10 import datetime
11 import json
12 import logging
13 import multiprocessing
14 import os
15 import pwd
16 import random
17 import re
18 import select
19 import shutil
20 import signal
21 import subprocess
22 import sys
23 import tempfile
24 import time
25 import unittest
26 import uuid
27
28 import pytest
29 from functools import partial
30 from pathlib import Path
31 from unittest import mock
32
33 import arvados
34 import arvados.commands.put as arv_put
35 import arvados.util
36 from . import arvados_testutil as tutil
37
38 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
39 from . import run_test_server
40
41 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
42     CACHE_ARGSET = [
43         [],
44         ['/dev/null'],
45         ['/dev/null', '--filename', 'empty'],
46         ['/tmp']
47         ]
48
49     def tearDown(self):
50         super(ArvadosPutResumeCacheTest, self).tearDown()
51         try:
52             self.last_cache.destroy()
53         except AttributeError:
54             pass
55
56     def cache_path_from_arglist(self, arglist):
57         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
58
59     def test_cache_names_stable(self):
60         for argset in self.CACHE_ARGSET:
61             self.assertEqual(self.cache_path_from_arglist(argset),
62                               self.cache_path_from_arglist(argset),
63                               "cache name changed for {}".format(argset))
64
65     def test_cache_names_unique(self):
66         results = []
67         for argset in self.CACHE_ARGSET:
68             path = self.cache_path_from_arglist(argset)
69             self.assertNotIn(path, results)
70             results.append(path)
71
72     def test_cache_names_simple(self):
73         # The goal here is to make sure the filename doesn't use characters
74         # reserved by the filesystem.  Feel free to adjust this regexp as
75         # long as it still does that.
76         bad_chars = re.compile(r'[^-\.\w]')
77         for argset in self.CACHE_ARGSET:
78             path = self.cache_path_from_arglist(argset)
79             self.assertFalse(bad_chars.search(os.path.basename(path)),
80                              "path too exotic: {}".format(path))
81
82     def test_cache_names_ignore_argument_order(self):
83         self.assertEqual(
84             self.cache_path_from_arglist(['a', 'b', 'c']),
85             self.cache_path_from_arglist(['c', 'a', 'b']))
86         self.assertEqual(
87             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
88             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
89
90     def test_cache_names_differ_for_similar_paths(self):
91         # This test needs names at / that don't exist on the real filesystem.
92         self.assertNotEqual(
93             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
94             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
95
96     def test_cache_names_ignore_irrelevant_arguments(self):
97         # Workaround: parse_arguments bails on --filename with a directory.
98         path1 = self.cache_path_from_arglist(['/tmp'])
99         args = arv_put.parse_arguments(['/tmp'])
100         args.filename = 'tmp'
101         path2 = arv_put.ResumeCache.make_path(args)
102         self.assertEqual(path1, path2,
103                          "cache path considered --filename for directory")
104         self.assertEqual(
105             self.cache_path_from_arglist(['-']),
106             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
107             "cache path considered --max-manifest-depth for file")
108
109     def test_cache_names_treat_negative_manifest_depths_identically(self):
110         base_args = ['/tmp', '--max-manifest-depth']
111         self.assertEqual(
112             self.cache_path_from_arglist(base_args + ['-1']),
113             self.cache_path_from_arglist(base_args + ['-2']))
114
115     def test_cache_names_treat_stdin_consistently(self):
116         self.assertEqual(
117             self.cache_path_from_arglist(['-', '--filename', 'test']),
118             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
119
120     def test_cache_names_identical_for_synonymous_names(self):
121         self.assertEqual(
122             self.cache_path_from_arglist(['.']),
123             self.cache_path_from_arglist([os.path.realpath('.')]))
124         testdir = self.make_tmpdir()
125         looplink = os.path.join(testdir, 'loop')
126         os.symlink(testdir, looplink)
127         self.assertEqual(
128             self.cache_path_from_arglist([testdir]),
129             self.cache_path_from_arglist([looplink]))
130
131     def test_cache_names_different_by_api_host(self):
132         config = arvados.config.settings()
133         orig_host = config.get('ARVADOS_API_HOST')
134         try:
135             name1 = self.cache_path_from_arglist(['.'])
136             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
137             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
138         finally:
139             if orig_host is None:
140                 del config['ARVADOS_API_HOST']
141             else:
142                 config['ARVADOS_API_HOST'] = orig_host
143
144     @mock.patch('arvados.keep.KeepClient.head')
145     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
146         keep_client_head.side_effect = [True]
147         thing = {}
148         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
149         with tempfile.NamedTemporaryFile() as cachefile:
150             self.last_cache = arv_put.ResumeCache(cachefile.name)
151         self.last_cache.save(thing)
152         self.last_cache.close()
153         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
154         self.assertNotEqual(None, resume_cache)
155
156     @mock.patch('arvados.keep.KeepClient.head')
157     def test_resume_cache_with_finished_streams(self, keep_client_head):
158         keep_client_head.side_effect = [True]
159         thing = {}
160         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
161         with tempfile.NamedTemporaryFile() as cachefile:
162             self.last_cache = arv_put.ResumeCache(cachefile.name)
163         self.last_cache.save(thing)
164         self.last_cache.close()
165         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
166         self.assertNotEqual(None, resume_cache)
167
168     @mock.patch('arvados.keep.KeepClient.head')
169     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
170         keep_client_head.side_effect = Exception('Locator not found')
171         thing = {}
172         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
173         with tempfile.NamedTemporaryFile() as cachefile:
174             self.last_cache = arv_put.ResumeCache(cachefile.name)
175         self.last_cache.save(thing)
176         self.last_cache.close()
177         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
178         self.assertNotEqual(None, resume_cache)
179         resume_cache.check_cache()
180
181     def test_basic_cache_storage(self):
182         thing = ['test', 'list']
183         with tempfile.NamedTemporaryFile() as cachefile:
184             self.last_cache = arv_put.ResumeCache(cachefile.name)
185         self.last_cache.save(thing)
186         self.assertEqual(thing, self.last_cache.load())
187
188     def test_empty_cache(self):
189         with tempfile.NamedTemporaryFile() as cachefile:
190             cache = arv_put.ResumeCache(cachefile.name)
191         self.assertRaises(ValueError, cache.load)
192
193     def test_cache_persistent(self):
194         thing = ['test', 'list']
195         path = os.path.join(self.make_tmpdir(), 'cache')
196         cache = arv_put.ResumeCache(path)
197         cache.save(thing)
198         cache.close()
199         self.last_cache = arv_put.ResumeCache(path)
200         self.assertEqual(thing, self.last_cache.load())
201
202     def test_multiple_cache_writes(self):
203         thing = ['short', 'list']
204         with tempfile.NamedTemporaryFile() as cachefile:
205             self.last_cache = arv_put.ResumeCache(cachefile.name)
206         # Start writing an object longer than the one we test, to make
207         # sure the cache file gets truncated.
208         self.last_cache.save(['long', 'long', 'list'])
209         self.last_cache.save(thing)
210         self.assertEqual(thing, self.last_cache.load())
211
212     def test_cache_is_locked(self):
213         with tempfile.NamedTemporaryFile() as cachefile:
214             _ = arv_put.ResumeCache(cachefile.name)
215             self.assertRaises(arv_put.ResumeCacheConflict,
216                               arv_put.ResumeCache, cachefile.name)
217
218     def test_cache_stays_locked(self):
219         with tempfile.NamedTemporaryFile() as cachefile:
220             self.last_cache = arv_put.ResumeCache(cachefile.name)
221             path = cachefile.name
222         self.last_cache.save('test')
223         self.assertRaises(arv_put.ResumeCacheConflict,
224                           arv_put.ResumeCache, path)
225
226     def test_destroy_cache(self):
227         cachefile = tempfile.NamedTemporaryFile(delete=False)
228         try:
229             cache = arv_put.ResumeCache(cachefile.name)
230             cache.save('test')
231             cache.destroy()
232             try:
233                 arv_put.ResumeCache(cachefile.name)
234             except arv_put.ResumeCacheConflict:
235                 self.fail("could not load cache after destroying it")
236             self.assertRaises(ValueError, cache.load)
237         finally:
238             if os.path.exists(cachefile.name):
239                 os.unlink(cachefile.name)
240
241     def test_restart_cache(self):
242         path = os.path.join(self.make_tmpdir(), 'cache')
243         cache = arv_put.ResumeCache(path)
244         cache.save('test')
245         cache.restart()
246         self.assertRaises(ValueError, cache.load)
247         self.assertRaises(arv_put.ResumeCacheConflict,
248                           arv_put.ResumeCache, path)
249
250
251 class TestArvadosPutResumeCacheDir:
252     @pytest.fixture
253     def args(self, tmp_path):
254         return arv_put.parse_arguments([str(tmp_path)])
255
256     @pytest.mark.parametrize('cache_dir', [None, 'test-put'])
257     def test_cache_subdir(self, tmp_path, monkeypatch, cache_dir, args):
258         if cache_dir is None:
259             cache_dir = arv_put.ResumeCache.CACHE_DIR
260         else:
261             monkeypatch.setattr(arv_put.ResumeCache, 'CACHE_DIR', cache_dir)
262         monkeypatch.setattr(arvados.util._BaseDirectories, 'storage_path', tmp_path.__truediv__)
263         actual = arv_put.ResumeCache.make_path(args)
264         assert isinstance(actual, str)
265         assert Path(actual).parent == (tmp_path / cache_dir)
266
267     def test_cache_relative_dir(self, tmp_path, monkeypatch, args):
268         expected = Path('rel', 'dir')
269         monkeypatch.setattr(Path, 'home', lambda: tmp_path)
270         monkeypatch.setattr(arv_put.ResumeCache, 'CACHE_DIR', str(expected))
271         actual = arv_put.ResumeCache.make_path(args)
272         assert isinstance(actual, str)
273         parent = Path(actual).parent
274         assert parent == (tmp_path / expected)
275         assert parent.is_dir()
276
277     def test_cache_absolute_dir(self, tmp_path, monkeypatch, args):
278         expected = tmp_path / 'arv-put'
279         monkeypatch.setattr(Path, 'home', lambda: tmp_path / 'home')
280         monkeypatch.setattr(arv_put.ResumeCache, 'CACHE_DIR', str(expected))
281         actual = arv_put.ResumeCache.make_path(args)
282         assert isinstance(actual, str)
283         parent = Path(actual).parent
284         assert parent == expected
285         assert parent.is_dir()
286
287
288 class TestArvadosPutUploadJobCacheDir:
289     @pytest.mark.parametrize('cache_dir', [None, 'test-put'])
290     def test_cache_subdir(self, tmp_path, monkeypatch, cache_dir):
291         def storage_path(self, subdir='.', mode=0o700):
292             path = tmp_path / subdir
293             path.mkdir(mode=mode)
294             return path
295         if cache_dir is None:
296             cache_dir = arv_put.ArvPutUploadJob.CACHE_DIR
297         else:
298             monkeypatch.setattr(arv_put.ArvPutUploadJob, 'CACHE_DIR', cache_dir)
299         monkeypatch.setattr(arvados.util._BaseDirectories, 'storage_path', storage_path)
300         job = arv_put.ArvPutUploadJob([str(tmp_path)], use_cache=True)
301         job.destroy_cache()
302         assert Path(job._cache_filename).parent == (tmp_path / cache_dir)
303
304     def test_cache_relative_dir(self, tmp_path, monkeypatch):
305         expected = Path('rel', 'dir')
306         monkeypatch.setattr(Path, 'home', lambda: tmp_path)
307         monkeypatch.setattr(arv_put.ArvPutUploadJob, 'CACHE_DIR', str(expected))
308         job = arv_put.ArvPutUploadJob([str(tmp_path)], use_cache=True)
309         job.destroy_cache()
310         assert Path(job._cache_filename).parent == (tmp_path / expected)
311
312     def test_cache_absolute_dir(self, tmp_path, monkeypatch):
313         expected = tmp_path / 'arv-put'
314         monkeypatch.setattr(Path, 'home', lambda: tmp_path / 'home')
315         monkeypatch.setattr(arv_put.ArvPutUploadJob, 'CACHE_DIR', str(expected))
316         job = arv_put.ArvPutUploadJob([str(tmp_path)], use_cache=True)
317         job.destroy_cache()
318         assert Path(job._cache_filename).parent == expected
319
320
321 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
322                           ArvadosBaseTestCase):
323
324     def setUp(self):
325         super(ArvPutUploadJobTest, self).setUp()
326         run_test_server.authorize_with('active')
327         # Temp files creation
328         self.tempdir = tempfile.mkdtemp()
329         subdir = os.path.join(self.tempdir, 'subdir')
330         os.mkdir(subdir)
331         data = "x" * 1024 # 1 KB
332         for i in range(1, 5):
333             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
334                 f.write(data * i)
335         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
336             f.write(data * 5)
337         # Large temp file for resume test
338         _, self.large_file_name = tempfile.mkstemp()
339         fileobj = open(self.large_file_name, 'w')
340         # Make sure to write just a little more than one block
341         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
342             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
343             fileobj.write(data)
344         fileobj.close()
345         # Temp dir containing small files to be repacked
346         self.small_files_dir = tempfile.mkdtemp()
347         data = 'y' * 1024 * 1024 # 1 MB
348         for i in range(1, 70):
349             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
350                 f.write(data + str(i))
351         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
352         # Temp dir to hold a symlink to other temp dir
353         self.tempdir_with_symlink = tempfile.mkdtemp()
354         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
355         os.symlink(os.path.join(self.tempdir, '1'),
356                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
357
358     def tearDown(self):
359         super(ArvPutUploadJobTest, self).tearDown()
360         shutil.rmtree(self.tempdir)
361         os.unlink(self.large_file_name)
362         shutil.rmtree(self.small_files_dir)
363         shutil.rmtree(self.tempdir_with_symlink)
364
365     def test_non_regular_files_are_ignored_except_symlinks_to_dirs(self):
366         def pfunc(x):
367             with open(x, 'w') as f:
368                 f.write('test')
369         fifo_filename = 'fifo-file'
370         fifo_path = os.path.join(self.tempdir_with_symlink, fifo_filename)
371         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
372         os.mkfifo(fifo_path)
373         producer = multiprocessing.Process(target=pfunc, args=(fifo_path,))
374         producer.start()
375         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
376         cwriter.start(save_collection=False)
377         if producer.exitcode is None:
378             # If the producer is still running, kill it. This should always be
379             # before any assertion that may fail.
380             producer.terminate()
381             producer.join(1)
382         self.assertIn('linkeddir', cwriter.manifest_text())
383         self.assertNotIn(fifo_filename, cwriter.manifest_text())
384
385     def test_symlinks_are_followed_by_default(self):
386         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
387         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
388         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
389         cwriter.start(save_collection=False)
390         self.assertIn('linkeddir', cwriter.manifest_text())
391         self.assertIn('linkedfile', cwriter.manifest_text())
392         cwriter.destroy_cache()
393
394     def test_symlinks_are_not_followed_when_requested(self):
395         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
396         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
397         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
398                                           follow_links=False)
399         cwriter.start(save_collection=False)
400         self.assertNotIn('linkeddir', cwriter.manifest_text())
401         self.assertNotIn('linkedfile', cwriter.manifest_text())
402         cwriter.destroy_cache()
403         # Check for bug #17800: passed symlinks should also be ignored.
404         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
405         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
406         cwriter.start(save_collection=False)
407         self.assertNotIn('linkeddir', cwriter.manifest_text())
408         cwriter.destroy_cache()
409
410     def test_no_empty_collection_saved(self):
411         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
412         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
413         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
414         cwriter.start(save_collection=True)
415         self.assertIsNone(cwriter.manifest_locator())
416         self.assertEqual('', cwriter.manifest_text())
417         cwriter.destroy_cache()
418
419     def test_passing_nonexistant_path_raise_exception(self):
420         uuid_str = str(uuid.uuid4())
421         with self.assertRaises(arv_put.PathDoesNotExistError):
422             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
423
424     def test_writer_works_without_cache(self):
425         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
426         cwriter.start(save_collection=False)
427         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
428
429     def test_writer_works_with_cache(self):
430         with tempfile.NamedTemporaryFile() as f:
431             f.write(b'foo')
432             f.flush()
433             cwriter = arv_put.ArvPutUploadJob([f.name])
434             cwriter.start(save_collection=False)
435             self.assertEqual(0, cwriter.bytes_skipped)
436             self.assertEqual(3, cwriter.bytes_written)
437             # Don't destroy the cache, and start another upload
438             cwriter_new = arv_put.ArvPutUploadJob([f.name])
439             cwriter_new.start(save_collection=False)
440             cwriter_new.destroy_cache()
441             self.assertEqual(3, cwriter_new.bytes_skipped)
442             self.assertEqual(3, cwriter_new.bytes_written)
443
444     def make_progress_tester(self):
445         progression = []
446         def record_func(written, expected):
447             progression.append((written, expected))
448         return progression, record_func
449
450     def test_progress_reporting(self):
451         with tempfile.NamedTemporaryFile() as f:
452             f.write(b'foo')
453             f.flush()
454             for expect_count in (None, 8):
455                 progression, reporter = self.make_progress_tester()
456                 cwriter = arv_put.ArvPutUploadJob([f.name],
457                                                   reporter=reporter)
458                 cwriter.bytes_expected = expect_count
459                 cwriter.start(save_collection=False)
460                 cwriter.destroy_cache()
461                 self.assertIn((3, expect_count), progression)
462
463     def test_writer_upload_directory(self):
464         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
465         cwriter.start(save_collection=False)
466         cwriter.destroy_cache()
467         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
468
469     def test_resume_large_file_upload(self):
470         def wrapped_write(*args, **kwargs):
471             data = args[1]
472             # Exit only on last block
473             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
474                 # Simulate a checkpoint before quitting. Ensure block commit.
475                 self.writer._update(final=True)
476                 raise SystemExit("Simulated error")
477             return self.arvfile_write(*args, **kwargs)
478
479         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
480                         autospec=True) as mocked_write:
481             mocked_write.side_effect = wrapped_write
482             writer = arv_put.ArvPutUploadJob([self.large_file_name],
483                                              replication_desired=1)
484             # We'll be accessing from inside the wrapper
485             self.writer = writer
486             with self.assertRaises(SystemExit):
487                 writer.start(save_collection=False)
488             # Confirm that the file was partially uploaded
489             self.assertGreater(writer.bytes_written, 0)
490             self.assertLess(writer.bytes_written,
491                             os.path.getsize(self.large_file_name))
492         # Retry the upload
493         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
494                                           replication_desired=1)
495         writer2.start(save_collection=False)
496         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
497                          os.path.getsize(self.large_file_name))
498         writer2.destroy_cache()
499         del(self.writer)
500
501     # Test for bug #11002
502     def test_graceful_exit_while_repacking_small_blocks(self):
503         def wrapped_commit(*args, **kwargs):
504             raise SystemExit("Simulated error")
505
506         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
507                         autospec=True) as mocked_commit:
508             mocked_commit.side_effect = wrapped_commit
509             # Upload a little more than 1 block, wrapped_commit will make the first block
510             # commit to fail.
511             # arv-put should not exit with an exception by trying to commit the collection
512             # as it's in an inconsistent state.
513             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
514                                              replication_desired=1)
515             try:
516                 with self.assertRaises(SystemExit):
517                     writer.start(save_collection=False)
518             except arvados.arvfile.UnownedBlockError:
519                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
520         writer.destroy_cache()
521
522     def test_no_resume_when_asked(self):
523         def wrapped_write(*args, **kwargs):
524             data = args[1]
525             # Exit only on last block
526             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
527                 # Simulate a checkpoint before quitting.
528                 self.writer._update()
529                 raise SystemExit("Simulated error")
530             return self.arvfile_write(*args, **kwargs)
531
532         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
533                         autospec=True) as mocked_write:
534             mocked_write.side_effect = wrapped_write
535             writer = arv_put.ArvPutUploadJob([self.large_file_name],
536                                              replication_desired=1)
537             # We'll be accessing from inside the wrapper
538             self.writer = writer
539             with self.assertRaises(SystemExit):
540                 writer.start(save_collection=False)
541             # Confirm that the file was partially uploaded
542             self.assertGreater(writer.bytes_written, 0)
543             self.assertLess(writer.bytes_written,
544                             os.path.getsize(self.large_file_name))
545         # Retry the upload, this time without resume
546         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
547                                           replication_desired=1,
548                                           resume=False)
549         writer2.start(save_collection=False)
550         self.assertEqual(writer2.bytes_skipped, 0)
551         self.assertEqual(writer2.bytes_written,
552                          os.path.getsize(self.large_file_name))
553         writer2.destroy_cache()
554         del(self.writer)
555
556     def test_no_resume_when_no_cache(self):
557         def wrapped_write(*args, **kwargs):
558             data = args[1]
559             # Exit only on last block
560             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
561                 # Simulate a checkpoint before quitting.
562                 self.writer._update()
563                 raise SystemExit("Simulated error")
564             return self.arvfile_write(*args, **kwargs)
565
566         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
567                         autospec=True) as mocked_write:
568             mocked_write.side_effect = wrapped_write
569             writer = arv_put.ArvPutUploadJob([self.large_file_name],
570                                              replication_desired=1)
571             # We'll be accessing from inside the wrapper
572             self.writer = writer
573             with self.assertRaises(SystemExit):
574                 writer.start(save_collection=False)
575             # Confirm that the file was partially uploaded
576             self.assertGreater(writer.bytes_written, 0)
577             self.assertLess(writer.bytes_written,
578                             os.path.getsize(self.large_file_name))
579         # Retry the upload, this time without cache usage
580         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
581                                           replication_desired=1,
582                                           resume=False,
583                                           use_cache=False)
584         writer2.start(save_collection=False)
585         self.assertEqual(writer2.bytes_skipped, 0)
586         self.assertEqual(writer2.bytes_written,
587                          os.path.getsize(self.large_file_name))
588         writer2.destroy_cache()
589         del(self.writer)
590
591     def test_dry_run_feature(self):
592         def wrapped_write(*args, **kwargs):
593             data = args[1]
594             # Exit only on last block
595             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
596                 # Simulate a checkpoint before quitting.
597                 self.writer._update()
598                 raise SystemExit("Simulated error")
599             return self.arvfile_write(*args, **kwargs)
600
601         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
602                         autospec=True) as mocked_write:
603             mocked_write.side_effect = wrapped_write
604             writer = arv_put.ArvPutUploadJob([self.large_file_name],
605                                              replication_desired=1)
606             # We'll be accessing from inside the wrapper
607             self.writer = writer
608             with self.assertRaises(SystemExit):
609                 writer.start(save_collection=False)
610             # Confirm that the file was partially uploaded
611             self.assertGreater(writer.bytes_written, 0)
612             self.assertLess(writer.bytes_written,
613                             os.path.getsize(self.large_file_name))
614         with self.assertRaises(arv_put.ArvPutUploadIsPending):
615             # Retry the upload using dry_run to check if there is a pending upload
616             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
617                                               replication_desired=1,
618                                               dry_run=True)
619         # Complete the pending upload
620         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
621                                           replication_desired=1)
622         writer3.start(save_collection=False)
623         with self.assertRaises(arv_put.ArvPutUploadNotPending):
624             # Confirm there's no pending upload with dry_run=True
625             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
626                                               replication_desired=1,
627                                               dry_run=True)
628         # Test obvious cases
629         with self.assertRaises(arv_put.ArvPutUploadIsPending):
630             arv_put.ArvPutUploadJob([self.large_file_name],
631                                     replication_desired=1,
632                                     dry_run=True,
633                                     resume=False,
634                                     use_cache=False)
635         with self.assertRaises(arv_put.ArvPutUploadIsPending):
636             arv_put.ArvPutUploadJob([self.large_file_name],
637                                     replication_desired=1,
638                                     dry_run=True,
639                                     resume=False)
640         del(self.writer)
641
642 class CachedManifestValidationTest(ArvadosBaseTestCase):
643     class MockedPut(arv_put.ArvPutUploadJob):
644         def __init__(self, cached_manifest=None):
645             self._state = copy.deepcopy(arv_put.ArvPutUploadJob.EMPTY_STATE)
646             self._state['manifest'] = cached_manifest
647             self._api_client = mock.MagicMock()
648             self.logger = mock.MagicMock()
649             self.num_retries = 1
650
651     def datetime_to_hex(self, dt):
652         return hex(int(time.mktime(dt.timetuple())))[2:]
653
654     def setUp(self):
655         super(CachedManifestValidationTest, self).setUp()
656         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
657         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
658         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
659
660     def test_empty_cached_manifest_is_valid(self):
661         put_mock = self.MockedPut()
662         self.assertEqual(None, put_mock._state.get('manifest'))
663         self.assertTrue(put_mock._cached_manifest_valid())
664         put_mock._state['manifest'] = ''
665         self.assertTrue(put_mock._cached_manifest_valid())
666
667     def test_signature_cases(self):
668         now = datetime.datetime.utcnow()
669         yesterday = now - datetime.timedelta(days=1)
670         lastweek = now - datetime.timedelta(days=7)
671         tomorrow = now + datetime.timedelta(days=1)
672         nextweek = now + datetime.timedelta(days=7)
673
674         def mocked_head(blocks={}, loc=None):
675             blk = loc.split('+', 1)[0]
676             if blocks.get(blk):
677                 return True
678             raise arvados.errors.KeepRequestError("mocked error - block invalid")
679
680         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
681         cases = [
682             # All expired, reset cache - OK
683             (yesterday, lastweek, False, False, True),
684             (lastweek, yesterday, False, False, True),
685             # All non-expired valid blocks - OK
686             (tomorrow, nextweek, True, True, True),
687             (nextweek, tomorrow, True, True, True),
688             # All non-expired invalid blocks - Not OK
689             (tomorrow, nextweek, False, False, False),
690             (nextweek, tomorrow, False, False, False),
691             # One non-expired valid block - OK
692             (tomorrow, yesterday, True, False, True),
693             (yesterday, tomorrow, False, True, True),
694             # One non-expired invalid block - Not OK
695             (tomorrow, yesterday, False, False, False),
696             (yesterday, tomorrow, False, False, False),
697         ]
698         for case in cases:
699             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
700             head_responses = {
701                 self.block1: b1_valid,
702                 self.block2: b2_valid,
703             }
704             cached_manifest = self.template % (
705                 self.datetime_to_hex(b1_expiration),
706                 self.datetime_to_hex(b2_expiration),
707             )
708             arvput = self.MockedPut(cached_manifest)
709             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
710                 head_mock.side_effect = partial(mocked_head, head_responses)
711                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
712                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
713                 )
714                 if b1_expiration > now or b2_expiration > now:
715                     # A HEAD request should have been done
716                     head_mock.assert_called_once()
717                 else:
718                     head_mock.assert_not_called()
719
720
721 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
722     TEST_SIZE = os.path.getsize(__file__)
723
724     def test_expected_bytes_for_file(self):
725         writer = arv_put.ArvPutUploadJob([__file__])
726         self.assertEqual(self.TEST_SIZE,
727                          writer.bytes_expected)
728
729     def test_expected_bytes_for_tree(self):
730         tree = self.make_tmpdir()
731         shutil.copyfile(__file__, os.path.join(tree, 'one'))
732         shutil.copyfile(__file__, os.path.join(tree, 'two'))
733
734         writer = arv_put.ArvPutUploadJob([tree])
735         self.assertEqual(self.TEST_SIZE * 2,
736                          writer.bytes_expected)
737         writer = arv_put.ArvPutUploadJob([tree, __file__])
738         self.assertEqual(self.TEST_SIZE * 3,
739                          writer.bytes_expected)
740
741     def test_expected_bytes_for_device(self):
742         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
743         self.assertIsNone(writer.bytes_expected)
744         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
745         self.assertIsNone(writer.bytes_expected)
746
747
748 class ArvadosPutReportTest(ArvadosBaseTestCase):
749     def test_machine_progress(self):
750         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
751             expect = ": {} written {} total\n".format(
752                 count, -1 if (total is None) else total)
753             self.assertTrue(
754                 arv_put.machine_progress(count, total).endswith(expect))
755
756     def test_known_human_progress(self):
757         for count, total in [(0, 1), (2, 4), (45, 60)]:
758             expect = '{:.1%}'.format(1.0*count/total)
759             actual = arv_put.human_progress(count, total)
760             self.assertTrue(actual.startswith('\r'))
761             self.assertIn(expect, actual)
762
763     def test_unknown_human_progress(self):
764         for count in [1, 20, 300, 4000, 50000]:
765             self.assertTrue(re.search(r'\b{}\b'.format(count),
766                                       arv_put.human_progress(count, None)))
767
768
769 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
770     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
771
772     def setUp(self):
773         super(ArvPutLogFormatterTest, self).setUp()
774         self.stderr = tutil.StringIO()
775         self.loggingHandler = logging.StreamHandler(self.stderr)
776         self.loggingHandler.setFormatter(
777             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
778         self.logger = logging.getLogger()
779         self.logger.addHandler(self.loggingHandler)
780         self.logger.setLevel(logging.DEBUG)
781
782     def tearDown(self):
783         self.logger.removeHandler(self.loggingHandler)
784         self.stderr.close()
785         self.stderr = None
786         super(ArvPutLogFormatterTest, self).tearDown()
787
788     def test_request_id_logged_only_once_on_error(self):
789         self.logger.error('Ooops, something bad happened.')
790         self.logger.error('Another bad thing just happened.')
791         log_lines = self.stderr.getvalue().split('\n')[:-1]
792         self.assertEqual(2, len(log_lines))
793         self.assertRegex(log_lines[0], self.matcher)
794         self.assertNotRegex(log_lines[1], self.matcher)
795
796     def test_request_id_logged_only_once_on_debug(self):
797         self.logger.debug('This is just a debug message.')
798         self.logger.debug('Another message, move along.')
799         log_lines = self.stderr.getvalue().split('\n')[:-1]
800         self.assertEqual(2, len(log_lines))
801         self.assertRegex(log_lines[0], self.matcher)
802         self.assertNotRegex(log_lines[1], self.matcher)
803
804     def test_request_id_not_logged_on_info(self):
805         self.logger.info('This should be a useful message')
806         log_lines = self.stderr.getvalue().split('\n')[:-1]
807         self.assertEqual(1, len(log_lines))
808         self.assertNotRegex(log_lines[0], self.matcher)
809
810 class ArvadosPutTest(run_test_server.TestCaseWithServers,
811                      ArvadosBaseTestCase,
812                      tutil.VersionChecker):
813     MAIN_SERVER = {}
814     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
815
816     def call_main_with_args(self, args):
817         self.main_stdout.seek(0, 0)
818         self.main_stdout.truncate(0)
819         self.main_stderr.seek(0, 0)
820         self.main_stderr.truncate(0)
821         return arv_put.main(args, self.main_stdout, self.main_stderr)
822
823     def call_main_on_test_file(self, args=[]):
824         with self.make_test_file() as testfile:
825             path = testfile.name
826             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
827         self.assertTrue(
828             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
829                                         '098f6bcd4621d373cade4e832627b4f6')),
830             "did not find file stream in Keep store")
831
832     def setUp(self):
833         super(ArvadosPutTest, self).setUp()
834         run_test_server.authorize_with('active')
835         arv_put.api_client = None
836         self.main_stdout = tutil.StringIO()
837         self.main_stderr = tutil.StringIO()
838         self.loggingHandler = logging.StreamHandler(self.main_stderr)
839         self.loggingHandler.setFormatter(
840             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
841         logging.getLogger().addHandler(self.loggingHandler)
842
843     def tearDown(self):
844         logging.getLogger().removeHandler(self.loggingHandler)
845         for outbuf in ['main_stdout', 'main_stderr']:
846             if hasattr(self, outbuf):
847                 getattr(self, outbuf).close()
848                 delattr(self, outbuf)
849         super(ArvadosPutTest, self).tearDown()
850
851     def test_version_argument(self):
852         with tutil.redirected_streams(
853                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
854             with self.assertRaises(SystemExit):
855                 self.call_main_with_args(['--version'])
856         self.assertVersionOutput(out, err)
857
858     def test_simple_file_put(self):
859         self.call_main_on_test_file()
860
861     def test_put_with_unwriteable_cache_dir(self):
862         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
863         cachedir = self.make_tmpdir()
864         os.chmod(cachedir, 0o0)
865         arv_put.ResumeCache.CACHE_DIR = cachedir
866         try:
867             self.call_main_on_test_file()
868         finally:
869             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
870             os.chmod(cachedir, 0o700)
871
872     def test_put_with_unwritable_cache_subdir(self):
873         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
874         cachedir = self.make_tmpdir()
875         os.chmod(cachedir, 0o0)
876         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
877         try:
878             self.call_main_on_test_file()
879         finally:
880             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
881             os.chmod(cachedir, 0o700)
882
883     def test_put_block_replication(self):
884         self.call_main_on_test_file()
885         arv_put.api_client = None
886         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
887             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
888             self.call_main_on_test_file(['--replication', '1'])
889             self.call_main_on_test_file(['--replication', '4'])
890             self.call_main_on_test_file(['--replication', '5'])
891             self.assertEqual(
892                 [x[-1].get('copies') for x in put_mock.call_args_list],
893                 [1, 4, 5])
894
895     def test_normalize(self):
896         testfile1 = self.make_test_file()
897         testfile2 = self.make_test_file()
898         test_paths = [testfile1.name, testfile2.name]
899         # Reverse-sort the paths, so normalization must change their order.
900         test_paths.sort(reverse=True)
901         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
902                                  test_paths)
903         manifest = self.main_stdout.getvalue()
904         # Assert the second file we specified appears first in the manifest.
905         file_indices = [manifest.find(':' + os.path.basename(path))
906                         for path in test_paths]
907         self.assertGreater(*file_indices)
908
909     def test_error_name_without_collection(self):
910         self.assertRaises(SystemExit, self.call_main_with_args,
911                           ['--name', 'test without Collection',
912                            '--stream', '/dev/null'])
913
914     def test_error_when_project_not_found(self):
915         self.assertRaises(SystemExit,
916                           self.call_main_with_args,
917                           ['--project-uuid', self.Z_UUID])
918
919     def test_error_bad_project_uuid(self):
920         self.assertRaises(SystemExit,
921                           self.call_main_with_args,
922                           ['--project-uuid', self.Z_UUID, '--stream'])
923
924     def test_error_when_excluding_absolute_path(self):
925         tmpdir = self.make_tmpdir()
926         self.assertRaises(SystemExit,
927                           self.call_main_with_args,
928                           ['--exclude', '/some/absolute/path/*',
929                            tmpdir])
930
931     def test_api_error_handling(self):
932         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
933         coll_save_mock.side_effect = arvados.errors.ApiError(
934             fake_httplib2_response(403), b'{}')
935         with mock.patch('arvados.collection.Collection.save_new',
936                         new=coll_save_mock):
937             with self.assertRaises(SystemExit) as exc_test:
938                 self.call_main_with_args(['/dev/null'])
939             self.assertLess(0, exc_test.exception.args[0])
940             self.assertLess(0, coll_save_mock.call_count)
941             self.assertEqual("", self.main_stdout.getvalue())
942
943     def test_request_id_logging_on_error(self):
944         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
945         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
946         coll_save_mock.side_effect = arvados.errors.ApiError(
947             fake_httplib2_response(403), b'{}')
948         with mock.patch('arvados.collection.Collection.save_new',
949                         new=coll_save_mock):
950             with self.assertRaises(SystemExit):
951                 self.call_main_with_args(['/dev/null'])
952             self.assertRegex(
953                 self.main_stderr.getvalue(), matcher)
954
955
956 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
957                             ArvadosBaseTestCase):
958     MAIN_SERVER = {}
959     KEEP_SERVER = {'blob_signing': True}
960     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
961
962     @classmethod
963     def setUpClass(cls):
964         super(ArvPutIntegrationTest, cls).setUpClass()
965         cls.ENVIRON = os.environ.copy()
966         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
967
968     def datetime_to_hex(self, dt):
969         return hex(int(time.mktime(dt.timetuple())))[2:]
970
971     def setUp(self):
972         super(ArvPutIntegrationTest, self).setUp()
973         arv_put.api_client = None
974
975     def authorize_with(self, token_name):
976         run_test_server.authorize_with(token_name)
977         for v in ["ARVADOS_API_HOST",
978                   "ARVADOS_API_HOST_INSECURE",
979                   "ARVADOS_API_TOKEN"]:
980             self.ENVIRON[v] = arvados.config.settings()[v]
981         arv_put.api_client = arvados.api('v1')
982
983     def current_user(self):
984         return arv_put.api_client.users().current().execute()
985
986     def test_check_real_project_found(self):
987         self.authorize_with('active')
988         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
989                         "did not correctly find test fixture project")
990
991     def test_check_error_finding_nonexistent_uuid(self):
992         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
993         self.authorize_with('active')
994         try:
995             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
996                                                   0)
997         except ValueError as error:
998             self.assertIn(BAD_UUID, str(error))
999         else:
1000             self.assertFalse(result, "incorrectly found nonexistent project")
1001
1002     def test_check_error_finding_nonexistent_project(self):
1003         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
1004         self.authorize_with('active')
1005         with self.assertRaises(apiclient.errors.HttpError):
1006             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
1007                                                   0)
1008
1009     def test_short_put_from_stdin(self):
1010         # Have to run this as an integration test since arv-put can't
1011         # read from the tests' stdin.
1012         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
1013         # case, because the /proc entry is already gone by the time it tries.
1014         pipe = subprocess.Popen(
1015             [sys.executable, arv_put.__file__, '--stream'],
1016             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1017             stderr=subprocess.STDOUT, env=self.ENVIRON)
1018         pipe.stdin.write(b'stdin test\xa6\n')
1019         pipe.stdin.close()
1020         deadline = time.time() + 5
1021         while (pipe.poll() is None) and (time.time() < deadline):
1022             time.sleep(.1)
1023         returncode = pipe.poll()
1024         if returncode is None:
1025             pipe.terminate()
1026             self.fail("arv-put did not PUT from stdin within 5 seconds")
1027         elif returncode != 0:
1028             sys.stdout.write(pipe.stdout.read())
1029             self.fail("arv-put returned exit code {}".format(returncode))
1030         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
1031                       pipe.stdout.read().decode())
1032
1033     def test_sigint_logs_request_id(self):
1034         # Start arv-put, give it a chance to start up, send SIGINT,
1035         # and check that its output includes the X-Request-Id.
1036         input_stream = subprocess.Popen(
1037             ['sleep', '10'],
1038             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
1039         pipe = subprocess.Popen(
1040             [sys.executable, arv_put.__file__, '--stream'],
1041             stdin=input_stream.stdout, stdout=subprocess.PIPE,
1042             stderr=subprocess.STDOUT, env=self.ENVIRON)
1043         # Wait for arv-put child process to print something (i.e., a
1044         # log message) so we know its signal handler is installed.
1045         select.select([pipe.stdout], [], [], 10)
1046         pipe.send_signal(signal.SIGINT)
1047         deadline = time.time() + 5
1048         while (pipe.poll() is None) and (time.time() < deadline):
1049             time.sleep(.1)
1050         returncode = pipe.poll()
1051         input_stream.terminate()
1052         if returncode is None:
1053             pipe.terminate()
1054             self.fail("arv-put did not exit within 5 seconds")
1055         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
1056
1057     def test_ArvPutSignedManifest(self):
1058         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
1059         # the newly created manifest from the API server, testing to confirm
1060         # that the block locators in the returned manifest are signed.
1061         self.authorize_with('active')
1062
1063         # Before doing anything, demonstrate that the collection
1064         # we're about to create is not present in our test fixture.
1065         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
1066         with self.assertRaises(apiclient.errors.HttpError):
1067             arv_put.api_client.collections().get(
1068                 uuid=manifest_uuid).execute()
1069
1070         datadir = self.make_tmpdir()
1071         with open(os.path.join(datadir, "foo"), "w") as f:
1072             f.write("The quick brown fox jumped over the lazy dog")
1073         p = subprocess.Popen([sys.executable, arv_put.__file__,
1074                               os.path.join(datadir, 'foo')],
1075                              stdout=subprocess.PIPE,
1076                              stderr=subprocess.PIPE,
1077                              env=self.ENVIRON)
1078         (_, err) = p.communicate()
1079         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
1080         self.assertEqual(p.returncode, 0)
1081
1082         # The manifest text stored in the API server under the same
1083         # manifest UUID must use signed locators.
1084         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
1085         self.assertRegex(
1086             c['manifest_text'],
1087             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1088
1089         os.remove(os.path.join(datadir, "foo"))
1090         os.rmdir(datadir)
1091
1092     def run_and_find_collection(self, text, extra_args=[]):
1093         self.authorize_with('active')
1094         pipe = subprocess.Popen(
1095             [sys.executable, arv_put.__file__] + extra_args,
1096             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1097             stderr=subprocess.PIPE, env=self.ENVIRON)
1098         stdout, stderr = pipe.communicate(text.encode())
1099         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1100         search_key = ('portable_data_hash'
1101                       if '--portable-data-hash' in extra_args else 'uuid')
1102         collection_list = arvados.api('v1').collections().list(
1103             filters=[[search_key, '=', stdout.decode().strip()]]
1104         ).execute().get('items', [])
1105         self.assertEqual(1, len(collection_list))
1106         return collection_list[0]
1107
1108     def test_all_expired_signatures_invalidates_cache(self):
1109         self.authorize_with('active')
1110         tmpdir = self.make_tmpdir()
1111         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1112             f.write('foo')
1113         # Upload a directory and get the cache file name
1114         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1115                              stdout=subprocess.PIPE,
1116                              stderr=subprocess.PIPE,
1117                              env=self.ENVIRON)
1118         (_, err) = p.communicate()
1119         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1120         self.assertEqual(p.returncode, 0)
1121         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1122                                    err.decode()).groups()[0]
1123         self.assertTrue(os.path.isfile(cache_filepath))
1124         # Load the cache file contents and modify the manifest to simulate
1125         # an expired access token
1126         with open(cache_filepath, 'r') as c:
1127             cache = json.load(c)
1128         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1129         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1130         cache['manifest'] = re.sub(
1131             r'\@.*? ',
1132             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1133             cache['manifest'])
1134         with open(cache_filepath, 'w') as c:
1135             c.write(json.dumps(cache))
1136         # Re-run the upload and expect to get an invalid cache message
1137         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1138                              stdout=subprocess.PIPE,
1139                              stderr=subprocess.PIPE,
1140                              env=self.ENVIRON)
1141         (_, err) = p.communicate()
1142         self.assertRegex(
1143             err.decode(),
1144             r'INFO: Cache expired, starting from scratch.*')
1145         self.assertEqual(p.returncode, 0)
1146
1147     def test_invalid_signature_in_cache(self):
1148         for batch_mode in [False, True]:
1149             self.authorize_with('active')
1150             tmpdir = self.make_tmpdir()
1151             with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1152                 f.write('foo')
1153             # Upload a directory and get the cache file name
1154             arv_put_args = [tmpdir]
1155             if batch_mode:
1156                 arv_put_args = ['--batch'] + arv_put_args
1157             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1158                                 stdout=subprocess.PIPE,
1159                                 stderr=subprocess.PIPE,
1160                                 env=self.ENVIRON)
1161             (_, err) = p.communicate()
1162             self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1163             self.assertEqual(p.returncode, 0)
1164             cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1165                                     err.decode()).groups()[0]
1166             self.assertTrue(os.path.isfile(cache_filepath))
1167             # Load the cache file contents and modify the manifest to simulate
1168             # an invalid access token
1169             with open(cache_filepath, 'r') as c:
1170                 cache = json.load(c)
1171             self.assertRegex(cache['manifest'], r'\+A\S+\@')
1172             cache['manifest'] = re.sub(
1173                 r'\+A.*\@',
1174                 "+Aabcdef0123456789abcdef0123456789abcdef01@",
1175                 cache['manifest'])
1176             with open(cache_filepath, 'w') as c:
1177                 c.write(json.dumps(cache))
1178             # Re-run the upload and expect to get an invalid cache message
1179             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1180                                 stdout=subprocess.PIPE,
1181                                 stderr=subprocess.PIPE,
1182                                 env=self.ENVIRON)
1183             (_, err) = p.communicate()
1184             if not batch_mode:
1185                 self.assertRegex(
1186                     err.decode(),
1187                     r'ERROR: arv-put: Resume cache contains invalid signature.*')
1188                 self.assertEqual(p.returncode, 1)
1189             else:
1190                 self.assertRegex(
1191                     err.decode(),
1192                     r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
1193                 self.assertEqual(p.returncode, 0)
1194
1195     def test_single_expired_signature_reuploads_file(self):
1196         self.authorize_with('active')
1197         tmpdir = self.make_tmpdir()
1198         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1199             f.write('foo')
1200         # Write a second file on its own subdir to force a new stream
1201         os.mkdir(os.path.join(tmpdir, 'bar'))
1202         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1203             f.write('bar')
1204         # Upload a directory and get the cache file name
1205         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1206                              stdout=subprocess.PIPE,
1207                              stderr=subprocess.PIPE,
1208                              env=self.ENVIRON)
1209         (_, err) = p.communicate()
1210         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1211         self.assertEqual(p.returncode, 0)
1212         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1213                                    err.decode()).groups()[0]
1214         self.assertTrue(os.path.isfile(cache_filepath))
1215         # Load the cache file contents and modify the manifest to simulate
1216         # an expired access token
1217         with open(cache_filepath, 'r') as c:
1218             cache = json.load(c)
1219         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1220         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1221         # Make one of the signatures appear to have expired
1222         cache['manifest'] = re.sub(
1223             r'\@.*? 3:3:barfile.txt',
1224             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1225             cache['manifest'])
1226         with open(cache_filepath, 'w') as c:
1227             c.write(json.dumps(cache))
1228         # Re-run the upload and expect to get an invalid cache message
1229         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1230                              stdout=subprocess.PIPE,
1231                              stderr=subprocess.PIPE,
1232                              env=self.ENVIRON)
1233         (_, err) = p.communicate()
1234         self.assertRegex(
1235             err.decode(),
1236             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1237         self.assertEqual(p.returncode, 0)
1238         # Confirm that the resulting cache is different from the last run.
1239         with open(cache_filepath, 'r') as c2:
1240             new_cache = json.load(c2)
1241         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1242
1243     def test_put_collection_with_later_update(self):
1244         tmpdir = self.make_tmpdir()
1245         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1246             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1247         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1248         self.assertNotEqual(None, col['uuid'])
1249         # Add a new file to the directory
1250         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1251             f.write('The quick brown fox jumped over the lazy dog')
1252         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1253         self.assertEqual(col['uuid'], updated_col['uuid'])
1254         # Get the manifest and check that the new file is being included
1255         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1256         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1257
1258     def test_put_collection_with_utc_expiring_datetime(self):
1259         tmpdir = self.make_tmpdir()
1260         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1261         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1262             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1263         col = self.run_and_find_collection(
1264             "",
1265             ['--no-progress', '--trash-at', trash_at, tmpdir])
1266         self.assertNotEqual(None, col['uuid'])
1267         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1268         self.assertEqual(ciso8601.parse_datetime(trash_at),
1269             ciso8601.parse_datetime(c['trash_at']))
1270
1271     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1272         tmpdir = self.make_tmpdir()
1273         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1274         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1275             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1276         col = self.run_and_find_collection(
1277             "",
1278             ['--no-progress', '--trash-at', trash_at, tmpdir])
1279         self.assertNotEqual(None, col['uuid'])
1280         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1281         self.assertEqual(
1282             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1283             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1284
1285     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1286         tmpdir = self.make_tmpdir()
1287         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1288         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1289             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1290         col = self.run_and_find_collection(
1291             "",
1292             ['--no-progress', '--trash-at', trash_at, tmpdir])
1293         self.assertNotEqual(None, col['uuid'])
1294         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1295         if time.daylight:
1296             offset = datetime.timedelta(seconds=time.altzone)
1297         else:
1298             offset = datetime.timedelta(seconds=time.timezone)
1299         self.assertEqual(
1300             ciso8601.parse_datetime(trash_at) + offset,
1301             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1302
1303     def test_put_collection_with_expiring_date_only(self):
1304         tmpdir = self.make_tmpdir()
1305         trash_at = '2140-01-01'
1306         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1307         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1308             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1309         col = self.run_and_find_collection(
1310             "",
1311             ['--no-progress', '--trash-at', trash_at, tmpdir])
1312         self.assertNotEqual(None, col['uuid'])
1313         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1314         if time.daylight:
1315             offset = datetime.timedelta(seconds=time.altzone)
1316         else:
1317             offset = datetime.timedelta(seconds=time.timezone)
1318         self.assertEqual(
1319             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1320             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1321
1322     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1323         cases = ['2100', '210010','2100-10', '2100-Oct']
1324         tmpdir = self.make_tmpdir()
1325         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1326             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1327         for test_datetime in cases:
1328             with self.assertRaises(AssertionError):
1329                 self.run_and_find_collection(
1330                     "",
1331                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1332
1333     def test_put_collection_with_relative_expiring_datetime(self):
1334         expire_after = 7
1335         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1336         tmpdir = self.make_tmpdir()
1337         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1338             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1339         col = self.run_and_find_collection(
1340             "",
1341             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1342         self.assertNotEqual(None, col['uuid'])
1343         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1344         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1345         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1346         self.assertTrue(dt_before < trash_at)
1347         self.assertTrue(dt_after > trash_at)
1348
1349     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1350         expire_after = 0 # Must be >= 1
1351         tmpdir = self.make_tmpdir()
1352         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1353             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1354         with self.assertRaises(AssertionError):
1355             self.run_and_find_collection(
1356                 "",
1357                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1358
1359     def test_upload_directory_reference_without_trailing_slash(self):
1360         tmpdir1 = self.make_tmpdir()
1361         tmpdir2 = self.make_tmpdir()
1362         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1363             f.write('This is foo')
1364         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1365             f.write('This is not foo')
1366         # Upload one directory and one file
1367         col = self.run_and_find_collection("", ['--no-progress',
1368                                                 tmpdir1,
1369                                                 os.path.join(tmpdir2, 'bar')])
1370         self.assertNotEqual(None, col['uuid'])
1371         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1372         # Check that 'foo' was written inside a subcollection
1373         # OTOH, 'bar' should have been directly uploaded on the root collection
1374         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1375
1376     def test_upload_directory_reference_with_trailing_slash(self):
1377         tmpdir1 = self.make_tmpdir()
1378         tmpdir2 = self.make_tmpdir()
1379         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1380             f.write('This is foo')
1381         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1382             f.write('This is not foo')
1383         # Upload one directory (with trailing slash) and one file
1384         col = self.run_and_find_collection("", ['--no-progress',
1385                                                 tmpdir1 + os.sep,
1386                                                 os.path.join(tmpdir2, 'bar')])
1387         self.assertNotEqual(None, col['uuid'])
1388         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1389         # Check that 'foo' and 'bar' were written at the same level
1390         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1391
1392     def test_put_collection_with_high_redundancy(self):
1393         # Write empty data: we're not testing CollectionWriter, just
1394         # making sure collections.create tells the API server what our
1395         # desired replication level is.
1396         collection = self.run_and_find_collection("", ['--replication', '4'])
1397         self.assertEqual(4, collection['replication_desired'])
1398
1399     def test_put_collection_with_default_redundancy(self):
1400         collection = self.run_and_find_collection("")
1401         self.assertEqual(None, collection['replication_desired'])
1402
1403     def test_put_collection_with_unnamed_project_link(self):
1404         link = self.run_and_find_collection(
1405             "Test unnamed collection",
1406             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1407         username = pwd.getpwuid(os.getuid()).pw_name
1408         self.assertRegex(
1409             link['name'],
1410             r'^Saved at .* by {}@'.format(re.escape(username)))
1411
1412     def test_put_collection_with_name_and_no_project(self):
1413         link_name = 'Test Collection Link in home project'
1414         collection = self.run_and_find_collection(
1415             "Test named collection in home project",
1416             ['--portable-data-hash', '--name', link_name])
1417         self.assertEqual(link_name, collection['name'])
1418         my_user_uuid = self.current_user()['uuid']
1419         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1420
1421     def test_put_collection_with_named_project_link(self):
1422         link_name = 'Test auto Collection Link'
1423         collection = self.run_and_find_collection("Test named collection",
1424                                       ['--portable-data-hash',
1425                                        '--name', link_name,
1426                                        '--project-uuid', self.PROJECT_UUID])
1427         self.assertEqual(link_name, collection['name'])
1428
1429     def test_put_collection_with_storage_classes_specified(self):
1430         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1431         self.assertEqual(len(collection['storage_classes_desired']), 1)
1432         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1433
1434     def test_put_collection_with_multiple_storage_classes_specified(self):
1435         collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar  ,baz'])
1436         self.assertEqual(len(collection['storage_classes_desired']), 3)
1437         self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
1438
1439     def test_put_collection_without_storage_classes_specified(self):
1440         collection = self.run_and_find_collection("")
1441         self.assertEqual(len(collection['storage_classes_desired']), 1)
1442         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1443
1444     def test_exclude_filename_pattern(self):
1445         tmpdir = self.make_tmpdir()
1446         tmpsubdir = os.path.join(tmpdir, 'subdir')
1447         os.mkdir(tmpsubdir)
1448         for fname in ['file1', 'file2', 'file3']:
1449             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1450                 f.write("This is %s" % fname)
1451             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1452                 f.write("This is %s" % fname)
1453         col = self.run_and_find_collection("", ['--no-progress',
1454                                                 '--exclude', '*2.txt',
1455                                                 '--exclude', 'file3.*',
1456                                                  tmpdir])
1457         self.assertNotEqual(None, col['uuid'])
1458         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1459         # None of the file2.txt & file3.txt should have been uploaded
1460         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1461         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1462         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1463
1464     def test_exclude_filepath_pattern(self):
1465         tmpdir = self.make_tmpdir()
1466         tmpsubdir = os.path.join(tmpdir, 'subdir')
1467         os.mkdir(tmpsubdir)
1468         for fname in ['file1', 'file2', 'file3']:
1469             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1470                 f.write("This is %s" % fname)
1471             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1472                 f.write("This is %s" % fname)
1473         col = self.run_and_find_collection("", ['--no-progress',
1474                                                 '--exclude', 'subdir/*2.txt',
1475                                                 '--exclude', './file1.*',
1476                                                  tmpdir])
1477         self.assertNotEqual(None, col['uuid'])
1478         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1479         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1480         self.assertNotRegex(c['manifest_text'],
1481                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1482         self.assertNotRegex(c['manifest_text'],
1483                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1484         self.assertRegex(c['manifest_text'],
1485                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1486         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1487
1488     def test_unicode_on_filename(self):
1489         tmpdir = self.make_tmpdir()
1490         fname = u"iā¤arvados.txt"
1491         with open(os.path.join(tmpdir, fname), 'w') as f:
1492             f.write("This is a unicode named file")
1493         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1494         self.assertNotEqual(None, col['uuid'])
1495         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1496         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1497
1498     def test_silent_mode_no_errors(self):
1499         self.authorize_with('active')
1500         tmpdir = self.make_tmpdir()
1501         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1502             f.write('hello world')
1503         pipe = subprocess.Popen(
1504             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1505             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1506             stderr=subprocess.PIPE, env=self.ENVIRON)
1507         stdout, stderr = pipe.communicate()
1508         # No console output should occur on normal operations
1509         self.assertNotRegex(stderr.decode(), r'.+')
1510         self.assertNotRegex(stdout.decode(), r'.+')
1511
1512     def test_silent_mode_does_not_avoid_error_messages(self):
1513         self.authorize_with('active')
1514         pipe = subprocess.Popen(
1515             [sys.executable, arv_put.__file__] + ['--silent',
1516                                                   '/path/not/existant'],
1517             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1518             stderr=subprocess.PIPE, env=self.ENVIRON)
1519         stdout, stderr = pipe.communicate()
1520         # Error message should be displayed when errors happen
1521         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1522         self.assertNotRegex(stdout.decode(), r'.+')
1523
1524
1525 if __name__ == '__main__':
1526     unittest.main()