“””

THE KEY EMERGES — invariance_metrics.py

 

Fitness functions that measure whether a transformed point cloud

exhibits stable structure across multiple exposures.

 

THIS IS THE HEART OF SURVIVAL.

 

A transform candidate earns survival not by looking right,

but by producing consistent structure across independent exposures

of the same latent signal under noise.

 

The system is not asking: “does this look like a circle or a wave?”

It is asking: “does this relationship hold when I look again?”

 

DUAL PRESSURE ARCHITECTURE:

Every candidate is evaluated under two simultaneous pressures:

 

```

Pressure 1 — Internal invariance across exposures

    Does the structure produced by this transform remain

    consistent when applied to K independent noisy exposures?

 

Pressure 2 — World admissibility (hard veto)

    Could this transformed structure possibly satisfy the

    world constraint? Binary gate. Not a gradient.

    Inadmissible candidates are eliminated before scoring.

```

 

INVARIANCE METRICS BY ARM:

 

```

Arm 1 — Radial consistency metrics

    After transform, do the resulting points cluster near

    a consistent radius? Measure:

    - cross-exposure variance of mean radius

    - cross-exposure variance of radius std

    - cross-exposure consistency of radial distribution shape

 

Arm 2 — Periodic consistency metrics

    After transform, do the resulting points exhibit a

    consistent periodic structure? Measure:

    - cross-exposure consistency of dominant frequency

    - cross-exposure consistency of phase

    - cross-exposure variance of amplitude estimate

```

 

ARCHITECTURAL REQUIREMENTS:

- All metrics are computed on transformed point clouds only

- No access to latent curve ground truth during evaluation

- No smoothing, averaging over neighbors, or aggregation

beyond what is explicitly defined here

- Admissibility check precedes invariance scoring

- Comments distinguish inadmissibility from non-survival

“””

 

from **future** import annotations

import numpy as np

from typing import List, Optional, Dict, Any

from core_types import ArmID, EvaluationResult, ExposureStream, TransformCandidate

from transform_families import apply_candidate

from world_constraint import WorldParameters, DEFAULT_WORLD

 

# —————————————————————————

 

# Arm 1 — Radial invariance metrics

 

# —————————————————————————

 

def radial_stats(points: np.ndarray) -> Dict[str, float]:

“””

Compute radial statistics of a 2D point cloud.

Pointwise — no neighbor interaction.

 

```

Returns mean radius, std of radii, and a shape consistency

measure (ratio of std to mean — lower is more circular).

"""

x, y = points[:, 0], points[:, 1]

radii = np.sqrt(x**2 + y**2)

mean_r = float(np.mean(radii))

std_r = float(np.std(radii))

cv = std_r / max(mean_r, 1e-8)   # coefficient of variation

return {"mean_r": mean_r, "std_r": std_r, "cv": cv}

```

 

def arm1_invariance_score(transformed_exposures: List[np.ndarray]) -> Dict[str, Any]:

“””

Measure radial invariance across K transformed exposures.

 

