Ruptures¶
I. Introduction¶
Rupture is a Python library used to detect regime changes in a signal by returning a list of breakpoints. This change point detection is made offline: the algorithm isn't working in real time, but needs to know all the samples in a certain fixed signal. We will need to run the algorithm several times a day to monitor changes in the data Air Liquide sends us.
We are going to use the PELT algorithm: its main attribute is that it doesn't need to know in advance the number of breakpoints.
The main problem is the time that PELT needs to compute the result. The faster our algorithm is, the more often we will be able to run the monitoring, and so our reactivity will be increased. Rupture has been developed for practical applications, not to monitor signals in near-real-time. While for the prior application the time of computation wasn't a priority to optimise, it was in ours.
So we decided to optimise the core of the PELT algorithm. In the following notebook, we present the different steps of our optimisation, and then how we applied PELT to alert for possible anomalies.
II. Example of a PELT application to a randomized signal with different change points¶
import ruptures as rp
import matplotlib.pyplot as plt
We first generate a randomized signal which contains different regimes and change points. We display the real breakpoints which are the ones used to generate the signal.
n_samples, dim, sigma = 10000, 3, 2
n_bkps = 2
signal, bkps = rp.pw_constant(n_samples, dim, n_bkps, noise_std=sigma)
signal = signal[:,1]
rp.display(signal, bkps)
plt.show();
We then apply PELT to determine the change points.
pelt = rp.Pelt(jump = 20)
bkps = pelt.fit_predict(signal, 30)
rp.display(signal, bkps)
plt.show();
III. Optimisation of PELT¶
To clarify the modifications we have done step by step, we extracted the functions of the Pelt class in order to modify some of them directly in the notebook.
For each modification, we display the associated benchmark, in order to show how much time we gained.
What do we have to optimise?¶
The following code is the original one, taken from the Ruptures library (https://github.com/deepcharles/ruptures).
from math import floor
from ruptures.costs import cost_factory
from ruptures.base import BaseCost, BaseEstimator
import numpy as np
"""
We take off the functions of Pelt class in order to
only modify the functions we are interested by (and not all of them)
"""
cost, min_size, jump, n_samples = cost_factory(model="l2"), None, None, None
def initPelt(model="l2", custom_cost=None, custom_min_size=2, custom_jump=5, params=None):
"""Initialize a Pelt instance.
Args:
model (str, optional): segment model, ["l1", "l2", "rbf"]. Not used if ``'custom_cost'`` is not None.
custom_cost (BaseCost, optional): custom cost function. Defaults to None.
min_size (int, optional): minimum segment length.
jump (int, optional): subsample (one every *jump* points).
params (dict, optional): a dictionary of parameters for the cost instance.
"""
global cost, min_size, jump, n_samples
if custom_cost is not None and isinstance(custom_cost, BaseCost):
cost = custom_cost
else:
if params is None:
cost = cost_factory(model=model)
else:
cost = cost_factory(model=model, **params)
min_size = max(custom_min_size, cost.min_size)
jump = custom_jump
def segPelt(pen):
"""Computes the segmentation for a given penalty using PELT (or a list
of penalties).
Args:
penalty (float): penalty value
Returns:
dict: partition dict {(start, end): cost value,...}
"""
global cost
# initialization
# partitions[t] contains the optimal partition of signal[0:t]
partitions = dict() # this dict will be recursively filled
partitions[0] = {(0, 0): 0}
admissible = []
# Recursion
ind = [k for k in range(0, n_samples, jump) if k >= min_size]
ind += [n_samples]
for bkp in ind:
# adding a point to the admissible set from the previous loop.
new_adm_pt = floor((bkp - min_size) / jump)
new_adm_pt *= jump
admissible.append(new_adm_pt)
subproblems = list()
for t in admissible:
# left partition
try:
tmp_partition = partitions[t].copy()
except KeyError: # no partition of 0:t exists
continue
# we update with the right partition
tmp_partition.update({(t, bkp): cost.error(t, bkp) + pen})
subproblems.append(tmp_partition)
# finding the optimal partition
partitions[bkp] = min(subproblems, key=lambda d: sum(d.values()))
# trimming the admissible set
admissible = [
t
for t, partition in zip(admissible, subproblems)
if sum(partition.values()) <= sum(partitions[bkp].values()) + pen
]
best_partition = partitions[n_samples]
del best_partition[(0, 0)]
return best_partition
def fitPelt(signal):
"""Set params.
Args:
signal (array): signal to segment. Shape (n_samples, n_features) or (n_samples,).
Returns:
self
"""
global cost, n_samples
# update params
cost.fit(signal)
if signal.ndim == 1:
(n_samples,) = signal.shape
else:
n_samples, _ = signal.shape
def predictPelt(pen):
"""Return the optimal breakpoints.
Must be called after the fit method. The breakpoints are associated with the signal passed
to [`fit()`][ruptures.detection.pelt.Pelt.fit].
Args:
pen (float): penalty value (>0)
Returns:
list: sorted list of breakpoints
"""
partition = segPelt(pen)
bkps = sorted(e for s, e in partition.keys())
return bkps
def fit_predictPelt(signal, pen):
"""Fit to the signal and return the optimal breakpoints.
Helper method to call fit and predict once
Args:
signal (array): signal. Shape (n_samples, n_features) or (n_samples,).
pen (float): penalty value (>0)
Returns:
list: sorted list of breakpoints
"""
fitPelt(signal)
return predictPelt(pen)
Let's try to compute this with hyper-parameters that give a reasonable computation time.
import pandas as pd
df = pd.read_csv("clean_data_detrended.csv")
data = df.iloc[:,3].to_numpy()
initPelt(custom_jump = 50)
%time bkps = fit_predictPelt(data, 30) # benchmark
rp.display(data,bkps) ;
CPU times: user 28.5 s, sys: 3.66 ms, total: 28.5 s Wall time: 28.5 s
Where does the algorithm spend its time ?¶
We didn't display it in order not to overflood the notebook, but 90% of the time is spent on calculating the cost of each partition. That's what we have to optimise. Our modifications will be focused on this point, in the seg
function.
1. Changes in the data structure and minimum calculation¶
We can raise obvious points to optimize :
- the dictionary data structure isn't optimised in terms of computation time but is used almost everywhere: we have to change the structure used. The easiest to change is the dictionaries inside of
partitions
- some calculations can be optimised with vectorisation using
NumPy
arrays - the exception raised in the admissible loop isn't useful since we obviously have a partition of 0:t in each run : removing it will allow us to re-organise the algorithm structure further
- we can directly calculate the minimum in the admissible loop, instead of waiting until the end. It will allow us to get the associated index without using
argmin
Let's modify the seg function, and also merge it with predict.
from copy import deepcopy
def predictPelt(pen):
global cost
# initialization
# partitions[t] contains the optimal partition of signal[0:t]
partitions = dict() # this dict will be recursively filled
partitions[0] = [[(0,0,0)], 0]
admissible = np.array([], dtype = int)
# Recursion
ind = np.array([k for k in range(0, n_samples, jump) if k >= min_size], dtype = int)
ind = np.concatenate((ind, [n_samples]))
for bkp in ind:
# adding a point to the admissible set from the previous loop.
new_adm_pt = floor((bkp - min_size) / jump)
new_adm_pt *= jump
admissible = np.concatenate((admissible, [new_adm_pt]))
sum_values = np.empty(len(admissible))
min_index, min_part = 0, {}
for index, t in enumerate(admissible):
# left partition
tmp_partition = deepcopy(partitions[t])
tmp_cost = cost.error(t, bkp) + pen
# we update with the right partition
tmp_partition[0].append((t, bkp, tmp_cost))
tmp_partition[1] += tmp_cost
sum_values[index] = tmp_partition[1]
# update the value of the minimum of the sum
if sum_values[index] <= sum_values[min_index]:
min_index = index
min_part = tmp_partition
# finding the optimal partition
partitions[bkp] = min_part
# trimming the admissible set
admissible = admissible[sum_values <= sum_values[min_index] + pen]
best_partition = partitions[n_samples]
bkps = sorted(b for a,b,c in best_partition[0][1:])
return bkps
%time bkps = fit_predictPelt(data, 30) # benchmark
rp.display(data,bkps) ;
CPU times: user 2min 58s, sys: 302 ms, total: 2min 58s Wall time: 2min 58s [11200, 12100, 12600, 33500, 34450, 35450, 36450, 36800, 37450, 37800, 38150, 38800, 39200, 40100, 40850, 41050, 52700, 52950, 53650, 54700, 54950, 55500, 55850, 56200, 56550, 56900, 57250, 57550, 57900, 58600, 58900, 59450, 59650, 60450, 60700, 61650, 64400, 64700, 65700, 108600, 108850, 109400, 111500, 111750, 112800, 113450, 115050, 115800, 116500, 117050, 117500, 118050, 118550, 119000, 120600, 121650, 122050, 135287]
It takes more time than the original version, but now the problem isn't the data structure. So why?
2. Deletion of the copy¶
Another problem this version reveals is the copy. Using the array structure, we were forced to use deepcopy since we have arrays in arrays, which are pointers in arrays. By modifying a value, we modify the original partition values, and that's not the aim. Deepcopy fixes that by copying each element in the list, but takes a lot of time.
We asked ourselves if we could do without this copy. Finally, why is there a copy?
We use partitions[t]
for two reasons :
- to calculate the cost to put in sum_values
- to define the partition associated to the minimum cost in sum_values
For the first point, we don't have to copy the partition, only to get the cost associated to the admissible point.
For the second point, since this partition is associated to the previous one, we don't have to copy the previous one but only to refer to it, and add this reference in a tuple with the new element (and the total cost).
By doing this and changing the partition structure a bit (which was a dictionary of lists of tuples, now a dictionary of tuples), we can delete the copy operation and in so doing gain a lot of computation time.
Also, since the last partition is referring to the second to last, etc, recursively, we obtain at the end the list of breakpoints already sorted but reversed. The final step is reversing the list, and any sorting like the one implemented in the original library would be superfluous.
def predictPelt(pen):
# initialization
# partitions[t] contains the optimal partition of signal[0:t]
partitions = dict() # this dict will be recursively filled
partitions[0] = (None, None, 0)
admissible = np.array([], dtype = int)
# Recursion
ind = np.array([k for k in range(0, n_samples, jump)
if k >= min_size], dtype = int)
ind = np.concatenate((ind, [n_samples]))
for bkp in ind:
# adding a point to the admissible set from the previous loop.
new_adm_pt = floor((bkp - min_size) / jump)
new_adm_pt *= jump
admissible = np.concatenate((admissible, [new_adm_pt]))
sum_values = np.empty(len(admissible))
min_index, min_part = 0, None
for index, t in enumerate(admissible):
sum_values[index] = partitions[t][-1] + cost.error(t, bkp) + pen
#update the value of the minimum of the sum
if sum_values[index] <= sum_values[min_index]:
min_index = index
# finding the optimal partition
t = admissible[min_index]
partitions[bkp] = (partitions[t], bkp, sum_values[min_index])
# trimming the admissible set
admissible = admissible[sum_values <= sum_values[min_index] + pen]
best_partition = partitions[n_samples]
a,b,c = best_partition
bkps = []
while a is not None:
bkps.append(b)
a,b,c = a
bkps = bkps[::-1]
return bkps
%time bkps = fit_predictPelt(data, 30) # benchmark
rp.display(data,bkps) ;
CPU times: user 22.1 s, sys: 3.82 ms, total: 22.1 s Wall time: 22.1 s
These are the best optimisations we were able to realise without practising parallelisation on the cost computation. By profiling our function, the cost calculation now takes almost 100% of our time.
3. Other optimisations (which don't reduce the computation time)¶
We implemented other small optimisation to finally get rid of the dictionary structure and others values which were duplicates.
def predictPelt(pen):
# initialization
# partitions[t] contains the optimal partition of signal[0:t]
partitions = [] # will be recursely filled
partitions.append((0, 0))
admissible = np.array([[-1, -1]], dtype=int)
# Recursion
ind = np.array([k for k in range(0, n_samples, jump)
if k >= min_size], dtype = int)
ind = np.concatenate((ind, [n_samples]))
for index_bkp, bkp in enumerate(ind):
# adding a point to the admissible set from the previous loop.
new_adm_pt = [index_bkp, floor((bkp - min_size) / jump) * jump]
if admissible[0][0] == -1:
admissible[0] = new_adm_pt
else:
admissible = np.concatenate((admissible, [new_adm_pt]))
sum_values = np.empty(len(admissible))
min_index = 0
for index_adm, adm in enumerate(admissible):
i, t = adm
sum_values[index_adm] = partitions[i][1] + cost.error(t, bkp) + pen
#update the value of the minimum of the sum
if sum_values[index_adm] <= sum_values[min_index]:
min_index = index_adm
# finding the optimal partition
index_t = admissible[min_index][0]
partitions.append((index_t, sum_values[min_index]))
# trimming the admissible set
admissible = admissible[sum_values <= sum_values[min_index] + pen]
a = partitions[-1][0]
bkps = [n_samples]
while a != 0:
bkps.append(ind[a])
a = partitions[a][0]
bkps = bkps[::-1]
return bkps
%time bkps = fit_predictPelt(data, 30) # benchmark
rp.display(data,bkps) ;
CPU times: user 22.6 s, sys: 23.9 ms, total: 22.6 s Wall time: 22.6 s
IV. Parallelisation of the cost calculation¶
As we said, once we gained computation time by changing the algortihm structure, the aim is to find a way to reduce the cost calculation time of each partition.
The first idea that came to us was: are these calculations independent from each other? And they are indeed, so we can parallelize the computation of each cost. In theory, the computation time should approximatively have been divided by the number of threads we create by parallelising.
But, in practice, we have to take into account the delay introduced when we create the threads (in fact, we do multi-processing, because of the terrible way Python manages threads). And, even by allocating a certain number of tasks per process (not only one task per process) and so, by reducing the number of thread creations, the results were clearly bad. We optimised the real computation time, but we lengthened the system time.
One solution could have been implementing the Pelt kernel in C, in order to better manage how the processes are created, but we would have lost modularity with the rest of the library (mainly the possibility to change the cost model).
V. Choice of the hyperparameters¶
Our algorithm needs two hyperparameters: a "jump" (the minimal distance between two breakpoints) and a "penalty" (measure the compromise between the detection time and the accuracy).
In order to choose the hyperparameters, we used the detection time and the number of false anomalies detected as criteria and we tested for 8 values of jump and 10 of penality.
We plotted the following graph, which represents the average number of false anomalies detected during 5 weeks depending on the average time of detection of an anomaly.
As AirLiquide's priority is to have the better detection time (no matter how many wrong alerts we have), we chose hyperparameters with the shorter detection time although that implies 1.7 wrong alerts in average every 5 weeks (red point).
This point corresponds to a jump of 100 and a penality of 5.
In order to find a better compromise, we repeated the tests with the calculated penalty of 5 and jumps between 90 and 110. We indeed found a better average precision with an unchanged average detection time (red point).
The default jump is set to 98 and the default penalty to 5.
With these values, we obtained a recal of 100% and an accuracy of 75%.
VI. Post-processing¶
The algorithm "pelt" in the library "ruptures" returns us the list of the breakpoints observed in the data's curve (typically, the beginning of an oscillation or the brutal fall at the end of an oscillation).
The detection of a new breakpoint since the previous execution of the function leads to the triggering of an alert (status = 2).
The alert will last for at least buffer_time.
If the algorithm doesn't detect a new breakpoint during this period of buffer_time, the alarm becomes an internal alarm (status = 1): the points are labeled as though the situation were normal but the algorithm keeps in memory the fact that the alert is maybe not totally over (specifically to count the number of ruptures).
If the algorithm doesn't detect a new breakpoint during this new period of buffer_time, we consider that the anomaly is over (status = 0).
Else, the alarm continues for at least buffer_time.
import ruptures as rp
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from Conversion_JSON import colorsToJson
from requests_interface import pull_data, push_data
from datetime import datetime as dt
from time import sleep
BUFFER_TIME = 504
class PeltAirLiquide:
def __init__(self, df, feature = "PARD@3C52@3C52-M@TE7011A.PNT", index = "TIMESTAMP", size_slice = 5000, jump = 98, pen = 5, update_freq = 98, buffer_time = BUFFER_TIME):
"""Initialization of the class PeltAirLiquide.
The liste of the breakpoints and the labels are calculated for the size_slice first values
Arguments:
df (pandas data frame): data frame
feature (string, optional) : name of the column we want to study
size_slice (int, opt) : nb of points where we search the breakpoints
jump (int, opt) : minimal distance between two breakpoints
pen (int, opt) : penality value for algorithm pelt
update_freq (int, opt) : frequency of the execution of the algorithm (in points)
buffer_time (int, opt) : minimal duration of an alert or internal alert
Attributes:
self.data (np.array with floats) : the size_slice last values of the feature studied
self.labels (np.array with ints) : array containing a label for each point of the data.
If the point studied doesn't belong to an alert phase, the label is 0.
Otherwise, the absolute value of the label corresponds to the number of ruptures in this alert phase.
The label is positive for the current alert and negative otherwise.
self.bkp_histo (np.array with lists of 2 ints) : list of the breakpoints already detected
(since the beginning of the execution) with their labels
self.alert_level (np.array with lists of 1 timestamp and 1 int) : array containing for each point of the data its timestamp and a label associated :
0 = normal
1 = internal alert
2 = alert
Returns:
None"""
self.jump = jump
self.pen = pen
self.size_slice = size_slice
self.update_freq = update_freq
self.buffer_time = buffer_time
self.index = index
self.data = df.loc[-self.size_slice:,feature].to_numpy()
self.labels = np.zeros(self.size_slice, dtype = int)
self.alert_level = np.zeros(self.size_slice, dtype = object)
self.timestamp = (df[index].to_numpy())[-self.size_slice:]
pelt = rp.Pelt(jump = self.jump)
bkps = np.array(pelt.fit_predict(self.data, self.pen))[:-1]
if len(bkps) == 0:
self.bkp_histo = np.array([[0, None]])
else:
self.bkp_histo = np.zeros((len(bkps),2), dtype = int)
self.bkp_histo[:,0] = bkps
label = 1
for ind, bkp in enumerate(self.bkp_histo[:,0]):
self.alert_level[bkp:bkp + self.buffer_time] = 2
self.labels[bkp: bkp + self.buffer_time] = -label
self.bkp_histo[ind, 1] = label
label += 1
if bkp == self.bkp_histo[-1,0]:
self.alert_level[bkp + self.buffer_time: min(bkp + 2*self.buffer_time, self.size_slice)] = 1
elif self.bkp_histo[ind+1,0] > bkp + 2*self.buffer_time:
self.alert_level[bkp + self.buffer_time:bkp + 2*self.buffer_time] = 1
label = 1
elif self.bkp_histo[ind+1,0] > bkp + self.buffer_time:
self.alert_level[bkp + self.buffer_time:bkp + 2*self.buffer_time] = 1
def endAlert(self, internal_alert = True):
"""Change the label value to
end the alert phase and begin an internal alert
or
end the internal alert and begin a new normal phase.
Argument:
internal_alert (bool, opt) : are we already in internal alert ? (alert phase ended for less than buffer_time points)
Returns:
None"""
self.labels = -np.abs(self.labels)
if internal_alert:
self.alert_level[-self.update_freq:] = 1
else:
self.alert_level[-self.update_freq:] = 0
self.bkp_histo[-1][1] = 0
def run(self):
#calculate the new breakpoints
pelt = rp.Pelt(jump = self.jump)
bkps = np.array(pelt.fit_predict(self.data,self.pen))[:-1]
new_bkps = bkps[bkps >= self.size_slice - self.update_freq]
#update arrays
self.labels = np.append(self.labels[self.update_freq:], self.update_freq*[max(self.labels[-1],0)])
self.alert_level = np.append(self.alert_level[self.update_freq:], self.update_freq*[self.alert_level[-1]])
# update bkp_histo : substract update_freq from each value
self.bkp_histo[:,0] -= self.update_freq
if len(new_bkps) > 0: #new breakpoints detected
if self.alert_level[-1] == 1: #internal alert
i = 1
while self.bkp_histo.shape[0] > i and self.bkp_histo[-i,1] != 1:
i += 1
beg_oscil = max(0, int(self.bkp_histo[-i][0]+1))
self.labels[beg_oscil:] = np.negative(self.labels[beg_oscil:])
self.labels[max(0, int(self.bkp_histo[-1][0]+1)):] = self.bkp_histo[-1][1]
self.bkp_histo[-1][1] = self.labels[int(self.bkp_histo[-1][0])]
else: #real alert
self.alert_level[-1] = 2
label_bkp = self.labels[-1] + 1
for bkp in new_bkps:
# update labels
self.labels[bkp:] += 1
# update bkp_histo
if self.bkp_histo[0][1] is None: #no breakpoint found before
self.bkp_histo[0] = (bkp, label_bkp)
else:
self.bkp_histo = np.concatenate((self.bkp_histo, [[bkp, label_bkp]]))
label_bkp += 1
nb_ruptures = self.labels[-1]
elif self.alert_level[-1] == 2 or self.alert_level[-1] == 1: #already in alert or internal alert
if self.bkp_histo[-1][0] + 2*self.buffer_time <= self.size_slice: #end of the internal alert
self.endAlert(False)
elif self.bkp_histo[-1][0] + self.buffer_time <= self.size_slice: #end of the real alert
self.endAlert(True)
else: #alert continues
nb_ruptures = self.labels[-1]
else: #no alert
pass
status = np.hstack((self.timestamp.reshape((-1,1)), self.alert_level.reshape((-1,1))))
status = status[status[:,1] != 0]
status = np.vstack((['timestamp', 'status'], status))
nb_rupt = np.hstack((self.timestamp.reshape((-1,1)), np.abs(self.labels.reshape((self.size_slice,1)))))
nb_rupt = np.vstack((['timestamp', 'nb_ruptures'], nb_rupt))
return status, nb_rupt
def updateData(self, df, feature, index):
self.data = (df[feature].to_numpy())[-self.size_slice:]
self.timestamp = (df[index].to_numpy())[-self.size_slice:]
def fonction_ruptures(feature = "PARD@3C52@3C52-M@TE7011A.PNT", index = "TIMESTAMP", size_slice = 5000, jump = 98, pen = 5, update_freq = 98, buffer_time = BUFFER_TIME):
# Initialization
#df = pull_data()
peltAL = PeltAirLiquide(df, feature, index, size_slice, jump, pen, update_freq, buffer_time)
status = np.hstack((peltAL.timestamp.reshape((-1,1)), peltAL.alert_level.reshape((-1,1))))
status = status[status[:,1] != 0]
status = np.vstack((['timestamp', 'status'], status))
nb_rupt = np.hstack((peltAL.timestamp.reshape((-1,1)), np.abs(peltAL.labels.reshape((peltAL.size_slice,1)))))
nb_rupt = np.vstack((['timestamp', 'nb_ruptures'], nb_rupt))
colorsToJson(status, "Ruptures.json")
#push_data("Ruptures.json", push_local_path = "http://localhost:5000/api/team_rupture")
colorsToJson(nb_rupt, "Nb_rupt.json")
#push_data("Nb_rupt.json", push_local_path = "http://localhost:5000/api/team_rupture_extra")
time_stamp_prec = dt.strptime(peltAL.timestamp[-1], '%Y-%m-%d %H:%M:%S')
# Loop
while True:
#df = pull_data()
time_stamp = dt.strptime(df.loc[df.shape[0] - 1,index], '%Y-%m-%d %H:%M:%S')
if (time_stamp - time_stamp_prec).total_seconds() >= update_freq * 600:
peltAL.updateData(df, feature, index)
status, nb_rupt = peltAL.run()
colorsToJson(status, "Ruptures.json")
#push_data("Ruptures.json", push_local_path = "http://localhost:5000/api/team_rupture")
colorsToJson(nb_rupt, "Nb_rupt.json")
#push_data("Nb_rupt.json", push_local_path = "http://localhost:5000/api/team_rupture_extra")
time_stamp_prec = time_stamp
print("Sleeping...")
sleep(update_freq*600 - 30)