@_FileLikeObjectBase._before_close
@retry_method
def decompress(self, decompress, size, num_retries=None):
- for segment in self.readall(size, num_retries):
+ for segment in self.readall(size, num_retries=num_retries):
data = decompress(segment)
if data:
yield data
@retry_method
def writelines(self, seq, num_retries=None):
for s in seq:
- self.write(s, num_retries)
+ self.write(s, num_retries=num_retries)
@_FileLikeObjectBase._before_close
def truncate(self, size=None):
"""
@functools.wraps(orig_func)
def num_retries_setter(self, *args, **kwargs):
- arg_vals = inspect.getcallargs(orig_func, self, *args, **kwargs)
- if arg_vals['num_retries'] is None:
+ if kwargs.get('num_retries') is None:
kwargs['num_retries'] = self.num_retries
return orig_func(self, *args, **kwargs)
return num_retries_setter
return (a, num_retries, z)
- def test_positional_arg_passed(self):
- self.assertEqual((3, 2, 0), self.Tester().check(3, 2))
+ def test_positional_arg_raises(self):
+ # unsupported use -- make sure we raise rather than ignore
+ with self.assertRaises(TypeError):
+ self.assertEqual((3, 2, 0), self.Tester().check(3, 2))
def test_keyword_arg_passed(self):
self.assertEqual((4, 3, 0), self.Tester().check(num_retries=3, a=4))