```

A transform earns a high score if:

1. Each exposure, after transform, shows points clustered near

   a consistent radius (low within-exposure CV)

2. The mean radius is consistent across exposures

   (low cross-exposure variance of mean_r)

3. The radius spread is consistent across exposures

   (low cross-exposure variance of std_r)

 

Score in [0, 1]. Higher = more invariant = better survival candidate.

 

CALIBRATION NOTE:

The ellipse is already near-circular (radii ~1.0) so cross-exposure

mean_r variance is naturally low. The discriminating signal is

within-exposure circularity (CV) — how tightly the transform

pulls points onto a consistent radius within each exposure.

Score functions are calibrated so that:

  - identity transform (no radial normalization): score ~0.3–0.5

  - random transform: score ~0.1–0.6 (wide spread = visible failures)

  - good radial normalization: score ~0.8–1.0

  - perfect circular recovery: score ~1.0

 

No ground truth used. No labels. No comparison to a stored shape.

The system only asks: is this relationship stable?

"""

K = len(transformed_exposures)

if K == 0:

    return {"score": 0.0, "diagnostics": {}}

 

stats = [radial_stats(pts) for pts in transformed_exposures]

 

mean_rs = np.array([s["mean_r"] for s in stats])

std_rs  = np.array([s["std_r"]  for s in stats])

cvs     = np.array([s["cv"]     for s in stats])

 

# Cross-exposure consistency of mean radius

cross_var_mean = float(np.var(mean_rs))

 

# Cross-exposure consistency of radius spread

cross_var_std = float(np.var(std_rs))

 

# Mean within-exposure circularity (low CV = more circular)

mean_cv = float(np.mean(cvs))

 

# Mean within-exposure std_r (absolute spread — key discriminator)

mean_std_r = float(np.mean(std_rs))

 

# Component 1: cross-exposure mean radius stability

score_cross_mean = np.exp(-80.0 * cross_var_mean)

 

# Component 2: within-exposure circularity — this is the main discriminator

# CV of ~0.11 (identity on ellipse) should score ~0.4

# CV near 0 (good circular recovery) should score ~1.0

score_circularity = np.exp(-12.0 * mean_cv)

 

# Component 3: absolute radius spread — penalize large std_r

# std_r ~0.11 (ellipse identity) -> score ~0.26

# std_r ~0.02 (good recovery) -> score ~0.78

score_spread = np.exp(-15.0 * mean_std_r)

 

# Component 4: cross-exposure std stability

score_cross_std = np.exp(-50.0 * cross_var_std)

 

# Weighted combination

score = float(0.2 * score_cross_mean

            + 0.35 * score_circularity

            + 0.35 * score_spread

            + 0.1  * score_cross_std)

 

return {

    "score": score,

    "diagnostics": {

        "cross_var_mean_r": cross_var_mean,

        "cross_var_std_r": cross_var_std,

        "mean_cv": mean_cv,

        "mean_std_r": mean_std_r,

        "score_cross_mean": float(score_cross_mean),

        "score_circularity": float(score_circularity),

        "score_spread": float(score_spread),

        "score_cross_std": float(score_cross_std),

        "mean_r_per_exposure": mean_rs.tolist(),

    }

}

```

 

# —————————————————————————

 

# Arm 2 — Periodic invariance metrics

 

# —————————————————————————

 

def arc_length_remap(points: np.ndarray) -> np.ndarray:

“””

Remap a 2D point cloud to uniform arc-length parameterization.

 

```

Sorts points by x-coordinate, computes cumulative chord length,

then resamples y onto a uniform x-grid via linear interpolation.

 

This undoes the x-axis sampling distortion in C2 (WorldJustifiedRidge),

making the underlying periodicity detectable via sinusoidal fitting.

 

Requires: x-coordinate must be monotone (distortion_strength < 0.069).

This is guaranteed by the WorldJustifiedRidge parameter contract.

 

ARCHITECTURAL NOTE:

This is the lawful transform that recovers the hidden structure.

The arc-length remap is one member of the arm 2 transform family

(arc_length_strength parameter). A surviving transform will have

high arc_length_strength because that is what makes the structure

stable across exposures.

"""

N = len(points)

idx = np.argsort(points[:, 0])

pts = points[idx]

 

# Cumulative chord length

diffs = np.diff(pts, axis=0)

chord = np.sqrt((diffs ** 2).sum(axis=1))

cumlen = np.concatenate([[0.0], np.cumsum(chord)])

total = cumlen[-1]

 

if total < 1e-8:

    return points

 

# Uniform parameterization in [0, 1]

t_uniform = np.linspace(0.0, total, N)

y_remap = np.interp(t_uniform, cumlen, pts[:, 1])

x_remap = t_uniform / total

 

return np.stack([x_remap, y_remap], axis=1)

```

 

def fit_sinusoid(points: np.ndarray,

omega_target: float = 3.0) -> Dict[str, float]:

“””

Fit y = a*cos(2*pi*omega*x) + b*sin(2*pi*omega*x) by least squares.

 

```

Assumes points are remapped to x in [0, 1] (uniform parameterization).

Returns amplitude, phase, and residual.

 

This is more robust than FFT for detecting a specific frequency

in a finite, non-power-of-2 sample with non-uniform x-spacing

before remap.

 

Pointwise least squares — no neighbor averaging.

"""

x = points[:, 0]

y = points[:, 1] - np.mean(points[:, 1])   # remove DC

 

cos_b = np.cos(2.0 * np.pi * omega_target * x)

sin_b = np.sin(2.0 * np.pi * omega_target * x)

A_mat = np.column_stack([cos_b, sin_b])

 

try:

    coeffs, residuals, _, _ = np.linalg.lstsq(A_mat, y, rcond=None)

    a, b = coeffs

    amplitude = float(np.sqrt(a ** 2 + b ** 2))

    phase = float(np.arctan2(b, a))

    if len(residuals) > 0:

        fit_residual = float(residuals[0] / len(y))

    else:

        fit_residual = float(np.mean((y - A_mat @ coeffs) ** 2))

except Exception:

    amplitude, phase, fit_residual = 0.0, 0.0, 1.0

 

return {

    "amplitude": amplitude,

    "phase": phase,

    "fit_residual": fit_residual

}

```

 

