arch-image-builder/builder/lib/loop.py
BigfootACA e6c6ab6bf7 Initial commit
Signed-off-by: BigfootACA <bigfoot@classfun.cn>
2024-05-17 23:04:34 +08:00

230 lines
5.8 KiB
Python

import io
import os
import stat
import fcntl
import ctypes
from builder.lib import utils
LO_NAME_SIZE = 64
LO_KEY_SIZE = 32
LO_FLAGS_READ_ONLY = 1
LO_FLAGS_AUTOCLEAR = 4
LO_FLAGS_PARTSCAN = 8
LO_FLAGS_DIRECT_IO = 16
LO_CRYPT_NONE = 0
LO_CRYPT_XOR = 1
LO_CRYPT_DES = 2
LO_CRYPT_FISH2 = 3
LO_CRYPT_BLOW = 4
LO_CRYPT_CAST128 = 5
LO_CRYPT_IDEA = 6
LO_CRYPT_DUMMY = 9
LO_CRYPT_SKIPJACK = 10
LO_CRYPT_CRYPTOAPI = 18
MAX_LO_CRYPT = 20
LOOP_SET_FD = 0x4C00
LOOP_CLR_FD = 0x4C01
LOOP_SET_STATUS = 0x4C02
LOOP_GET_STATUS = 0x4C03
LOOP_SET_STATUS64 = 0x4C04
LOOP_GET_STATUS64 = 0x4C05
LOOP_CHANGE_FD = 0x4C06
LOOP_SET_CAPACITY = 0x4C07
LOOP_SET_DIRECT_IO = 0x4C08
LOOP_SET_BLOCK_SIZE = 0x4C09
LOOP_CONFIGURE = 0x4C0A
LOOP_CTL_ADD = 0x4C80
LOOP_CTL_REMOVE = 0x4C81
LOOP_CTL_GET_FREE = 0x4C82
LOOP_SET_STATUS_SETTABLE_FLAGS = LO_FLAGS_AUTOCLEAR | LO_FLAGS_PARTSCAN
LOOP_SET_STATUS_CLEARABLE_FLAGS = LO_FLAGS_AUTOCLEAR
LOOP_CONFIGURE_SETTABLE_FLAGS = LO_FLAGS_READ_ONLY | LO_FLAGS_AUTOCLEAR | LO_FLAGS_PARTSCAN | LO_FLAGS_DIRECT_IO
class LoopInfo64(ctypes.Structure):
_fields_ = [
("lo_device", ctypes.c_uint64),
("lo_inode", ctypes.c_uint64),
("lo_rdevice", ctypes.c_uint64),
("lo_offset", ctypes.c_uint64),
("lo_sizelimit", ctypes.c_uint64),
("lo_number", ctypes.c_uint32),
("lo_encrypt_type", ctypes.c_uint32),
("lo_encrypt_key_size", ctypes.c_uint32),
("lo_flags", ctypes.c_uint32),
("lo_file_name", ctypes.c_char * LO_NAME_SIZE),
("lo_crypt_name", ctypes.c_char * LO_NAME_SIZE),
("lo_encrypt_key", ctypes.c_byte * LO_KEY_SIZE),
("lo_init", ctypes.c_uint64 * 2),
]
class LoopConfig(ctypes.Structure):
_fields_ = [
("fd", ctypes.c_uint32),
("block_size", ctypes.c_uint32),
("info", LoopInfo64),
("__reserved", ctypes.c_uint64 * 8),
]
def loop_get_free_no() -> int:
ctrl = os.open("/dev/loop-control", os.O_RDWR)
try:
no = fcntl.ioctl(ctrl, LOOP_CTL_GET_FREE)
if no < 0: raise OSError("LOOP_CTL_GET_FREE failed")
finally: os.close(ctrl)
return no
def loop_get_free() -> str:
no = loop_get_free_no()
return f"/dev/loop{no}"
def loop_create_dev(no: int, dev: str = None) -> str:
if dev is None:
dev = f"/dev/loop{no}"
if not os.path.exists(dev):
if no < 0: raise ValueError("no loop number set")
a_mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IFBLK
a_dev = os.makedev(7, no)
os.mknod(dev, a_mode, a_dev)
return dev
def loop_detach(dev: str):
loop = os.open(dev, os.O_RDWR)
try:
ret = fcntl.ioctl(loop, LOOP_CLR_FD)
if ret != 0: raise OSError(f"detach loop device {dev} failed")
finally: os.close(loop)
def loop_setup(
path: str = None,
fio: io.FileIO = None,
fd: int = -1,
dev: str = None,
no: int = -1,
offset: int = 0,
size: int = 0,
block_size: int = 512,
read_only: bool = False,
part_scan: bool = False,
auto_clear: bool = False,
direct_io: bool = False,
) -> str:
if path is None and fio is None and fd < 0:
raise ValueError("no source file set")
if no < 0:
if dev is None:
dev = loop_get_free()
else:
fn = os.path.basename(dev)
if fn.startswith("loop"): no = int(fn[4:])
loop_create_dev(no=no, dev=dev)
opened, loop = -1, -1
if fio:
if fd < 0: fd = fio.fileno()
if path is None: path = fio.name
elif fd >= 0:
if path is None: path = utils.fd_get_path(fd)
if path is None: raise OSError("bad fd for loop")
elif path:
path = os.path.realpath(path)
opened = os.open(path, os.O_RDWR)
if opened < 0: raise OSError(f"open {path} failed")
fd = opened
else: raise ValueError("no source file set")
flags = 0
if part_scan: flags |= LO_FLAGS_PARTSCAN
if direct_io: flags |= LO_FLAGS_DIRECT_IO
if read_only: flags |= LO_FLAGS_READ_ONLY
if auto_clear: flags |= LO_FLAGS_AUTOCLEAR
try:
file_name = path[0:63].encode()
li = LoopInfo64(
lo_flags=flags,
lo_offset=offset,
lo_sizelimit=size,
lo_file_name=file_name,
)
lc = LoopConfig(fd=fd, block_size=block_size, info=li)
loop = os.open(dev, os.O_RDWR)
if loop < 0: raise OSError(f"open loop device {dev} failed")
ret = fcntl.ioctl(loop, LOOP_CONFIGURE, lc)
if ret != 0: raise OSError(f"configure loop device {dev} with {path} failed")
finally:
if loop >= 0: os.close(loop)
if opened >= 0: os.close(opened)
return dev
def loop_get_sysfs(dev: str) -> str:
st = os.stat(dev)
if not stat.S_ISBLK(st.st_mode):
raise ValueError(f"device {dev} is not block")
major = os.major(st.st_rdev)
minor = os.minor(st.st_rdev)
if major != 7:
raise ValueError(f"device {dev} is not loop")
sysfs = f"/sys/dev/block/{major}:{minor}"
if not os.path.exists(sysfs):
raise RuntimeError("get sysfs failed")
return sysfs
def loop_get_backing(dev: str) -> str:
sysfs = loop_get_sysfs(dev)
path = os.path.join(sysfs, "loop", "backing_file")
with open(path, "r") as f:
backing = f.read()
return os.path.realpath(backing.strip())
def loop_get_offset(dev: str) -> int:
sysfs = loop_get_sysfs(dev)
path = os.path.join(sysfs, "loop", "offset")
with open(path, "r") as f:
backing = f.read()
return int(backing.strip())
class LoopDevice:
device: str
def __init__(
self,
path: str = None,
fio: io.FileIO = None,
fd: int = -1,
dev: str = None,
no: int = -1,
offset: int = 0,
size: int = 0,
block_size: int = 512,
read_only: bool = False,
part_scan: bool = False,
auto_clear: bool = False,
direct_io: bool = False,
):
self.device = loop_setup(
path=path,
fio=fio,
fd=fd,
dev=dev,
no=no,
offset=offset,
size=size,
block_size=block_size,
read_only=read_only,
part_scan=part_scan,
auto_clear=auto_clear,
direct_io=direct_io,
)
def __del__(self):
loop_detach(self.device)