Merge branch '21773-keep-service-discovery'
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 import apiclient
8 import ciso8601
9 import copy
10 import datetime
11 import json
12 import logging
13 import multiprocessing
14 import os
15 import pwd
16 import random
17 import re
18 import select
19 import shutil
20 import signal
21 import subprocess
22 import sys
23 import tempfile
24 import time
25 import unittest
26 import uuid
27
28 import pytest
29 from functools import partial
30 from pathlib import Path
31 from unittest import mock
32
33 import arvados
34 import arvados.commands.put as arv_put
35 import arvados.util
36 from arvados._internal import basedirs
37
38 from . import arvados_testutil as tutil
39 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
40 from . import run_test_server
41
42 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
43     CACHE_ARGSET = [
44         [],
45         ['/dev/null'],
46         ['/dev/null', '--filename', 'empty'],
47         ['/tmp']
48         ]
49
50     def tearDown(self):
51         super(ArvadosPutResumeCacheTest, self).tearDown()
52         try:
53             self.last_cache.destroy()
54         except AttributeError:
55             pass
56
57     def cache_path_from_arglist(self, arglist):
58         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
59
60     def test_cache_names_stable(self):
61         for argset in self.CACHE_ARGSET:
62             self.assertEqual(self.cache_path_from_arglist(argset),
63                               self.cache_path_from_arglist(argset),
64                               "cache name changed for {}".format(argset))
65
66     def test_cache_names_unique(self):
67         results = []
68         for argset in self.CACHE_ARGSET:
69             path = self.cache_path_from_arglist(argset)
70             self.assertNotIn(path, results)
71             results.append(path)
72
73     def test_cache_names_simple(self):
74         # The goal here is to make sure the filename doesn't use characters
75         # reserved by the filesystem.  Feel free to adjust this regexp as
76         # long as it still does that.
77         bad_chars = re.compile(r'[^-\.\w]')
78         for argset in self.CACHE_ARGSET:
79             path = self.cache_path_from_arglist(argset)
80             self.assertFalse(bad_chars.search(os.path.basename(path)),
81                              "path too exotic: {}".format(path))
82
83     def test_cache_names_ignore_argument_order(self):
84         self.assertEqual(
85             self.cache_path_from_arglist(['a', 'b', 'c']),
86             self.cache_path_from_arglist(['c', 'a', 'b']))
87         self.assertEqual(
88             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
89             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
90
91     def test_cache_names_differ_for_similar_paths(self):
92         # This test needs names at / that don't exist on the real filesystem.
93         self.assertNotEqual(
94             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
95             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
96
97     def test_cache_names_ignore_irrelevant_arguments(self):
98         # Workaround: parse_arguments bails on --filename with a directory.
99         path1 = self.cache_path_from_arglist(['/tmp'])
100         args = arv_put.parse_arguments(['/tmp'])
101         args.filename = 'tmp'
102         path2 = arv_put.ResumeCache.make_path(args)
103         self.assertEqual(path1, path2,
104                          "cache path considered --filename for directory")
105         self.assertEqual(
106             self.cache_path_from_arglist(['-']),
107             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
108             "cache path considered --max-manifest-depth for file")
109
110     def test_cache_names_treat_negative_manifest_depths_identically(self):
111         base_args = ['/tmp', '--max-manifest-depth']
112         self.assertEqual(
113             self.cache_path_from_arglist(base_args + ['-1']),
114             self.cache_path_from_arglist(base_args + ['-2']))
115
116     def test_cache_names_treat_stdin_consistently(self):
117         self.assertEqual(
118             self.cache_path_from_arglist(['-', '--filename', 'test']),
119             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
120
121     def test_cache_names_identical_for_synonymous_names(self):
122         self.assertEqual(
123             self.cache_path_from_arglist(['.']),
124             self.cache_path_from_arglist([os.path.realpath('.')]))
125         testdir = self.make_tmpdir()
126         looplink = os.path.join(testdir, 'loop')
127         os.symlink(testdir, looplink)
128         self.assertEqual(
129             self.cache_path_from_arglist([testdir]),
130             self.cache_path_from_arglist([looplink]))
131
132     def test_cache_names_different_by_api_host(self):
133         config = arvados.config.settings()
134         orig_host = config.get('ARVADOS_API_HOST')
135         try:
136             name1 = self.cache_path_from_arglist(['.'])
137             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
138             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
139         finally:
140             if orig_host is None:
141                 del config['ARVADOS_API_HOST']
142             else:
143                 config['ARVADOS_API_HOST'] = orig_host
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams(self, keep_client_head):
159         keep_client_head.side_effect = [True]
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168
169     @mock.patch('arvados.keep.KeepClient.head')
170     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
171         keep_client_head.side_effect = Exception('Locator not found')
172         thing = {}
173         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
174         with tempfile.NamedTemporaryFile() as cachefile:
175             self.last_cache = arv_put.ResumeCache(cachefile.name)
176         self.last_cache.save(thing)
177         self.last_cache.close()
178         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
179         self.assertNotEqual(None, resume_cache)
180         resume_cache.check_cache()
181
182     def test_basic_cache_storage(self):
183         thing = ['test', 'list']
184         with tempfile.NamedTemporaryFile() as cachefile:
185             self.last_cache = arv_put.ResumeCache(cachefile.name)
186         self.last_cache.save(thing)
187         self.assertEqual(thing, self.last_cache.load())
188
189     def test_empty_cache(self):
190         with tempfile.NamedTemporaryFile() as cachefile:
191             cache = arv_put.ResumeCache(cachefile.name)
192         self.assertRaises(ValueError, cache.load)
193
194     def test_cache_persistent(self):
195         thing = ['test', 'list']
196         path = os.path.join(self.make_tmpdir(), 'cache')
197         cache = arv_put.ResumeCache(path)
198         cache.save(thing)
199         cache.close()
200         self.last_cache = arv_put.ResumeCache(path)
201         self.assertEqual(thing, self.last_cache.load())
202
203     def test_multiple_cache_writes(self):
204         thing = ['short', 'list']
205         with tempfile.NamedTemporaryFile() as cachefile:
206             self.last_cache = arv_put.ResumeCache(cachefile.name)
207         # Start writing an object longer than the one we test, to make
208         # sure the cache file gets truncated.
209         self.last_cache.save(['long', 'long', 'list'])
210         self.last_cache.save(thing)
211         self.assertEqual(thing, self.last_cache.load())
212
213     def test_cache_is_locked(self):
214         with tempfile.NamedTemporaryFile() as cachefile:
215             _ = arv_put.ResumeCache(cachefile.name)
216             self.assertRaises(arv_put.ResumeCacheConflict,
217                               arv_put.ResumeCache, cachefile.name)
218
219     def test_cache_stays_locked(self):
220         with tempfile.NamedTemporaryFile() as cachefile:
221             self.last_cache = arv_put.ResumeCache(cachefile.name)
222             path = cachefile.name
223         self.last_cache.save('test')
224         self.assertRaises(arv_put.ResumeCacheConflict,
225                           arv_put.ResumeCache, path)
226
227     def test_destroy_cache(self):
228         cachefile = tempfile.NamedTemporaryFile(delete=False)
229         try:
230             cache = arv_put.ResumeCache(cachefile.name)
231             cache.save('test')
232             cache.destroy()
233             try:
234                 arv_put.ResumeCache(cachefile.name)
235             except arv_put.ResumeCacheConflict:
236                 self.fail("could not load cache after destroying it")
237             self.assertRaises(ValueError, cache.load)
238         finally:
239             if os.path.exists(cachefile.name):
240                 os.unlink(cachefile.name)
241
242     def test_restart_cache(self):
243         path = os.path.join(self.make_tmpdir(), 'cache')
244         cache = arv_put.ResumeCache(path)
245         cache.save('test')
246         cache.restart()
247         self.assertRaises(ValueError, cache.load)
248         self.assertRaises(arv_put.ResumeCacheConflict,
249                           arv_put.ResumeCache, path)
250
251
252 class TestArvadosPutResumeCacheDir:
253     @pytest.fixture
254     def args(self, tmp_path):
255         return arv_put.parse_arguments([str(tmp_path)])
256
257     @pytest.mark.parametrize('cache_dir', [None, 'test-put'])
258     def test_cache_subdir(self, tmp_path, monkeypatch, cache_dir, args):
259         if cache_dir is None:
260             cache_dir = arv_put.ResumeCache.CACHE_DIR
261         else:
262             monkeypatch.setattr(arv_put.ResumeCache, 'CACHE_DIR', cache_dir)
263         monkeypatch.setattr(basedirs.BaseDirectories, 'storage_path', tmp_path.__truediv__)
264         actual = arv_put.ResumeCache.make_path(args)
265         assert isinstance(actual, str)
266         assert Path(actual).parent == (tmp_path / cache_dir)
267
268     def test_cache_relative_dir(self, tmp_path, monkeypatch, args):
269         expected = Path('rel', 'dir')
270         monkeypatch.setattr(Path, 'home', lambda: tmp_path)
271         monkeypatch.setattr(arv_put.ResumeCache, 'CACHE_DIR', str(expected))
272         actual = arv_put.ResumeCache.make_path(args)
273         assert isinstance(actual, str)
274         parent = Path(actual).parent
275         assert parent == (tmp_path / expected)
276         assert parent.is_dir()
277
278     def test_cache_absolute_dir(self, tmp_path, monkeypatch, args):
279         expected = tmp_path / 'arv-put'
280         monkeypatch.setattr(Path, 'home', lambda: tmp_path / 'home')
281         monkeypatch.setattr(arv_put.ResumeCache, 'CACHE_DIR', str(expected))
282         actual = arv_put.ResumeCache.make_path(args)
283         assert isinstance(actual, str)
284         parent = Path(actual).parent
285         assert parent == expected
286         assert parent.is_dir()
287
288
289 class TestArvadosPutUploadJobCacheDir:
290     @pytest.mark.parametrize('cache_dir', [None, 'test-put'])
291     def test_cache_subdir(self, tmp_path, monkeypatch, cache_dir):
292         def storage_path(self, subdir='.', mode=0o700):
293             path = tmp_path / subdir
294             path.mkdir(mode=mode)
295             return path
296         if cache_dir is None:
297             cache_dir = arv_put.ArvPutUploadJob.CACHE_DIR
298         else:
299             monkeypatch.setattr(arv_put.ArvPutUploadJob, 'CACHE_DIR', cache_dir)
300         monkeypatch.setattr(basedirs.BaseDirectories, 'storage_path', storage_path)
301         job = arv_put.ArvPutUploadJob([str(tmp_path)], use_cache=True)
302         job.destroy_cache()
303         assert Path(job._cache_filename).parent == (tmp_path / cache_dir)
304
305     def test_cache_relative_dir(self, tmp_path, monkeypatch):
306         expected = Path('rel', 'dir')
307         monkeypatch.setattr(Path, 'home', lambda: tmp_path)
308         monkeypatch.setattr(arv_put.ArvPutUploadJob, 'CACHE_DIR', str(expected))
309         job = arv_put.ArvPutUploadJob([str(tmp_path)], use_cache=True)
310         job.destroy_cache()
311         assert Path(job._cache_filename).parent == (tmp_path / expected)
312
313     def test_cache_absolute_dir(self, tmp_path, monkeypatch):
314         expected = tmp_path / 'arv-put'
315         monkeypatch.setattr(Path, 'home', lambda: tmp_path / 'home')
316         monkeypatch.setattr(arv_put.ArvPutUploadJob, 'CACHE_DIR', str(expected))
317         job = arv_put.ArvPutUploadJob([str(tmp_path)], use_cache=True)
318         job.destroy_cache()
319         assert Path(job._cache_filename).parent == expected
320
321
322 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
323                           ArvadosBaseTestCase):
324
325     def setUp(self):
326         super(ArvPutUploadJobTest, self).setUp()
327         run_test_server.authorize_with('active')
328         # Temp files creation
329         self.tempdir = tempfile.mkdtemp()
330         subdir = os.path.join(self.tempdir, 'subdir')
331         os.mkdir(subdir)
332         data = "x" * 1024 # 1 KB
333         for i in range(1, 5):
334             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
335                 f.write(data * i)
336         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
337             f.write(data * 5)
338         # Large temp file for resume test
339         _, self.large_file_name = tempfile.mkstemp()
340         fileobj = open(self.large_file_name, 'w')
341         # Make sure to write just a little more than one block
342         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
343             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
344             fileobj.write(data)
345         fileobj.close()
346         # Temp dir containing small files to be repacked
347         self.small_files_dir = tempfile.mkdtemp()
348         data = 'y' * 1024 * 1024 # 1 MB
349         for i in range(1, 70):
350             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
351                 f.write(data + str(i))
352         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
353         # Temp dir to hold a symlink to other temp dir
354         self.tempdir_with_symlink = tempfile.mkdtemp()
355         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
356         os.symlink(os.path.join(self.tempdir, '1'),
357                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
358
359     def tearDown(self):
360         super(ArvPutUploadJobTest, self).tearDown()
361         shutil.rmtree(self.tempdir)
362         os.unlink(self.large_file_name)
363         shutil.rmtree(self.small_files_dir)
364         shutil.rmtree(self.tempdir_with_symlink)
365
366     def test_non_regular_files_are_ignored_except_symlinks_to_dirs(self):
367         def pfunc(x):
368             with open(x, 'w') as f:
369                 f.write('test')
370         fifo_filename = 'fifo-file'
371         fifo_path = os.path.join(self.tempdir_with_symlink, fifo_filename)
372         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
373         os.mkfifo(fifo_path)
374         producer = multiprocessing.Process(target=pfunc, args=(fifo_path,))
375         producer.start()
376         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
377         cwriter.start(save_collection=False)
378         if producer.exitcode is None:
379             # If the producer is still running, kill it. This should always be
380             # before any assertion that may fail.
381             producer.terminate()
382             producer.join(1)
383         self.assertIn('linkeddir', cwriter.manifest_text())
384         self.assertNotIn(fifo_filename, cwriter.manifest_text())
385
386     def test_symlinks_are_followed_by_default(self):
387         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
388         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
389         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
390         cwriter.start(save_collection=False)
391         self.assertIn('linkeddir', cwriter.manifest_text())
392         self.assertIn('linkedfile', cwriter.manifest_text())
393         cwriter.destroy_cache()
394
395     def test_symlinks_are_not_followed_when_requested(self):
396         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
397         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
398         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
399                                           follow_links=False)
400         cwriter.start(save_collection=False)
401         self.assertNotIn('linkeddir', cwriter.manifest_text())
402         self.assertNotIn('linkedfile', cwriter.manifest_text())
403         cwriter.destroy_cache()
404         # Check for bug #17800: passed symlinks should also be ignored.
405         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
406         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
407         cwriter.start(save_collection=False)
408         self.assertNotIn('linkeddir', cwriter.manifest_text())
409         cwriter.destroy_cache()
410
411     def test_no_empty_collection_saved(self):
412         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
413         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
414         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
415         cwriter.start(save_collection=True)
416         self.assertIsNone(cwriter.manifest_locator())
417         self.assertEqual('', cwriter.manifest_text())
418         cwriter.destroy_cache()
419
420     def test_passing_nonexistant_path_raise_exception(self):
421         uuid_str = str(uuid.uuid4())
422         with self.assertRaises(arv_put.PathDoesNotExistError):
423             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
424
425     def test_writer_works_without_cache(self):
426         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
427         cwriter.start(save_collection=False)
428         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
429
430     def test_writer_works_with_cache(self):
431         with tempfile.NamedTemporaryFile() as f:
432             f.write(b'foo')
433             f.flush()
434             cwriter = arv_put.ArvPutUploadJob([f.name])
435             cwriter.start(save_collection=False)
436             self.assertEqual(0, cwriter.bytes_skipped)
437             self.assertEqual(3, cwriter.bytes_written)
438             # Don't destroy the cache, and start another upload
439             cwriter_new = arv_put.ArvPutUploadJob([f.name])
440             cwriter_new.start(save_collection=False)
441             cwriter_new.destroy_cache()
442             self.assertEqual(3, cwriter_new.bytes_skipped)
443             self.assertEqual(3, cwriter_new.bytes_written)
444
445     def make_progress_tester(self):
446         progression = []
447         def record_func(written, expected):
448             progression.append((written, expected))
449         return progression, record_func
450
451     def test_progress_reporting(self):
452         with tempfile.NamedTemporaryFile() as f:
453             f.write(b'foo')
454             f.flush()
455             for expect_count in (None, 8):
456                 progression, reporter = self.make_progress_tester()
457                 cwriter = arv_put.ArvPutUploadJob([f.name],
458                                                   reporter=reporter)
459                 cwriter.bytes_expected = expect_count
460                 cwriter.start(save_collection=False)
461                 cwriter.destroy_cache()
462                 self.assertIn((3, expect_count), progression)
463
464     def test_writer_upload_directory(self):
465         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
466         cwriter.start(save_collection=False)
467         cwriter.destroy_cache()
468         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
469
470     def test_resume_large_file_upload(self):
471         def wrapped_write(*args, **kwargs):
472             data = args[1]
473             # Exit only on last block
474             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
475                 # Simulate a checkpoint before quitting. Ensure block commit.
476                 self.writer._update(final=True)
477                 raise SystemExit("Simulated error")
478             return self.arvfile_write(*args, **kwargs)
479
480         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
481                         autospec=True) as mocked_write:
482             mocked_write.side_effect = wrapped_write
483             writer = arv_put.ArvPutUploadJob([self.large_file_name],
484                                              replication_desired=1)
485             # We'll be accessing from inside the wrapper
486             self.writer = writer
487             with self.assertRaises(SystemExit):
488                 writer.start(save_collection=False)
489             # Confirm that the file was partially uploaded
490             self.assertGreater(writer.bytes_written, 0)
491             self.assertLess(writer.bytes_written,
492                             os.path.getsize(self.large_file_name))
493         # Retry the upload
494         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
495                                           replication_desired=1)
496         writer2.start(save_collection=False)
497         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
498                          os.path.getsize(self.large_file_name))
499         writer2.destroy_cache()
500         del(self.writer)
501
502     # Test for bug #11002
503     def test_graceful_exit_while_repacking_small_blocks(self):
504         def wrapped_commit(*args, **kwargs):
505             raise SystemExit("Simulated error")
506
507         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
508                         autospec=True) as mocked_commit:
509             mocked_commit.side_effect = wrapped_commit
510             # Upload a little more than 1 block, wrapped_commit will make the first block
511             # commit to fail.
512             # arv-put should not exit with an exception by trying to commit the collection
513             # as it's in an inconsistent state.
514             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
515                                              replication_desired=1)
516             try:
517                 with self.assertRaises(SystemExit):
518                     writer.start(save_collection=False)
519             except arvados.arvfile.UnownedBlockError:
520                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
521         writer.destroy_cache()
522
523     def test_no_resume_when_asked(self):
524         def wrapped_write(*args, **kwargs):
525             data = args[1]
526             # Exit only on last block
527             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
528                 # Simulate a checkpoint before quitting.
529                 self.writer._update()
530                 raise SystemExit("Simulated error")
531             return self.arvfile_write(*args, **kwargs)
532
533         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
534                         autospec=True) as mocked_write:
535             mocked_write.side_effect = wrapped_write
536             writer = arv_put.ArvPutUploadJob([self.large_file_name],
537                                              replication_desired=1)
538             # We'll be accessing from inside the wrapper
539             self.writer = writer
540             with self.assertRaises(SystemExit):
541                 writer.start(save_collection=False)
542             # Confirm that the file was partially uploaded
543             self.assertGreater(writer.bytes_written, 0)
544             self.assertLess(writer.bytes_written,
545                             os.path.getsize(self.large_file_name))
546         # Retry the upload, this time without resume
547         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
548                                           replication_desired=1,
549                                           resume=False)
550         writer2.start(save_collection=False)
551         self.assertEqual(writer2.bytes_skipped, 0)
552         self.assertEqual(writer2.bytes_written,
553                          os.path.getsize(self.large_file_name))
554         writer2.destroy_cache()
555         del(self.writer)
556
557     def test_no_resume_when_no_cache(self):
558         def wrapped_write(*args, **kwargs):
559             data = args[1]
560             # Exit only on last block
561             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
562                 # Simulate a checkpoint before quitting.
563                 self.writer._update()
564                 raise SystemExit("Simulated error")
565             return self.arvfile_write(*args, **kwargs)
566
567         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
568                         autospec=True) as mocked_write:
569             mocked_write.side_effect = wrapped_write
570             writer = arv_put.ArvPutUploadJob([self.large_file_name],
571                                              replication_desired=1)
572             # We'll be accessing from inside the wrapper
573             self.writer = writer
574             with self.assertRaises(SystemExit):
575                 writer.start(save_collection=False)
576             # Confirm that the file was partially uploaded
577             self.assertGreater(writer.bytes_written, 0)
578             self.assertLess(writer.bytes_written,
579                             os.path.getsize(self.large_file_name))
580         # Retry the upload, this time without cache usage
581         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
582                                           replication_desired=1,
583                                           resume=False,
584                                           use_cache=False)
585         writer2.start(save_collection=False)
586         self.assertEqual(writer2.bytes_skipped, 0)
587         self.assertEqual(writer2.bytes_written,
588                          os.path.getsize(self.large_file_name))
589         writer2.destroy_cache()
590         del(self.writer)
591
592     def test_dry_run_feature(self):
593         def wrapped_write(*args, **kwargs):
594             data = args[1]
595             # Exit only on last block
596             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
597                 # Simulate a checkpoint before quitting.
598                 self.writer._update()
599                 raise SystemExit("Simulated error")
600             return self.arvfile_write(*args, **kwargs)
601
602         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
603                         autospec=True) as mocked_write:
604             mocked_write.side_effect = wrapped_write
605             writer = arv_put.ArvPutUploadJob([self.large_file_name],
606                                              replication_desired=1)
607             # We'll be accessing from inside the wrapper
608             self.writer = writer
609             with self.assertRaises(SystemExit):
610                 writer.start(save_collection=False)
611             # Confirm that the file was partially uploaded
612             self.assertGreater(writer.bytes_written, 0)
613             self.assertLess(writer.bytes_written,
614                             os.path.getsize(self.large_file_name))
615         with self.assertRaises(arv_put.ArvPutUploadIsPending):
616             # Retry the upload using dry_run to check if there is a pending upload
617             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
618                                               replication_desired=1,
619                                               dry_run=True)
620         # Complete the pending upload
621         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
622                                           replication_desired=1)
623         writer3.start(save_collection=False)
624         with self.assertRaises(arv_put.ArvPutUploadNotPending):
625             # Confirm there's no pending upload with dry_run=True
626             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
627                                               replication_desired=1,
628                                               dry_run=True)
629         # Test obvious cases
630         with self.assertRaises(arv_put.ArvPutUploadIsPending):
631             arv_put.ArvPutUploadJob([self.large_file_name],
632                                     replication_desired=1,
633                                     dry_run=True,
634                                     resume=False,
635                                     use_cache=False)
636         with self.assertRaises(arv_put.ArvPutUploadIsPending):
637             arv_put.ArvPutUploadJob([self.large_file_name],
638                                     replication_desired=1,
639                                     dry_run=True,
640                                     resume=False)
641         del(self.writer)
642
643 class CachedManifestValidationTest(ArvadosBaseTestCase):
644     class MockedPut(arv_put.ArvPutUploadJob):
645         def __init__(self, cached_manifest=None):
646             self._state = copy.deepcopy(arv_put.ArvPutUploadJob.EMPTY_STATE)
647             self._state['manifest'] = cached_manifest
648             self._api_client = mock.MagicMock()
649             self.logger = mock.MagicMock()
650             self.num_retries = 1
651
652     def datetime_to_hex(self, dt):
653         return hex(int(time.mktime(dt.timetuple())))[2:]
654
655     def setUp(self):
656         super(CachedManifestValidationTest, self).setUp()
657         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
658         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
659         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
660
661     def test_empty_cached_manifest_is_valid(self):
662         put_mock = self.MockedPut()
663         self.assertEqual(None, put_mock._state.get('manifest'))
664         self.assertTrue(put_mock._cached_manifest_valid())
665         put_mock._state['manifest'] = ''
666         self.assertTrue(put_mock._cached_manifest_valid())
667
668     def test_signature_cases(self):
669         now = datetime.datetime.utcnow()
670         yesterday = now - datetime.timedelta(days=1)
671         lastweek = now - datetime.timedelta(days=7)
672         tomorrow = now + datetime.timedelta(days=1)
673         nextweek = now + datetime.timedelta(days=7)
674
675         def mocked_head(blocks={}, loc=None):
676             blk = loc.split('+', 1)[0]
677             if blocks.get(blk):
678                 return True
679             raise arvados.errors.KeepRequestError("mocked error - block invalid")
680
681         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
682         cases = [
683             # All expired, reset cache - OK
684             (yesterday, lastweek, False, False, True),
685             (lastweek, yesterday, False, False, True),
686             # All non-expired valid blocks - OK
687             (tomorrow, nextweek, True, True, True),
688             (nextweek, tomorrow, True, True, True),
689             # All non-expired invalid blocks - Not OK
690             (tomorrow, nextweek, False, False, False),
691             (nextweek, tomorrow, False, False, False),
692             # One non-expired valid block - OK
693             (tomorrow, yesterday, True, False, True),
694             (yesterday, tomorrow, False, True, True),
695             # One non-expired invalid block - Not OK
696             (tomorrow, yesterday, False, False, False),
697             (yesterday, tomorrow, False, False, False),
698         ]
699         for case in cases:
700             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
701             head_responses = {
702                 self.block1: b1_valid,
703                 self.block2: b2_valid,
704             }
705             cached_manifest = self.template % (
706                 self.datetime_to_hex(b1_expiration),
707                 self.datetime_to_hex(b2_expiration),
708             )
709             arvput = self.MockedPut(cached_manifest)
710             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
711                 head_mock.side_effect = partial(mocked_head, head_responses)
712                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
713                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
714                 )
715                 if b1_expiration > now or b2_expiration > now:
716                     # A HEAD request should have been done
717                     head_mock.assert_called_once()
718                 else:
719                     head_mock.assert_not_called()
720
721
722 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
723     TEST_SIZE = os.path.getsize(__file__)
724
725     def test_expected_bytes_for_file(self):
726         writer = arv_put.ArvPutUploadJob([__file__])
727         self.assertEqual(self.TEST_SIZE,
728                          writer.bytes_expected)
729
730     def test_expected_bytes_for_tree(self):
731         tree = self.make_tmpdir()
732         shutil.copyfile(__file__, os.path.join(tree, 'one'))
733         shutil.copyfile(__file__, os.path.join(tree, 'two'))
734
735         writer = arv_put.ArvPutUploadJob([tree])
736         self.assertEqual(self.TEST_SIZE * 2,
737                          writer.bytes_expected)
738         writer = arv_put.ArvPutUploadJob([tree, __file__])
739         self.assertEqual(self.TEST_SIZE * 3,
740                          writer.bytes_expected)
741
742     def test_expected_bytes_for_device(self):
743         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
744         self.assertIsNone(writer.bytes_expected)
745         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
746         self.assertIsNone(writer.bytes_expected)
747
748
749 class ArvadosPutReportTest(ArvadosBaseTestCase):
750     def test_machine_progress(self):
751         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
752             expect = ": {} written {} total\n".format(
753                 count, -1 if (total is None) else total)
754             self.assertTrue(
755                 arv_put.machine_progress(count, total).endswith(expect))
756
757     def test_known_human_progress(self):
758         for count, total in [(0, 1), (2, 4), (45, 60)]:
759             expect = '{:.1%}'.format(1.0*count/total)
760             actual = arv_put.human_progress(count, total)
761             self.assertTrue(actual.startswith('\r'))
762             self.assertIn(expect, actual)
763
764     def test_unknown_human_progress(self):
765         for count in [1, 20, 300, 4000, 50000]:
766             self.assertTrue(re.search(r'\b{}\b'.format(count),
767                                       arv_put.human_progress(count, None)))
768
769
770 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
771     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
772
773     def setUp(self):
774         super(ArvPutLogFormatterTest, self).setUp()
775         self.stderr = tutil.StringIO()
776         self.loggingHandler = logging.StreamHandler(self.stderr)
777         self.loggingHandler.setFormatter(
778             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
779         self.logger = logging.getLogger()
780         self.logger.addHandler(self.loggingHandler)
781         self.logger.setLevel(logging.DEBUG)
782
783     def tearDown(self):
784         self.logger.removeHandler(self.loggingHandler)
785         self.stderr.close()
786         self.stderr = None
787         super(ArvPutLogFormatterTest, self).tearDown()
788
789     def test_request_id_logged_only_once_on_error(self):
790         self.logger.error('Ooops, something bad happened.')
791         self.logger.error('Another bad thing just happened.')
792         log_lines = self.stderr.getvalue().split('\n')[:-1]
793         self.assertEqual(2, len(log_lines))
794         self.assertRegex(log_lines[0], self.matcher)
795         self.assertNotRegex(log_lines[1], self.matcher)
796
797     def test_request_id_logged_only_once_on_debug(self):
798         self.logger.debug('This is just a debug message.')
799         self.logger.debug('Another message, move along.')
800         log_lines = self.stderr.getvalue().split('\n')[:-1]
801         self.assertEqual(2, len(log_lines))
802         self.assertRegex(log_lines[0], self.matcher)
803         self.assertNotRegex(log_lines[1], self.matcher)
804
805     def test_request_id_not_logged_on_info(self):
806         self.logger.info('This should be a useful message')
807         log_lines = self.stderr.getvalue().split('\n')[:-1]
808         self.assertEqual(1, len(log_lines))
809         self.assertNotRegex(log_lines[0], self.matcher)
810
811 class ArvadosPutTest(run_test_server.TestCaseWithServers,
812                      ArvadosBaseTestCase,
813                      tutil.VersionChecker):
814     MAIN_SERVER = {}
815     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
816
817     def call_main_with_args(self, args):
818         self.main_stdout.seek(0, 0)
819         self.main_stdout.truncate(0)
820         self.main_stderr.seek(0, 0)
821         self.main_stderr.truncate(0)
822         return arv_put.main(args, self.main_stdout, self.main_stderr)
823
824     def call_main_on_test_file(self, args=[]):
825         with self.make_test_file() as testfile:
826             path = testfile.name
827             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
828         self.assertTrue(
829             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
830                                         '098f6bcd4621d373cade4e832627b4f6')),
831             "did not find file stream in Keep store")
832
833     def setUp(self):
834         super(ArvadosPutTest, self).setUp()
835         run_test_server.authorize_with('active')
836         arv_put.api_client = None
837         self.main_stdout = tutil.StringIO()
838         self.main_stderr = tutil.StringIO()
839         self.loggingHandler = logging.StreamHandler(self.main_stderr)
840         self.loggingHandler.setFormatter(
841             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
842         logging.getLogger().addHandler(self.loggingHandler)
843
844     def tearDown(self):
845         logging.getLogger().removeHandler(self.loggingHandler)
846         for outbuf in ['main_stdout', 'main_stderr']:
847             if hasattr(self, outbuf):
848                 getattr(self, outbuf).close()
849                 delattr(self, outbuf)
850         super(ArvadosPutTest, self).tearDown()
851
852     def test_version_argument(self):
853         with tutil.redirected_streams(
854                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
855             with self.assertRaises(SystemExit):
856                 self.call_main_with_args(['--version'])
857         self.assertVersionOutput(out, err)
858
859     def test_simple_file_put(self):
860         self.call_main_on_test_file()
861
862     def test_put_with_unwriteable_cache_dir(self):
863         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
864         cachedir = self.make_tmpdir()
865         os.chmod(cachedir, 0o0)
866         arv_put.ResumeCache.CACHE_DIR = cachedir
867         try:
868             self.call_main_on_test_file()
869         finally:
870             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
871             os.chmod(cachedir, 0o700)
872
873     def test_put_with_unwritable_cache_subdir(self):
874         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
875         cachedir = self.make_tmpdir()
876         os.chmod(cachedir, 0o0)
877         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
878         try:
879             self.call_main_on_test_file()
880         finally:
881             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
882             os.chmod(cachedir, 0o700)
883
884     def test_put_block_replication(self):
885         self.call_main_on_test_file()
886         arv_put.api_client = None
887         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
888             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
889             self.call_main_on_test_file(['--replication', '1'])
890             self.call_main_on_test_file(['--replication', '4'])
891             self.call_main_on_test_file(['--replication', '5'])
892             self.assertEqual(
893                 [x[-1].get('copies') for x in put_mock.call_args_list],
894                 [1, 4, 5])
895
896     def test_normalize(self):
897         testfile1 = self.make_test_file()
898         testfile2 = self.make_test_file()
899         test_paths = [testfile1.name, testfile2.name]
900         # Reverse-sort the paths, so normalization must change their order.
901         test_paths.sort(reverse=True)
902         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
903                                  test_paths)
904         manifest = self.main_stdout.getvalue()
905         # Assert the second file we specified appears first in the manifest.
906         file_indices = [manifest.find(':' + os.path.basename(path))
907                         for path in test_paths]
908         self.assertGreater(*file_indices)
909
910     def test_error_name_without_collection(self):
911         self.assertRaises(SystemExit, self.call_main_with_args,
912                           ['--name', 'test without Collection',
913                            '--stream', '/dev/null'])
914
915     def test_error_when_project_not_found(self):
916         self.assertRaises(SystemExit,
917                           self.call_main_with_args,
918                           ['--project-uuid', self.Z_UUID])
919
920     def test_error_bad_project_uuid(self):
921         self.assertRaises(SystemExit,
922                           self.call_main_with_args,
923                           ['--project-uuid', self.Z_UUID, '--stream'])
924
925     def test_error_when_excluding_absolute_path(self):
926         tmpdir = self.make_tmpdir()
927         self.assertRaises(SystemExit,
928                           self.call_main_with_args,
929                           ['--exclude', '/some/absolute/path/*',
930                            tmpdir])
931
932     def test_api_error_handling(self):
933         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
934         coll_save_mock.side_effect = arvados.errors.ApiError(
935             fake_httplib2_response(403), b'{}')
936         with mock.patch('arvados.collection.Collection.save_new',
937                         new=coll_save_mock):
938             with self.assertRaises(SystemExit) as exc_test:
939                 self.call_main_with_args(['/dev/null'])
940             self.assertLess(0, exc_test.exception.args[0])
941             self.assertLess(0, coll_save_mock.call_count)
942             self.assertEqual("", self.main_stdout.getvalue())
943
944     def test_request_id_logging_on_error(self):
945         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
946         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
947         coll_save_mock.side_effect = arvados.errors.ApiError(
948             fake_httplib2_response(403), b'{}')
949         with mock.patch('arvados.collection.Collection.save_new',
950                         new=coll_save_mock):
951             with self.assertRaises(SystemExit):
952                 self.call_main_with_args(['/dev/null'])
953             self.assertRegex(
954                 self.main_stderr.getvalue(), matcher)
955
956
957 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
958                             ArvadosBaseTestCase):
959     MAIN_SERVER = {}
960     KEEP_SERVER = {'blob_signing': True}
961     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
962
963     @classmethod
964     def setUpClass(cls):
965         super(ArvPutIntegrationTest, cls).setUpClass()
966         cls.ENVIRON = os.environ.copy()
967         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
968
969     def datetime_to_hex(self, dt):
970         return hex(int(time.mktime(dt.timetuple())))[2:]
971
972     def setUp(self):
973         super(ArvPutIntegrationTest, self).setUp()
974         arv_put.api_client = None
975
976     def authorize_with(self, token_name):
977         run_test_server.authorize_with(token_name)
978         for v in ["ARVADOS_API_HOST",
979                   "ARVADOS_API_HOST_INSECURE",
980                   "ARVADOS_API_TOKEN"]:
981             self.ENVIRON[v] = arvados.config.settings()[v]
982         arv_put.api_client = arvados.api('v1')
983
984     def current_user(self):
985         return arv_put.api_client.users().current().execute()
986
987     def test_check_real_project_found(self):
988         self.authorize_with('active')
989         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
990                         "did not correctly find test fixture project")
991
992     def test_check_error_finding_nonexistent_uuid(self):
993         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
994         self.authorize_with('active')
995         try:
996             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
997                                                   0)
998         except ValueError as error:
999             self.assertIn(BAD_UUID, str(error))
1000         else:
1001             self.assertFalse(result, "incorrectly found nonexistent project")
1002
1003     def test_check_error_finding_nonexistent_project(self):
1004         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
1005         self.authorize_with('active')
1006         with self.assertRaises(apiclient.errors.HttpError):
1007             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
1008                                                   0)
1009
1010     def test_short_put_from_stdin(self):
1011         # Have to run this as an integration test since arv-put can't
1012         # read from the tests' stdin.
1013         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
1014         # case, because the /proc entry is already gone by the time it tries.
1015         pipe = subprocess.Popen(
1016             [sys.executable, arv_put.__file__, '--stream'],
1017             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1018             stderr=subprocess.STDOUT, env=self.ENVIRON)
1019         pipe.stdin.write(b'stdin test\xa6\n')
1020         pipe.stdin.close()
1021         deadline = time.time() + 5
1022         while (pipe.poll() is None) and (time.time() < deadline):
1023             time.sleep(.1)
1024         returncode = pipe.poll()
1025         if returncode is None:
1026             pipe.terminate()
1027             self.fail("arv-put did not PUT from stdin within 5 seconds")
1028         elif returncode != 0:
1029             sys.stdout.write(pipe.stdout.read())
1030             self.fail("arv-put returned exit code {}".format(returncode))
1031         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
1032                       pipe.stdout.read().decode())
1033
1034     def test_sigint_logs_request_id(self):
1035         # Start arv-put, give it a chance to start up, send SIGINT,
1036         # and check that its output includes the X-Request-Id.
1037         input_stream = subprocess.Popen(
1038             ['sleep', '10'],
1039             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
1040         pipe = subprocess.Popen(
1041             [sys.executable, arv_put.__file__, '--stream'],
1042             stdin=input_stream.stdout, stdout=subprocess.PIPE,
1043             stderr=subprocess.STDOUT, env=self.ENVIRON)
1044         # Wait for arv-put child process to print something (i.e., a
1045         # log message) so we know its signal handler is installed.
1046         select.select([pipe.stdout], [], [], 10)
1047         pipe.send_signal(signal.SIGINT)
1048         deadline = time.time() + 5
1049         while (pipe.poll() is None) and (time.time() < deadline):
1050             time.sleep(.1)
1051         returncode = pipe.poll()
1052         input_stream.terminate()
1053         if returncode is None:
1054             pipe.terminate()
1055             self.fail("arv-put did not exit within 5 seconds")
1056         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
1057
1058     def test_ArvPutSignedManifest(self):
1059         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
1060         # the newly created manifest from the API server, testing to confirm
1061         # that the block locators in the returned manifest are signed.
1062         self.authorize_with('active')
1063
1064         # Before doing anything, demonstrate that the collection
1065         # we're about to create is not present in our test fixture.
1066         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
1067         with self.assertRaises(apiclient.errors.HttpError):
1068             arv_put.api_client.collections().get(
1069                 uuid=manifest_uuid).execute()
1070
1071         datadir = self.make_tmpdir()
1072         with open(os.path.join(datadir, "foo"), "w") as f:
1073             f.write("The quick brown fox jumped over the lazy dog")
1074         p = subprocess.Popen([sys.executable, arv_put.__file__,
1075                               os.path.join(datadir, 'foo')],
1076                              stdout=subprocess.PIPE,
1077                              stderr=subprocess.PIPE,
1078                              env=self.ENVIRON)
1079         (_, err) = p.communicate()
1080         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
1081         self.assertEqual(p.returncode, 0)
1082
1083         # The manifest text stored in the API server under the same
1084         # manifest UUID must use signed locators.
1085         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
1086         self.assertRegex(
1087             c['manifest_text'],
1088             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1089
1090         os.remove(os.path.join(datadir, "foo"))
1091         os.rmdir(datadir)
1092
1093     def run_and_find_collection(self, text, extra_args=[]):
1094         self.authorize_with('active')
1095         pipe = subprocess.Popen(
1096             [sys.executable, arv_put.__file__] + extra_args,
1097             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1098             stderr=subprocess.PIPE, env=self.ENVIRON)
1099         stdout, stderr = pipe.communicate(text.encode())
1100         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1101         search_key = ('portable_data_hash'
1102                       if '--portable-data-hash' in extra_args else 'uuid')
1103         collection_list = arvados.api('v1').collections().list(
1104             filters=[[search_key, '=', stdout.decode().strip()]]
1105         ).execute().get('items', [])
1106         self.assertEqual(1, len(collection_list))
1107         return collection_list[0]
1108
1109     def test_all_expired_signatures_invalidates_cache(self):
1110         self.authorize_with('active')
1111         tmpdir = self.make_tmpdir()
1112         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1113             f.write('foo')
1114         # Upload a directory and get the cache file name
1115         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1116                              stdout=subprocess.PIPE,
1117                              stderr=subprocess.PIPE,
1118                              env=self.ENVIRON)
1119         (_, err) = p.communicate()
1120         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1121         self.assertEqual(p.returncode, 0)
1122         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1123                                    err.decode()).groups()[0]
1124         self.assertTrue(os.path.isfile(cache_filepath))
1125         # Load the cache file contents and modify the manifest to simulate
1126         # an expired access token
1127         with open(cache_filepath, 'r') as c:
1128             cache = json.load(c)
1129         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1130         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1131         cache['manifest'] = re.sub(
1132             r'\@.*? ',
1133             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1134             cache['manifest'])
1135         with open(cache_filepath, 'w') as c:
1136             c.write(json.dumps(cache))
1137         # Re-run the upload and expect to get an invalid cache message
1138         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1139                              stdout=subprocess.PIPE,
1140                              stderr=subprocess.PIPE,
1141                              env=self.ENVIRON)
1142         (_, err) = p.communicate()
1143         self.assertRegex(
1144             err.decode(),
1145             r'INFO: Cache expired, starting from scratch.*')
1146         self.assertEqual(p.returncode, 0)
1147
1148     def test_invalid_signature_in_cache(self):
1149         for batch_mode in [False, True]:
1150             self.authorize_with('active')
1151             tmpdir = self.make_tmpdir()
1152             with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1153                 f.write('foo')
1154             # Upload a directory and get the cache file name
1155             arv_put_args = [tmpdir]
1156             if batch_mode:
1157                 arv_put_args = ['--batch'] + arv_put_args
1158             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1159                                 stdout=subprocess.PIPE,
1160                                 stderr=subprocess.PIPE,
1161                                 env=self.ENVIRON)
1162             (_, err) = p.communicate()
1163             self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1164             self.assertEqual(p.returncode, 0)
1165             cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1166                                     err.decode()).groups()[0]
1167             self.assertTrue(os.path.isfile(cache_filepath))
1168             # Load the cache file contents and modify the manifest to simulate
1169             # an invalid access token
1170             with open(cache_filepath, 'r') as c:
1171                 cache = json.load(c)
1172             self.assertRegex(cache['manifest'], r'\+A\S+\@')
1173             cache['manifest'] = re.sub(
1174                 r'\+A.*\@',
1175                 "+Aabcdef0123456789abcdef0123456789abcdef01@",
1176                 cache['manifest'])
1177             with open(cache_filepath, 'w') as c:
1178                 c.write(json.dumps(cache))
1179             # Re-run the upload and expect to get an invalid cache message
1180             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1181                                 stdout=subprocess.PIPE,
1182                                 stderr=subprocess.PIPE,
1183                                 env=self.ENVIRON)
1184             (_, err) = p.communicate()
1185             if not batch_mode:
1186                 self.assertRegex(
1187                     err.decode(),
1188                     r'ERROR: arv-put: Resume cache contains invalid signature.*')
1189                 self.assertEqual(p.returncode, 1)
1190             else:
1191                 self.assertRegex(
1192                     err.decode(),
1193                     r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
1194                 self.assertEqual(p.returncode, 0)
1195
1196     def test_single_expired_signature_reuploads_file(self):
1197         self.authorize_with('active')
1198         tmpdir = self.make_tmpdir()
1199         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1200             f.write('foo')
1201         # Write a second file on its own subdir to force a new stream
1202         os.mkdir(os.path.join(tmpdir, 'bar'))
1203         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1204             f.write('bar')
1205         # Upload a directory and get the cache file name
1206         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1207                              stdout=subprocess.PIPE,
1208                              stderr=subprocess.PIPE,
1209                              env=self.ENVIRON)
1210         (_, err) = p.communicate()
1211         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1212         self.assertEqual(p.returncode, 0)
1213         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1214                                    err.decode()).groups()[0]
1215         self.assertTrue(os.path.isfile(cache_filepath))
1216         # Load the cache file contents and modify the manifest to simulate
1217         # an expired access token
1218         with open(cache_filepath, 'r') as c:
1219             cache = json.load(c)
1220         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1221         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1222         # Make one of the signatures appear to have expired
1223         cache['manifest'] = re.sub(
1224             r'\@.*? 3:3:barfile.txt',
1225             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1226             cache['manifest'])
1227         with open(cache_filepath, 'w') as c:
1228             c.write(json.dumps(cache))
1229         # Re-run the upload and expect to get an invalid cache message
1230         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1231                              stdout=subprocess.PIPE,
1232                              stderr=subprocess.PIPE,
1233                              env=self.ENVIRON)
1234         (_, err) = p.communicate()
1235         self.assertRegex(
1236             err.decode(),
1237             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1238         self.assertEqual(p.returncode, 0)
1239         # Confirm that the resulting cache is different from the last run.
1240         with open(cache_filepath, 'r') as c2:
1241             new_cache = json.load(c2)
1242         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1243
1244     def test_put_collection_with_later_update(self):
1245         tmpdir = self.make_tmpdir()
1246         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1247             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1248         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1249         self.assertNotEqual(None, col['uuid'])
1250         # Add a new file to the directory
1251         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1252             f.write('The quick brown fox jumped over the lazy dog')
1253         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1254         self.assertEqual(col['uuid'], updated_col['uuid'])
1255         # Get the manifest and check that the new file is being included
1256         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1257         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1258
1259     def test_put_collection_with_utc_expiring_datetime(self):
1260         tmpdir = self.make_tmpdir()
1261         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1262         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1263             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1264         col = self.run_and_find_collection(
1265             "",
1266             ['--no-progress', '--trash-at', trash_at, tmpdir])
1267         self.assertNotEqual(None, col['uuid'])
1268         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1269         self.assertEqual(ciso8601.parse_datetime(trash_at),
1270             ciso8601.parse_datetime(c['trash_at']))
1271
1272     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1273         tmpdir = self.make_tmpdir()
1274         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1275         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1276             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1277         col = self.run_and_find_collection(
1278             "",
1279             ['--no-progress', '--trash-at', trash_at, tmpdir])
1280         self.assertNotEqual(None, col['uuid'])
1281         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1282         self.assertEqual(
1283             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1284             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1285
1286     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1287         tmpdir = self.make_tmpdir()
1288         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1289         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1290             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1291         col = self.run_and_find_collection(
1292             "",
1293             ['--no-progress', '--trash-at', trash_at, tmpdir])
1294         self.assertNotEqual(None, col['uuid'])
1295         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1296         if time.daylight:
1297             offset = datetime.timedelta(seconds=time.altzone)
1298         else:
1299             offset = datetime.timedelta(seconds=time.timezone)
1300         self.assertEqual(
1301             ciso8601.parse_datetime(trash_at) + offset,
1302             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1303
1304     def test_put_collection_with_expiring_date_only(self):
1305         tmpdir = self.make_tmpdir()
1306         trash_at = '2140-01-01'
1307         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1308         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1309             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1310         col = self.run_and_find_collection(
1311             "",
1312             ['--no-progress', '--trash-at', trash_at, tmpdir])
1313         self.assertNotEqual(None, col['uuid'])
1314         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1315         if time.daylight:
1316             offset = datetime.timedelta(seconds=time.altzone)
1317         else:
1318             offset = datetime.timedelta(seconds=time.timezone)
1319         self.assertEqual(
1320             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1321             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1322
1323     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1324         cases = ['2100', '210010','2100-10', '2100-Oct']
1325         tmpdir = self.make_tmpdir()
1326         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1327             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1328         for test_datetime in cases:
1329             with self.assertRaises(AssertionError):
1330                 self.run_and_find_collection(
1331                     "",
1332                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1333
1334     def test_put_collection_with_relative_expiring_datetime(self):
1335         expire_after = 7
1336         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1337         tmpdir = self.make_tmpdir()
1338         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1339             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1340         col = self.run_and_find_collection(
1341             "",
1342             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1343         self.assertNotEqual(None, col['uuid'])
1344         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1345         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1346         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1347         self.assertTrue(dt_before < trash_at)
1348         self.assertTrue(dt_after > trash_at)
1349
1350     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1351         expire_after = 0 # Must be >= 1
1352         tmpdir = self.make_tmpdir()
1353         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1354             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1355         with self.assertRaises(AssertionError):
1356             self.run_and_find_collection(
1357                 "",
1358                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1359
1360     def test_upload_directory_reference_without_trailing_slash(self):
1361         tmpdir1 = self.make_tmpdir()
1362         tmpdir2 = self.make_tmpdir()
1363         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1364             f.write('This is foo')
1365         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1366             f.write('This is not foo')
1367         # Upload one directory and one file
1368         col = self.run_and_find_collection("", ['--no-progress',
1369                                                 tmpdir1,
1370                                                 os.path.join(tmpdir2, 'bar')])
1371         self.assertNotEqual(None, col['uuid'])
1372         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1373         # Check that 'foo' was written inside a subcollection
1374         # OTOH, 'bar' should have been directly uploaded on the root collection
1375         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1376
1377     def test_upload_directory_reference_with_trailing_slash(self):
1378         tmpdir1 = self.make_tmpdir()
1379         tmpdir2 = self.make_tmpdir()
1380         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1381             f.write('This is foo')
1382         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1383             f.write('This is not foo')
1384         # Upload one directory (with trailing slash) and one file
1385         col = self.run_and_find_collection("", ['--no-progress',
1386                                                 tmpdir1 + os.sep,
1387                                                 os.path.join(tmpdir2, 'bar')])
1388         self.assertNotEqual(None, col['uuid'])
1389         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1390         # Check that 'foo' and 'bar' were written at the same level
1391         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1392
1393     def test_put_collection_with_high_redundancy(self):
1394         # Write empty data: we're not testing CollectionWriter, just
1395         # making sure collections.create tells the API server what our
1396         # desired replication level is.
1397         collection = self.run_and_find_collection("", ['--replication', '4'])
1398         self.assertEqual(4, collection['replication_desired'])
1399
1400     def test_put_collection_with_default_redundancy(self):
1401         collection = self.run_and_find_collection("")
1402         self.assertEqual(None, collection['replication_desired'])
1403
1404     def test_put_collection_with_unnamed_project_link(self):
1405         link = self.run_and_find_collection(
1406             "Test unnamed collection",
1407             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1408         username = pwd.getpwuid(os.getuid()).pw_name
1409         self.assertRegex(
1410             link['name'],
1411             r'^Saved at .* by {}@'.format(re.escape(username)))
1412
1413     def test_put_collection_with_name_and_no_project(self):
1414         link_name = 'Test Collection Link in home project'
1415         collection = self.run_and_find_collection(
1416             "Test named collection in home project",
1417             ['--portable-data-hash', '--name', link_name])
1418         self.assertEqual(link_name, collection['name'])
1419         my_user_uuid = self.current_user()['uuid']
1420         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1421
1422     def test_put_collection_with_named_project_link(self):
1423         link_name = 'Test auto Collection Link'
1424         collection = self.run_and_find_collection("Test named collection",
1425                                       ['--portable-data-hash',
1426                                        '--name', link_name,
1427                                        '--project-uuid', self.PROJECT_UUID])
1428         self.assertEqual(link_name, collection['name'])
1429
1430     def test_put_collection_with_storage_classes_specified(self):
1431         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1432         self.assertEqual(len(collection['storage_classes_desired']), 1)
1433         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1434
1435     def test_put_collection_with_multiple_storage_classes_specified(self):
1436         collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar  ,baz'])
1437         self.assertEqual(len(collection['storage_classes_desired']), 3)
1438         self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
1439
1440     def test_put_collection_without_storage_classes_specified(self):
1441         collection = self.run_and_find_collection("")
1442         self.assertEqual(len(collection['storage_classes_desired']), 1)
1443         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1444
1445     def test_exclude_filename_pattern(self):
1446         tmpdir = self.make_tmpdir()
1447         tmpsubdir = os.path.join(tmpdir, 'subdir')
1448         os.mkdir(tmpsubdir)
1449         for fname in ['file1', 'file2', 'file3']:
1450             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1451                 f.write("This is %s" % fname)
1452             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1453                 f.write("This is %s" % fname)
1454         col = self.run_and_find_collection("", ['--no-progress',
1455                                                 '--exclude', '*2.txt',
1456                                                 '--exclude', 'file3.*',
1457                                                  tmpdir])
1458         self.assertNotEqual(None, col['uuid'])
1459         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1460         # None of the file2.txt & file3.txt should have been uploaded
1461         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1462         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1463         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1464
1465     def test_exclude_filepath_pattern(self):
1466         tmpdir = self.make_tmpdir()
1467         tmpsubdir = os.path.join(tmpdir, 'subdir')
1468         os.mkdir(tmpsubdir)
1469         for fname in ['file1', 'file2', 'file3']:
1470             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1471                 f.write("This is %s" % fname)
1472             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1473                 f.write("This is %s" % fname)
1474         col = self.run_and_find_collection("", ['--no-progress',
1475                                                 '--exclude', 'subdir/*2.txt',
1476                                                 '--exclude', './file1.*',
1477                                                  tmpdir])
1478         self.assertNotEqual(None, col['uuid'])
1479         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1480         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1481         self.assertNotRegex(c['manifest_text'],
1482                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1483         self.assertNotRegex(c['manifest_text'],
1484                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1485         self.assertRegex(c['manifest_text'],
1486                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1487         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1488
1489     def test_unicode_on_filename(self):
1490         tmpdir = self.make_tmpdir()
1491         fname = u"iā¤arvados.txt"
1492         with open(os.path.join(tmpdir, fname), 'w') as f:
1493             f.write("This is a unicode named file")
1494         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1495         self.assertNotEqual(None, col['uuid'])
1496         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1497         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1498
1499     def test_silent_mode_no_errors(self):
1500         self.authorize_with('active')
1501         tmpdir = self.make_tmpdir()
1502         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1503             f.write('hello world')
1504         pipe = subprocess.Popen(
1505             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1506             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1507             stderr=subprocess.PIPE, env=self.ENVIRON)
1508         stdout, stderr = pipe.communicate()
1509         # No console output should occur on normal operations
1510         self.assertNotRegex(stderr.decode(), r'.+')
1511         self.assertNotRegex(stdout.decode(), r'.+')
1512
1513     def test_silent_mode_does_not_avoid_error_messages(self):
1514         self.authorize_with('active')
1515         pipe = subprocess.Popen(
1516             [sys.executable, arv_put.__file__] + ['--silent',
1517                                                   '/path/not/existant'],
1518             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1519             stderr=subprocess.PIPE, env=self.ENVIRON)
1520         stdout, stderr = pipe.communicate()
1521         # Error message should be displayed when errors happen
1522         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1523         self.assertNotRegex(stdout.decode(), r'.+')
1524
1525
1526 if __name__ == '__main__':
1527     unittest.main()