def arm2_invariance_score(transformed_exposures: List[np.ndarray],

omega_target: float = 3.0) -> Dict[str, Any]:

“””

Measure periodic invariance across K transformed exposures.

 

```

Strategy:

1. Apply arc-length remap to each transformed exposure

   (recovers uniform parameterization — undoes x-distortion)

2. Fit sinusoid at omega_target by least squares

3. Measure cross-exposure consistency of amplitude and phase

 

A transform earns a high score if:

- Amplitude at omega_target is consistent across exposures

- Phase at omega_target is consistent across exposures (circular variance)

- Fit residual is low (the sinusoid accounts for most variance)

 

Score in [0, 1]. Higher = more periodically invariant.

 

WHAT THIS DETECTS:

A transform that successfully recovers the uniform parameterization

AND normalizes the amplitude toward A_lock will show:

- consistent amplitude ~A_lock across exposures

- consistent phase ~phi_lock across exposures

- low fit residual

 

A random transform will show scattered amplitude and phase estimates.

The distinction is real and visible in the score.

 

No ground truth used. No labels.

The system only asks: does this periodic relationship hold?

"""

K = len(transformed_exposures)

if K == 0:

    return {"score": 0.0, "diagnostics": {}}

 

amplitudes = []

phases = []

residuals = []

 

for pts in transformed_exposures:

    # Arc-length remap: recover uniform parameterization

    pts_remap = arc_length_remap(pts)

    result = fit_sinusoid(pts_remap, omega_target)

    amplitudes.append(result["amplitude"])

    phases.append(result["phase"])

    residuals.append(result["fit_residual"])

 

amplitudes = np.array(amplitudes)

phases = np.array(phases)

residuals = np.array(residuals)

 

# Cross-exposure amplitude consistency

cross_var_amp = float(np.var(amplitudes))

mean_amp = float(np.mean(amplitudes))

 

# Cross-exposure phase consistency (circular)

phase_vectors = np.exp(1j * phases)

circular_var = float(1.0 - np.abs(np.mean(phase_vectors)))

 

# Mean fit residual (how well does the sinusoid model fit?)

mean_residual = float(np.mean(residuals))

 

# Component scores

# Amplitude consistency across exposures

score_amp = np.exp(-30.0 * cross_var_amp)

 

# Amplitude proximity to world A_lock

# This is essential: without it the GA drives y_scale to minimum

# (near-zero amplitude makes phase estimates accidentally cluster).

# The world admissibility pressure must flow into the fitness function

# not just the hard veto — otherwise the GA optimizes against it.

from world_constraint import DEFAULT_WORLD as _W

amp_proximity = abs(mean_amp - _W.A_lock)

score_amp_proximity = np.exp(-15.0 * amp_proximity)

 

# Phase consistency (circular variance near 0 = locked phase)

score_phase = np.exp(-8.0 * circular_var)

 

# Fit quality (low residual = sinusoidal structure present)

score_fit = np.exp(-20.0 * mean_residual)

 

score = float(0.20 * score_amp

            + 0.25 * score_amp_proximity

            + 0.40 * score_phase

            + 0.15 * score_fit)

 

return {

    "score": score,

    "diagnostics": {

        "cross_var_amp": cross_var_amp,

        "mean_amp": mean_amp,

        "circular_phase_var": circular_var,

        "mean_fit_residual": mean_residual,

        "score_amp": float(score_amp),

        "score_phase": float(score_phase),

        "score_fit": float(score_fit),

        "amplitudes": amplitudes.tolist(),

        "phases": phases.tolist(),

    }

}

```

 

# —————————————————————————

 

# World admissibility pre-screen

 

# —————————————————————————

 

def check_arm1_admissibility(transformed_exposures: List[np.ndarray],

world: WorldParameters = DEFAULT_WORLD) -> bool:

“””

Hard veto for arm 1 candidates.

 

