import json
import numpy as np
import networkx as nx
import cvxpy as cp
import warnings
# Suppress CVXPY warnings regarding inaccurate solutions to keep console output clean
warnings.filterwarnings("ignore", message="Solution may be inaccurate.*")
from pandas.core.frame import DataFrame
from fourpace.psys import Grid
from fourpace.model import SynchronousMachine, AsynchronousMachine, Load, Shunt, Transformer
[docs]
class NumpyEncoder(json.JSONEncoder):
"""
Custom JSON Encoder to seamlessly serialize NumPy data types and Python complex numbers
into standard JSON format for exporting simulation results.
"""
[docs]
def default(self, obj):
if type(obj).__module__ == np.__name__:
if isinstance(obj, np.ndarray):
return obj.tolist()
else:
return obj.item()
if isinstance(obj, complex):
return {"real": obj.real, "imag": obj.imag}
return super(NumpyEncoder, self).default(obj)
[docs]
def MPOPF(grid: Grid, profile_df: DataFrame = None, relax: str = 'SOCP', solver: str = 'SCS'):
"""
Multi-Period Optimal Power Flow (MPOPF).
Optimizes the dispatch schedule (Active and Reactive power) for generators, smart inverters,
and energy storage systems (BESS) over a given time horizon. Minimizes total generation cost
and BESS degradation while strictly adhering to AC power flow physics via convex relaxations.
Parameters:
-----------
grid : Grid
The power system grid object containing topology and components.
profile_df : DataFrame, optional
Time-series data for loads and renewable availability. If None, uses the grid's attached profile.
relax : str
The convex relaxation method to use for AC power flow ('SOCP' or 'SDP'). Default is 'SOCP'.
solver : str
The CVXPY backend solver to use (e.g., 'SCS', 'MOSEK', 'CLARABEL'). Default is 'SCS'.
"""
active_profile: DataFrame = profile_df if profile_df is not None else grid.load_profile
if active_profile is None:
raise ValueError("❌ No load profile provided and no profile attached to Grid!")
T = len(active_profile)
num_bus = len(grid.buses)
edges = list(grid.edges(data=True))
num_branch = len(edges)
nodes_list = list(grid.nodes)
# ====================================================================
# 1. Classify Devices & Map to Buses
# ====================================================================
machines, inverters, batteries = [], [], []
machine_bus, inverter_bus, battery_bus = [], [], []
for i, bus in enumerate(grid.buses):
for comp in bus.components:
c_name = type(comp).__name__
if c_name == 'SynchronousMachine':
machines.append(comp)
machine_bus.append(i)
elif c_name == 'Inverter':
inverters.append(comp)
inverter_bus.append(i)
elif c_name == 'Battery':
batteries.append(comp)
battery_bus.append(i)
num_gen, num_inv, num_bat = len(machines), len(inverters), len(batteries)
# ====================================================================
# 2. Extract Load Profile & Solar Availability Parameters
# ====================================================================
P_load_pu = np.zeros((num_bus, T))
Q_load_pu = np.zeros((num_bus, T))
Q_shunt_pu = np.zeros((num_bus, T))
P_solar_avail_pu = np.zeros((num_inv, T)) if num_inv > 0 else None
for t in range(T):
grid.apply_profile(active_profile.iloc[t].to_dict())
for i, bus in enumerate(grid.buses):
for comp in bus.components:
c_name = type(comp).__name__
if c_name == 'Load':
P_load_pu[i, t] += abs(comp.P) / grid.Sbase
Q_load_pu[i, t] += abs(comp.Q) / grid.Sbase
elif c_name == 'AsynchronousMachine':
comp.update_pq_from_slip(1.0, grid.Sbase)
P_load_pu[i, t] += abs(comp.P) / grid.Sbase
Q_load_pu[i, t] += abs(comp.Q) / grid.Sbase
elif c_name == 'Shunt':
Q_shunt_pu[i, t] += comp.Q_nom / grid.Sbase
elif c_name == 'Inverter':
inv_idx = inverters.index(comp)
P_solar_avail_pu[inv_idx, t] = comp.P / grid.Sbase
grid.build_ybus()
G_bus = grid.Ybus.real
B_bus = grid.Ybus.imag
# ====================================================================
# 3. SHARED VARIABLES (Cross-temporal links for the optimization horizon)
# ====================================================================
Pg = cp.Variable((num_gen, T)) if num_gen > 0 else None
Qg = cp.Variable((num_gen, T)) if num_gen > 0 else None
P_inv = cp.Variable((num_inv, T)) if num_inv > 0 else None
Q_inv = cp.Variable((num_inv, T)) if num_inv > 0 else None
P_ch = cp.Variable((num_bat, T)) if num_bat > 0 else None
P_dis = cp.Variable((num_bat, T)) if num_bat > 0 else None
SoC = cp.Variable((num_bat, T)) if num_bat > 0 else None
constraints = []
slack_idx = next(idx for idx, b in enumerate(grid.buses) if b.type == 'Slack')
# ====================================================================
# 4. SWITCHABLE RELAXATION CORE (AC Power Flow Constraints)
# ====================================================================
if relax.upper() == 'SOCP':
# --- Second-Order Cone Programming (Branch Flow Model) ---
v = cp.Variable((num_bus, T)) # Squared voltage magnitude
P_ij = cp.Variable((num_branch, T)) # Active power flow
Q_ij = cp.Variable((num_branch, T)) # Reactive power flow
l_ij = cp.Variable((num_branch, T)) # Squared current magnitude
for t in range(T):
for i in range(num_bus):
constraints.append(v[i, t] == 1.0 if i == slack_idx else v[i, t] >= 0.85**2)
if i != slack_idx:
constraints.append(v[i, t] <= 1.15**2)
P_inj = {i: 0.0 for i in range(num_bus)}
Q_inj = {i: 0.0 for i in range(num_bus)}
for k, (u, target_v, data) in enumerate(edges):
i, j = nodes_list.index(u), nodes_list.index(target_v)
branch = data.get('obj')
R, X, Z2 = branch.R, branch.X, branch.R**2 + branch.X**2
tau = branch.tap_ratio if type(branch).__name__ == 'Transformer' else 1.0
v_i_eff = v[i, t] / (tau**2)
# Voltage Drop Equation
constraints.append(v[j, t] == v_i_eff - 2 * (R * P_ij[k, t] + X * Q_ij[k, t]) + Z2 * l_ij[k, t])
# Conic Relaxation for Apparent Power Limit
constraints.append(cp.SOC(v_i_eff + l_ij[k, t], cp.vstack([2 * P_ij[k, t], 2 * Q_ij[k, t], v_i_eff - l_ij[k, t]])))
# Thermal Line Limits
if getattr(branch, 'S_max', None):
constraints.append(cp.norm(cp.vstack([P_ij[k, t], Q_ij[k, t]])) <= branch.S_max / grid.Sbase)
# Branch Power Balance Accumulation
P_inj[i] += P_ij[k, t]
Q_inj[i] += Q_ij[k, t]
P_inj[j] -= (P_ij[k, t] - R * l_ij[k, t])
Q_inj[j] -= (Q_ij[k, t] - X * l_ij[k, t])
# Nodal Power Balance Constraints
for i in range(num_bus):
p_g = sum([Pg[k, t] for k, b_idx in enumerate(machine_bus) if b_idx == i]) if num_gen > 0 else 0.0
q_g = sum([Qg[k, t] for k, b_idx in enumerate(machine_bus) if b_idx == i]) if num_gen > 0 else 0.0
p_i = sum([P_inv[k, t] for k, b_idx in enumerate(inverter_bus) if b_idx == i]) if num_inv > 0 else 0.0
q_i = sum([Q_inv[k, t] for k, b_idx in enumerate(inverter_bus) if b_idx == i]) if num_inv > 0 else 0.0
p_d = sum([P_dis[k, t] for k, b_idx in enumerate(battery_bus) if b_idx == i]) if num_bat > 0 else 0.0
p_c = sum([P_ch[k, t] for k, b_idx in enumerate(battery_bus) if b_idx == i]) if num_bat > 0 else 0.0
constraints.append(p_g + p_i + p_d - p_c - P_load_pu[i, t] == P_inj[i])
constraints.append(q_g + q_i - Q_load_pu[i, t] + (Q_shunt_pu[i, t] * v[i, t]) == Q_inj[i])
elif relax.upper() == 'SDP':
# --- Semidefinite Programming (Bus Injection Model) ---
W = [cp.Variable((num_bus, num_bus), hermitian=True) for _ in range(T)]
for t in range(T):
constraints.append(W[t] >> 0) # W must be Positive Semidefinite
WR, WI = cp.real(W[t]), cp.imag(W[t])
for i in range(num_bus):
constraints.append(WR[i, i] == 1.0 if i == slack_idx else WR[i, i] >= 0.85**2)
if i != slack_idx:
constraints.append(WR[i, i] <= 1.15**2)
# Nodal Injection Equations derived from Y-bus
P_calc = G_bus[i, :] @ WR[i, :] + B_bus[i, :] @ WI[i, :]
Q_calc = G_bus[i, :] @ WI[i, :] - B_bus[i, :] @ WR[i, :]
p_g = sum([Pg[k, t] for k, b_idx in enumerate(machine_bus) if b_idx == i]) if num_gen > 0 else 0.0
q_g = sum([Qg[k, t] for k, b_idx in enumerate(machine_bus) if b_idx == i]) if num_gen > 0 else 0.0
p_i = sum([P_inv[k, t] for k, b_idx in enumerate(inverter_bus) if b_idx == i]) if num_inv > 0 else 0.0
q_i = sum([Q_inv[k, t] for k, b_idx in enumerate(inverter_bus) if b_idx == i]) if num_inv > 0 else 0.0
p_d = sum([P_dis[k, t] for k, b_idx in enumerate(battery_bus) if b_idx == i]) if num_bat > 0 else 0.0
p_c = sum([P_ch[k, t] for k, b_idx in enumerate(battery_bus) if b_idx == i]) if num_bat > 0 else 0.0
constraints.append(P_calc == p_g + p_i + p_d - p_c - P_load_pu[i, t])
constraints.append(Q_calc == q_g + q_i - Q_load_pu[i, t] + (Q_shunt_pu[i, t] * WR[i, i]))
# ====================================================================
# 5. COMMON DEVICE CONSTRAINTS (Time-Coupling & Limits)
# ====================================================================
for t in range(T):
# 5.1 Generator Limits (Active and Reactive Power Bounds)
if num_gen > 0:
for k, m in enumerate(machines):
constraints.extend([
Pg[k, t] >= (m.Pmin if m.Pmin != float('-inf') else 0.0) / grid.Sbase,
Pg[k, t] <= (m.Pmax if m.Pmax != float('inf') else 9999.0) / grid.Sbase,
Qg[k, t] >= (m.Qmin if m.Qmin != float('-inf') else -9999.0) / grid.Sbase,
Qg[k, t] <= (m.Qmax if m.Qmax != float('inf') else 9999.0) / grid.Sbase
])
# 5.2 Smart Inverter Limits (Active Curtailment & Apparent Power Cone)
if num_inv > 0:
for k, inv in enumerate(inverters):
constraints.extend([
P_inv[k, t] >= 0,
P_inv[k, t] <= P_solar_avail_pu[k, t], # Allow solar curtailment if needed
cp.norm(cp.vstack([P_inv[k, t], Q_inv[k, t]])) <= inv.S_max / grid.Sbase
])
# 5.3 Battery Limits & Time-Coupling Dynamics (State of Charge)
if num_bat > 0:
for k, bat in enumerate(batteries):
p_max_pu = bat.P_max / grid.Sbase
e_max_pu = bat.E_max / grid.Sbase
constraints.extend([
P_ch[k, t] >= 0, P_ch[k, t] <= p_max_pu,
P_dis[k, t] >= 0, P_dis[k, t] <= p_max_pu,
SoC[k, t] >= 0.1, SoC[k, t] <= 1.0 # Maintain at least 10% reserve capacity (DoD limit)
])
# SoC Inter-temporal constraint (Energy balance across time bridging t and t-1)
delta_soc = (P_ch[k, t] * bat.eta - P_dis[k, t] / bat.eta) / e_max_pu
if t == 0:
constraints.append(SoC[k, t] == bat.init_soc + delta_soc)
else:
constraints.append(SoC[k, t] == SoC[k, t-1] + delta_soc)
# ====================================================================
# 6. OBJECTIVE FUNCTION & SOLVE
# ====================================================================
cost = 0
for t in range(T):
# 6.1 Generator Operating Cost (Quadratic formulation)
if num_gen > 0:
for k, m in enumerate(machines):
Pg_mw = Pg[k, t] * grid.Sbase
cost += m.a + (m.b * Pg_mw) + (m.c * cp.square(Pg_mw))
# 6.2 Renewable Incentives & ESS Degradation
if num_inv > 0:
# Reward AI slightly for utilizing solar energy (Negative cost acts as incentive)
for k, inv in enumerate(inverters):
cost -= 0.01 * (P_inv[k, t] * grid.Sbase)
if num_bat > 0:
# Apply minor degradation cost to prevent unnecessary micro-charge/discharge cycles
for k, bat in enumerate(batteries):
cost += 0.1 * (P_ch[k, t] + P_dis[k, t]) * grid.Sbase
prob = cp.Problem(cp.Minimize(cost), constraints)
print(f"\n⏳ Solving 24-Hr Master Plan with MPOPF ({relax.upper()} Relaxation)...")
try:
prob.solve(solver=getattr(cp, solver.upper()), verbose=False)
except Exception as e:
print(f"❌ CVXPY Error: {e}")
# ====================================================================
# 7. PARSE AND STORE RESULTS
# ====================================================================
if prob.status in ["optimal", "optimal_inaccurate"]:
print(f"✅ MPOPF ({relax.upper()}) Converged! Master Plan 24h Total Cost: ${prob.value:.2f}")
if num_gen > 0:
for k, m in enumerate(machines):
m.P_series = np.array(Pg.value[k, :]) * grid.Sbase
m.Q_series = np.array(Qg.value[k, :]) * grid.Sbase
if num_inv > 0:
for k, inv in enumerate(inverters):
inv.P_series = np.array(P_inv.value[k, :]) * grid.Sbase
inv.Q_series = np.array(Q_inv.value[k, :]) * grid.Sbase
if num_bat > 0:
for k, bat in enumerate(batteries):
bat.P_ch_series = np.array(P_ch.value[k, :]) * grid.Sbase
bat.P_dis_series = np.array(P_dis.value[k, :]) * grid.Sbase
bat.SoC_series = np.array(SoC.value[k, :])
else:
raise Exception(f"❌ MPOPF Infeasible! AI could not find a safe 24h plan (Status: {prob.status})")
[docs]
def NR(grid: Grid, tol: float = 1e-6, max_iter: int = 100):
"""
Non-Linear AC Power Flow Solver using the Newton-Raphson (NR) Method.
Computes precise steady-state bus voltages (magnitude and phase angle), branch power flows,
and system losses. Includes dynamic PV-to-PQ bus switching to enforce generator reactive
power limits (Q-limits), and On-Load Tap Changer (OLTC) auto-adjustments.
Parameters:
-----------
grid : Grid
The power system grid object.
tol : float
Convergence tolerance for maximum power mismatch (p.u.). Default is 1e-6.
max_iter : int
Maximum number of iterations allowed before aborting. Default is 100.
"""
print("\n🚀 Starting Newton-Raphson Power Flow...")
Ybus = grid.build_ybus()
num_bus = len(grid.buses)
iteration = 0
converged = False
while iteration < max_iter:
# =======================================================
# 1. Update Voltage Dependence Dynamics
# =======================================================
for bus in grid.buses:
for comp in bus.components:
# Update induction motor slip based on current voltage estimation
if isinstance(comp, AsynchronousMachine):
grid.update_motor_slip(comp, bus.V)
comp.update_pq_from_slip(bus.V, grid.Sbase)
# Update capacitor bank injection (scales with V^2)
elif isinstance(comp, Shunt):
comp.update_voltage_dependence(bus.V)
# =======================================================
# 2. Classify Dynamic Bus Types (Slack, PV, PQ)
# =======================================================
slack, pv, pq = [], [], []
for i, bus in enumerate(grid.buses):
if bus.type == 'Slack':
slack.append(i)
elif bus.type == 'PV':
pv.append(i)
else:
pq.append(i)
non_slack = pv + pq
# =======================================================
# 3. Prepare State Variables (V, theta) and Specified P, Q
# =======================================================
V = np.array([b.V for b in grid.buses], dtype=float)
theta = np.array([b.theta for b in grid.buses], dtype=float)
P_spec = np.zeros(num_bus)
Q_spec = np.zeros(num_bus)
for i, bus in enumerate(grid.buses):
for comp in bus.components:
c_type = type(comp).__name__
if c_type == 'SynchronousMachine':
P_spec[i] += comp.P / grid.Sbase
Q_spec[i] += comp.Q / grid.Sbase
elif c_type == 'Load':
P_spec[i] -= abs(comp.P) / grid.Sbase
Q_spec[i] -= abs(comp.Q) / grid.Sbase
elif c_type in ['Inverter', 'Battery']:
P_spec[i] += comp.P / grid.Sbase
Q_spec[i] += comp.Q / grid.Sbase
elif c_type == 'AsynchronousMachine':
P_spec[i] -= abs(comp.P) / grid.Sbase
Q_spec[i] -= abs(comp.Q) / grid.Sbase
elif c_type == 'Shunt':
Q_spec[i] += comp.Q / grid.Sbase
# =======================================================
# 4. Calculate Computed P_calc, Q_calc from Y-bus
# =======================================================
Vc = V * np.exp(1j * theta)
I = Ybus @ Vc
S_calc = Vc * np.conj(I)
P_calc = S_calc.real
Q_calc = S_calc.imag
# =======================================================
# 5. Generator Q-Limit Checks (PV to PQ Bus Switching)
# =======================================================
limit_hit_this_iter = False
for i in pv:
bus = grid.buses[i]
Q_load_pu = sum([abs(c.Q) for c in bus.components if isinstance(c, (Load, AsynchronousMachine))]) / grid.Sbase
Q_gen_req_pu = Q_calc[i] + Q_load_pu
machines = [c for c in bus.components if isinstance(c, SynchronousMachine)]
if machines:
Qmax_total_pu = sum(getattr(m, 'Qmax', 9999.0) for m in machines) / grid.Sbase
Qmin_total_pu = sum(getattr(m, 'Qmin', -9999.0) for m in machines) / grid.Sbase
if Q_gen_req_pu > Qmax_total_pu:
print(f"⚠️ Iteration {iteration}: Bus {bus.name} exceeds Qmax ({Q_gen_req_pu*grid.Sbase:.2f} > {Qmax_total_pu*grid.Sbase:.2f}) -> Converted to PQ Bus!")
bus.type = 'PQ'
for m in machines:
m.Q = getattr(m, 'Qmax', 9999.0)
limit_hit_this_iter = True
elif Q_gen_req_pu < Qmin_total_pu:
print(f"⚠️ Iteration {iteration}: Bus {bus.name} exceeds Qmin ({Q_gen_req_pu*grid.Sbase:.2f} < {Qmin_total_pu*grid.Sbase:.2f}) -> Converted to PQ Bus!")
bus.type = 'PQ'
for m in machines:
m.Q = getattr(m, 'Qmin', -9999.0)
limit_hit_this_iter = True
# If limits were hit, re-evaluate mismatches in the next iteration before calculating Jacobian
if limit_hit_this_iter:
iteration += 1
continue
# =======================================================
# 6. Compute Mismatches and Check Convergence
# =======================================================
dP = P_spec - P_calc
dQ = Q_spec - Q_calc
dP_ns = dP[non_slack]
dQ_pq = dQ[pq]
mismatch = np.concatenate((dP_ns, dQ_pq))
max_error = np.max(np.abs(mismatch)) if len(mismatch) > 0 else 0
if max_error < tol:
converged = True
break # Escaped NR Loop successfully!
# =======================================================
# 7. Construct and Solve Jacobian Matrix [J]
# =======================================================
diagVc = np.diag(Vc)
diagI_conj = np.diag(np.conj(I))
diagVc_conj = np.diag(np.conj(Vc))
diagV_phase = np.diag(Vc / V)
diagV_phase_conj = np.diag(np.conj(Vc) / V)
dS_dTheta = 1j * diagVc @ diagI_conj - 1j * diagVc @ np.conj(Ybus) @ diagVc_conj
dS_dV = diagV_phase @ diagI_conj + diagVc @ np.conj(Ybus) @ diagV_phase_conj
H = dS_dTheta.real
N = dS_dV.real
M = dS_dTheta.imag
L = dS_dV.imag
J11 = H[np.ix_(non_slack, non_slack)]
J12 = N[np.ix_(non_slack, pq)]
J21 = M[np.ix_(pq, non_slack)]
J22 = L[np.ix_(pq, pq)]
J_top = np.hstack((J11, J12)) if J12.size else J11
J_bottom = np.hstack((J21, J22)) if J12.size else J21
J = np.vstack((J_top, J_bottom))
try:
dx = np.linalg.solve(J, mismatch)
except np.linalg.LinAlgError:
print("❌ Singular Jacobian Matrix! The grid might be physically collapsed (Voltage Collapse).")
break
dTheta = dx[:len(non_slack)]
dV = dx[len(non_slack):]
for idx, i in enumerate(non_slack):
grid.buses[i].theta += dTheta[idx]
for idx, i in enumerate(pq):
grid.buses[i].V += dV[idx]
# =======================================================
# 8. On-Load Tap Changer (OLTC) Auto-Tap Updates
# =======================================================
tap_changed = False
for u, v, data in grid.edges(data=True):
branch = data.get('obj')
if isinstance(branch, Transformer) and getattr(branch, 'auto_tap', False) and getattr(branch, 'controlled_bus', None):
target_bus = grid.bus(branch.controlled_bus)
deadband = 0.015 # Voltage tolerance band before stepping tap
if target_bus.V < branch.target_V - deadband and branch.tap_ratio > branch.tap_min:
branch.tap_ratio -= branch.tap_step
branch.tap_ratio = max(branch.tap_ratio, branch.tap_min)
tap_changed = True
print(f"🔄 Iteration {iteration}: {branch.name} Step DOWN Tap -> {branch.tap_ratio:.4f}")
elif target_bus.V > branch.target_V + deadband and branch.tap_ratio < branch.tap_max:
branch.tap_ratio += branch.tap_step
branch.tap_ratio = min(branch.tap_ratio, branch.tap_max)
tap_changed = True
print(f"🔄 Iteration {iteration}: {branch.name} Step UP Tap -> {branch.tap_ratio:.4f}")
# If tap changes, physical Y-bus matrix is altered and must be rebuilt
if tap_changed:
Ybus = grid.build_ybus()
iteration += 1
# =======================================================
# 9. Post-Processing (Calculate Slack & PV actual generation)
# =======================================================
V_final = np.array([b.V for b in grid.buses])
theta_final = np.array([b.theta for b in grid.buses])
Vc_final = V_final * np.exp(1j * theta_final)
S_final = Vc_final * np.conj(Ybus @ Vc_final)
for i, bus in enumerate(grid.buses):
P_load_total = sum([abs(c.P) / grid.Sbase for c in bus.components if type(c).__name__ in ['Load', 'AsynchronousMachine']])
Q_load_total = sum([abs(c.Q) / grid.Sbase for c in bus.components if type(c).__name__ in ['Load', 'AsynchronousMachine']])
P_inv_total = sum([c.P / grid.Sbase for c in bus.components if type(c).__name__ in ['Inverter', 'Battery']])
Q_inv_total = sum([c.Q / grid.Sbase for c in bus.components if type(c).__name__ in ['Inverter', 'Battery']])
Q_shunt_total = sum([c.Q / grid.Sbase for c in bus.components if type(c).__name__ == 'Shunt'])
if bus.type == 'Slack':
P_slack_gen = S_final.real[i] + P_load_total - P_inv_total
Q_slack_gen = S_final.imag[i] + Q_load_total - Q_inv_total - Q_shunt_total
for comp in bus.components:
if type(comp).__name__ == 'SynchronousMachine':
comp.P = float(P_slack_gen * grid.Sbase)
comp.Q = float(Q_slack_gen * grid.Sbase)
elif bus.type == 'PV':
Q_pv_gen = S_final.imag[i] + Q_load_total - Q_inv_total - Q_shunt_total
for comp in bus.components:
if type(comp).__name__ == 'SynchronousMachine':
comp.Q = float(Q_pv_gen * grid.Sbase)
if converged:
print(f"✅ Newton-Raphson Converged seamlessly in {iteration+1} iterations!")
else:
print(f"⚠️ Newton-Raphson Failed after {max_iter} iterations (Max Error: {max_error:.6f})")
raise Exception(f"Voltage Collapse! (NR Max Error: {max_error:.2f})")
[docs]
def plan(grid: Grid, profile_df: DataFrame = None, path: str = "settings.json", relax: str = 'SOCP', solver: str = 'SCS', tol: float = 1e-6, max_iter: int = 100):
"""
Executes a complete Time-Series Simulation and Master Plan.
This function wraps the MPOPF (to find the optimal economic dispatch schedules) and the
Newton-Raphson solver (to validate the exact AC physics at each time step). The final results,
including voltages, angles, and line flows, are exported to a JSON file.
Parameters:
-----------
grid : Grid
The power system grid object.
profile_df : DataFrame, optional
Time-series data for the planning horizon.
path : str
File path to export the resulting settings and states. Default is "settings.json".
relax : str
Relaxation method for the MPOPF ('SOCP' or 'SDP').
solver : str
Optimization backend solver.
tol : float
Tolerance for the NR solver.
max_iter : int
Maximum iterations for the NR solver.
Returns:
--------
str
The JSON string containing the comprehensive simulation history.
"""
active_profile = profile_df if profile_df is not None else grid.load_profile
T = len(active_profile)
print(f"\n🔮 Starting Modern Grid Simulation (MPOPF) for {T} steps...")
# 1. Generate optimal generation schedules using OPF
MPOPF(grid, profile_df=active_profile, relax=relax, solver=solver)
history = []
# 2. Replay the schedules through the precise Non-Linear NR engine to get exact voltages
for t in range(T):
print(f"\n{'='*15} 🕒 Step {t} {'='*15}")
grid.apply_profile(active_profile.iloc[t].to_dict())
# Apply OPF schedules to hardware components
for bus in grid.buses:
for comp in bus.components:
c_name = type(comp).__name__
if c_name == 'SynchronousMachine':
comp.P = float(comp.P_series[t])
comp.Q = float(comp.Q_series[t])
elif c_name == 'Inverter':
comp.P = float(comp.P_series[t])
comp.Q = float(comp.Q_series[t])
elif c_name == 'Battery':
comp.P = float(comp.P_dis_series[t] - comp.P_ch_series[t]) # Net active power injection
comp.SoC = float(comp.SoC_series[t])
# Validate exact AC Physics for the step
NR(grid, max_iter=max_iter, tol=tol)
total_cost = sum([bus.total_cost() for bus in grid.buses])
# Compile Step Dashboard
step_result = {
'step': t,
'system': {'total_cost_per_hr': round(total_cost, 2)},
'buses': {}, 'components': {}, 'branches': {}
}
for bus in grid.buses:
step_result['buses'][bus.name] = {
'V_pu': round(bus.V, 4), 'theta_rad': round(bus.theta, 4),
'P_total_MW': round(bus.P * grid.Sbase, 2), 'Q_total_MVAr': round(bus.Q * grid.Sbase, 2)
}
for comp in bus.components:
comp_data = {
'bus': bus.name, 'type': type(comp).__name__,
'P_MW': round(comp.P * grid.Sbase, 2) if hasattr(comp, 'P') else 0.0,
'Q_MVAr': round(comp.Q * grid.Sbase, 2) if hasattr(comp, 'Q') else 0.0
}
if type(comp).__name__ == 'Battery':
comp_data['SoC'] = round(comp.SoC, 4)
step_result['components'][comp.name] = comp_data
for u, v, data in grid.edges(data=True):
branch = data.get('obj')
if branch:
from_bus, to_bus = grid.bus(u), grid.bus(v)
V_i = from_bus.V * np.exp(1j * from_bus.theta)
V_j = to_bus.V * np.exp(1j * to_bus.theta)
# Flow calculation depending on Tap Ratio (Transformer) or strict impedance (Line)
if type(branch).__name__ == 'Transformer':
t_tap = branch.tap_ratio * np.exp(1j * branch.phase_shift)
I_ij = (V_i / t_tap - V_j) * branch.Y
else:
I_ij = (V_i - V_j) * branch.Y
S_flow_MVA = abs(V_i * np.conj(I_ij)) * grid.Sbase
branch_data = {
'from_bus': u, 'to_bus': v, 'type': type(branch).__name__,
'flow_MVA': round(S_flow_MVA, 2)
}
if getattr(branch, 'S_max', None):
loading_pct = (S_flow_MVA / branch.S_max) * 100
branch_data['loading_percent'] = round(loading_pct, 2)
branch_data['is_overload'] = loading_pct > 100
step_result['branches'][branch.name] = branch_data
history.append(step_result)
grid.loading_status()
# Export cleanly formatted JSON Data
result = json.dumps(history, indent=2, cls=NumpyEncoder)
with open(path, "w") as f:
f.write(result)
print("\n✅ MPOPF Time-Series Simulation Completed!")
return result
[docs]
def CEP(grid: Grid, profile_df: DataFrame = None, relax: str = 'SOCP', solver: str = 'SCS'):
"""
Capacity Expansion Planning (CEP) Engine.
Co-optimizes long-term investment sizing (Capital Expenditure - CapEx) against short-term
operational schedules (Operational Expenditure - OpEx). Determines the absolute most cost-effective
capacity (MW / MWh) of candidate Renewable Energy and BESS assets required to satisfy load profiles
while respecting all AC grid physics constraints.
Parameters:
-----------
grid : Grid
The power system grid object containing designated 'candidate' components.
profile_df : DataFrame, optional
Time-series load profile.
relax : str
Relaxation formulation ('SOCP' or 'SDP').
solver : str
Optimization solver.
"""
print("\n🏗️ Starting Capacity Expansion Planning (CEP)...")
active_profile: DataFrame = profile_df if profile_df is not None else grid.load_profile
if active_profile is None:
raise ValueError("❌ No load profile provided and no profile attached to Grid!")
T = len(active_profile)
num_bus = len(grid.buses)
edges = list(grid.edges(data=True))
num_branch = len(edges)
nodes_list = list(grid.nodes)
machines, inverters, batteries = [], [], []
machine_bus, inverter_bus, battery_bus = [], [], []
for i, bus in enumerate(grid.buses):
for comp in bus.components:
c_name = type(comp).__name__
if c_name == 'SynchronousMachine':
machines.append(comp); machine_bus.append(i)
elif c_name == 'Inverter':
inverters.append(comp); inverter_bus.append(i)
elif c_name == 'Battery':
batteries.append(comp); battery_bus.append(i)
num_gen, num_inv, num_bat = len(machines), len(inverters), len(batteries)
P_load_pu = np.zeros((num_bus, T))
Q_load_pu = np.zeros((num_bus, T))
Q_shunt_pu = np.zeros((num_bus, T))
P_solar_avail_pu = np.zeros((num_inv, T)) if num_inv > 0 else None
for t in range(T):
grid.apply_profile(active_profile.iloc[t].to_dict())
for i, bus in enumerate(grid.buses):
for comp in bus.components:
c_name = type(comp).__name__
if c_name == 'Load':
P_load_pu[i, t] += abs(comp.P) / grid.Sbase
Q_load_pu[i, t] += abs(comp.Q) / grid.Sbase
elif c_name == 'AsynchronousMachine':
comp.update_pq_from_slip(1.0, grid.Sbase)
P_load_pu[i, t] += abs(comp.P) / grid.Sbase
Q_load_pu[i, t] += abs(comp.Q) / grid.Sbase
elif c_name == 'Shunt':
Q_shunt_pu[i, t] += comp.Q_nom / grid.Sbase
elif c_name == 'Inverter':
inv_idx = inverters.index(comp)
# For candidate inverters, extract the raw multiplier directly from the profile
if getattr(comp, 'is_candidate', False):
multiplier = active_profile.iloc[t].to_dict().get(comp.name, 0.0)
P_solar_avail_pu[inv_idx, t] = multiplier
else:
P_solar_avail_pu[inv_idx, t] = comp.P / grid.Sbase
grid.build_ybus()
G_bus = grid.Ybus.real
B_bus = grid.Ybus.imag
Pg = cp.Variable((num_gen, T)) if num_gen > 0 else None
Qg = cp.Variable((num_gen, T)) if num_gen > 0 else None
P_inv = cp.Variable((num_inv, T)) if num_inv > 0 else None
Q_inv = cp.Variable((num_inv, T)) if num_inv > 0 else None
P_ch = cp.Variable((num_bat, T)) if num_bat > 0 else None
P_dis = cp.Variable((num_bat, T)) if num_bat > 0 else None
# Use E_stored (MWh equivalent) instead of SoC to avoid nonlinear division in optimization constraints
E_stored = cp.Variable((num_bat, T)) if num_bat > 0 else None
constraints = []
slack_idx = next(idx for idx, b in enumerate(grid.buses) if b.type == 'Slack')
# Apply Standard Power Flow Relaxations (Similar to MPOPF)
if relax.upper() == 'SOCP':
v = cp.Variable((num_bus, T))
P_ij = cp.Variable((num_branch, T))
Q_ij = cp.Variable((num_branch, T))
l_ij = cp.Variable((num_branch, T))
for t in range(T):
for i in range(num_bus):
constraints.append(v[i, t] == 1.0 if i == slack_idx else v[i, t] >= 0.85**2)
if i != slack_idx: constraints.append(v[i, t] <= 1.15**2)
P_inj = {i: 0.0 for i in range(num_bus)}
Q_inj = {i: 0.0 for i in range(num_bus)}
for k, (u, target_v, data) in enumerate(edges):
i, j = nodes_list.index(u), nodes_list.index(target_v)
branch = data.get('obj')
R, X, Z2 = branch.R, branch.X, branch.R**2 + branch.X**2
tau = branch.tap_ratio if type(branch).__name__ == 'Transformer' else 1.0
v_i_eff = v[i, t] / (tau**2)
constraints.append(v[j, t] == v_i_eff - 2 * (R * P_ij[k, t] + X * Q_ij[k, t]) + Z2 * l_ij[k, t])
constraints.append(cp.SOC(v_i_eff + l_ij[k, t], cp.vstack([2 * P_ij[k, t], 2 * Q_ij[k, t], v_i_eff - l_ij[k, t]])))
if getattr(branch, 'S_max', None):
constraints.append(cp.norm(cp.vstack([P_ij[k, t], Q_ij[k, t]])) <= branch.S_max / grid.Sbase)
P_inj[i] += P_ij[k, t]; Q_inj[i] += Q_ij[k, t]
P_inj[j] -= (P_ij[k, t] - R * l_ij[k, t]); Q_inj[j] -= (Q_ij[k, t] - X * l_ij[k, t])
for i in range(num_bus):
p_g = sum([Pg[k, t] for k, b_idx in enumerate(machine_bus) if b_idx == i]) if num_gen > 0 else 0.0
q_g = sum([Qg[k, t] for k, b_idx in enumerate(machine_bus) if b_idx == i]) if num_gen > 0 else 0.0
p_i = sum([P_inv[k, t] for k, b_idx in enumerate(inverter_bus) if b_idx == i]) if num_inv > 0 else 0.0
q_i = sum([Q_inv[k, t] for k, b_idx in enumerate(inverter_bus) if b_idx == i]) if num_inv > 0 else 0.0
p_d = sum([P_dis[k, t] for k, b_idx in enumerate(battery_bus) if b_idx == i]) if num_bat > 0 else 0.0
p_c = sum([P_ch[k, t] for k, b_idx in enumerate(battery_bus) if b_idx == i]) if num_bat > 0 else 0.0
constraints.append(p_g + p_i + p_d - p_c - P_load_pu[i, t] == P_inj[i])
constraints.append(q_g + q_i - Q_load_pu[i, t] + (Q_shunt_pu[i, t] * v[i, t]) == Q_inj[i])
elif relax.upper() == 'SDP':
W = [cp.Variable((num_bus, num_bus), hermitian=True) for _ in range(T)]
for t in range(T):
constraints.append(W[t] >> 0)
WR, WI = cp.real(W[t]), cp.imag(W[t])
for i in range(num_bus):
constraints.append(WR[i, i] == 1.0 if i == slack_idx else WR[i, i] >= 0.85**2)
if i != slack_idx: constraints.append(WR[i, i] <= 1.15**2)
P_calc = G_bus[i, :] @ WR[i, :] + B_bus[i, :] @ WI[i, :]
Q_calc = G_bus[i, :] @ WI[i, :] - B_bus[i, :] @ WR[i, :]
p_g = sum([Pg[k, t] for k, b_idx in enumerate(machine_bus) if b_idx == i]) if num_gen > 0 else 0.0
q_g = sum([Qg[k, t] for k, b_idx in enumerate(machine_bus) if b_idx == i]) if num_gen > 0 else 0.0
p_i = sum([P_inv[k, t] for k, b_idx in enumerate(inverter_bus) if b_idx == i]) if num_inv > 0 else 0.0
q_i = sum([Q_inv[k, t] for k, b_idx in enumerate(inverter_bus) if b_idx == i]) if num_inv > 0 else 0.0
p_d = sum([P_dis[k, t] for k, b_idx in enumerate(battery_bus) if b_idx == i]) if num_bat > 0 else 0.0
p_c = sum([P_ch[k, t] for k, b_idx in enumerate(battery_bus) if b_idx == i]) if num_bat > 0 else 0.0
constraints.append(P_calc == p_g + p_i + p_d - p_c - P_load_pu[i, t])
constraints.append(Q_calc == q_g + q_i - Q_load_pu[i, t] + (Q_shunt_pu[i, t] * WR[i, i]))
# ====================================================================
# 5. CEP SPECIFIC: INVESTMENT VARIABLES & CONSTRAINTS (CAPEX formulation)
# ====================================================================
total_capex = 0.0
bat_p_max_capacity = []
bat_e_max_capacity = []
inv_s_max_capacity = []
# --- Consider Solar Investments ---
if num_inv > 0:
for k, inv in enumerate(inverters):
if getattr(inv, 'is_candidate', False):
built_s_mw = cp.Variable(nonneg=True) # AI decides how many MW to build
constraints.append(built_s_mw <= inv.max_build_mw)
inv_s_max_capacity.append(built_s_mw / grid.Sbase)
total_capex += (built_s_mw * inv.capex_per_mw) * getattr(inv, 'daily_capex_factor', 1.0)
else:
inv_s_max_capacity.append(inv.S_max / grid.Sbase)
# --- Consider Battery Investments ---
if num_bat > 0:
for k, bat in enumerate(batteries):
if getattr(bat, 'is_candidate', False):
built_p_mw = cp.Variable(nonneg=True) # Power Capacity Investment
built_e_mwh = cp.Variable(nonneg=True) # Energy Storage Investment
constraints.extend([
built_p_mw <= bat.max_build_mw,
built_e_mwh <= bat.max_build_mwh
])
bat_p_max_capacity.append(built_p_mw / grid.Sbase)
bat_e_max_capacity.append(built_e_mwh / grid.Sbase)
total_capex += (built_p_mw * bat.capex_per_mw + built_e_mwh * bat.capex_per_mwh) * getattr(bat, 'daily_capex_factor', 1.0)
else:
bat_p_max_capacity.append(bat.P_max / grid.Sbase)
bat_e_max_capacity.append(bat.E_max / grid.Sbase)
# ====================================================================
# 6. TIME-COUPLING WITH DYNAMIC CAPACITY
# ====================================================================
for t in range(T):
if num_gen > 0:
for k, m in enumerate(machines):
constraints.extend([
Pg[k, t] >= (m.Pmin if m.Pmin != float('-inf') else 0.0) / grid.Sbase,
Pg[k, t] <= (m.Pmax if m.Pmax != float('inf') else 9999.0) / grid.Sbase,
Qg[k, t] >= (m.Qmin if m.Qmin != float('-inf') else -9999.0) / grid.Sbase,
Qg[k, t] <= (m.Qmax if m.Qmax != float('inf') else 9999.0) / grid.Sbase
])
if num_inv > 0:
for k, inv in enumerate(inverters):
# Available generation bounds scale linearly with built capacity
if getattr(inv, 'is_candidate', False):
p_avail = P_solar_avail_pu[k, t] * inv_s_max_capacity[k]
else:
p_avail = P_solar_avail_pu[k, t]
constraints.extend([
P_inv[k, t] >= 0,
P_inv[k, t] <= p_avail,
cp.norm(cp.vstack([P_inv[k, t], Q_inv[k, t]])) <= inv_s_max_capacity[k]
])
if num_bat > 0:
for k, bat in enumerate(batteries):
p_max_pu = bat_p_max_capacity[k]
e_max_pu = bat_e_max_capacity[k]
constraints.extend([
P_ch[k, t] >= 0, P_ch[k, t] <= p_max_pu,
P_dis[k, t] >= 0, P_dis[k, t] <= p_max_pu,
E_stored[k, t] >= 0.1 * e_max_pu,
E_stored[k, t] <= 1.0 * e_max_pu
])
delta_e = (P_ch[k, t] * bat.eta - P_dis[k, t] / bat.eta)
if t == 0:
init_e = bat.init_soc * e_max_pu
constraints.append(E_stored[k, t] == init_e + delta_e)
else:
constraints.append(E_stored[k, t] == E_stored[k, t-1] + delta_e)
# ====================================================================
# 7. OBJECTIVE: MINIMIZE (CAPEX + OPEX)
# ====================================================================
total_opex = 0
for t in range(T):
if num_gen > 0:
for k, m in enumerate(machines):
Pg_mw = Pg[k, t] * grid.Sbase
total_opex += m.a + (m.b * Pg_mw) + (m.c * cp.square(Pg_mw))
total_cost = total_capex + total_opex
prob = cp.Problem(cp.Minimize(total_cost), constraints)
print(f"⏳ Optimizing Sizing and Operation ({relax.upper()})...")
try:
prob.solve(solver=getattr(cp, solver.upper()), verbose=False)
except Exception as e:
print(f"❌ CVXPY Error: {e}")
# ====================================================================
# 8. POST-PROCESS AND ASSIGN BUILT CAPACITIES
# ====================================================================
if prob.status in ["optimal", "optimal_inaccurate"]:
print(f"✅ CEP Converged! Total Cost (CapEx + OpEx): ${prob.value:.2f}")
if num_inv > 0:
for k, inv in enumerate(inverters):
if getattr(inv, 'is_candidate', False):
inv.built_S_max = float(inv_s_max_capacity[k].value * grid.Sbase)
inv.S_max = inv.built_S_max
print(f" ☀️ Investment Decision -> {inv.name}: S_max = {inv.built_S_max:.2f} MW")
if num_bat > 0:
for k, bat in enumerate(batteries):
if getattr(bat, 'is_candidate', False):
bat.built_P_max = float(bat_p_max_capacity[k].value * grid.Sbase)
bat.built_E_max = float(bat_e_max_capacity[k].value * grid.Sbase)
bat.P_max = bat.built_P_max
bat.E_max = bat.built_E_max
print(f" 🔋 Investment Decision -> {bat.name}: P_max = {bat.built_P_max:.2f} MW, E_max = {bat.built_E_max:.2f} MWh")
else:
raise Exception(f"❌ CEP Infeasible! Status: {prob.status}")
[docs]
def N1_Screening(grid, peak_hour: int = None, relax: str = 'SOCP', solver: str = 'SCS'):
"""
N-1 Contingency Screening utilizing Optimal Power Flow.
Systematically disconnects one transmission branch at a time to test grid resilience
during peak loading. Identifies weak physical links that cause system collapse or unsolvable
overloads.
Parameters:
-----------
grid : Grid
The power system grid object.
peak_hour : int, optional
Specific hour index to test. If None, auto-detects the hour with maximum active power load.
relax : str
Relaxation method ('SOCP' or 'SDP').
solver : str
Solver backend (e.g., 'SCS').
"""
if peak_hour is None:
peak_hour = grid.get_peak_load_hour()
print(f"\n🌪️ Starting N-1 Contingency Screening (Snapshot: Hour {peak_hour})...")
single_hour_df = grid.load_profile.iloc[[peak_hour]].reset_index(drop=True)
# ========================================================
# 1. Verify standard system health (N-0) before any outages
# ========================================================
print("🏥 Testing Base Case (N-0) health...")
try:
MPOPF(grid, profile_df=single_hour_df, relax=relax, solver=solver)
print("✅ Base Case is healthy! Proceeding to cut lines...\n")
except Exception as e:
print(f"\n💀 FATAL: The system is already collapsing BEFORE cutting any lines!")
print(f"🚨 Actual Error: {e}")
print("💡 Hint: Verify Battery E_max is non-zero, or check for undervoltage limits below 0.85 p.u.")
return
# ========================================================
# 2. Systematically disconnect lines to test N-1 scenarios
# ========================================================
original_edges = list(grid.edges(data=True))
num_branches = len(original_edges)
print(f"🔍 Testing {num_branches} branch contingencies...")
report = {}
for u, v, data in original_edges:
branch = data['obj']
branch_name = branch.name
grid.remove_edge(u, v)
try:
MPOPF(grid, single_hour_df, relax=relax, solver=solver)
report[branch_name] = "✅ Survived"
except Exception as e:
report[branch_name] = f"❌ CRITICAL: {str(e)}"
finally:
grid.add_edge(u, v, **data) # Always restore the branch before next iteration
print("\n========================================")
print("📊 N-1 Contingency Report:")
print("========================================")
for name, status in report.items():
print(f"Outage of {name:<8} -> {status}")
[docs]
def SCOPF(grid, peak_hour: int = None, relax: str = 'SOCP', solver: str = 'SCS'):
"""
Security-Constrained Optimal Power Flow (SCOPF) with ESS Rescue.
Formulates a massive "Multiverse" optimization problem that simultaneously calculates
the base case (N-0) and all single-branch outages (N-1). Ensures that synchronous
generators have a single, fixed setpoint across all universes (preventive), while allowing
fast-acting Inverters and Batteries to react dynamically (corrective) to survive outages.
Parameters:
-----------
grid : Grid
The power system grid object.
peak_hour : int, optional
Hour index representing peak loading. Auto-detected if None.
relax : str
Relaxation formulation ('SOCP' or 'SDP').
solver : str
Solver backend (e.g., 'CLARABEL', 'SCS').
Returns:
--------
dict
The 'Rescue Plan' mapping each contingency to optimal Battery/Inverter setpoints.
"""
if peak_hour is None:
peak_hour = grid.get_peak_load_hour()
print(f"\n🛡️ Starting Preventive SCOPF (Multiverse: Hour {peak_hour}) with ESS Rescue...")
# 1. Inject 1-hour peak profile into the grid snapshot
single_hour_df = grid.load_profile.iloc[[peak_hour]].reset_index(drop=True)
original_profile = grid.load_profile
grid.attach_profile(single_hour_df)
grid.apply_profile(0)
edges = list(grid.edges(data=True))
nodes_list = list(grid.nodes)
num_bus = len(grid.buses)
num_branch = len(edges)
K = num_branch + 1 # Total universes: Base Case (k=0) + N-1 Outages (k=1 to K-1)
# 2. Group all active power devices
machines, inverters, batteries = [], [], []
machine_bus, inverter_bus, battery_bus = [], [], []
for i, bus in enumerate(grid.buses):
for comp in bus.components:
c_type = type(comp).__name__
if c_type == 'SynchronousMachine': machines.append(comp); machine_bus.append(i)
elif c_type == 'Inverter': inverters.append(comp); inverter_bus.append(i)
elif c_type == 'Battery': batteries.append(comp); battery_bus.append(i)
num_gen, num_inv, num_bat = len(machines), len(inverters), len(batteries)
slack_idx = next(idx for idx, b in enumerate(grid.buses) if b.type == 'Slack')
# ====================================================================
# 3. MULTIVERSE VARIABLES (Simulating K networks in parallel)
# ====================================================================
Pg = cp.Variable((num_gen, K)) if num_gen > 0 else None
Qg = cp.Variable((num_gen, K)) if num_gen > 0 else None
P_inv = cp.Variable((num_inv, K)) if num_inv > 0 else None
Q_inv = cp.Variable((num_inv, K)) if num_inv > 0 else None
P_ch = cp.Variable((num_bat, K)) if num_bat > 0 else None
P_dis = cp.Variable((num_bat, K)) if num_bat > 0 else None
v = cp.Variable((num_bus, K))
P_ij = cp.Variable((num_branch, K))
Q_ij = cp.Variable((num_branch, K))
l_ij = cp.Variable((num_branch, K))
constraints = []
for k in range(K):
# 3.1 Voltage Limits per universe
for i in range(num_bus):
constraints.append(v[i, k] == 1.0 if i == slack_idx else v[i, k] >= 0.85**2)
if i != slack_idx: constraints.append(v[i, k] <= 1.15**2)
P_inj = {i: 0.0 for i in range(num_bus)}; Q_inj = {i: 0.0 for i in range(num_bus)}
# 3.2 Branch Flow Formulation (SOCP)
for b_idx, (u, target_v, data) in enumerate(edges):
# If we are in universe k > 0, we slice the branch corresponding to (k-1)
if k > 0 and b_idx == (k - 1):
constraints.extend([P_ij[b_idx, k] == 0, Q_ij[b_idx, k] == 0, l_ij[b_idx, k] == 0])
continue
i, j = nodes_list.index(u), nodes_list.index(target_v)
branch = data.get('obj')
R, X, Z2 = branch.R, branch.X, branch.R**2 + branch.X**2
tau = branch.tap_ratio if type(branch).__name__ == 'Transformer' else 1.0
v_i_eff = v[i, k] / (tau**2)
constraints.append(v[j, k] == v_i_eff - 2*(R*P_ij[b_idx, k] + X*Q_ij[b_idx, k]) + Z2*l_ij[b_idx, k])
constraints.append(cp.SOC(v_i_eff + l_ij[b_idx, k], cp.vstack([2*P_ij[b_idx, k], 2*Q_ij[b_idx, k], v_i_eff - l_ij[b_idx, k]])))
if getattr(branch, 'S_max', None):
constraints.append(cp.norm(cp.vstack([P_ij[b_idx, k], Q_ij[b_idx, k]])) <= branch.S_max / grid.Sbase)
P_inj[i] += P_ij[b_idx, k]; Q_inj[i] += Q_ij[b_idx, k]
P_inj[j] -= (P_ij[b_idx, k] - R*l_ij[b_idx, k]); Q_inj[j] -= (Q_ij[b_idx, k] - X*l_ij[b_idx, k])
# 3.3 Nodal Power Balance per universe
for i in range(num_bus):
bus = grid.buses[i]
P_L = sum([abs(c.P)/grid.Sbase for c in bus.components if isinstance(c, Load)])
Q_L = sum([abs(c.Q)/grid.Sbase for c in bus.components if isinstance(c, Load)])
Q_sh = sum([c.Q_nom/grid.Sbase for c in bus.components if isinstance(c, Shunt)])
p_g = sum([Pg[m, k] for m, b_idx in enumerate(machine_bus) if b_idx == i]) if num_gen > 0 else 0.0
q_g = sum([Qg[m, k] for m, b_idx in enumerate(machine_bus) if b_idx == i]) if num_gen > 0 else 0.0
p_i = sum([P_inv[m, k] for m, b_idx in enumerate(inverter_bus) if b_idx == i]) if num_inv > 0 else 0.0
q_i = sum([Q_inv[m, k] for m, b_idx in enumerate(inverter_bus) if b_idx == i]) if num_inv > 0 else 0.0
p_d = sum([P_dis[m, k] for m, b_idx in enumerate(battery_bus) if b_idx == i]) if num_bat > 0 else 0.0
p_c = sum([P_ch[m, k] for m, b_idx in enumerate(battery_bus) if b_idx == i]) if num_bat > 0 else 0.0
constraints.append(p_g + p_i + p_d - p_c - P_L == P_inj[i])
constraints.append(q_g + q_i - Q_L + (Q_sh * v[i, k]) == Q_inj[i])
# 3.4 Synchronous Generators (Locked to Base Case)
# Generators cannot react instantly to line drops; their P setpoint must remain identical across all scenarios.
if num_gen > 0:
for m, gen in enumerate(machines):
constraints.extend([
Pg[m, k] >= (gen.Pmin / grid.Sbase), Pg[m, k] <= (gen.Pmax / grid.Sbase),
Qg[m, k] >= (gen.Qmin / grid.Sbase), Qg[m, k] <= (gen.Qmax / grid.Sbase)
])
if k > 0 and machine_bus[m] != slack_idx:
constraints.append(Pg[m, k] == Pg[m, 0]) # 🔗 Force lock to Base Case
# 3.5 Solar Inverters (Flexible Real-Time Curtailment)
if num_inv > 0:
for m, inv in enumerate(inverters):
solar_avail = single_hour_df.iloc[0].to_dict().get(inv.name, inv.P / grid.Sbase)
constraints.extend([
P_inv[m, k] >= 0, P_inv[m, k] <= solar_avail,
cp.norm(cp.vstack([P_inv[m, k], Q_inv[m, k]])) <= inv.S_max / grid.Sbase
])
# 3.6 Batteries (Corrective Rescue Action)
# Fast-acting ESS can instantly alter dispatch to rescue the grid during N-1 contingencies
if num_bat > 0:
for m, bat in enumerate(batteries):
p_max_pu = bat.P_max / grid.Sbase
if p_max_pu > 0:
constraints.extend([
P_ch[m, k] >= 0, P_ch[m, k] <= p_max_pu,
P_dis[m, k] >= 0, P_dis[m, k] <= p_max_pu
])
else:
constraints.extend([P_ch[m, k] == 0, P_dis[m, k] == 0])
# ====================================================================
# 4. OBJECTIVE: Minimize Generation Cost of the Base Case (k=0) Only
# ====================================================================
cost = 0
if num_gen > 0:
for m, gen in enumerate(machines):
Pg_mw = Pg[m, 0] * grid.Sbase
cost += gen.a + (gen.b * Pg_mw) + (gen.c * cp.square(Pg_mw))
prob = cp.Problem(cp.Minimize(cost), constraints)
print(f"⏳ Solving {K} Simultaneous Networks ({relax.upper()})...")
try:
prob.solve(solver=getattr(cp, solver.upper()), verbose=False)
except Exception as e:
print(f"❌ CVXPY Error: {e}")
# ====================================================================
# 5. PARSE SCOPF RESULTS AND GENERATE RESCUE PLAN
# ====================================================================
if prob.status in ["optimal", "optimal_inaccurate"]:
print(f"✅ SCOPF Converged! Preventive Security Total Cost: ${prob.value:.2f}")
# Restore safe P, Q setpoints into Hardware Objects
if num_gen > 0:
for m, gen in enumerate(machines):
gen.P = float(Pg.value[m, 0] * grid.Sbase)
gen.Q = float(Qg.value[m, 0] * grid.Sbase)
print(f" 🏭 {gen.name:<5} Setpoint -> P = {gen.P:7.2f} MW (Safe for all N-1!)")
if num_bat > 0:
print(" 🔋 Battery Base-Case Dispatch:")
for m, bat in enumerate(batteries):
bat.P = float(P_dis.value[m, 0] * grid.Sbase - P_ch.value[m, 0] * grid.Sbase)
if abs(bat.P) < 1e-4:
state = "Idle"
elif bat.P > 0:
state = "Discharging"
else:
state = "Charging"
print(f" {bat.name:<5} -> {abs(bat.P):5.2f} MW ({state})")
if num_inv > 0:
print(" ☀️ Inverter Base-Case Dispatch:")
for m, inv in enumerate(inverters):
inv.P = float(P_inv.value[m, 0] * grid.Sbase)
inv.Q = float(Q_inv.value[m, 0] * grid.Sbase)
print(f" {inv.name:<5} -> P = {inv.P:7.2f} MW | Q = {inv.Q:7.2f} MVAr")
rescue_plan = {}
# Extract N-1 scenario emergency actions (k=1 to K-1)
for k in range(1, K):
b_idx = k - 1
branch_name = edges[b_idx][2]['obj'].name
rescue_plan[branch_name] = {}
if num_bat > 0:
for m, bat in enumerate(batteries):
p_net = float(P_dis.value[m, k] * grid.Sbase - P_ch.value[m, k] * grid.Sbase)
rescue_plan[branch_name][bat.name] = {'P': p_net}
if num_inv > 0:
for m, inv in enumerate(inverters):
p_inv_val = float(P_inv.value[m, k] * grid.Sbase)
q_inv_val = float(Q_inv.value[m, k] * grid.Sbase)
rescue_plan[branch_name][inv.name] = {'P': p_inv_val, 'Q': q_inv_val}
grid.attach_profile(original_profile)
return rescue_plan
else:
print(f"❌ SCOPF Infeasible! AI cannot find a safe state (Status: {prob.status})")
# Restore original time-series profile on failure
grid.attach_profile(original_profile)
[docs]
def Validate_N1(grid: Grid, rescue_plan: dict = None, tol: float = 1e-6, max_iter: int = 100):
"""
Physical Verification Auditor for N-1 Constraints.
Acts as a strict physics validation engine. It drops branches sequentially, applies the
AI-generated emergency rescue plans (from SCOPF), and runs the strict non-linear Newton-Raphson
AC load flow to ensure that line flows and voltage deviations are practically sound and do not
trigger subsequent overloads.
Parameters:
-----------
grid : Grid
The power system grid object to validate.
rescue_plan : dict, optional
A dictionary mapping branch outages to specific Battery/Inverter corrective actions.
tol : float
Tolerance for the NR solver.
max_iter : int
Maximum iterations for the NR solver.
"""
print("\n🔎 ========================================")
print("🛡️ Starting N-1 Physics Validation (NR)...")
print("========================================")
report = {}
edges = list(grid.edges(data=True))
# Verify N-0 baseline first
try:
NR(grid, tol=tol, max_iter=max_iter)
status = grid.check_overload()
if any(pct > 100 for pct in status['branches'].values()):
print("❌ Base Case FAILED: Overload detected in N-0!")
return
print("✅ Base Case (N-0) Passed Physics Test.")
except Exception as e:
print(f"❌ Base Case FAILED: NR did not converge! ({e})")
return
# 1.5 Create a system state checkpoint (Record the pristine N-0 state)
base_case_state = {}
for bus in grid.buses:
b_data = {'V': bus.V, 'theta': bus.theta, 'type': bus.type, 'comps': {}}
for comp in bus.components:
if type(comp).__name__ == 'SynchronousMachine':
b_data['comps'][comp.name] = {'Q': comp.Q}
elif type(comp).__name__ in ['Battery', 'Inverter']:
b_data['comps'][comp.name] = {'P': comp.P, 'Q': comp.Q}
base_case_state[bus.name] = b_data
base_case_taps = {}
for u, v, data in edges:
branch = data['obj']
if type(branch).__name__ == 'Transformer':
base_case_taps[branch.name] = branch.tap_ratio
# 2. Begin cutting lines iteratively (N-1 testing)
for u, v, data in edges:
branch = data['obj']
b_name = branch.name
# ✂️ Simulate physical branch severing
grid.remove_edge(u, v)
# Check for islanding (disconnected graph topology preventing convergence)
if not nx.is_connected(grid):
report[b_name] = "⏩ SKIPPED (Radial Line / Islanding)"
grid.add_edge(u, v, **data)
continue
# 2.1 Apply emergency rescue plan (Activate Batteries and Inverters based on SCOPF logic)
if rescue_plan and b_name in rescue_plan:
for bus in grid.buses:
for comp in bus.components:
if type(comp).__name__ == 'Battery' and comp.name in rescue_plan[b_name]:
plan_data = rescue_plan[b_name][comp.name]
comp.P = plan_data['P'] if isinstance(plan_data, dict) else plan_data
elif type(comp).__name__ == 'Inverter' and comp.name in rescue_plan[b_name]:
plan_data = rescue_plan[b_name][comp.name]
if isinstance(plan_data, dict):
comp.P = plan_data.get('P', comp.P)
comp.Q = plan_data.get('Q', comp.Q)
grid.build_ybus() # Rebuild Admittance Matrix without the cut line
# 🧪 Physics Test via exact AC Newton-Raphson Load Flow
try:
NR(grid, tol=tol, max_iter=max_iter)
status = grid.check_overload()
overloaded = [name for name, pct in status['branches'].items() if pct > 100]
if overloaded:
report[b_name] = f"❌ FAILED: Overloaded {overloaded}"
else:
report[b_name] = "✅ PASSED (No Overload)"
except Exception as e:
report[b_name] = f"❌ FAILED: {type(e).__name__} - {e}"
finally:
# 🩹 Reconnect the severed line
grid.add_edge(u, v, **data)
# 2.2 Load Save Game! (Restore everything completely to the pristine baseline N-0 state)
for bus in grid.buses:
b_data = base_case_state[bus.name]
bus.V = b_data['V']
bus.theta = b_data['theta']
bus.type = b_data['type']
for comp in bus.components:
if comp.name in b_data['comps']:
c_data = b_data['comps'][comp.name]
if 'P' in c_data: comp.P = c_data['P']
if 'Q' in c_data: comp.Q = c_data['Q']
for u_edge, v_edge, t_data in edges:
t_branch = t_data['obj']
if type(t_branch).__name__ == 'Transformer' and t_branch.name in base_case_taps:
t_branch.tap_ratio = base_case_taps[t_branch.name]
grid.build_ybus() # Reconstruct Ybus from the intact state
# Final Reporting Output
print("\n📊 Physics Validation Report:")
for name, res in report.items():
print(f" Drop {name:<8} -> {res}")