import yaml
import numpy as np
import networkx as nx
import scipy.optimize as opt
from pathlib import Path
from pandas.core.frame import DataFrame
from fourpace.model import (
BusComponent, SynchronousMachine, AsynchronousMachine, Load,
Shunt, Inverter, Battery, BranchComponent, TransmissionLine, Transformer
)
from fourpace.control import SEXS, TGOV1, PSS1A
from fourpace.facts import CSVGN1, STATCOM1, TCSC1
[docs]
class Bus(nx.Graph):
"""
Representation of a Bus (Node) in a power system grid.
Inherits from nx.Graph, allowing the bus to act as a subgraph containing connected components.
"""
def __init__(self, name: str, Vbase: float, type: str = 'PQ'):
"""
Parameters:
-----------
name : str
The unique identifier/name of the bus.
Vbase : float
The base voltage of the bus in kV.
type : str
The bus type for Load Flow calculations ('PQ', 'PV', or 'Slack'). Default is 'PQ'.
"""
super().__init__()
self.name = name
self.Vbase: float | None = Vbase
self.type = type
self.V: float = 1.0 # Voltage magnitude (pu)
self.theta: float = 0.0 # Voltage phase angle (rad)
self.add_node(name, obj=self)
@property
def P(self) -> float:
"""Calculates the net active power (P) injected into this bus by all connected components."""
total_p = 0.0
for comp in self.components:
if hasattr(comp, 'P'):
total_p += comp.P
return total_p
@property
def Q(self) -> float:
"""Calculates the net reactive power (Q) injected into this bus by all connected components."""
total_q = 0.0
for comp in self.components:
if hasattr(comp, 'Q'):
total_q += comp.Q
return total_q
@property
def S(self) -> complex:
"""Calculates the net apparent power (S) injected into this bus (P + jQ)."""
return self.P + 1j * self.Q
[docs]
def component(self, name: str) -> BusComponent:
"""Retrieves a specific component connected to this bus by its name."""
if name in self.nodes:
return self.nodes[name]['obj']
raise KeyError(f"β '{name}' not found in this bus.")
@property
def components(self) -> list:
"""Returns a list of all components (e.g., loads, generators) attached to this bus."""
comp_list = []
for _, data in self.nodes(data=True):
obj = data.get('obj')
if obj is not None and obj is not self:
comp_list.append(obj)
return comp_list
[docs]
def get(self):
"""Returns the current state variables of the bus: [V, theta, P, Q]."""
return [self.V, self.theta, self.P, self.Q]
[docs]
def add_component(self, component: BusComponent):
"""Connects a single component to this bus."""
self.add_node(component.name, obj=component)
self.add_edge(self.name, component.name)
[docs]
def add_components(self, components: list[BusComponent]):
"""Connects multiple components to this bus simultaneously."""
for component in components:
self.add_component(component)
[docs]
def total_cost(self) -> float:
"""Calculates the total generation cost of all generators connected to this bus."""
components = [component for _, component in self.nodes(data='obj')]
components.remove(self)
total = 0.0
for com in components:
total += com.cost()
return total
[docs]
class Grid(nx.Graph):
"""
The main class representing the macro-level Power Grid network.
Handles system topology, Y-bus matrix construction, and basic power flow operations.
"""
def __init__(self, Sbase: float):
"""
Parameters:
-----------
Sbase : float
The system base apparent power in MVA.
"""
super().__init__()
self.Sbase: float = Sbase
self.Ybus: np.ndarray | None = None
self.load_profile: DataFrame = None
self.T: int = 1
self.series_facts = [] # Stores Series FACTS devices (e.g., TCSC) installed on branches.
[docs]
@classmethod
def load(cls, filepath: str) -> 'Grid':
"""
Loads the grid topology and parameters from a configuration file (YAML or JSON).
Parameters:
-----------
filepath : str
The path to the configuration file (e.g., 'config.yaml').
Returns:
--------
Grid
A fully initialized Grid object ready for simulation.
"""
path = Path(filepath)
with open(path, 'r', encoding='utf-8') as f:
if path.suffix in ['.yaml', '.yml']:
data = yaml.safe_load(f)
else:
raise ValueError("β Unsupported format! Please use .yaml or .json")
grid = cls(Sbase=data.get('Sbase', 100.0))
# Include Shunt FACTS in the standard bus component registry
comp_classes = {
'SynchronousMachine': SynchronousMachine,
'AsynchronousMachine': AsynchronousMachine,
'Load': Load,
'Shunt': Shunt,
'Inverter': Inverter,
'Battery': Battery,
'TransmissionLine': TransmissionLine,
'Transformer': Transformer,
'CSVGN1': CSVGN1, # SVC
'STATCOM1': STATCOM1 # STATCOM
}
ctrl_classes = {
'SEXS': SEXS,
'TGOV1': TGOV1,
'PSS1A': PSS1A
}
# 1. Load Buses and Shunt Components (Including SVC & STATCOM)
for b_data in data.get('buses', []):
bus = Bus(name=b_data['name'], Vbase=b_data['Vbase'], type=b_data.get('bus_type', 'PQ'))
grid.add_bus(bus)
for comp_data in b_data.get('components', []):
comp_type = comp_data.pop('type')
if comp_type == 'SynchronousMachine':
avr_data = comp_data.pop('avr', None)
gov_data = comp_data.pop('gov', None)
pss_data = comp_data.pop('pss', None)
avr_obj, gov_obj, pss_obj = None, None, None
gen_name = comp_data.get('name', 'UnknownGen')
if avr_data:
ctrl_type = avr_data.pop('type')
if ctrl_type in ctrl_classes:
avr_obj = ctrl_classes[ctrl_type](name=f"AVR_{gen_name}", **avr_data)
if gov_data:
ctrl_type = gov_data.pop('type')
if ctrl_type in ctrl_classes:
gov_obj = ctrl_classes[ctrl_type](name=f"GOV_{gen_name}", **gov_data)
if pss_data:
ctrl_type = pss_data.pop('type')
if ctrl_type in ctrl_classes:
pss_obj = ctrl_classes[ctrl_type](name=f"PSS_{gen_name}", **pss_data)
comp_data['avr'] = avr_obj
comp_data['gov'] = gov_obj
comp_data['pss'] = pss_obj
if comp_type in comp_classes:
comp_obj = comp_classes[comp_type](**comp_data)
grid.bus(bus.name).add_component(comp_obj)
# 2. Load Branches
for branch_data in data.get('branches', []):
branch_type = branch_data.pop('type')
from_bus = branch_data.pop('from_bus')
to_bus = branch_data.pop('to_bus')
if branch_type in comp_classes:
branch_obj = comp_classes[branch_type](**branch_data)
grid.connect(from_bus, to_bus, branch_obj)
# 3. ADDED: Load Series FACTS Devices (TCSC)
series_classes = {
'TCSC1': TCSC1
}
for facts_data in data.get('series_facts', []):
facts_type = facts_data.pop('type')
if facts_type in series_classes:
facts_obj = series_classes[facts_type](**facts_data)
grid.series_facts.append(facts_obj)
print(f"β
Successfully loaded grid from {path.name}")
return grid
[docs]
def add_bus(self, bus: Bus):
"""Adds a single bus to the grid."""
self.add_node(bus.name, bus=bus)
[docs]
def add_busses(self, busses: list[Bus]):
"""Adds a list of buses to the grid."""
for bus in busses:
self.add_bus(bus)
[docs]
def connect(self, from_bus: str, to_bus: str, branch: BranchComponent):
"""
Connects two buses via a branch component (e.g., Transmission Line or Transformer).
"""
self.add_edge(from_bus, to_bus, obj=branch)
[docs]
def bus(self, name: str) -> Bus:
"""Retrieves a Bus object by its name."""
if name in self.nodes:
return self.nodes[name]['bus']
raise KeyError(f"β Bus '{name}' not found in this grid.")
@property
def buses(self) -> list[Bus]:
"""Returns a list of all Bus objects in the system."""
return [bus for _, bus in self.nodes(data='bus')]
[docs]
def build_ybus(self) -> np.ndarray:
"""
Constructs the Steady-State Admittance Matrix (Y-bus).
Used for standard Load Flow calculations.
"""
nodes = list(self.nodes)
n = len(nodes)
Y = np.zeros((n, n), dtype=complex)
for u, v, data in self.edges(data=True):
i = nodes.index(u)
j = nodes.index(v)
obj = data.get('obj')
if obj is None:
continue
y = obj.Y
if isinstance(obj, TransmissionLine):
b_sh = obj.B_shunt
Y[i, i] += y + (1j * b_sh / 2)
Y[j, j] += y + (1j * b_sh / 2)
Y[i, j] -= y
Y[j, i] -= y
elif isinstance(obj, Transformer):
a = obj.tap_ratio
alpha = obj.phase_shift
tap_complex = a * np.exp(1j * alpha)
Y[i, i] += y / (a**2)
Y[j, j] += y
Y[i, j] -= y / np.conj(tap_complex)
Y[j, i] -= y / tap_complex
self.Ybus = Y
return Y
[docs]
def build_ybus_pos(self):
"""
Constructs the Positive-Sequence Y-bus matrix (Y1).
Incorporates generator subtransient reactances (Xd").
Used for fault calculation and transient stability analysis.
"""
num_bus = len(self.buses)
nodes_list = list(self.nodes)
Ybus_fault = np.zeros((num_bus, num_bus), dtype=complex)
for u, v, data in self.edges(data=True):
i = nodes_list.index(u)
j = nodes_list.index(v)
branch = data.get('obj')
Y = branch.Y
Ybus_fault[i, j] -= Y
Ybus_fault[j, i] -= Y
Ybus_fault[i, i] += Y
Ybus_fault[j, j] += Y
if isinstance(branch, TransmissionLine) and getattr(branch, 'B_shunt', 0) > 0:
Y_shunt = 1j * (branch.B_shunt / 2)
Ybus_fault[i, i] += Y_shunt
Ybus_fault[j, j] += Y_shunt
elif isinstance(branch, Transformer) and branch.tap_ratio != 1.0:
a = branch.tap_ratio
Ybus_fault[i, i] += Y * ((1 - a) / (a**2))
Ybus_fault[j, j] += Y * ((a - 1) / a)
for i, bus in enumerate(self.buses):
for comp in bus.components:
if isinstance(comp, SynchronousMachine):
Xd_subtransient = getattr(comp, 'Xd_sub', 0.2)
Y_gen = 1 / (1j * Xd_subtransient)
Ybus_fault[i, i] += Y_gen
return Ybus_fault
[docs]
def build_ybus_zero(self):
"""
Constructs the Zero-Sequence Y-bus Matrix (Y0).
*** CRITICAL: This structure is highly dependent on transformer grounding/connection types. ***
Used for asymmetrical fault analysis (e.g., SLG, DLG).
"""
num_bus = len(self.buses)
nodes_list = list(self.nodes)
Ybus_zero = np.zeros((num_bus, num_bus), dtype=complex)
for u, v, data in self.edges(data=True):
i = nodes_list.index(u)
j = nodes_list.index(v)
branch = data.get('obj')
if isinstance(branch, TransmissionLine):
Y0 = getattr(branch, 'Y0', branch.Y)
Ybus_zero[i, j] -= Y0
Ybus_zero[j, i] -= Y0
Ybus_zero[i, i] += Y0
Ybus_zero[j, j] += Y0
if getattr(branch, 'B0_shunt', 0) > 0:
Y0_shunt = 1j * (branch.B0_shunt / 2)
Ybus_zero[i, i] += Y0_shunt
Ybus_zero[j, j] += Y0_shunt
elif isinstance(branch, Transformer):
conn_attr = getattr(branch, 'connection_type', 'yg-yg')
conn = conn_attr.lower().split('-')
if len(conn) != 2:
pri_type, sec_type = 'yg', 'yg'
else:
pri_type, sec_type = conn[0], conn[1]
Y0 = getattr(branch, 'Y0', branch.Y)
if pri_type == 'yg' and sec_type == 'yg':
Ybus_zero[i, j] -= Y0
Ybus_zero[j, i] -= Y0
Ybus_zero[i, i] += Y0
Ybus_zero[j, j] += Y0
elif pri_type == 'delta' and sec_type == 'yg':
Ybus_zero[j, j] += Y0
elif pri_type == 'yg' and sec_type == 'delta':
Ybus_zero[i, i] += Y0
for i, bus in enumerate(self.buses):
for comp in bus.components:
if isinstance(comp, SynchronousMachine):
X0 = getattr(comp, 'X0', 0.05)
if X0 > 0:
Y0_gen = 1 / (1j * X0)
Ybus_zero[i, i] += Y0_gen
# Artificial shunt to prevent singular matrices in grids with floating neutrals
Ybus_zero += np.eye(num_bus) * 1e-6j
return Ybus_zero
[docs]
def kron_reduction(self, Y_matrix: np.ndarray, keep_nodes: list[str]) -> np.ndarray:
"""
Reduces the dimensions of the Y-bus matrix using Kron Reduction (Network Equivalencing).
Eliminates non-essential nodes while preserving the electrical characteristics
between the specified 'keep_nodes'. Essential for Transient Stability Analysis.
Parameters:
-----------
Y_matrix : np.ndarray
The original admittance matrix.
keep_nodes : list[str]
A list of bus names to retain (e.g., buses with generators).
Returns:
--------
np.ndarray
The reduced admittance matrix.
"""
nodes_list = list(self.nodes)
n = Y_matrix.shape[0]
# Find the indices of the nodes to keep and the nodes to eliminate
keep_indices = [nodes_list.index(name) for name in keep_nodes if name in nodes_list]
eliminate_indices = [i for i in range(n) if i not in keep_indices]
Y_new = Y_matrix.copy()
# Loop to eliminate nodes one by one (from highest index to lowest to prevent index shifting)
for k in sorted(eliminate_indices, reverse=True):
Y_kk = Y_new[k, k]
if abs(Y_kk) < 1e-6:
continue # Skip unconnected nodes (Floating nodes)
Y_ik = Y_new[:, k:k+1]
Y_kj = Y_new[k:k+1, :]
# Kron Reduction equation
Y_new = Y_new - (Y_ik @ Y_kj) / Y_kk
# Delete row and column k
Y_new = np.delete(Y_new, k, axis=0)
Y_new = np.delete(Y_new, k, axis=1)
return Y_new
[docs]
def result(self):
"""Prints the load flow calculation results (Voltage, Angle, Active & Reactive Power) to the console."""
print("\n=== π POWER FLOW RESULTS ===")
for i, bus in enumerate(self.buses):
P_pu, Q_pu = self.calculate_PQ(i)
P_actual = P_pu * self.Sbase
Q_actual = Q_pu * self.Sbase
print(f"Bus {bus.name} | V = {bus.V:.4f} pu | phase = {np.rad2deg(bus.theta):.2f}Β° | P = {P_actual:7.2f} MW | Q = {Q_actual:7.2f} MVAr")
[docs]
def loading_status(self):
"""Checks and prints the loading status (overload warnings) for all branches and generators."""
grid_status = self.check_overload()
print("\nπ Grid Loading Status:")
print("--- Branches ---")
for name, pct in grid_status['branches'].items():
status = "π₯ OVERLOAD!" if pct > 100 else "β
OK"
print(f"{name}: {pct}% {status}")
print("\n--- Generators ---")
for name, pct in grid_status['generators'].items():
status = "π₯ OVERLOAD!" if pct > 100 else "β
OK"
print(f"{name}: {pct}% {status}")
[docs]
def attach_profile(self, df: DataFrame):
"""
Attaches a time-series profile (e.g., hourly load or solar generation profile) to the grid.
Parameters:
-----------
df : pandas.DataFrame
The time-series data profile.
"""
self.load_profile = df
self.T = len(df)
print(f"π
Attached Load/Generation Profile with {self.T} time steps.")
[docs]
def apply_profile(self, step_or_dict):
"""
Updates the P and Q values of grid loads based on the specified time step in the attached profile.
Parameters:
-----------
step_or_dict : int or dict
The hour index (int) to fetch from the DataFrame, or a dictionary specifying load multipliers directly.
"""
if isinstance(step_or_dict, int):
if self.load_profile is None:
raise ValueError("β No profile attached to Grid! Use grid.attach_profile(df) first.")
data_dict = self.load_profile.iloc[step_or_dict].to_dict()
else:
data_dict = step_or_dict
for bus in self.buses:
for comp in bus.components:
if type(comp).__name__ == 'Load':
if comp.name in data_dict:
multiplier = data_dict[comp.name]
comp.P = comp.P_nom * multiplier
comp.Q = comp.Q_nom * multiplier
[docs]
def get_peak_load_hour(self) -> int:
"""Finds and returns the hour (index) at which the system experiences the maximum total active power load."""
if self.load_profile is None:
raise ValueError("β No profile attached to Grid! Use grid.attach_profile(df) first.")
max_load = -1.0
peak_hour = 0
for t in range(self.T):
current_total_load = 0.0
data_dict = self.load_profile.iloc[t].to_dict()
for bus in self.buses:
for comp in bus.components:
comp_type = type(comp).__name__
if comp_type == 'Load':
multiplier = data_dict.get(comp.name, 1.0)
current_total_load += abs(comp.P_nom * multiplier)
elif comp_type == 'AsynchronousMachine':
multiplier = data_dict.get(comp.name, 1.0)
current_total_load += abs(comp.P_rated_base * multiplier)
if current_total_load > max_load:
max_load = current_total_load
peak_hour = t
print(f"π Peak Load Detected at Hour {peak_hour} (Total System Load: {max_load:.2f} MW)")
return peak_hour
[docs]
def calculate_PQ(self, i: int) -> tuple[float, float]:
"""
Calculates the active (P) and reactive (Q) power injected at bus 'i' using power flow equations and the Y-bus matrix.
Parameters:
-----------
i : int
The index of the bus in the `self.buses` list.
Returns:
--------
tuple[float, float]
(P_i, Q_i) values in per-unit (p.u.).
"""
bus_i = self.buses[i]
Vi = bus_i.V
theta_i = bus_i.theta
G = self.Ybus.real
B = self.Ybus.imag
P_i = 0.0
Q_i = 0.0
for j in range(len(self.buses)):
bus_j = self.buses[j]
Vj = bus_j.V
theta_j = bus_j.theta
delta_ij = theta_i - theta_j
P_i += Vi * Vj * (G[i, j] * np.cos(delta_ij) + B[i, j] * np.sin(delta_ij))
Q_i += Vi * Vj * (G[i, j] * np.sin(delta_ij) - B[i, j] * np.cos(delta_ij))
return P_i, Q_i
[docs]
def update_motor_slip(self, motor: AsynchronousMachine, V_pu: float):
"""
Updates the slip of an induction motor to find the equilibrium point where
electromagnetic torque (Te) equals mechanical torque (Tm).
Parameters:
-----------
motor : AsynchronousMachine
The induction motor object.
V_pu : float
The current terminal voltage (p.u.).
"""
def torque_mismatch(s):
if s <= 0: return 1e6
V_phase = (V_pu * motor.V_rated) / np.sqrt(3)
ws = 2 * np.pi * motor.freq / (motor.poles / 2)
R_total = motor.Rs + (motor.Rr / s)
X_total = motor.Xs + motor.Xr
Z_sq = R_total**2 + X_total**2
Te = (3 * V_phase**2 * (motor.Rr / s)) / (ws * Z_sq)
if motor.load_type == 'constant_torque':
Tm = (motor.P_rated * 1000) / ws
else:
Tm = ((motor.P_rated * 1000) / ws) * ((1 - s)**2)
return Te - Tm
# Root finding to determine the new equilibrium slip
new_s = opt.fsolve(torque_mismatch, x0=motor.s)[0]
motor.s = max(0.0001, min(new_s, 0.99))
[docs]
def check_overload(self) -> dict:
"""
Inspects the loading status of all branches and generators in the system.
Returns:
--------
dict
A dictionary containing loading percentages categorized by 'branches' and 'generators'.
"""
loadings = {
'branches': {},
'generators': {}
}
# 1. Branches Check
for u, v, data in self.edges(data=True):
branch = data.get('obj')
if branch is None or getattr(branch, 'S_max', None) is None:
continue
from_bus = self.bus(u)
to_bus = self.bus(v)
V_i = from_bus.V * np.exp(1j * from_bus.theta)
V_j = to_bus.V * np.exp(1j * to_bus.theta)
if type(branch).__name__ == 'Transformer':
t = branch.tap_ratio * np.exp(1j * branch.phase_shift)
I_ij = (V_i / t - V_j) * branch.Y
else:
I_ij = (V_i - V_j) * branch.Y
S_flow_MVA = abs(V_i * np.conj(I_ij)) * self.Sbase
loading_pct = (S_flow_MVA / branch.S_max) * 100
loadings['branches'][branch.name] = round(loading_pct, 2)
# 2. Generator Check
for bus in self.buses:
for comp in bus.components:
if type(comp).__name__ == 'SynchronousMachine':
S_gen_MVA = np.sqrt(comp.P**2 + comp.Q**2)
s_rated = getattr(comp, 'S_rated', None)
if s_rated is None:
p_max = getattr(comp, 'Pmax', 9999.0)
q_max = getattr(comp, 'Qmax', 9999.0)
if p_max == float('inf') or q_max == float('inf') or p_max == 9999.0:
s_rated = 9999.0
else:
s_rated = np.sqrt(p_max**2 + q_max**2)
if s_rated > 0 and s_rated != 9999.0:
loading_pct = (S_gen_MVA / s_rated) * 100
loadings['generators'][comp.name] = round(loading_pct, 2)
else:
loadings['generators'][comp.name] = 0.0
return loadings