```

A candidate is inadmissible for arm 1 if its mean radius across

exposures is too far from R_lock. This is the necessary condition

for Constraint A to be satisfiable at composition time.

 

Binary gate. Not a gradient. Inadmissible candidates are

eliminated immediately — they do not receive an invariance score.

 

COMMENT: This is admissibility exclusion, not non-survival.

A candidate excluded here never reaches invariance scoring.

A candidate that passes here but scores poorly on invariance

is a different outcome: admissible but non-surviving.

These must be tracked and reported separately.

"""

if not transformed_exposures:

    return False

 

mean_rs = []

for pts in transformed_exposures:

    x, y = pts[:, 0], pts[:, 1]

    radii = np.sqrt(x**2 + y**2)

    mean_rs.append(np.mean(radii))

 

overall_mean_r = float(np.mean(mean_rs))

 

# Admissibility window: mean radius must be within loose tolerance

# of R_lock. Loose here (3x epsilon_A) because this is pre-screening

# over raw transforms, not final composed geometry.

tolerance = world.R_lock * 0.25   # 25% window at search time

return abs(overall_mean_r - world.R_lock) <= tolerance

```

 

def check_arm2_admissibility(transformed_exposures: List[np.ndarray],

world: WorldParameters = DEFAULT_WORLD) -> bool:

“””

Hard veto for arm 2 candidates.

 

```

A candidate is inadmissible if its sinusoidal fit amplitude at

omega_lock is near zero across all exposures — meaning the

transform has destroyed rather than revealed the periodic structure.

 

Same comment as arm 1: inadmissibility exclusion is tracked

separately from invariance non-survival.

"""

if not transformed_exposures:

    return False

 

amplitudes = []

for pts in transformed_exposures:

    pts_remap = arc_length_remap(pts)

    result = fit_sinusoid(pts_remap, world.omega_lock)

    amplitudes.append(result["amplitude"])

 

mean_amp = float(np.mean(amplitudes))

 

# Admissible if mean fitted amplitude is nonzero AND not too far from A_lock

# Near-zero amplitude is inadmissible — it means the transform destroyed the signal.

# This prevents the GA from exploiting near-zero amplitude as a phase-lock trick.

min_admissible_amp = world.A_lock * 0.15   # at least 15% of target

return mean_amp > min_admissible_amp

```

 

# —————————————————————————

 

# Unified evaluator

 

# —————————————————————————

 

def evaluate_candidate(candidate: TransformCandidate,

stream: ExposureStream,

world: WorldParameters = DEFAULT_WORLD) -> EvaluationResult:

“””

Full evaluation of one transform candidate against one exposure stream.

 

```

Evaluation proceeds in two steps:

 

Step A: admissibility check (hard veto)

    If candidate cannot possibly satisfy world constraints,

    eliminate immediately. No invariance score assigned.

    This is INADMISSIBILITY — tracked separately from non-survival.

 

Step B: invariance scoring (for admissible candidates only)

    Measure structural consistency across all K exposures.

    Score in [0, 1].

 

Returns EvaluationResult with is_admissible flag and score.

 

COMMENT: The world constraint participates in survival from the

beginning — not as a gradient, but as a hard admissibility filter.

The system knows the domain of valid questions, not the answer.

"""

# Apply transform to all exposures

transformed = [

    apply_candidate(candidate, exp.points)

    for exp in stream.exposures

]

 

# Step A: admissibility pre-screen

if candidate.arm_id == ArmID.ARM_1:

    admissible = check_arm1_admissibility(transformed, world)

else:

    admissible = check_arm2_admissibility(transformed, world)

 

if not admissible:

    # INADMISSIBLE — hard veto

    # This is not "almost survived" — it is structurally incompatible

    # with the world before invariance is even measured.

    return EvaluationResult(

        candidate_id=candidate.candidate_id,

        is_admissible=False,

        invariance_score=0.0,

        failure_reason="inadmissible: world pre-screen failed",

        diagnostics={"step": "admissibility_veto"}

    )

 

# Step B: invariance scoring

if candidate.arm_id == ArmID.ARM_1:

    result = arm1_invariance_score(transformed)

else:

    result = arm2_invariance_score(transformed, world.omega_lock)

 

score = result["score"]

 

return EvaluationResult(

    candidate_id=candidate.candidate_id,

    is_admissible=True,

    invariance_score=score,

    failure_reason=None if score > 0.0 else "zero invariance score",

    diagnostics=result["diagnostics"]

)

```

 

# —————————————————————————

 

# Sanity check

 

# —————————————————————————

 

