arch-image-builder/builder/disk/layout/layout.py
BigfootACA e6c6ab6bf7 Initial commit
Signed-off-by: BigfootACA <bigfoot@classfun.cn>
2024-05-17 23:04:34 +08:00

141 lines
3.8 KiB
Python

from math import ceil
from io import RawIOBase
from logging import getLogger
from builder.lib.utils import size_to_bytes
from builder.lib.serializable import SerializableDict
from builder.lib.area import Area
from builder.disk.layout.dio import DiskIO
from builder.disk.layout.area import DiskArea
from builder.disk.layout.part import DiskPart
log = getLogger(__name__)
class DiskLayout(DiskIO, DiskArea, SerializableDict):
partitions: list[DiskPart]
loaded: bool
@property
def id(self) -> str: pass
@id.setter
def id(self, val: str): pass
def create(self): pass
def reload(self): pass
def unload(self): pass
def save(self): pass
def set_from(self, config: dict): pass
def size_to_bytes(self, value: str | int, alt_units: dict = None) -> int:
units = {
"s": self.sector,
"sector": self.sector,
"sectors": self.sector
}
if alt_units:
units.update(alt_units)
return size_to_bytes(value, units)
def size_to_sectors(self, value: str | int, alt_units: dict = None) -> int:
ret = self.size_to_bytes(value, alt_units)
return ceil(ret / self.sector)
def _parse_area(self, config: dict) -> Area:
start, end, size = -1, -1, -1
if "start" in config: start = self.size_to_sectors(config["start"])
if "offset" in config: start = self.size_to_sectors(config["offset"])
if "end" in config: end = self.size_to_sectors(config["end"])
if "size" in config: size = self.size_to_sectors(config["size"])
if "length" in config: size = self.size_to_sectors(config["length"])
if "start_lba" in config: start = config["start_lba"]
if "offset_lba" in config: start = config["offset_lba"]
if "end_lba" in config: end = config["end_lba"]
if "size_lba" in config: size = config["size_lba"]
if "length_lba" in config: size = config["length_lba"]
return Area(start, end, size)
def parse_area(self, config: dict) -> Area:
area = self._parse_area(config)
area.fixup()
return area
def parse_free_area(self, config: dict) -> Area:
return self.find_free_area(area=self._parse_area(config))
def resort_partitions(self):
self.partitions.sort(key=lambda p: p.start_lba)
idx = 0
for part in self.partitions:
part.idx = idx
idx += 1
def add_partition_from(self, config: dict) -> DiskPart:
area = self.parse_free_area(config)
if area is None: raise ValueError("no free area found")
ptype = config["ptype"] if "ptype" in config else "linux"
return self.add_partition(ptype, area=area)
def del_partition(self, part: DiskPart):
if part not in self.partitions:
if part.layout == self:
raise ValueError("removed partition")
raise KeyError("partition not found")
self.partitions.remove(part)
def add_partition(
self,
ptype: str = None,
start: int = -1,
end: int = -1,
size: int = -1,
area: Area = None,
) -> DiskPart: pass
def __init__(
self,
fp: RawIOBase = None,
path: str = None,
sector: int = 512
):
DiskIO.__init__(self)
self.partitions = []
self.loaded = False
self._sector = sector
if sector < self._min_sector:
raise ValueError("bad sector size")
if fp: self._fp = fp
elif path:
self._fp = open(path, "wb+")
self._opened = True
else: raise ValueError("no I/O interface")
def __del__(self):
if self._opened: self._fp.close()
def __len__(self) -> int:
return len(self.partitions)
def __setitem__(self, key: int, value: DiskPart):
self.resort_partitions()
self.partitions[key] = value
def __getitem__(self, key: int) -> DiskPart:
self.resort_partitions()
return self.partitions[key]
def __delitem__(self, key: int):
self.resort_partitions()
self.del_partition(self.partitions[key])
self.resort_partitions()
def __iadd__(self, value: DiskPart) -> DiskPart:
self.resort_partitions()
value.idx = len(self.partitions)
self.partitions += value
self.resort_partitions()
return value