poi

Introduction

The PyOptInterface implementation for Forecast-Based Controllers (FBC) in HAMLET provides a flexible, lower-level approach to formulating and solving multi-period optimization problems. It leverages the PyOptInterface package, which offers a unified interface to various optimization solvers, allowing for more direct control over the optimization process.

The implementation is organized around two main Python files: - components.py: Defines component models for various energy system elements over multiple timesteps - mpc_poi.py: Implements the main optimization controller with forecast integration

This implementation is designed for users who need greater flexibility in problem formulation and solver configuration, providing a more direct mapping between the mathematical formulation and the code implementation. The key difference from the Real-Time Controller (RTC) implementation is the handling of multiple timesteps and forecasts, enabling the controller to make decisions that optimize performance over a prediction horizon rather than just for the current timestep.

Objective Function

The primary goal of the PyOptInterface implementation for FBC is to minimize operational costs over the entire prediction horizon. Unlike the RTC implementation, which focuses on minimizing deviations from target values, the FBC implementation directly minimizes costs and maximizes revenues from market interactions.

Mathematical Formulation

The objective function can be mathematically expressed as:

\[\min \sum_{t=0}^{T-1} \sum_{m \in M} \left( C_m(t) - R_m(t) \right)\]

where:

  • \(T\) is the optimization horizon

  • \(M\) is the set of markets

  • \(C_m(t)\) is the cost of buying energy from market \(m\) at timestep \(t\)

  • \(R_m(t)\) is the revenue from selling energy to market \(m\) at timestep \(t\)

The costs and revenues are defined as:

\[ \begin{align}\begin{aligned}C_m(t) = P_{m,in}(t) \cdot \Delta t \cdot \left( p_{buy}(t) + g_{buy}(t) + l_{buy}(t) \right)\\R_m(t) = -P_{m,out}(t) \cdot \Delta t \cdot \left( p_{sell}(t) - g_{sell}(t) - l_{sell}(t) \right)\end{aligned}\end{align} \]

where:

  • \(P_{m,in}(t)\) is the power imported from market \(m\) at timestep \(t\) [W]

  • \(P_{m,out}(t)\) is the power exported to market \(m\) at timestep \(t\) [W]

  • \(\Delta t\) is the timestep duration [h]

  • \(p_{buy}(t)\), \(p_{sell}(t)\) are the energy buying and selling prices at timestep \(t\) [€/Wh]

  • \(g_{buy}(t)\), \(g_{sell}(t)\) are the grid fees for buying and selling at timestep \(t\) [€/Wh]

  • \(l_{buy}(t)\), \(l_{sell}(t)\) are the levies for buying and selling at timestep \(t\) [€/Wh]

Code Implementation

The objective function is defined in the define_objective method of the POI class:

def define_objective(self):
    """Defines the objective function. The objective is to reduce the costs."""

    # Initialize the objective function as zero
    objective = []

    # Loop through the model's variables to identify the balancing variables
    for variable_name, variables in self.variables.items():
        # Only consider the cost and revenue components of the markets
        if variable_name.startswith(tuple(self.market_names)):
            if variable_name.endswith('_costs'):
                # Add the variable to the objective function
                objective.append(variables)
            elif variable_name.endswith('_revenue'):
                # Subtract the variable from the objective function
                objective.append(-1 * variables)
            else:
                pass
        else:
            pass

    # Set the objective function to the model with the minimize direction
    self.model.set_objective(np.sum(objective), poi.ObjectiveSense.Minimize)

The objective function minimizes the total cost over the prediction horizon by: 1. Adding cost variables (representing expenses for buying energy) 2. Subtracting revenue variables (representing income from selling energy)

This approach allows the controller to make decisions that optimize economic performance over time, taking advantage of price variations and forecasted conditions to minimize overall costs.

Code Implementation

The main implementation of the PyOptInterface controller for FBC is in the mpc_poi.py file, which defines the POI class:

class POI(MpcBase):
    def get_model(self, **kwargs):
        env = gurobi.Env(empty=True)
        env.set_raw_parameter("OutputFlag", 0)
        env.start()
        model = gurobi.Model(env)
        model.set_model_attribute(poi.ModelAttribute.Silent, True)
        model.set_raw_parameter("OutputFlag", 0)
        model.set_raw_parameter("LogToConsole", 0)
        return model

The class inherits from MpcBase, which provides common functionality for Model Predictive Control (MPC) based controllers. This is different from the RTC implementation, which inherits from OptimBase.

Model Initialization

The model is initialized in the get_model method, which creates a Gurobi model with specific parameters:

