libfuse/test/test_examples.py
HereThereBeDragons c0a344e379 Test for fuse_lowlevel_notify_expire_entry.
This test is too simple to check for all functionalities of notify_expire as it always successfully passes when libfuse supports the function (even if kernel does not support it -  it just takes it as notify_inval)
2023-01-06 18:35:52 +00:00

765 lines
25 KiB
Python
Executable File

#!/usr/bin/env python3
if __name__ == '__main__':
import pytest
import sys
sys.exit(pytest.main([__file__] + sys.argv[1:]))
import subprocess
import os
import sys
import py
import pytest
import stat
import shutil
import filecmp
import tempfile
import time
import errno
import sys
import platform
from distutils.version import LooseVersion
from tempfile import NamedTemporaryFile
from contextlib import contextmanager
from util import (wait_for_mount, umount, cleanup, base_cmdline,
safe_sleep, basename, fuse_test_marker, test_printcap,
fuse_proto, powerset)
from os.path import join as pjoin
pytestmark = fuse_test_marker()
TEST_FILE = __file__
with open(TEST_FILE, 'rb') as fh:
TEST_DATA = fh.read()
def name_generator(__ctr=[0]):
__ctr[0] += 1
return 'testfile_%d' % __ctr[0]
options = []
if sys.platform == 'linux':
options.append('clone_fd')
def invoke_directly(mnt_dir, name, options):
cmdline = base_cmdline + [ pjoin(basename, 'example', name),
'-f', mnt_dir, '-o', ','.join(options) ]
if name == 'hello_ll':
# supports single-threading only
cmdline.append('-s')
return cmdline
def invoke_mount_fuse(mnt_dir, name, options):
return base_cmdline + [ pjoin(basename, 'util', 'mount.fuse3'),
name, mnt_dir, '-o', ','.join(options) ]
def invoke_mount_fuse_drop_privileges(mnt_dir, name, options):
if os.getuid() != 0:
pytest.skip('drop_privileges requires root, skipping.')
return invoke_mount_fuse(mnt_dir, name, options + ('drop_privileges',))
class raii_tmpdir:
def __init__(self):
self.d = tempfile.mkdtemp()
def __str__(self):
return str(self.d)
def mkdir(self, path):
return py.path.local(str(self.d)).mkdir(path)
@pytest.fixture
def short_tmpdir():
return raii_tmpdir()
def readdir_inode(dir):
cmd = base_cmdline + [ pjoin(basename, 'test', 'readdir_inode'), dir ]
with subprocess.Popen(cmd, stdout=subprocess.PIPE,
universal_newlines=True) as proc:
lines = proc.communicate()[0].splitlines()
lines.sort()
return lines
@pytest.mark.parametrize("cmdline_builder", (invoke_directly, invoke_mount_fuse,
invoke_mount_fuse_drop_privileges))
@pytest.mark.parametrize("options", powerset(options))
@pytest.mark.parametrize("name", ('hello', 'hello_ll'))
def test_hello(tmpdir, name, options, cmdline_builder, output_checker):
mnt_dir = str(tmpdir)
mount_process = subprocess.Popen(
cmdline_builder(mnt_dir, name, options),
stdout=output_checker.fd, stderr=output_checker.fd)
try:
wait_for_mount(mount_process, mnt_dir)
assert os.listdir(mnt_dir) == [ 'hello' ]
filename = pjoin(mnt_dir, 'hello')
with open(filename, 'r') as fh:
assert fh.read() == 'Hello World!\n'
with pytest.raises(IOError) as exc_info:
open(filename, 'r+')
assert exc_info.value.errno == errno.EACCES
with pytest.raises(IOError) as exc_info:
open(filename + 'does-not-exist', 'r+')
assert exc_info.value.errno == errno.ENOENT
except:
cleanup(mount_process, mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
@pytest.mark.parametrize("writeback", (False, True))
@pytest.mark.parametrize("name", ('passthrough', 'passthrough_plus',
'passthrough_fh', 'passthrough_ll'))
@pytest.mark.parametrize("debug", (False, True))
def test_passthrough(short_tmpdir, name, debug, output_checker, writeback):
# Avoid false positives from libfuse debug messages
if debug:
output_checker.register_output(r'^ unique: [0-9]+, error: -[0-9]+ .+$',
count=0)
# test_syscalls prints "No error" under FreeBSD
output_checker.register_output(r"^ \d\d \[[^\]]+ message: 'No error: 0'\]",
count=0)
mnt_dir = str(short_tmpdir.mkdir('mnt'))
src_dir = str(short_tmpdir.mkdir('src'))
if name == 'passthrough_plus':
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'passthrough'),
'--plus', '-f', mnt_dir ]
else:
cmdline = base_cmdline + \
[ pjoin(basename, 'example', name),
'-f', mnt_dir ]
if debug:
cmdline.append('-d')
if writeback:
if name != 'passthrough_ll':
pytest.skip('example does not support writeback caching')
cmdline.append('-o')
cmdline.append('writeback')
mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
stderr=output_checker.fd)
try:
wait_for_mount(mount_process, mnt_dir)
work_dir = mnt_dir + src_dir
tst_statvfs(work_dir)
tst_readdir(src_dir, work_dir)
tst_readdir_big(src_dir, work_dir)
tst_open_read(src_dir, work_dir)
tst_open_write(src_dir, work_dir)
tst_create(work_dir)
tst_passthrough(src_dir, work_dir)
tst_append(src_dir, work_dir)
tst_seek(src_dir, work_dir)
tst_mkdir(work_dir)
tst_rmdir(work_dir, src_dir)
tst_unlink(work_dir, src_dir)
tst_symlink(work_dir)
if os.getuid() == 0:
tst_chown(work_dir)
# Underlying fs may not have full nanosecond resolution
tst_utimens(work_dir, ns_tol=1000)
tst_link(work_dir)
tst_truncate_path(work_dir)
tst_truncate_fd(work_dir)
tst_open_unlink(work_dir)
syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'),
work_dir, ':' + src_dir ]
if writeback:
# When writeback caching is enabled, kernel has to open files for
# reading even when userspace opens with O_WDONLY. This fails if the
# filesystem process doesn't have special permission.
syscall_test_cmd.append('-53')
subprocess.check_call(syscall_test_cmd)
except:
cleanup(mount_process, mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
@pytest.mark.parametrize("cache", (False, True))
def test_passthrough_hp(short_tmpdir, cache, output_checker):
mnt_dir = str(short_tmpdir.mkdir('mnt'))
src_dir = str(short_tmpdir.mkdir('src'))
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'passthrough_hp'),
src_dir, mnt_dir ]
if not cache:
cmdline.append('--nocache')
mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
stderr=output_checker.fd)
try:
wait_for_mount(mount_process, mnt_dir)
tst_statvfs(mnt_dir)
tst_readdir(src_dir, mnt_dir)
tst_readdir_big(src_dir, mnt_dir)
tst_open_read(src_dir, mnt_dir)
tst_open_write(src_dir, mnt_dir)
tst_create(mnt_dir)
if not cache:
tst_passthrough(src_dir, mnt_dir)
tst_append(src_dir, mnt_dir)
tst_seek(src_dir, mnt_dir)
tst_mkdir(mnt_dir)
if cache:
# if cache is enabled, no operations should go through
# src_dir as the cache will become stale.
tst_rmdir(mnt_dir)
tst_unlink(mnt_dir)
else:
tst_rmdir(mnt_dir, src_dir)
tst_unlink(mnt_dir, src_dir)
tst_symlink(mnt_dir)
if os.getuid() == 0:
tst_chown(mnt_dir)
# Underlying fs may not have full nanosecond resolution
tst_utimens(mnt_dir, ns_tol=1000)
tst_link(mnt_dir)
tst_truncate_path(mnt_dir)
tst_truncate_fd(mnt_dir)
tst_open_unlink(mnt_dir)
# test_syscalls assumes that changes in source directory
# will be reflected immediately in mountpoint, so we
# can't use it.
if not cache:
syscall_test_cmd = [ os.path.join(basename, 'test', 'test_syscalls'),
mnt_dir, ':' + src_dir ]
# unlinked testfiles check fails without kernel fix
# "fuse: fix illegal access to inode with reused nodeid"
# so opt-in for this test from kernel 5.14
if LooseVersion(platform.release()) >= '5.14':
syscall_test_cmd.append('-u')
subprocess.check_call(syscall_test_cmd)
except:
cleanup(mount_process, mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
@pytest.mark.skipif(fuse_proto < (7,11),
reason='not supported by running kernel')
def test_ioctl(tmpdir, output_checker):
progname = pjoin(basename, 'example', 'ioctl')
if not os.path.exists(progname):
pytest.skip('%s not built' % os.path.basename(progname))
mnt_dir = str(tmpdir)
testfile = pjoin(mnt_dir, 'fioc')
cmdline = base_cmdline + [progname, '-f', mnt_dir ]
mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
stderr=output_checker.fd)
try:
wait_for_mount(mount_process, mnt_dir)
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'ioctl_client'),
testfile ]
assert subprocess.check_output(cmdline) == b'0\n'
with open(testfile, 'wb') as fh:
fh.write(b'foobar')
assert subprocess.check_output(cmdline) == b'6\n'
subprocess.check_call(cmdline + [ '3' ])
with open(testfile, 'rb') as fh:
assert fh.read()== b'foo'
except:
cleanup(mount_process, mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
def test_poll(tmpdir, output_checker):
mnt_dir = str(tmpdir)
cmdline = base_cmdline + [pjoin(basename, 'example', 'poll'),
'-f', mnt_dir ]
mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
stderr=output_checker.fd)
try:
wait_for_mount(mount_process, mnt_dir)
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'poll_client') ]
subprocess.check_call(cmdline, cwd=mnt_dir)
except:
cleanup(mount_process, mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
def test_null(tmpdir, output_checker):
progname = pjoin(basename, 'example', 'null')
if not os.path.exists(progname):
pytest.skip('%s not built' % os.path.basename(progname))
mnt_file = str(tmpdir) + '/file'
with open(mnt_file, 'w') as fh:
fh.write('dummy')
cmdline = base_cmdline + [ progname, '-f', mnt_file ]
mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
stderr=output_checker.fd)
def test_fn(name):
return os.stat(name).st_size > 4000
try:
wait_for_mount(mount_process, mnt_file, test_fn)
with open(mnt_file, 'rb') as fh:
assert fh.read(382) == b'\0' * 382
with open(mnt_file, 'wb') as fh:
fh.write(b'whatever')
except:
cleanup(mount_process, mnt_file)
raise
else:
umount(mount_process, mnt_file)
@pytest.mark.skipif(fuse_proto < (7,12),
reason='not supported by running kernel')
@pytest.mark.parametrize("only_expire", ("invalidate_entries", "expire_entries"))
@pytest.mark.parametrize("notify", (True, False))
def test_notify_inval_entry(tmpdir, only_expire, notify, output_checker):
mnt_dir = str(tmpdir)
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'notify_inval_entry'),
'-f', '--update-interval=1',
'--timeout=5', mnt_dir ]
if not notify:
cmdline.append('--no-notify')
if only_expire == "expire_entries":
cmdline.append('--only-expire')
mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
stderr=output_checker.fd)
try:
wait_for_mount(mount_process, mnt_dir)
fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
try:
os.stat(fname)
except FileNotFoundError:
# We may have hit a race condition and issued
# readdir just before the name changed
fname = pjoin(mnt_dir, os.listdir(mnt_dir)[0])
os.stat(fname)
safe_sleep(2)
if not notify:
os.stat(fname)
safe_sleep(5)
with pytest.raises(FileNotFoundError):
os.stat(fname)
except:
cleanup(mount_process, mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
@pytest.mark.skipif(os.getuid() != 0,
reason='needs to run as root')
def test_cuse(output_checker):
# Valgrind warns about unknown ioctls, that's ok
output_checker.register_output(r'^==([0-9]+).+unhandled ioctl.+\n'
r'==\1== \s{3}.+\n'
r'==\1== \s{3}.+$', count=0)
devname = 'cuse-test-%d' % os.getpid()
devpath = '/dev/%s' % devname
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'cuse'),
'-f', '--name=%s' % devname ]
mount_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
stderr=output_checker.fd)
cmdline = base_cmdline + \
[ pjoin(basename, 'example', 'cuse_client'),
devpath ]
try:
wait_for_mount(mount_process, devpath,
test_fn=os.path.exists)
assert subprocess.check_output(cmdline + ['s']) == b'0\n'
data = b'some test data'
off = 5
proc = subprocess.Popen(cmdline + [ 'w', str(len(data)), str(off) ],
stdin=subprocess.PIPE)
proc.stdin.write(data)
proc.stdin.close()
assert proc.wait(timeout=10) == 0
size = str(off + len(data)).encode() + b'\n'
assert subprocess.check_output(cmdline + ['s']) == size
out = subprocess.check_output(
cmdline + [ 'r', str(off + len(data) + 2), '0' ])
assert out == (b'\0' * off) + data
finally:
mount_process.terminate()
@contextmanager
def os_open(name, flags):
fd = os.open(name, flags)
try:
yield fd
finally:
os.close(fd)
def os_create(name):
os.close(os.open(name, os.O_CREAT | os.O_RDWR))
def tst_unlink(mnt_dir, src_dir=None):
name = name_generator()
fullname = mnt_dir + "/" + name
srcname = fullname
if src_dir is not None:
srcname = pjoin(src_dir, name)
with open(srcname, 'wb') as fh:
fh.write(b'hello')
assert name in os.listdir(mnt_dir)
os.unlink(fullname)
with pytest.raises(OSError) as exc_info:
os.stat(fullname)
assert exc_info.value.errno == errno.ENOENT
assert name not in os.listdir(mnt_dir)
def tst_mkdir(mnt_dir):
dirname = name_generator()
fullname = mnt_dir + "/" + dirname
os.mkdir(fullname)
fstat = os.stat(fullname)
assert stat.S_ISDIR(fstat.st_mode)
assert os.listdir(fullname) == []
# Some filesystem (e.g. BTRFS) don't track st_nlink for directories
assert fstat.st_nlink in (1,2)
assert dirname in os.listdir(mnt_dir)
def tst_rmdir(mnt_dir, src_dir=None):
name = name_generator()
fullname = mnt_dir + "/" + name
srcname = fullname
if src_dir is not None:
srcname = pjoin(src_dir, name)
os.mkdir(srcname)
assert name in os.listdir(mnt_dir)
os.rmdir(fullname)
with pytest.raises(OSError) as exc_info:
os.stat(fullname)
assert exc_info.value.errno == errno.ENOENT
assert name not in os.listdir(mnt_dir)
def tst_symlink(mnt_dir):
linkname = name_generator()
fullname = mnt_dir + "/" + linkname
os.symlink("/imaginary/dest", fullname)
fstat = os.lstat(fullname)
assert stat.S_ISLNK(fstat.st_mode)
assert os.readlink(fullname) == "/imaginary/dest"
assert fstat.st_nlink == 1
assert linkname in os.listdir(mnt_dir)
def tst_create(mnt_dir):
name = name_generator()
fullname = pjoin(mnt_dir, name)
with pytest.raises(OSError) as exc_info:
os.stat(fullname)
assert exc_info.value.errno == errno.ENOENT
assert name not in os.listdir(mnt_dir)
fd = os.open(fullname, os.O_CREAT | os.O_RDWR)
os.close(fd)
assert name in os.listdir(mnt_dir)
fstat = os.lstat(fullname)
assert stat.S_ISREG(fstat.st_mode)
assert fstat.st_nlink == 1
assert fstat.st_size == 0
def tst_chown(mnt_dir):
filename = pjoin(mnt_dir, name_generator())
os.mkdir(filename)
fstat = os.lstat(filename)
uid = fstat.st_uid
gid = fstat.st_gid
uid_new = uid + 1
os.chown(filename, uid_new, -1)
fstat = os.lstat(filename)
assert fstat.st_uid == uid_new
assert fstat.st_gid == gid
gid_new = gid + 1
os.chown(filename, -1, gid_new)
fstat = os.lstat(filename)
assert fstat.st_uid == uid_new
assert fstat.st_gid == gid_new
def tst_open_read(src_dir, mnt_dir):
name = name_generator()
with open(pjoin(src_dir, name), 'wb') as fh_out, \
open(TEST_FILE, 'rb') as fh_in:
shutil.copyfileobj(fh_in, fh_out)
assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False)
def tst_open_write(src_dir, mnt_dir):
name = name_generator()
os_create(pjoin(src_dir, name))
fullname = pjoin(mnt_dir, name)
with open(fullname, 'wb') as fh_out, \
open(TEST_FILE, 'rb') as fh_in:
shutil.copyfileobj(fh_in, fh_out)
assert filecmp.cmp(fullname, TEST_FILE, False)
def tst_append(src_dir, mnt_dir):
name = name_generator()
os_create(pjoin(src_dir, name))
fullname = pjoin(mnt_dir, name)
with os_open(fullname, os.O_WRONLY) as fd:
os.write(fd, b'foo\n')
with os_open(fullname, os.O_WRONLY|os.O_APPEND) as fd:
os.write(fd, b'bar\n')
with open(fullname, 'rb') as fh:
assert fh.read() == b'foo\nbar\n'
def tst_seek(src_dir, mnt_dir):
name = name_generator()
os_create(pjoin(src_dir, name))
fullname = pjoin(mnt_dir, name)
with os_open(fullname, os.O_WRONLY) as fd:
os.lseek(fd, 1, os.SEEK_SET)
os.write(fd, b'foobar\n')
with os_open(fullname, os.O_WRONLY) as fd:
os.lseek(fd, 4, os.SEEK_SET)
os.write(fd, b'com')
with open(fullname, 'rb') as fh:
assert fh.read() == b'\0foocom\n'
def tst_open_unlink(mnt_dir):
name = pjoin(mnt_dir, name_generator())
data1 = b'foo'
data2 = b'bar'
fullname = pjoin(mnt_dir, name)
with open(fullname, 'wb+', buffering=0) as fh:
fh.write(data1)
os.unlink(fullname)
with pytest.raises(OSError) as exc_info:
os.stat(fullname)
assert exc_info.value.errno == errno.ENOENT
assert name not in os.listdir(mnt_dir)
fh.write(data2)
fh.seek(0)
assert fh.read() == data1+data2
def tst_statvfs(mnt_dir):
os.statvfs(mnt_dir)
def tst_link(mnt_dir):
name1 = pjoin(mnt_dir, name_generator())
name2 = pjoin(mnt_dir, name_generator())
shutil.copyfile(TEST_FILE, name1)
assert filecmp.cmp(name1, TEST_FILE, False)
fstat1 = os.lstat(name1)
assert fstat1.st_nlink == 1
os.link(name1, name2)
fstat1 = os.lstat(name1)
fstat2 = os.lstat(name2)
assert fstat1 == fstat2
assert fstat1.st_nlink == 2
assert os.path.basename(name2) in os.listdir(mnt_dir)
assert filecmp.cmp(name1, name2, False)
# Since RELEASE requests are asynchronous, it is possible that
# libfuse still considers the file to be open at this point
# and (since -o hard_remove is not used) renames it instead of
# deleting it. In that case, the following lstat() call will
# still report an st_nlink value of 2 (cf. issue #157).
os.unlink(name2)
assert os.path.basename(name2) not in os.listdir(mnt_dir)
with pytest.raises(FileNotFoundError):
os.lstat(name2)
# See above, we may have to wait until RELEASE has been
# received before the st_nlink value is correct.
maxwait = time.time() + 2
fstat1 = os.lstat(name1)
while fstat1.st_nlink == 2 and time.time() < maxwait:
fstat1 = os.lstat(name1)
time.sleep(0.1)
assert fstat1.st_nlink == 1
os.unlink(name1)
def tst_readdir(src_dir, mnt_dir):
newdir = name_generator()
src_newdir = pjoin(src_dir, newdir)
mnt_newdir = pjoin(mnt_dir, newdir)
file_ = src_newdir + "/" + name_generator()
subdir = src_newdir + "/" + name_generator()
subfile = subdir + "/" + name_generator()
os.mkdir(src_newdir)
shutil.copyfile(TEST_FILE, file_)
os.mkdir(subdir)
shutil.copyfile(TEST_FILE, subfile)
listdir_is = os.listdir(mnt_newdir)
listdir_is.sort()
listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
listdir_should.sort()
assert listdir_is == listdir_should
inodes_is = readdir_inode(mnt_newdir)
inodes_should = readdir_inode(src_newdir)
assert inodes_is == inodes_should
os.unlink(file_)
os.unlink(subfile)
os.rmdir(subdir)
os.rmdir(src_newdir)
def tst_readdir_big(src_dir, mnt_dir):
# Add enough entries so that readdir needs to be called
# multiple times.
fnames = []
for i in range(500):
fname = ('A rather long filename to make sure that we '
'fill up the buffer - ' * 3) + str(i)
with open(pjoin(src_dir, fname), 'w') as fh:
fh.write('File %d' % i)
fnames.append(fname)
listdir_is = sorted(os.listdir(mnt_dir))
listdir_should = sorted(os.listdir(src_dir))
assert listdir_is == listdir_should
inodes_is = readdir_inode(mnt_dir)
inodes_should = readdir_inode(src_dir)
assert inodes_is == inodes_should
for fname in fnames:
stat_src = os.stat(pjoin(src_dir, fname))
stat_mnt = os.stat(pjoin(mnt_dir, fname))
assert stat_src.st_ino == stat_mnt.st_ino
assert stat_src.st_mtime == stat_mnt.st_mtime
assert stat_src.st_ctime == stat_mnt.st_ctime
assert stat_src.st_size == stat_mnt.st_size
os.unlink(pjoin(src_dir, fname))
def tst_truncate_path(mnt_dir):
assert len(TEST_DATA) > 1024
filename = pjoin(mnt_dir, name_generator())
with open(filename, 'wb') as fh:
fh.write(TEST_DATA)
fstat = os.stat(filename)
size = fstat.st_size
assert size == len(TEST_DATA)
# Add zeros at the end
os.truncate(filename, size + 1024)
assert os.stat(filename).st_size == size + 1024
with open(filename, 'rb') as fh:
assert fh.read(size) == TEST_DATA
assert fh.read(1025) == b'\0' * 1024
# Truncate data
os.truncate(filename, size - 1024)
assert os.stat(filename).st_size == size - 1024
with open(filename, 'rb') as fh:
assert fh.read(size) == TEST_DATA[:size-1024]
os.unlink(filename)
def tst_truncate_fd(mnt_dir):
assert len(TEST_DATA) > 1024
with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
fd = fh.fileno()
fh.write(TEST_DATA)
fstat = os.fstat(fd)
size = fstat.st_size
assert size == len(TEST_DATA)
# Add zeros at the end
os.ftruncate(fd, size + 1024)
assert os.fstat(fd).st_size == size + 1024
fh.seek(0)
assert fh.read(size) == TEST_DATA
assert fh.read(1025) == b'\0' * 1024
# Truncate data
os.ftruncate(fd, size - 1024)
assert os.fstat(fd).st_size == size - 1024
fh.seek(0)
assert fh.read(size) == TEST_DATA[:size-1024]
def tst_utimens(mnt_dir, ns_tol=0):
filename = pjoin(mnt_dir, name_generator())
os.mkdir(filename)
fstat = os.lstat(filename)
atime = fstat.st_atime + 42.28
mtime = fstat.st_mtime - 42.23
if sys.version_info < (3,3):
os.utime(filename, (atime, mtime))
else:
atime_ns = fstat.st_atime_ns + int(42.28*1e9)
mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
os.utime(filename, None, ns=(atime_ns, mtime_ns))
fstat = os.lstat(filename)
assert abs(fstat.st_atime - atime) < 1
assert abs(fstat.st_mtime - mtime) < 1
if sys.version_info >= (3,3):
assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol
assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol
def tst_passthrough(src_dir, mnt_dir):
name = name_generator()
src_name = pjoin(src_dir, name)
mnt_name = pjoin(src_dir, name)
assert name not in os.listdir(src_dir)
assert name not in os.listdir(mnt_dir)
with open(src_name, 'w') as fh:
fh.write('Hello, world')
assert name in os.listdir(src_dir)
assert name in os.listdir(mnt_dir)
assert os.stat(src_name) == os.stat(mnt_name)
name = name_generator()
src_name = pjoin(src_dir, name)
mnt_name = pjoin(src_dir, name)
assert name not in os.listdir(src_dir)
assert name not in os.listdir(mnt_dir)
with open(mnt_name, 'w') as fh:
fh.write('Hello, world')
assert name in os.listdir(src_dir)
assert name in os.listdir(mnt_dir)
assert os.stat(src_name) == os.stat(mnt_name)
# avoid warning about unused import
test_printcap