mirror of
https://github.com/BigfootACA/arch-image-builder.git
synced 2024-11-11 07:57:53 +08:00
230 lines
5.8 KiB
Python
230 lines
5.8 KiB
Python
import io
|
|
import os
|
|
import stat
|
|
import fcntl
|
|
import ctypes
|
|
from builder.lib import utils
|
|
|
|
|
|
LO_NAME_SIZE = 64
|
|
LO_KEY_SIZE = 32
|
|
LO_FLAGS_READ_ONLY = 1
|
|
LO_FLAGS_AUTOCLEAR = 4
|
|
LO_FLAGS_PARTSCAN = 8
|
|
LO_FLAGS_DIRECT_IO = 16
|
|
LO_CRYPT_NONE = 0
|
|
LO_CRYPT_XOR = 1
|
|
LO_CRYPT_DES = 2
|
|
LO_CRYPT_FISH2 = 3
|
|
LO_CRYPT_BLOW = 4
|
|
LO_CRYPT_CAST128 = 5
|
|
LO_CRYPT_IDEA = 6
|
|
LO_CRYPT_DUMMY = 9
|
|
LO_CRYPT_SKIPJACK = 10
|
|
LO_CRYPT_CRYPTOAPI = 18
|
|
MAX_LO_CRYPT = 20
|
|
LOOP_SET_FD = 0x4C00
|
|
LOOP_CLR_FD = 0x4C01
|
|
LOOP_SET_STATUS = 0x4C02
|
|
LOOP_GET_STATUS = 0x4C03
|
|
LOOP_SET_STATUS64 = 0x4C04
|
|
LOOP_GET_STATUS64 = 0x4C05
|
|
LOOP_CHANGE_FD = 0x4C06
|
|
LOOP_SET_CAPACITY = 0x4C07
|
|
LOOP_SET_DIRECT_IO = 0x4C08
|
|
LOOP_SET_BLOCK_SIZE = 0x4C09
|
|
LOOP_CONFIGURE = 0x4C0A
|
|
LOOP_CTL_ADD = 0x4C80
|
|
LOOP_CTL_REMOVE = 0x4C81
|
|
LOOP_CTL_GET_FREE = 0x4C82
|
|
LOOP_SET_STATUS_SETTABLE_FLAGS = LO_FLAGS_AUTOCLEAR | LO_FLAGS_PARTSCAN
|
|
LOOP_SET_STATUS_CLEARABLE_FLAGS = LO_FLAGS_AUTOCLEAR
|
|
LOOP_CONFIGURE_SETTABLE_FLAGS = LO_FLAGS_READ_ONLY | LO_FLAGS_AUTOCLEAR | LO_FLAGS_PARTSCAN | LO_FLAGS_DIRECT_IO
|
|
|
|
|
|
class LoopInfo64(ctypes.Structure):
|
|
_fields_ = [
|
|
("lo_device", ctypes.c_uint64),
|
|
("lo_inode", ctypes.c_uint64),
|
|
("lo_rdevice", ctypes.c_uint64),
|
|
("lo_offset", ctypes.c_uint64),
|
|
("lo_sizelimit", ctypes.c_uint64),
|
|
("lo_number", ctypes.c_uint32),
|
|
("lo_encrypt_type", ctypes.c_uint32),
|
|
("lo_encrypt_key_size", ctypes.c_uint32),
|
|
("lo_flags", ctypes.c_uint32),
|
|
("lo_file_name", ctypes.c_char * LO_NAME_SIZE),
|
|
("lo_crypt_name", ctypes.c_char * LO_NAME_SIZE),
|
|
("lo_encrypt_key", ctypes.c_byte * LO_KEY_SIZE),
|
|
("lo_init", ctypes.c_uint64 * 2),
|
|
]
|
|
|
|
|
|
class LoopConfig(ctypes.Structure):
|
|
_fields_ = [
|
|
("fd", ctypes.c_uint32),
|
|
("block_size", ctypes.c_uint32),
|
|
("info", LoopInfo64),
|
|
("__reserved", ctypes.c_uint64 * 8),
|
|
]
|
|
|
|
|
|
def loop_get_free_no() -> int:
|
|
ctrl = os.open("/dev/loop-control", os.O_RDWR)
|
|
try:
|
|
no = fcntl.ioctl(ctrl, LOOP_CTL_GET_FREE)
|
|
if no < 0: raise OSError("LOOP_CTL_GET_FREE failed")
|
|
finally: os.close(ctrl)
|
|
return no
|
|
|
|
|
|
def loop_get_free() -> str:
|
|
no = loop_get_free_no()
|
|
return f"/dev/loop{no}"
|
|
|
|
|
|
def loop_create_dev(no: int, dev: str = None) -> str:
|
|
if dev is None:
|
|
dev = f"/dev/loop{no}"
|
|
if not os.path.exists(dev):
|
|
if no < 0: raise ValueError("no loop number set")
|
|
a_mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IFBLK
|
|
a_dev = os.makedev(7, no)
|
|
os.mknod(dev, a_mode, a_dev)
|
|
return dev
|
|
|
|
|
|
def loop_detach(dev: str):
|
|
loop = os.open(dev, os.O_RDWR)
|
|
try:
|
|
ret = fcntl.ioctl(loop, LOOP_CLR_FD)
|
|
if ret != 0: raise OSError(f"detach loop device {dev} failed")
|
|
finally: os.close(loop)
|
|
|
|
|
|
def loop_setup(
|
|
path: str = None,
|
|
fio: io.FileIO = None,
|
|
fd: int = -1,
|
|
dev: str = None,
|
|
no: int = -1,
|
|
offset: int = 0,
|
|
size: int = 0,
|
|
block_size: int = 512,
|
|
read_only: bool = False,
|
|
part_scan: bool = False,
|
|
auto_clear: bool = False,
|
|
direct_io: bool = False,
|
|
) -> str:
|
|
if path is None and fio is None and fd < 0:
|
|
raise ValueError("no source file set")
|
|
if no < 0:
|
|
if dev is None:
|
|
dev = loop_get_free()
|
|
else:
|
|
fn = os.path.basename(dev)
|
|
if fn.startswith("loop"): no = int(fn[4:])
|
|
loop_create_dev(no=no, dev=dev)
|
|
opened, loop = -1, -1
|
|
if fio:
|
|
if fd < 0: fd = fio.fileno()
|
|
if path is None: path = fio.name
|
|
elif fd >= 0:
|
|
if path is None: path = utils.fd_get_path(fd)
|
|
if path is None: raise OSError("bad fd for loop")
|
|
elif path:
|
|
path = os.path.realpath(path)
|
|
opened = os.open(path, os.O_RDWR)
|
|
if opened < 0: raise OSError(f"open {path} failed")
|
|
fd = opened
|
|
else: raise ValueError("no source file set")
|
|
flags = 0
|
|
if part_scan: flags |= LO_FLAGS_PARTSCAN
|
|
if direct_io: flags |= LO_FLAGS_DIRECT_IO
|
|
if read_only: flags |= LO_FLAGS_READ_ONLY
|
|
if auto_clear: flags |= LO_FLAGS_AUTOCLEAR
|
|
try:
|
|
file_name = path[0:63].encode()
|
|
li = LoopInfo64(
|
|
lo_flags=flags,
|
|
lo_offset=offset,
|
|
lo_sizelimit=size,
|
|
lo_file_name=file_name,
|
|
)
|
|
lc = LoopConfig(fd=fd, block_size=block_size, info=li)
|
|
loop = os.open(dev, os.O_RDWR)
|
|
if loop < 0: raise OSError(f"open loop device {dev} failed")
|
|
ret = fcntl.ioctl(loop, LOOP_CONFIGURE, lc)
|
|
if ret != 0: raise OSError(f"configure loop device {dev} with {path} failed")
|
|
finally:
|
|
if loop >= 0: os.close(loop)
|
|
if opened >= 0: os.close(opened)
|
|
return dev
|
|
|
|
|
|
def loop_get_sysfs(dev: str) -> str:
|
|
st = os.stat(dev)
|
|
if not stat.S_ISBLK(st.st_mode):
|
|
raise ValueError(f"device {dev} is not block")
|
|
major = os.major(st.st_rdev)
|
|
minor = os.minor(st.st_rdev)
|
|
if major != 7:
|
|
raise ValueError(f"device {dev} is not loop")
|
|
sysfs = f"/sys/dev/block/{major}:{minor}"
|
|
if not os.path.exists(sysfs):
|
|
raise RuntimeError("get sysfs failed")
|
|
return sysfs
|
|
|
|
|
|
def loop_get_backing(dev: str) -> str:
|
|
sysfs = loop_get_sysfs(dev)
|
|
path = os.path.join(sysfs, "loop", "backing_file")
|
|
with open(path, "r") as f:
|
|
backing = f.read()
|
|
return os.path.realpath(backing.strip())
|
|
|
|
|
|
def loop_get_offset(dev: str) -> int:
|
|
sysfs = loop_get_sysfs(dev)
|
|
path = os.path.join(sysfs, "loop", "offset")
|
|
with open(path, "r") as f:
|
|
backing = f.read()
|
|
return int(backing.strip())
|
|
|
|
|
|
class LoopDevice:
|
|
device: str
|
|
|
|
def __init__(
|
|
self,
|
|
path: str = None,
|
|
fio: io.FileIO = None,
|
|
fd: int = -1,
|
|
dev: str = None,
|
|
no: int = -1,
|
|
offset: int = 0,
|
|
size: int = 0,
|
|
block_size: int = 512,
|
|
read_only: bool = False,
|
|
part_scan: bool = False,
|
|
auto_clear: bool = False,
|
|
direct_io: bool = False,
|
|
):
|
|
self.device = loop_setup(
|
|
path=path,
|
|
fio=fio,
|
|
fd=fd,
|
|
dev=dev,
|
|
no=no,
|
|
offset=offset,
|
|
size=size,
|
|
block_size=block_size,
|
|
read_only=read_only,
|
|
part_scan=part_scan,
|
|
auto_clear=auto_clear,
|
|
direct_io=direct_io,
|
|
)
|
|
|
|
def __del__(self):
|
|
loop_detach(self.device)
|