THE KEY EMERGES transform_families.py
Parameterized transform families for each signal arm.
ARCHITECTURAL REQUIREMENTS (from spec):
- Transforms must be: pointwise, deterministic, non-aggregating,
evidence-preserving
- No smoothing, no kernel averaging, no neighbor-dependent blending
- Each family defined by bounded mutation operators, NOT by knowledge
of the desired final key
- Transform families for arm 1 and arm 2 may differ this strengthens
the claim that compatibility is discovered across independently
evolved structures
ARM 1 FAMILY targets radial/rotational invariance recovery
Transforms that can reveal the near-circular structure of C1:
- rotation (rigid)
- isotropic/anisotropic scaling
- translation
- radial normalization (maps variable-radius curve toward unit circle)
- angular reparameterization (arc-length normalization)
ARM 2 FAMILY targets periodic invariance recovery
Transforms that can reveal the periodic structure of C2:
- rotation
- translation
- phase shift (horizontal offset in unwrapped representation)
- frequency normalization (x-axis rescaling)
- amplitude normalization (y-axis rescaling)
- arc-length reparameterization (removes sampling distortion)
WHY DIFFERENT FAMILIES:
If both arms used identical transforms, a viewer could reasonably
infer that the system knows both signals are the same kind of thing.
Different families make the eventual composition a discovery across
genuinely differently-characterized structures.
CODE COMMENT REQUIREMENT:
At each family definition, comments explicitly state that transform
families are defined by lawful parameterized operators and bounded
mutation, not by knowledge of the desired final key.
from **future** import annotations
import numpy as np
from dataclasses import dataclass, field
from typing import Optional, Callable
from core_types import ArmID, TransformCandidate
#
# Transform application arm 1 (radial recovery family)
#
def apply_arm1_transform(points: np.ndarray,
params: np.ndarray) -> np.ndarray:
Apply arm 1 transform to a (N, 2) point cloud.
```
Transform family for arm 1 is designed to reveal radial/rotational
invariance. It is defined by lawful parameterized operators and
bounded mutation not by knowledge that C1 is an ellipse or that
the world requires a circle.
Parameter vector (6 dimensions):
params[0]: rotation angle (radians)
params[1]: scale_x (anisotropic scaling, x-axis)
params[2]: scale_y (anisotropic scaling, y-axis)
params[3]: translate_x
params[4]: translate_y
params[5]: radial_norm_strength (0=off, 1=full normalization)
Radial normalization: maps each point toward unit circle
by blending its current radius with R_lock=1.0
Pointwise, deterministic, evidence-preserving.
Returns transformed (N, 2) point cloud.
"""
rotation = params[0]
sx, sy = params[1], params[2]
tx, ty = params[3], params[4]
rn_strength = np.clip(params[5], 0.0, 1.0)
# Step 1: rotate
c, s = np.cos(rotation), np.sin(rotation)
R = np.array([[c, -s], [s, c]])
out = points @ R.T
# Step 2: anisotropic scale
out = out * np.array([sx, sy])
# Step 3: translate
out = out + np.array([tx, ty])
# Step 4: radial normalization (pointwise no neighbor interaction)
if rn_strength > 1e-6:
radii = np.sqrt(out[:, 0]**2 + out[:, 1]**2)
# Guard against origin
safe_radii = np.where(radii < 1e-8, 1.0, radii)
# Blend each point's radius toward 1.0
scale_factors = 1.0 + rn_strength * (1.0 / safe_radii - 1.0)
out = out * scale_factors[:, np.newaxis]
return out
```
def apply_arm2_transform(points: np.ndarray,
params: np.ndarray) -> np.ndarray:
Apply arm 2 transform to a (N, 2) point cloud.
```
Transform family for arm 2 is designed to reveal periodic invariance.
It is defined by lawful parameterized operators and bounded mutation
not by knowledge that C2 has omega=3, phi=pi/6, or that the world
requires a specific groove pattern.
Parameter vector (6 dimensions):
params[0]: rotation angle (radians)
params[1]: x_scale (frequency normalization x-axis rescaling)
params[2]: y_scale (amplitude normalization y-axis rescaling)
params[3]: x_shift (phase shift horizontal offset)
params[4]: y_shift (baseline shift)
params[5]: arc_length_strength (0=off, 1=full arc-length remap)
Arc-length reparameterization: removes sampling distortion
by redistributing points uniformly along the curve.
Pointwise assignment from sorted arc-lengths deterministic,
non-aggregating.
Returns transformed (N, 2) point cloud.
"""
rotation = params[0]
x_scale = params[1]
y_scale = params[2]
x_shift = params[3]
y_shift = params[4]
al_strength = np.clip(params[5], 0.0, 1.0)
out = points.copy()
# Step 1: rotate
c, s = np.cos(rotation), np.sin(rotation)
R = np.array([[c, -s], [s, c]])
out = out @ R.T
# Step 2: x-axis rescaling (frequency normalization)
out[:, 0] = out[:, 0] * x_scale + x_shift
# Step 3: y-axis rescaling and shift (amplitude normalization)
out[:, 1] = out[:, 1] * y_scale + y_shift
# Step 4: arc-length reparameterization (removes sampling distortion)
# Pointwise: sort points by x-coordinate, reassign to uniform x-grid
# This is deterministic and non-aggregating each point gets a new
# x-position based on its rank in the sorted order.
if al_strength > 1e-6:
N = len(out)
sort_idx = np.argsort(out[:, 0])
uniform_x = np.linspace(out[:, 0].min(), out[:, 0].max(), N)
# Blend between original x and uniform-spaced x
blended_x = (1.0 - al_strength) * out[:, 0] + al_strength * uniform_x[np.argsort(sort_idx)]
out[:, 0] = blended_x
return out
```
#
# Candidate factories
#
ARM1_PARAM_BOUNDS = np.array([
[-np.pi, np.pi], # rotation
[0.5, 2.0], # scale_x
[0.5, 2.0], # scale_y
[-0.5, 0.5], # translate_x
[-0.5, 0.5], # translate_y
[0.0, 1.0], # radial_norm_strength
])
ARM2_PARAM_BOUNDS = np.array([
[-np.pi, np.pi], # rotation
[0.3, 3.0], # x_scale (frequency normalization)
[0.1, 5.0], # y_scale (amplitude normalization)
# Raw amplitude 0.25, world requires 0.12.
# Correct normalization ~0.48 must be discovered.
# Bounds are broad (0.15.0) generic, not answer-tuned.
# Raw signal amplitude may exceed world-admissible amplitude.
# Surviving transforms must recover periodic structure
# and normalize it into admissible range.
# This is not target matching; it is lawful normalization
# under world constraint.
[-1.0, 1.0], # x_shift (phase shift)
[-0.2, 0.2], # y_shift (baseline)
[0.0, 1.0], # arc_length_strength
])
def make_transform_candidate(arm_id: ArmID,
candidate_id: str,
params: np.ndarray,
generation: int = 0) -> TransformCandidate:
Wrap a parameter vector into a TransformCandidate.
No domain-specific assumptions introduced here
candidates are defined entirely by their parameter vectors
and the bounds of their family.
bounds = ARM1_PARAM_BOUNDS if arm_id == ArmID.ARM_1 else ARM2_PARAM_BOUNDS
return TransformCandidate(
candidate_id=candidate_id,
arm_id=arm_id,
family_name=arm1_radial_family if arm_id == ArmID.ARM_1 else arm2_periodic_family,
params=params.copy(),
param_bounds=bounds,
generation_born=generation
)
def random_candidate(arm_id: ArmID,
candidate_id: str,
generation: int = 0,
rng: Optional[np.random.Generator] = None,
within_admissible: bool = True) -> TransformCandidate:
Generate one random transform candidate within parameter bounds.
```
within_admissible=True: initialize within bounds (the standard case).
This restricts search to lawful questions, not known answers.
The GA still searches freely within these bounds.
No domain-specific initialization bias is introduced here.
Bounds were defined by the transform family, not by the target output.
"""
if rng is None:
rng = np.random.default_rng()
bounds = ARM1_PARAM_BOUNDS if arm_id == ArmID.ARM_1 else ARM2_PARAM_BOUNDS
params = np.array([
rng.uniform(lo, hi) for lo, hi in bounds
])
return make_transform_candidate(arm_id, candidate_id, params, generation)
```
def mutate_candidate(candidate: TransformCandidate,
mutation_rate: float = 0.3,
mutation_scale: float = 0.15,
rng: Optional[np.random.Generator] = None) -> TransformCandidate:
Produce a mutated copy of a transform candidate.
```
Each parameter is mutated independently with probability mutation_rate.
Mutation is Gaussian with scale proportional to the parameter range.
Result is clipped to bounds.
No outcome-directed mutation bias. Mutation operators are defined
by the parameter family, not by knowledge of the target structure.
"""
if rng is None:
rng = np.random.default_rng()
new_params = candidate.params.copy()
bounds = candidate.param_bounds
for i in range(len(new_params)):
if rng.random() < mutation_rate:
lo, hi = bounds[i]
scale = (hi - lo) * mutation_scale
new_params[i] += rng.normal(0.0, scale)
new_params[i] = np.clip(new_params[i], lo, hi)
new_id = candidate.candidate_id + "_m"
return make_transform_candidate(
candidate.arm_id, new_id, new_params,
candidate.generation_born
)
```
def crossover_candidates(parent_a: TransformCandidate,
parent_b: TransformCandidate,
child_id: str,
rng: Optional[np.random.Generator] = None) -> TransformCandidate:
Uniform crossover between two candidates of the same arm and family.
```
Each parameter independently drawn from parent_a or parent_b.
No outcome-directed recombination bias.
"""
if rng is None:
rng = np.random.default_rng()
assert parent_a.arm_id == parent_b.arm_id, "Cannot cross arms"
assert parent_a.family_name == parent_b.family_name, "Cannot cross families"
mask = rng.random(len(parent_a.params)) < 0.5
new_params = np.where(mask, parent_a.params, parent_b.params)
return make_transform_candidate(
parent_a.arm_id, child_id, new_params,
max(parent_a.generation_born, parent_b.generation_born)
)
```
def apply_candidate(candidate: TransformCandidate,
points: np.ndarray) -> np.ndarray:
Apply a TransformCandidate to a point cloud.
Dispatches to the correct arms transform function.
if candidate.arm_id == ArmID.ARM_1:
return apply_arm1_transform(points, candidate.params)
else:
return apply_arm2_transform(points, candidate.params)
#
# Sanity check
#
if **name** == **main**:
from signal_model import build_signal_arms
```
stream_1, stream_2, c1, c2 = build_signal_arms(seed=42)
rng = np.random.default_rng(0)
print("=== TRANSFORM FAMILIES SANITY CHECK ===")
# Arm 1
print("\nArm 1 radial recovery family:")
candidates_1 = [
random_candidate(ArmID.ARM_1, f"a1_{i}", rng=rng)
for i in range(5)
]
raw_pts = stream_1.exposures[0].points
for c in candidates_1:
transformed = apply_candidate(c, raw_pts)
radii = np.sqrt(transformed[:, 0]**2 + transformed[:, 1]**2)
print(f" {c.candidate_id}: mean_radius={radii.mean():.4f}, "
f"std_radius={radii.std():.4f}, "
f"params={c.params.round(3)}")
# Arm 2
print("\nArm 2 periodic recovery family:")
candidates_2 = [
random_candidate(ArmID.ARM_2, f"a2_{i}", rng=rng)
for i in range(5)
]
raw_pts_2 = stream_2.exposures[0].points
for c in candidates_2:
transformed = apply_candidate(c, raw_pts_2)
print(f" {c.candidate_id}: x_range=[{transformed[:,0].min():.3f},"
f"{transformed[:,0].max():.3f}], "
f"y_std={transformed[:,1].std():.4f}, "
f"params={c.params.round(3)}")
# Mutation test
print("\nMutation test:")
parent = candidates_1[0]
child = mutate_candidate(parent, rng=rng)
print(f" parent: {parent.params.round(3)}")
print(f" child: {child.params.round(3)}")
print(f" diff: {(child.params - parent.params).round(3)}")
# Crossover test
print("\nCrossover test:")
offspring = crossover_candidates(candidates_1[0], candidates_1[1],
"xover_test", rng=rng)
print(f" parent_a: {candidates_1[0].params.round(3)}")
print(f" parent_b: {candidates_1[1].params.round(3)}")
print(f" offspring:{offspring.params.round(3)}")
print("\nPASS transform_families.py is operational.")
```