def get_model(self, **kwargs):
    env = gurobi.Env(empty=True)
    env.set_raw_parameter("OutputFlag", 0)
    env.start()
    model = gurobi.Model(env)
    model.set_model_attribute(poi.ModelAttribute.Silent, True)
    model.set_raw_parameter("OutputFlag", 0)
    model.set_raw_parameter("LogToConsole", 0)
    return model

Solving the Model

The model is solved in the run method:

def run(self):
    # Solve the optimization problem
    solver = self.ems[c.C_OPTIM].get('solver')
    match solver:
        case 'gurobi':
            if self.ems[c.C_OPTIM].get('time_limit') is not None:
                self.model.set_raw_parameter('TimeLimit', self.ems[c.C_OPTIM]['time_limit'] / 60)
            self.model.optimize()
            status = self.model.get_model_attribute(poi.ModelAttribute.TerminationStatus)
        case _:
            raise ValueError(f"Unsupported solver: {solver}")

    # Check if the solution is optimal
    if status not in [poi.TerminationStatusCode.OPTIMAL, poi.TerminationStatusCode.TIME_LIMIT]:
        print(f'Exited with status "{status}". \n ')
        # raise ValueError(f"Optimization failed: {status}")

    # Process the solution into control commands and return
    self.agent = self.process_solution()

    return self.agent

Getting the Solution

The solution is retrieved in the get_solution method, which returns a dictionary mapping variable names to arrays of values (one value per timestep):

def get_solution(self):
    # Obtain the solution values
    return {var_name: np.array([self.model.get_value(var) for var in vars]) for var_name, vars in
            self.variables.items()}

Mathematical Formulation

The PyOptInterface implementation for FBC follows the general mathematical formulation described in the Mathematical Formulation section, with specific adaptations for multi-period optimization and forecast integration.

Problem Structure

The optimization problem is formulated as a minimization problem over multiple timesteps. In the code, this is implemented using PyOptInterface’s API with variables and constraints defined for each timestep:

# Define variables for each timestep
for timestep in range(horizon):
    # Create variables for this timestep
    # ...

# Define constraints for each timestep
for timestep in range(horizon):
    # Create constraints for this timestep
    # ...

# Define temporal coupling constraints
for timestep in range(1, horizon):
    # Create constraints that link variables across timesteps
    # ...

Decision Variables

Variables are defined for each component and each timestep in the optimization horizon. Unlike the Linopy implementation, which uses labeled arrays, the PyOptInterface implementation uses a dictionary to store variables:

def define_variables(self):
    self.variables = {}
    # Define variables for each plant
    for plant_name, plant in self.plant_objects.items():
        plant.define_variables(self.model, self.variables, comp_type=self.plants[plant_name]['type'])

    # Define variables for each market
    for market_name, market in self.market_objects.items():
        market.define_variables(self.model, self.variables, comp_type=self.markets[market_name])

Temporal Coupling Constraints

A key feature of FBC is the inclusion of constraints that couple variables across timesteps. For example, the state-of-charge evolution for a battery:

def _constraint_soc(self, model, variables, energy_type: str = c.ET_ELECTRICITY):
    """Adds the constraint that the soc of the battery is that of the previous timestep plus dis-/charging power"""

    # Get the variables
    var_soc = variables[f'{self.name}_{self.comp_type}_soc']
    var_charge = variables[f'{self.name}_{self.comp_type}_{energy_type}_{c.PF_OUT}']
    var_discharge = variables[f'{self.name}_{self.comp_type}_{energy_type}_{c.PF_IN}']
    var_soc_init = variables[f'{self.name}_{self.comp_type}_soc_init']

    # Define the constraint for each timestep
    for timestep in range(len(self.timesteps)):
        if timestep == 0:
            # For the first timestep, use the initial SOC
            model.add_linear_constraint(
                var_soc[timestep] - var_soc_init - var_charge[timestep] * self.efficiency * self.dt_hours -
                var_discharge[timestep] / self.efficiency * self.dt_hours,
                poi.ConstraintSense.Equal,
                0,
                name=f'{self.name}_soc_{timestep}'
            )
        else:
            # For subsequent timesteps, use the SOC from the previous timestep
            model.add_linear_constraint(
                var_soc[timestep] - var_soc[timestep-1] - var_charge[timestep] * self.efficiency * self.dt_hours -
                var_discharge[timestep] / self.efficiency * self.dt_hours,
                poi.ConstraintSense.Equal,
                0,
                name=f'{self.name}_soc_{timestep}'
            )

Energy Balance Constraints