if **name** == “**main**”:

from signal_model import build_signal_arms

from transform_families import random_candidate, make_transform_candidate

from world_constraint import DEFAULT_WORLD

 

```

stream_1, stream_2, c1, c2 = build_signal_arms(seed=42)

rng = np.random.default_rng(7)

world = DEFAULT_WORLD

 

print("=== INVARIANCE METRICS SANITY CHECK ===")

 

# --- Arm 1 ---

print("\n--- Arm 1 (radial invariance) ---")

 

# Random candidates

scores_1 = []

inadmissible_1 = 0

for i in range(20):

    cand = random_candidate(ArmID.ARM_1, f"a1_{i}", rng=rng)

    result = evaluate_candidate(cand, stream_1, world)

    if not result.is_admissible:

        inadmissible_1 += 1

    else:

        scores_1.append(result.invariance_score)

 

print(f"  20 random candidates: {inadmissible_1} inadmissible, "

      f"{len(scores_1)} scored")

if scores_1:

    print(f"  Score range: [{min(scores_1):.4f}, {max(scores_1):.4f}], "

          f"mean={np.mean(scores_1):.4f}")

 

# Construct a near-perfect arm 1 candidate manually

# (radial normalization=1.0, neutral rotation, unit scale)

perfect_params_1 = np.array([0.0, 1.0, 1.0, 0.0, 0.0, 1.0])

perfect_1 = make_transform_candidate(ArmID.ARM_1, "perfect_a1",

                                      perfect_params_1)

result_perfect_1 = evaluate_candidate(perfect_1, stream_1, world)

print(f"\n  Near-perfect arm 1 candidate:")

print(f"    admissible={result_perfect_1.is_admissible}, "

      f"score={result_perfect_1.invariance_score:.4f}")

if result_perfect_1.diagnostics:

    d = result_perfect_1.diagnostics

    print(f"    cross_var_mean_r={d.get('cross_var_mean_r',0):.6f}, "

          f"mean_cv={d.get('mean_cv',0):.4f}")

 

# --- Arm 2 ---

print("\n--- Arm 2 (periodic invariance) ---")

 

scores_2 = []

inadmissible_2 = 0

for i in range(20):

    cand = random_candidate(ArmID.ARM_2, f"a2_{i}", rng=rng)

    result = evaluate_candidate(cand, stream_2, world)

    if not result.is_admissible:

        inadmissible_2 += 1

    else:

        scores_2.append(result.invariance_score)

 

print(f"  20 random candidates: {inadmissible_2} inadmissible, "

      f"{len(scores_2)} scored")

if scores_2:

    print(f"  Score range: [{min(scores_2):.4f}, {max(scores_2):.4f}], "

          f"mean={np.mean(scores_2):.4f}")

 

# Near-perfect arm 2: x_scale recovers uniform sampling,

# y_scale normalizes amplitude toward A_lock

# These are parameter choices that should score well

perfect_params_2 = np.array([0.0, 1.0, 1.0, 0.0, 0.0, 1.0])

perfect_2 = make_transform_candidate(ArmID.ARM_2, "perfect_a2",

                                      perfect_params_2)

result_perfect_2 = evaluate_candidate(perfect_2, stream_2, world)

print(f"\n  Near-perfect arm 2 candidate:")

print(f"    admissible={result_perfect_2.is_admissible}, "

      f"score={result_perfect_2.invariance_score:.4f}")

if result_perfect_2.diagnostics:

    d = result_perfect_2.diagnostics

    print(f"    freq_deviation={d.get('freq_deviation',0):.4f}, "

          f"circular_phase_var={d.get('circular_phase_var',0):.4f}")

 

print("\n=== ADMISSIBILITY SEPARATION CHECK ===")

print("(inadmissible != non-surviving — these are different outcomes)")

print(f"  Arm 1: {inadmissible_1}/20 hard-vetoed before scoring")

print(f"  Arm 2: {inadmissible_2}/20 hard-vetoed before scoring")

surviving_1 = sum(1 for s in scores_1 if s > 0.5)

surviving_2 = sum(1 for s in scores_2 if s > 0.5)

print(f"  Arm 1: {surviving_1}/{len(scores_1)} admissible candidates "

      f"scoring above 0.5")

print(f"  Arm 2: {surviving_2}/{len(scores_2)} admissible candidates "

      f"scoring above 0.5")

 

print("\nPASS — invariance_metrics.py is operational.")

```


Back to site index