mirror of
https://github.com/python/cpython.git
synced 2024-11-27 11:55:13 +08:00
526 lines
18 KiB
Python
526 lines
18 KiB
Python
"""Stuff to parse Sun and NeXT audio files.
|
|
|
|
An audio file consists of a header followed by the data. The structure
|
|
of the header is as follows.
|
|
|
|
+---------------+
|
|
| magic word |
|
|
+---------------+
|
|
| header size |
|
|
+---------------+
|
|
| data size |
|
|
+---------------+
|
|
| encoding |
|
|
+---------------+
|
|
| sample rate |
|
|
+---------------+
|
|
| # of channels |
|
|
+---------------+
|
|
| info |
|
|
| |
|
|
+---------------+
|
|
|
|
The magic word consists of the 4 characters '.snd'. Apart from the
|
|
info field, all header fields are 4 bytes in size. They are all
|
|
32-bit unsigned integers encoded in big-endian byte order.
|
|
|
|
The header size really gives the start of the data.
|
|
The data size is the physical size of the data. From the other
|
|
parameters the number of frames can be calculated.
|
|
The encoding gives the way in which audio samples are encoded.
|
|
Possible values are listed below.
|
|
The info field currently consists of an ASCII string giving a
|
|
human-readable description of the audio file. The info field is
|
|
padded with NUL bytes to the header size.
|
|
|
|
Usage.
|
|
|
|
Reading audio files:
|
|
f = sunau.open(file, 'r')
|
|
where file is either the name of a file or an open file pointer.
|
|
The open file pointer must have methods read(), seek(), and close().
|
|
When the setpos() and rewind() methods are not used, the seek()
|
|
method is not necessary.
|
|
|
|
This returns an instance of a class with the following public methods:
|
|
getnchannels() -- returns number of audio channels (1 for
|
|
mono, 2 for stereo)
|
|
getsampwidth() -- returns sample width in bytes
|
|
getframerate() -- returns sampling frequency
|
|
getnframes() -- returns number of audio frames
|
|
getcomptype() -- returns compression type ('NONE' or 'ULAW')
|
|
getcompname() -- returns human-readable version of
|
|
compression type ('not compressed' matches 'NONE')
|
|
getparams() -- returns a namedtuple consisting of all of the
|
|
above in the above order
|
|
getmarkers() -- returns None (for compatibility with the
|
|
aifc module)
|
|
getmark(id) -- raises an error since the mark does not
|
|
exist (for compatibility with the aifc module)
|
|
readframes(n) -- returns at most n frames of audio
|
|
rewind() -- rewind to the beginning of the audio stream
|
|
setpos(pos) -- seek to the specified position
|
|
tell() -- return the current position
|
|
close() -- close the instance (make it unusable)
|
|
The position returned by tell() and the position given to setpos()
|
|
are compatible and have nothing to do with the actual position in the
|
|
file.
|
|
The close() method is called automatically when the class instance
|
|
is destroyed.
|
|
|
|
Writing audio files:
|
|
f = sunau.open(file, 'w')
|
|
where file is either the name of a file or an open file pointer.
|
|
The open file pointer must have methods write(), tell(), seek(), and
|
|
close().
|
|
|
|
This returns an instance of a class with the following public methods:
|
|
setnchannels(n) -- set the number of channels
|
|
setsampwidth(n) -- set the sample width
|
|
setframerate(n) -- set the frame rate
|
|
setnframes(n) -- set the number of frames
|
|
setcomptype(type, name)
|
|
-- set the compression type and the
|
|
human-readable compression type
|
|
setparams(tuple)-- set all parameters at once
|
|
tell() -- return current position in output file
|
|
writeframesraw(data)
|
|
-- write audio frames without pathing up the
|
|
file header
|
|
writeframes(data)
|
|
-- write audio frames and patch up the file header
|
|
close() -- patch up the file header and close the
|
|
output file
|
|
You should set the parameters before the first writeframesraw or
|
|
writeframes. The total number of frames does not need to be set,
|
|
but when it is set to the correct value, the header does not have to
|
|
be patched up.
|
|
It is best to first set all parameters, perhaps possibly the
|
|
compression type, and then write audio frames using writeframesraw.
|
|
When all frames have been written, either call writeframes(b'') or
|
|
close() to patch up the sizes in the header.
|
|
The close() method is called automatically when the class instance
|
|
is destroyed.
|
|
"""
|
|
|
|
from collections import namedtuple
|
|
|
|
_sunau_params = namedtuple('_sunau_params',
|
|
'nchannels sampwidth framerate nframes comptype compname')
|
|
|
|
# from <multimedia/audio_filehdr.h>
|
|
AUDIO_FILE_MAGIC = 0x2e736e64
|
|
AUDIO_FILE_ENCODING_MULAW_8 = 1
|
|
AUDIO_FILE_ENCODING_LINEAR_8 = 2
|
|
AUDIO_FILE_ENCODING_LINEAR_16 = 3
|
|
AUDIO_FILE_ENCODING_LINEAR_24 = 4
|
|
AUDIO_FILE_ENCODING_LINEAR_32 = 5
|
|
AUDIO_FILE_ENCODING_FLOAT = 6
|
|
AUDIO_FILE_ENCODING_DOUBLE = 7
|
|
AUDIO_FILE_ENCODING_ADPCM_G721 = 23
|
|
AUDIO_FILE_ENCODING_ADPCM_G722 = 24
|
|
AUDIO_FILE_ENCODING_ADPCM_G723_3 = 25
|
|
AUDIO_FILE_ENCODING_ADPCM_G723_5 = 26
|
|
AUDIO_FILE_ENCODING_ALAW_8 = 27
|
|
|
|
# from <multimedia/audio_hdr.h>
|
|
AUDIO_UNKNOWN_SIZE = 0xFFFFFFFF # ((unsigned)(~0))
|
|
|
|
_simple_encodings = [AUDIO_FILE_ENCODING_MULAW_8,
|
|
AUDIO_FILE_ENCODING_LINEAR_8,
|
|
AUDIO_FILE_ENCODING_LINEAR_16,
|
|
AUDIO_FILE_ENCODING_LINEAR_24,
|
|
AUDIO_FILE_ENCODING_LINEAR_32,
|
|
AUDIO_FILE_ENCODING_ALAW_8]
|
|
|
|
class Error(Exception):
|
|
pass
|
|
|
|
def _read_u32(file):
|
|
x = 0
|
|
for i in range(4):
|
|
byte = file.read(1)
|
|
if not byte:
|
|
raise EOFError
|
|
x = x*256 + ord(byte)
|
|
return x
|
|
|
|
def _write_u32(file, x):
|
|
data = []
|
|
for i in range(4):
|
|
d, m = divmod(x, 256)
|
|
data.insert(0, int(m))
|
|
x = d
|
|
file.write(bytes(data))
|
|
|
|
class Au_read:
|
|
|
|
def __init__(self, f):
|
|
if type(f) == type(''):
|
|
import builtins
|
|
f = builtins.open(f, 'rb')
|
|
self._opened = True
|
|
else:
|
|
self._opened = False
|
|
self.initfp(f)
|
|
|
|
def __del__(self):
|
|
if self._file:
|
|
self.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self.close()
|
|
|
|
def initfp(self, file):
|
|
self._file = file
|
|
self._soundpos = 0
|
|
magic = int(_read_u32(file))
|
|
if magic != AUDIO_FILE_MAGIC:
|
|
raise Error('bad magic number')
|
|
self._hdr_size = int(_read_u32(file))
|
|
if self._hdr_size < 24:
|
|
raise Error('header size too small')
|
|
if self._hdr_size > 100:
|
|
raise Error('header size ridiculously large')
|
|
self._data_size = _read_u32(file)
|
|
if self._data_size != AUDIO_UNKNOWN_SIZE:
|
|
self._data_size = int(self._data_size)
|
|
self._encoding = int(_read_u32(file))
|
|
if self._encoding not in _simple_encodings:
|
|
raise Error('encoding not (yet) supported')
|
|
if self._encoding in (AUDIO_FILE_ENCODING_MULAW_8,
|
|
AUDIO_FILE_ENCODING_ALAW_8):
|
|
self._sampwidth = 2
|
|
self._framesize = 1
|
|
elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_8:
|
|
self._framesize = self._sampwidth = 1
|
|
elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_16:
|
|
self._framesize = self._sampwidth = 2
|
|
elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_24:
|
|
self._framesize = self._sampwidth = 3
|
|
elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_32:
|
|
self._framesize = self._sampwidth = 4
|
|
else:
|
|
raise Error('unknown encoding')
|
|
self._framerate = int(_read_u32(file))
|
|
self._nchannels = int(_read_u32(file))
|
|
self._framesize = self._framesize * self._nchannels
|
|
if self._hdr_size > 24:
|
|
self._info = file.read(self._hdr_size - 24)
|
|
self._info, _, _ = self._info.partition(b'\0')
|
|
else:
|
|
self._info = b''
|
|
try:
|
|
self._data_pos = file.tell()
|
|
except (AttributeError, OSError):
|
|
self._data_pos = None
|
|
|
|
def getfp(self):
|
|
return self._file
|
|
|
|
def getnchannels(self):
|
|
return self._nchannels
|
|
|
|
def getsampwidth(self):
|
|
return self._sampwidth
|
|
|
|
def getframerate(self):
|
|
return self._framerate
|
|
|
|
def getnframes(self):
|
|
if self._data_size == AUDIO_UNKNOWN_SIZE:
|
|
return AUDIO_UNKNOWN_SIZE
|
|
if self._encoding in _simple_encodings:
|
|
return self._data_size // self._framesize
|
|
return 0 # XXX--must do some arithmetic here
|
|
|
|
def getcomptype(self):
|
|
if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
|
|
return 'ULAW'
|
|
elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8:
|
|
return 'ALAW'
|
|
else:
|
|
return 'NONE'
|
|
|
|
def getcompname(self):
|
|
if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
|
|
return 'CCITT G.711 u-law'
|
|
elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8:
|
|
return 'CCITT G.711 A-law'
|
|
else:
|
|
return 'not compressed'
|
|
|
|
def getparams(self):
|
|
return _sunau_params(self.getnchannels(), self.getsampwidth(),
|
|
self.getframerate(), self.getnframes(),
|
|
self.getcomptype(), self.getcompname())
|
|
|
|
def getmarkers(self):
|
|
return None
|
|
|
|
def getmark(self, id):
|
|
raise Error('no marks')
|
|
|
|
def readframes(self, nframes):
|
|
if self._encoding in _simple_encodings:
|
|
if nframes == AUDIO_UNKNOWN_SIZE:
|
|
data = self._file.read()
|
|
else:
|
|
data = self._file.read(nframes * self._framesize)
|
|
self._soundpos += len(data) // self._framesize
|
|
if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
|
|
import audioop
|
|
data = audioop.ulaw2lin(data, self._sampwidth)
|
|
return data
|
|
return None # XXX--not implemented yet
|
|
|
|
def rewind(self):
|
|
if self._data_pos is None:
|
|
raise OSError('cannot seek')
|
|
self._file.seek(self._data_pos)
|
|
self._soundpos = 0
|
|
|
|
def tell(self):
|
|
return self._soundpos
|
|
|
|
def setpos(self, pos):
|
|
if pos < 0 or pos > self.getnframes():
|
|
raise Error('position not in range')
|
|
if self._data_pos is None:
|
|
raise OSError('cannot seek')
|
|
self._file.seek(self._data_pos + pos * self._framesize)
|
|
self._soundpos = pos
|
|
|
|
def close(self):
|
|
file = self._file
|
|
if file:
|
|
self._file = None
|
|
if self._opened:
|
|
file.close()
|
|
|
|
class Au_write:
|
|
|
|
def __init__(self, f):
|
|
if type(f) == type(''):
|
|
import builtins
|
|
f = builtins.open(f, 'wb')
|
|
self._opened = True
|
|
else:
|
|
self._opened = False
|
|
self.initfp(f)
|
|
|
|
def __del__(self):
|
|
if self._file:
|
|
self.close()
|
|
self._file = None
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self.close()
|
|
|
|
def initfp(self, file):
|
|
self._file = file
|
|
self._framerate = 0
|
|
self._nchannels = 0
|
|
self._sampwidth = 0
|
|
self._framesize = 0
|
|
self._nframes = AUDIO_UNKNOWN_SIZE
|
|
self._nframeswritten = 0
|
|
self._datawritten = 0
|
|
self._datalength = 0
|
|
self._info = b''
|
|
self._comptype = 'ULAW' # default is U-law
|
|
|
|
def setnchannels(self, nchannels):
|
|
if self._nframeswritten:
|
|
raise Error('cannot change parameters after starting to write')
|
|
if nchannels not in (1, 2, 4):
|
|
raise Error('only 1, 2, or 4 channels supported')
|
|
self._nchannels = nchannels
|
|
|
|
def getnchannels(self):
|
|
if not self._nchannels:
|
|
raise Error('number of channels not set')
|
|
return self._nchannels
|
|
|
|
def setsampwidth(self, sampwidth):
|
|
if self._nframeswritten:
|
|
raise Error('cannot change parameters after starting to write')
|
|
if sampwidth not in (1, 2, 3, 4):
|
|
raise Error('bad sample width')
|
|
self._sampwidth = sampwidth
|
|
|
|
def getsampwidth(self):
|
|
if not self._framerate:
|
|
raise Error('sample width not specified')
|
|
return self._sampwidth
|
|
|
|
def setframerate(self, framerate):
|
|
if self._nframeswritten:
|
|
raise Error('cannot change parameters after starting to write')
|
|
self._framerate = framerate
|
|
|
|
def getframerate(self):
|
|
if not self._framerate:
|
|
raise Error('frame rate not set')
|
|
return self._framerate
|
|
|
|
def setnframes(self, nframes):
|
|
if self._nframeswritten:
|
|
raise Error('cannot change parameters after starting to write')
|
|
if nframes < 0:
|
|
raise Error('# of frames cannot be negative')
|
|
self._nframes = nframes
|
|
|
|
def getnframes(self):
|
|
return self._nframeswritten
|
|
|
|
def setcomptype(self, type, name):
|
|
if type in ('NONE', 'ULAW'):
|
|
self._comptype = type
|
|
else:
|
|
raise Error('unknown compression type')
|
|
|
|
def getcomptype(self):
|
|
return self._comptype
|
|
|
|
def getcompname(self):
|
|
if self._comptype == 'ULAW':
|
|
return 'CCITT G.711 u-law'
|
|
elif self._comptype == 'ALAW':
|
|
return 'CCITT G.711 A-law'
|
|
else:
|
|
return 'not compressed'
|
|
|
|
def setparams(self, params):
|
|
nchannels, sampwidth, framerate, nframes, comptype, compname = params
|
|
self.setnchannels(nchannels)
|
|
self.setsampwidth(sampwidth)
|
|
self.setframerate(framerate)
|
|
self.setnframes(nframes)
|
|
self.setcomptype(comptype, compname)
|
|
|
|
def getparams(self):
|
|
return _sunau_params(self.getnchannels(), self.getsampwidth(),
|
|
self.getframerate(), self.getnframes(),
|
|
self.getcomptype(), self.getcompname())
|
|
|
|
def tell(self):
|
|
return self._nframeswritten
|
|
|
|
def writeframesraw(self, data):
|
|
if not isinstance(data, (bytes, bytearray)):
|
|
data = memoryview(data).cast('B')
|
|
self._ensure_header_written()
|
|
if self._comptype == 'ULAW':
|
|
import audioop
|
|
data = audioop.lin2ulaw(data, self._sampwidth)
|
|
nframes = len(data) // self._framesize
|
|
self._file.write(data)
|
|
self._nframeswritten = self._nframeswritten + nframes
|
|
self._datawritten = self._datawritten + len(data)
|
|
|
|
def writeframes(self, data):
|
|
self.writeframesraw(data)
|
|
if self._nframeswritten != self._nframes or \
|
|
self._datalength != self._datawritten:
|
|
self._patchheader()
|
|
|
|
def close(self):
|
|
if self._file:
|
|
try:
|
|
self._ensure_header_written()
|
|
if self._nframeswritten != self._nframes or \
|
|
self._datalength != self._datawritten:
|
|
self._patchheader()
|
|
self._file.flush()
|
|
finally:
|
|
file = self._file
|
|
self._file = None
|
|
if self._opened:
|
|
file.close()
|
|
|
|
#
|
|
# private methods
|
|
#
|
|
|
|
def _ensure_header_written(self):
|
|
if not self._nframeswritten:
|
|
if not self._nchannels:
|
|
raise Error('# of channels not specified')
|
|
if not self._sampwidth:
|
|
raise Error('sample width not specified')
|
|
if not self._framerate:
|
|
raise Error('frame rate not specified')
|
|
self._write_header()
|
|
|
|
def _write_header(self):
|
|
if self._comptype == 'NONE':
|
|
if self._sampwidth == 1:
|
|
encoding = AUDIO_FILE_ENCODING_LINEAR_8
|
|
self._framesize = 1
|
|
elif self._sampwidth == 2:
|
|
encoding = AUDIO_FILE_ENCODING_LINEAR_16
|
|
self._framesize = 2
|
|
elif self._sampwidth == 3:
|
|
encoding = AUDIO_FILE_ENCODING_LINEAR_24
|
|
self._framesize = 3
|
|
elif self._sampwidth == 4:
|
|
encoding = AUDIO_FILE_ENCODING_LINEAR_32
|
|
self._framesize = 4
|
|
else:
|
|
raise Error('internal error')
|
|
elif self._comptype == 'ULAW':
|
|
encoding = AUDIO_FILE_ENCODING_MULAW_8
|
|
self._framesize = 1
|
|
else:
|
|
raise Error('internal error')
|
|
self._framesize = self._framesize * self._nchannels
|
|
_write_u32(self._file, AUDIO_FILE_MAGIC)
|
|
header_size = 25 + len(self._info)
|
|
header_size = (header_size + 7) & ~7
|
|
_write_u32(self._file, header_size)
|
|
if self._nframes == AUDIO_UNKNOWN_SIZE:
|
|
length = AUDIO_UNKNOWN_SIZE
|
|
else:
|
|
length = self._nframes * self._framesize
|
|
try:
|
|
self._form_length_pos = self._file.tell()
|
|
except (AttributeError, OSError):
|
|
self._form_length_pos = None
|
|
_write_u32(self._file, length)
|
|
self._datalength = length
|
|
_write_u32(self._file, encoding)
|
|
_write_u32(self._file, self._framerate)
|
|
_write_u32(self._file, self._nchannels)
|
|
self._file.write(self._info)
|
|
self._file.write(b'\0'*(header_size - len(self._info) - 24))
|
|
|
|
def _patchheader(self):
|
|
if self._form_length_pos is None:
|
|
raise OSError('cannot seek')
|
|
self._file.seek(self._form_length_pos)
|
|
_write_u32(self._file, self._datawritten)
|
|
self._datalength = self._datawritten
|
|
self._file.seek(0, 2)
|
|
|
|
def open(f, mode=None):
|
|
if mode is None:
|
|
if hasattr(f, 'mode'):
|
|
mode = f.mode
|
|
else:
|
|
mode = 'rb'
|
|
if mode in ('r', 'rb'):
|
|
return Au_read(f)
|
|
elif mode in ('w', 'wb'):
|
|
return Au_write(f)
|
|
else:
|
|
raise Error("mode must be 'r', 'rb', 'w', or 'wb'")
|
|
|
|
openfp = open
|