Energy balance constraints ensure that supply matches demand for each energy carrier at each timestep:

def add_balance_constraints(self):
    # Initialize the balance equations for each energy type
    balance_equations = {energy_type: [] for energy_type in self.energy_types}

    # Loop through each energy type
    for energy_type in self.energy_types:
        # Loop through each variable and add it to the balance equation accordingly
        for variable_name, variables in self.variables.items():
            # Add variables to balance equations based on their type and energy carrier
            # ...

    # Add the constraints for each energy type and timestep
    for energy_type, expressions in balance_equations.items():
        timestep_equations = np.sum(expressions, axis=0)
        for timestep, equation in enumerate(timestep_equations):
            self.model.add_linear_constraint(
                equation,
                poi.ConstraintSense.Equal,
                0,
                name=f"balance_{energy_type}_{timestep}"
            )

Component Models

The PyOptInterface implementation for FBC includes models for various energy system components, defined in the components.py file. Each component is implemented as a class that inherits from the base POIComps class. Here we focus on three key component models: inflexible load, PV, and market.

Base Component Class

The POIComps class provides common functionality for all components:

class POIComps:
    def __init__(self, name, forecasts, **kwargs):
        # Get the data
        self.name = name
        self.fcast = forecasts
        self.timesteps = kwargs['timesteps']
        self.info = kwargs

    def define_variables(self, model, variables, **kwargs):
        raise NotImplementedError()

    @staticmethod
    def define_constraints(model, variables):
        pass

    @staticmethod
    def add_variable_to_model(model, variables, name, **kwargs):
        coords = kwargs.get('coords', [[0]])
        if len(coords) > 1:
            print('Warning: 2d coords are not currently supported. Exiting..')
            return
        for coord in coords:
            variables[name] = np.empty((len(coord)), dtype=object)
            for ind in coord:
                var_name = name
                if 'coords' in kwargs:
                    var_name += f'_{ind}'
                lb = kwargs.get("lower", -math.inf)
                if isinstance(lb, (pd.Series, list, np.ndarray)):
                    lb = lb[ind]
                ub = kwargs.get("upper", math.inf)
                if isinstance(ub, (pd.Series, list, np.ndarray)):
                    ub = ub[ind]
                kwargs_var = {
                    'name': var_name,
                    'lb': lb,
                    'ub': ub,
                    'domain': poi.VariableDomain.Integer if kwargs.get('integer', False)
                    else poi.VariableDomain.Binary
                    if kwargs.get('binary', False) else poi.VariableDomain.Continuous,
                }
                variables[name][ind] = model.add_variable(**kwargs_var)

Inflexible Load

The InflexibleLoad class represents electrical loads that cannot be controlled or shifted. These loads must be satisfied exactly as specified for each timestep in the horizon:

class InflexibleLoad(POIComps):
    def __init__(self, name, **kwargs):
        # Call the parent class constructor
        super().__init__(name, **kwargs)

        # Get specific object attributes
        self.power = pd.Series(self.fcast[f'{self.name}_{c.ET_ELECTRICITY}'], index=self.timesteps, dtype='int32')

    def define_variables(self, model, variables, **kwargs):
        comp_type = kwargs['comp_type']

        # Define the power variable
        self.define_electricity_variable(model, variables, comp_type=comp_type, lower=-self.power, upper=-self.power)

The power variable has fixed lower and upper bounds equal to the negative of the load power (indicating consumption) for each timestep, ensuring that the load must be satisfied exactly.

PV Systems

PV systems are implemented in the Pv class, which inherits from SimplePlant:

class Pv(SimplePlant):
    def __init__(self, name, **kwargs):
        # Call the parent class constructor
        super().__init__(name, **kwargs)

The SimplePlant class defines the common functionality for generation components:

class SimplePlant(POIComps):
    def __init__(self, name, **kwargs):
        # Call the parent class constructor
        super().__init__(name, **kwargs)

        # Get specific object attributes
        self.power = list(self.fcast[f'{self.name}_{c.ET_ELECTRICITY}'])
        self.controllable = self.info['sizing']['controllable']
        self.lower = [0] * len(self.power) if self.controllable else self.power

        self.lower = pd.Series(self.lower, index=self.timesteps)
        self.power = pd.Series(self.power, index=self.timesteps)

    def define_variables(self, model, variables, **kwargs):
        comp_type = kwargs['comp_type']

        # Define the power variable
        self.define_electricity_variable(model, variables, comp_type=comp_type, lower=self.lower, upper=self.power)

