Skip to content

Add time interpolation to mpc data #3559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
73 changes: 73 additions & 0 deletions pyomo/contrib/mpc/data/interpolation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2025
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________

from bisect import bisect_right


def _get_time_index_vec(time_set, time_data):
"""Get the position index of time_data above and below the times in
time_set. This can be used to find positions of points to interpolate
between.

Parameters
----------
time_set: iterable
Time points to locate
time_data: iterable
Sorted time points to locate time_set in

Returns
-------
numpy.array
Position index of the first time in time_data greater than the
corresponding points time_set. If a time is less than all the times
in time_data return 1. If a time is greater than all times time_data
set return the last index of time_data.
"""
pos = [None] * len(time_set)
for i, t in enumerate(time_set):
pos[i] = bisect_right(time_data, t)
if pos[i] == 0:
pos[i] = 1
elif pos[i] == len(time_data):
pos[i] = len(time_data) - 1
return pos


def _get_interp_expr_vec(time_set, time_data, data, indexes=None):
"""Return an array floats interpolated from data at times_data in
time_set.

Parameters
----------
time_set: iterable
Time points to locate
time_data: iterable
Sorted time points to locate time_set in
data: iterable
Data corresponding to times in time_data, must have the same
length as time data.

Returns
-------
list
If data are Pyomo components, this will return Pyomo expressions
interpolation if data are floats, this will return floats.
"""
if indexes is None:
indexes = _get_time_index_vec(time_set, time_data)
expr = [None] * len(time_set)
for i, (h, t) in enumerate(zip(indexes, time_set)):
l = h - 1
expr[i] = data[l] + (data[h] - data[l]) / (time_data[h] - time_data[l]) * (
t - time_data[l]
)
return expr
43 changes: 42 additions & 1 deletion pyomo/contrib/mpc/data/series_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
from pyomo.contrib.mpc.data.get_cuid import get_indexed_cuid
from pyomo.contrib.mpc.data.dynamic_data_base import _is_iterable, _DynamicDataBase
from pyomo.contrib.mpc.data.scalar_data import ScalarData

from pyomo.contrib.mpc.data.interpolation import (
_get_time_index_vec,
_get_interp_expr_vec,
)

TimeSeriesTuple = namedtuple("TimeSeriesTuple", ["data", "time"])

Expand Down Expand Up @@ -152,6 +155,44 @@ def get_data_at_time(self, time=None, tolerance=0.0):
indices = indices[0]
return self.get_data_at_time_indices(indices)

def get_interpolated_data_at_time(self, time=None):
"""
Returns the data associated with the provided time point or points by
linear interpolation.

Parameters
----------
time: Float or iterable
The time point or points corresponding to returned data.

Returns
-------
TimeSeriesData or ~scalar_data.ScalarData
TimeSeriesData containing only the specified time points
or dict mapping CUIDs to values at the specified scalar time
point.

"""
if time is None:
# If time is not specified, assume we want the entire time
# set. Skip all the overhead, don't create a new object, and
# return self.
return self
is_iterable = _is_iterable(time)
if not is_iterable:
time = [time]
idxs = _get_time_index_vec(time, self._time)
data = {}
for cuid in self._data:
v = _get_interp_expr_vec(time, self._time, self._data[cuid], idxs)
data[cuid] = v
if is_iterable:
return TimeSeriesData(data, list(time))
else:
for cuid in self._data:
data[cuid] = data[cuid][0]
return ScalarData(data)

def to_serializable(self):
"""
Convert to json-serializable object.
Expand Down
20 changes: 20 additions & 0 deletions pyomo/contrib/mpc/data/tests/test_series_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@ def test_get_data_at_time_with_tolerance(self):
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_data_at_time(-0.01, tolerance=1e-3)

def test_get_data_at_time_with_interpolate(self):
m = self._make_model()
data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]}
data = TimeSeriesData(data_dict, m.time)
new_data = data.get_interpolated_data_at_time(0.05)
self.assertEqual(ScalarData({m.var[:, "A"]: 1.5, m.var[:, "B"]: 3}), new_data)

t1 = 0.05
new_data = data.get_interpolated_data_at_time([t1])
self.assertEqual(
TimeSeriesData({m.var[:, "A"]: [1.5], m.var[:, "B"]: [3]}, [t1]), new_data
)

new_t = [0.05, 0.15]
new_data = data.get_interpolated_data_at_time(new_t)
self.assertEqual(
TimeSeriesData({m.var[:, "A"]: [1.5, 2.5], m.var[:, "B"]: [3, 5]}, new_t),
new_data,
)

def test_to_serializable(self):
m = self._make_model()
data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]}
Expand Down
Loading