extends GutTest ## The N2 client-prediction invariant, exercised without any networking: a client ## that replays its un-acknowledged inputs onto the latest authoritative snapshot ## must land its hero exactly where the server's own simulation will — prediction ## and reconciliation never diverge from authority. ## ## The reconcile loop here mirrors `main.gd`'s `_predicted_state`: decode the ## snapshot, drop the inputs at or below the server's ack, replay the rest through ## the shared `SimCore.apply_movement`. Keeping it pure lets the round trip be ## checked headlessly, exactly like the protocol and simulation cores. const HERO_SPEED := 320.0 func test_replayed_prediction_matches_the_authoritative_position() -> void: var inputs := [ _command(Vector2.RIGHT), _command(Vector2.RIGHT), _command(Vector2.UP), _command(Vector2(1.0, 1.0)), _command(Vector2.LEFT), ] # The server has applied the first three inputs (acked up to seq 3); the client # still holds all five as pending until it reconciles. var acked := 3 var server := SimCore.new() server.spawn_creeps = false var hero_id := server.add_hero(1, Vector2(500.0, 500.0), HERO_SPEED) for i in acked: server.step({hero_id: inputs[i]}) # The client reconciles against the snapshot taken at this ack: prune the # applied inputs, then replay the remainder onto its predicted hero. var pending := _all_pending(inputs) var snapshot := NetProtocol.decode_snapshot(NetProtocol.encode_snapshot(server.state, acked)) var predicted := snapshot.get_entity(hero_id) while not pending.is_empty() and pending[0]["seq"] <= acked: pending.pop_front() for entry in pending: SimCore.apply_movement(predicted, entry["input"]) # Meanwhile the server applies the remaining inputs for real. for i in range(acked, inputs.size()): server.step({hero_id: inputs[i]}) assert_eq( predicted.position, server.state.get_entity(hero_id).position, "the replayed prediction lands on the authoritative position", ) func test_reconciliation_prunes_acknowledged_inputs() -> void: # Five pending inputs (seq 1..5); an ack of 3 must drop seqs 1..3 and keep 4 # and 5 — only the inputs the server has not yet applied are replayed. var pending: Array[Dictionary] = [] for seq in range(1, 6): pending.append({"seq": seq, "input": _command(Vector2.RIGHT)}) var ack := 3 while not pending.is_empty() and pending[0]["seq"] <= ack: pending.pop_front() assert_eq(pending.size(), 2, "the two un-acked inputs remain") assert_eq(pending[0]["seq"], 4, "pruning stops at the first un-acked input") assert_eq(pending[1]["seq"], 5) func test_a_fully_acked_buffer_predicts_nothing() -> void: # When the server has applied every input, replay is empty and the prediction # is exactly the snapshot — the client and server agree with no extrapolation. var pending: Array[Dictionary] = [] for seq in range(1, 4): pending.append({"seq": seq, "input": _command(Vector2.RIGHT)}) var ack := 3 while not pending.is_empty() and pending[0]["seq"] <= ack: pending.pop_front() assert_eq(pending.size(), 0, "an ack covering every input leaves nothing to replay") # --- apply_movement: the shared movement sub-step the prediction replays -------- func test_apply_movement_advances_one_tick() -> void: var entity := SimEntity.new(1, 0, Vector2.ZERO, 300.0) SimCore.apply_movement(entity, _command(Vector2.RIGHT)) assert_almost_eq(entity.position.x, 300.0 * SimCore.TICK_DELTA, 0.0001) assert_almost_eq(entity.position.y, 0.0, 0.0001) func test_apply_movement_clamps_diagonals() -> void: var entity := SimEntity.new(1, 0, Vector2.ZERO, 300.0) SimCore.apply_movement(entity, _command(Vector2.ONE)) # length sqrt(2) -> clamps to 1 assert_almost_eq(entity.position.length(), 300.0 * SimCore.TICK_DELTA, 0.0001) func test_apply_movement_holds_still_on_null_command() -> void: var entity := SimEntity.new(1, 0, Vector2(10.0, -5.0), 300.0) SimCore.apply_movement(entity, null) assert_eq(entity.position, Vector2(10.0, -5.0), "a null command moves nothing") # --- collision: a moving unit is blocked, and prediction matches the server through it ---------- func test_a_moving_hero_stops_at_an_obstacle_edge() -> void: var center := MapData.nexus_for_team(0) var sim := SimCore.new() sim.spawn_creeps = false var hero := sim.add_hero(0, center + Vector2(600.0, 0.0), 320.0) for _i in 300: sim.step({hero: _command(Vector2.LEFT)}) # drive straight at the obstacle var pos := sim.state.get_entity(hero).position assert_false( MapData.point_blocked(pos, SimCore.UNIT_RADIUS), "a hero driven into an obstacle never ends up inside it", ) assert_gt(pos.x, center.x, "it is stopped on its approach side, not pushed through") func test_prediction_matches_the_server_through_an_obstacle() -> void: # The decoded snapshot the client predicts on carries no is_hero flag, but the collision gate is # the same "mobile, non-creep" predicate, so the replay collides exactly as the server does. var start := MapData.nexus_for_team(0) + Vector2(600.0, 0.0) var inputs: Array = [] for _i in 30: inputs.append(_command(Vector2.LEFT)) var acked := 15 var server := SimCore.new() server.spawn_creeps = false var hero_id := server.add_hero(0, start, 320.0) for i in acked: server.step({hero_id: inputs[i]}) var snapshot := NetProtocol.decode_snapshot(NetProtocol.encode_snapshot(server.state, acked)) var predicted := snapshot.get_entity(hero_id) for i in range(acked, inputs.size()): SimCore.apply_movement(predicted, inputs[i]) for i in range(acked, inputs.size()): server.step({hero_id: inputs[i]}) assert_eq( predicted.position, server.state.get_entity(hero_id).position, "prediction with collision lands exactly on the authoritative position", ) func _command(dir: Vector2) -> InputCommand: var command := InputCommand.new() command.move_dir = dir return command ## Every input as a pending entry, seq stamped 1-based in send order. func _all_pending(inputs: Array) -> Array[Dictionary]: var pending: Array[Dictionary] = [] for i in inputs.size(): pending.append({"seq": i + 1, "input": inputs[i]}) return pending