PV systems have a power variable with a lower bound of 0 and an upper bound equal to the forecasted available power for each timestep, allowing for curtailment when necessary.

Market

The market component represents the connection to external energy networks, with buying and selling capabilities over the prediction horizon:

class Market(POIComps):
    def __init__(self, name, **kwargs):
        # Call the parent class constructor
        super().__init__(name, **kwargs)

        # Get specific object attributes
        self.comp_type = None
        self.dt_hours = kwargs['delta'].total_seconds() * c.SECONDS_TO_HOURS  # time delta in hours

        # Calculate the upper and lower bounds for the market power from the energy quantity
        self.upper = [int(round(x / self.dt_hours)) for x in self.fcast[f'{c.TC_ENERGY}_{c.TC_ENERGY}_{c.PF_IN}']]
        self.lower = [int(round(x / self.dt_hours * -1)) for x in self.fcast[f'{c.TC_ENERGY}_{c.TC_ENERGY}_{c.PF_OUT}']]

        self.lower = pd.Series(self.lower, index=self.timesteps)
        self.upper = pd.Series(self.upper, index=self.timesteps)

        # Get market price forecasts
        self.price_sell = pd.Series(self.fcast[f'{c.TC_ENERGY}_{c.TC_PRICE}_{c.PF_OUT}'], index=self.timesteps)
        self.price_buy = pd.Series(self.fcast[f'{c.TC_ENERGY}_{c.TC_PRICE}_{c.PF_IN}'], index=self.timesteps)
        self.grid_sell = pd.Series(self.fcast[f'{c.TT_GRID}_{c.TT_MARKET}_{c.PF_OUT}'], index=self.timesteps)
        self.grid_buy = pd.Series(self.fcast[f'{c.TT_GRID}_{c.TT_MARKET}_{c.PF_IN}'], index=self.timesteps)
        self.levies_sell = pd.Series(self.fcast[f'{c.TT_LEVIES}_{c.TC_PRICE}_{c.PF_OUT}'], index=self.timesteps)
        self.levies_buy = pd.Series(self.fcast[f'{c.TT_LEVIES}_{c.TC_PRICE}_{c.PF_IN}'], index=self.timesteps)

    def define_variables(self, model, variables, **kwargs):
        self.comp_type = kwargs['comp_type']
        # Define the market power variables (need to be positive and negative due to different pricing)
        self.add_variable_to_model(model, variables, name=f'{self.name}_{self.comp_type}_{c.PF_OUT}', lower=self.lower,
                               upper=0, coords=[self.timesteps],
                               integer=True)  # outflow from the building (selling)
        self.add_variable_to_model(model, variables, name=f'{self.name}_{self.comp_type}_{c.PF_IN}', lower=0,
                               upper=self.upper, coords=[self.timesteps],
                               integer=True)  # inflow into the building (buying)

        # Define mode flag that decides whether the market energy is bought or sold
        self.add_variable_to_model(model, variables, name=f'{self.name}_mode', coords=[self.timesteps], binary=True)

        # Define the market cost and revenue variables
        self.add_variable_to_model(model, variables, name=f'{self.name}_costs', lower=0, upper=np.inf,
                               coords=[self.timesteps])
        self.add_variable_to_model(model, variables, name=f'{self.name}_revenue', lower=0, upper=np.inf,
                               coords=[self.timesteps])

    def define_constraints(self, model, variables):
        # Add constraint that the market can either buy or sell but not both at the same time
        self.__constraint_operation_mode(model, variables)

        # Add constraint that the market cost and revenue are linked to the power
        self.__constraint_cost_revenue(model, variables)

The market component includes variables for buying and selling power, as well as for costs and revenues, which are used in the objective function. It also includes constraints to ensure that buying and selling don’t occur simultaneously and to link costs and revenues to power flows.

Configuration

The PyOptInterface implementation for FBC can be configured through the agent configuration file. The configuration is specified in the ems.controller.fbc section of the agent config file:

ems:
  controller:
    fbc:
      method: optimization
      horizon: 86_400  # control horizon in seconds
      optimization:
        framework: poi
        solver: gurobi
        time_limit: 120

Configuration Parameters

  • method: The control method to use (set to “optimization” for the PyOptInterface implementation)

  • horizon: The control horizon in seconds (e.g., 86_400 for 24 hours) - Note: Cannot exceed forecast horizon

  • optimization.framework: The optimization implementation to use (set to “poi” for this implementation)

  • optimization.solver: The solver to use for the optimization problem (currently only “gurobi” is supported)

  • optimization.time_limit: Maximum solving time in seconds (default: 120s)