""" Module providing methods for reading Dustforce binary formats including
level files.
"""
import io
from typing import Any, Dict, List, Tuple
import zlib
from . import replay
from .bitio import BitIOReader
from .entity import Entity
from .level import Level, LevelType
from .exceptions import LevelParseException
from .prop import Prop
from .replay import Replay
from .tile import Tile, TileShape
from .variable import (
Variable,
VariableArray,
VariableBool,
VariableFloat,
VariableInt,
VariableString,
VariableStruct,
VariableType,
VariableUInt,
VariableVec2,
)
[docs]class DFReader(BitIOReader):
"""Helper class to read Dustforce binary files"""
[docs] def read_expect(self, data: bytes) -> None:
"""Ensure the next bytes match `data`
Raises:
LevelParseException: If the read bytes do not match `data`.
"""
if data != self.read_bytes(len(data)):
raise LevelParseException("unexpected data")
[docs] def read_float(self, ibits: int, fbits: int) -> float:
"""Read a float in the Dustforce format
Arguments:
ibits (int): Number of integer bits
fbits (int): Number of fractional bits
"""
sign = 1 - 2 * self.read(1)
ipart = self.read(ibits - 1)
fpart = self.read(fbits)
return sign * ipart + 1.0 * fpart / (1 << (fbits - 1))
[docs] def read_6bit_str(self) -> str:
"""Read a '6-bit' string. These are strings with length between
0 and 63 inclusive that contain only alpha-numeric lower and uppercase
characters in addition to '_' and '{'.
"""
ln = self.read(6)
chrs = []
for _ in range(ln):
v = self.read(6)
if v < 10:
chrs.append(chr(ord("0") + v))
elif v < 36:
chrs.append(chr(ord("A") + v - 10))
elif v == 36:
chrs.append("_")
elif v < 63:
chrs.append(chr(ord("a") + v - 37))
else:
chrs.append("{")
return "".join(chrs)
[docs] def read_variable(self, vtype: VariableType) -> Variable:
"""Read a variable of a given type.
Arguments:
vtype (VariableType): The type of variable to read
"""
max_width = (2 ** 16) - 1
if vtype == VariableType.NULL:
raise LevelParseException("unexpected null variable")
if vtype == VariableType.BOOL:
return VariableBool(self.read(1) == 1)
if vtype == VariableType.UINT:
return VariableUInt(self.read(32))
if vtype == VariableType.INT:
return VariableInt(self.read(32, True))
if vtype == VariableType.FLOAT:
return VariableFloat(self.read_float(32, 32))
if vtype == VariableType.STRING:
slen = self.read(16)
return VariableString(self.read_bytes(slen))
if vtype == VariableType.VEC2:
f0 = self.read_float(32, 32)
f1 = self.read_float(32, 32)
return VariableVec2((f0, f1))
if vtype == VariableType.ARRAY:
atype = VariableType(self.read(4))
alen = self.read(16)
val: List[Variable] = []
if atype != VariableType.STRING:
val.extend(self.read_variable(atype) for _ in range(alen))
else:
while alen > 0:
var_value_ctns = []
while alen > 0:
alen -= 1
value_part = self.read_variable(atype).value
var_value_ctns.append(value_part)
if len(value_part) < max_width:
break
val.append(VariableString(b"".join(var_value_ctns)))
return VariableArray(Variable._TYPES[atype], val)
if vtype == VariableType.STRUCT:
result: Dict[str, Variable] = {}
while True:
vtype = VariableType(self.read(4))
if vtype == VariableType.NULL:
break
var_name = self.read_6bit_str()
if vtype != VariableType.STRING:
result[var_name] = self.read_variable(vtype)
continue
var_value_ctns = []
while True:
value_part = self.read_variable(vtype)
var_value_ctns.append(value_part.value)
if len(value_part.value) < max_width:
break
self.read(4)
self.read_6bit_str()
result[var_name] = VariableString(b"".join(var_value_ctns))
return VariableStruct(result)
raise LevelParseException("unknown var type")
[docs] def read_variable_map(self) -> Dict[str, Variable]:
"""Convenience method equivalent to `read_variable(VariableType.STRUCT).value`"""
return self.read_variable(VariableType.STRUCT).value
[docs] def read_segment(self, level: Level, xoffset: int, yoffset: int) -> None:
"""Read segment data into the passed level. In most cases you should
just use :meth:`read_level` instead of this method.
Arguments:
level (Level): The level object to read data into
xoffset (int): The segment x-offset in tiles
yoffset (int): The segment y-offset in tiles
"""
start_index = self.bit_tell()
segment_size = self.read(32)
version = self.read(16)
xoffset += self.read(8) * 16
yoffset += self.read(8) * 16
segment_width = self.read(8) # pylint: disable=unused-variable
if version > 4:
level_uid = self.read(32) # pylint: disable=unused-variable
dust_filth = self.read(16) # pylint: disable=unused-variable
enemy_filth = self.read(16) # pylint: disable=unused-variable
if version > 5:
tile_surface = self.read(16) # pylint: disable=unused-variable
dustblock_filth = self.read(16) # pylint: disable=unused-variable
flags = self.read(32)
if flags & 1:
layers = self.read(8)
for _ in range(layers):
layer = self.read(8)
tiles = self.read(10)
for _ in range(tiles):
txpos = self.read(5)
typos = self.read(5)
shape = self.read(5)
tile_flags = self.read(3)
data = self.read_bytes(12)
level.tiles[(layer, xoffset + txpos, yoffset + typos)] = Tile(
TileShape(shape & 0x1F),
tile_flags=tile_flags,
_tile_data=data,
)
if flags & 2:
dusts = self.read(10)
for _ in range(dusts):
txpos = self.read(5)
typos = self.read(5)
data = self.read_bytes(12)
tile = level.tiles.get((19, xoffset + txpos, yoffset + typos))
if tile is not None:
tile._unpack_dust_data(data)
if flags & 8:
props = self.read(16)
for _ in range(props):
id_num = self.read(32, True)
if id_num < 0:
continue
layer = self.read(8)
layer_sub = self.read(8)
scale = 1.0
if version > 6 or level.level_type == LevelType.DUSTMOD:
x_sgn = self.read(1)
x_int = self.read(27)
x_scale = (self.read(4) & 0x7) ^ 0x4
y_sgn = self.read(1)
y_int = self.read(27)
y_scale = (self.read(4) & 0x7) ^ 0x4
xpos = (-1.0 if x_sgn != 0 else 1.0) * x_int
ypos = (-1.0 if y_sgn != 0 else 1.0) * y_int
scale_lg = x_scale * 7 + y_scale
scale = pow(50.0, (scale_lg - 32.0) / 24.0)
else:
xpos = self.read_float(28, 4)
ypos = self.read_float(28, 4)
rotation = self.read(16)
flip_x = self.read(1) != 0
flip_y = self.read(1) != 0
prop_set = self.read(8)
prop_group = self.read(12)
prop_index = self.read(12)
palette = self.read(8)
# Default DF behavior is to overwrite repeated props
level.props.pop(id_num, None)
level.add_prop(
layer,
xpos,
ypos,
Prop(
layer_sub,
rotation,
flip_x,
flip_y,
scale,
prop_set,
prop_group,
prop_index,
palette,
),
id_num=id_num,
)
if flags & 4:
entities = []
num_entities = self.read(16)
has_extended_names = False
for _ in range(num_entities):
id_num = self.read(32, True)
if id_num < 0:
continue
etype = self.read_6bit_str()
if etype == "entity" and version > 7:
has_extended_names = True
xpos = self.read_float(32, 8)
ypos = self.read_float(32, 8)
rotation = self.read(16)
layer = self.read(8)
faceX = self.read(1)
faceY = self.read(1)
visible = self.read(1)
variables = self.read_variable_map()
entities.append(
(
etype,
xpos,
ypos,
(
variables,
rotation,
layer,
faceX == 0,
faceY == 0,
visible == 1,
),
id_num,
)
)
# Read in extended names if present and add entities to level
if has_extended_names:
self.bit_seek(start_index + (segment_size - 4) * 8)
extra_names_index = self.read(32)
self.skip(-extra_names_index - 32)
for etype, xpos, ypos, eargs, id_num in entities:
if has_extended_names and etype == "entity":
etype = self.read_6bit_str()
level.entities.pop(id_num, None)
level.add_entity(
xpos,
ypos,
Entity._from_raw(etype, *eargs),
id_num,
)
self.bit_seek(start_index + segment_size * 8)
[docs] def read_region(self, level: Level) -> None:
"""Read region data into the passed level. In most cases you should
just use :meth:`read_level` instead of this method.
Arguments:
level (Level): The level object to read data into
"""
region_len = self.read(32)
uncompressed_len = self.read(32) # pylint: disable=unused-variable
offx = self.read(16, True)
offy = self.read(16, True)
version = self.read(16) # pylint: disable=unused-variable
segments = self.read(16)
has_backdrop = self.read(8) != 0
sub_reader = DFReader(
io.BytesIO(zlib.decompress(self.read_bytes(region_len - 17)))
)
for _ in range(segments):
sub_reader.align()
sub_reader.read_segment(level, offx * 256, offy * 256)
if has_backdrop:
if level.backdrop is None:
raise ValueError("no backdrop present in level")
sub_reader.align()
sub_reader.read_segment(level.backdrop, offx * 16, offy * 16)
def _read_metadata(self) -> Dict[str, Any]:
"""Read a metadata block (DF_MTD) into a JSON payload."""
self.read_expect(b"DF_MTD")
version = self.read(16)
region_offset = self.read(32) # pylint: disable=unused-variable
entityUid = self.read(32)
propUid = self.read(32)
deprecatedSaveUid = self.read(32)
regionUidDeprecated = self.read(32)
return {
"version": version,
"entityUid": entityUid,
"propUid": propUid,
"deprecatedSaveUid": deprecatedSaveUid,
"regionUidDeprecated": regionUidDeprecated,
}
[docs] def read_var_file(self, header: bytes) -> Dict[str, Variable]:
"""Reads a variable mapping with a given header. There are several
file types that Dustforce use that are expressed this way including
notably "stats1" (header=b"DF_STA") and "config" (header=b"DF_CFG").
Arguments:
header (bytes): The expected file header at the start of the stream.
Just pass b"" if you've already read and checked the header.
"""
self.read_expect(header)
version = self.read(16) # pylint: disable=unused-variable
statSize = self.read(32) # pylint: disable=unused-variable
return self.read_variable_map()
[docs] def read_level_ex(self) -> Tuple[Level, List[int]]:
"""Extended version of :meth:`read_level`.
Read level file metadata into a :class:`Level` object while
extracting additional metadata so that the rest of the data can be
ingested in an opaque way. `read_level_ex` ends with the reader
byte-aligned. The entirety of the region data can be read subsequently
with reader.read_bytes(region_bytes) or using :meth:`read_region`.
This can be used with :meth:`dustmaker.dfwriter.DFWriter.write_level_ex` to modify
level metadata without reading in region data.
Example: ::
# Re-write level metadata without reading in region data.
level, region_offsets = reader.read_level_ex()
region_data = reader.read_bytes(region_offsets[-1])
...
writer.write_level_ex(level, region_offsets, region_data)
Example: ::
# Manually read region data
level, region_offsets = reader.read_level_ex()
for _ in region_offsets[:-1]:
reader.read_region(level)
Returns:
(level, region_offsets) tuple
- level
:class:`dustmaker.level.Level` object with metadata (e.g.
level.variables and level.sshot) filled in.
- region_offsets
list of byte offsets of each region from the current stream position
(which is aligned). The last element of this array is the end of the
region data and does not correspond to a region itself.
"""
assert self.aligned()
start_index = self.bit_tell()
self.read_expect(b"DF_LVL")
version = self.read(16)
if version <= 42:
raise LevelParseException("unsupported level version")
filesize = self.read(32) # pylint: disable=unused-variable
num_regions = self.read(32)
meta = self._read_metadata() # pylint: disable=unused-variable
sshot_data = b""
if version > 43:
sshot_len = self.read(32)
sshot_data = self.read_bytes(sshot_len)
level = Level()
level._next_id = meta["entityUid"]
level.variables = self.read_variable_map()
level.sshot = sshot_data
region_offsets = [self.read(32) for _ in range(num_regions)]
self.align()
region_offsets.append(filesize - ((self.bit_tell() - start_index) >> 3))
return level, region_offsets
[docs] def read_level(self, *, metadata_only: bool = False) -> Level:
"""Read a level data stream and return the :class:`dustmaker.level.Level`
object.
Arguments:
metadata_only (bool, optional): If set to True only the variables
and sshot data will be set in the returned :class:`Level`.
Raises:
LevelParseException: Parser ran into unexpected data.
"""
level, region_offsets = self.read_level_ex()
if metadata_only:
return level
for _ in range(len(region_offsets) - 1):
self.read_region(level)
return level
[docs] def read_replay(self) -> Replay:
"""Read in a replay from the input stream."""
self.read_expect(b"DF_RPL")
rep = Replay(version=self.read(8) - ord("0"))
# Strip out username header if present.
if rep.version == 2:
username_len = self.read(16)
rep.username = self.read_bytes(username_len)
self.read_expect(b"DF_RPL")
rep.version = self.read(8) - ord("0")
if rep.version not in (1, 3, 4):
raise LevelParseException("unknown replay format")
num_players = self.read(16)
if rep.version < 3:
num_players = 1
uncompressed_size = self.read(32) # pylint: disable=unused-variable
rep.frames = self.read(32)
rep.players = [
replay.PlayerData(character=replay.Character(self.read(8)))
for _ in range(num_players)
]
level_len = self.read(8)
rep.level = self.read_bytes(level_len)
# Decompress the next gzip block. Unfortunately the replay format
# doesn't tell us how many bytes we need to decompress so we just have
# to figure it out which is a bit inefficient.
input_data = bytearray()
decomp = zlib.decompressobj()
while not decomp.eof:
input_data += decomp.decompress(self.read_bytes(1))
with DFReader(io.BytesIO(input_data)) as sub_reader:
inputs_len = sub_reader.read(32) # pylint: disable=unused-variable
for player in rep.players:
for intent, intent_meta in enumerate(replay._INTENT_META):
if rep.version < intent_meta.version:
break
intent_len = sub_reader.read(32)
next_pos = sub_reader.bit_tell() + intent_len * 8
intent_values = player.intents.setdefault(
replay.IntentStream(intent), []
)
state = intent_meta.default
while True:
count = sub_reader.read(8)
if count == 0xFF:
break
if intent_values:
count += 1
intent_values.extend(state for _ in range(count))
state = intent_meta.to_repr(sub_reader.read(intent_meta.bits))
sub_reader.bit_seek(next_pos)
entity_count = sub_reader.read(32)
for _ in range(entity_count):
entity_uid = sub_reader.read(32)
replay_uid = sub_reader.read(32) # pylint: disable=unused-variable
frame_count = sub_reader.read(32)
entity = rep.entities.setdefault(entity_uid, replay.EntityData())
for _ in range(frame_count):
entity_frame = sub_reader.read(32)
entity_x = sub_reader.read(32)
entity_y = sub_reader.read(32)
entity_x_speed = sub_reader.read(32)
entity_y_speed = sub_reader.read(32)
entity.frames.append(
replay.EntityFrame(
frame=entity_frame,
x_pos=entity_x / 10.0,
y_pos=entity_y / 10.0,
x_speed=entity_x_speed / 100.0,
y_speed=entity_y_speed / 100.0,
)
)
return rep
[docs]def read_level(data: bytes) -> Level:
"""Convenience function to read in a level from bytes directly
Arguments:
data (bytes): The data source for the level
Returns:
The parsed Level object.
"""
with DFReader(io.BytesIO(data), noclose=True) as reader:
return reader.read_level()