GAN¶
I. Working principle¶
A GAN (Generative Adversarial Network) is an unsupervised neural network which is comprised of two neural networks competing to achieve the best results: the generator and the discriminator. The generator’s job is to generate believable fake data from random noise in order to fool the discriminator. The discriminator’s job is to discriminate between fake and real data being sent to it.
At the end of the training loop the discriminator is able to discriminate accurately between real and fake data, and thus to detect real data in a set it is being fed. Its state is then saved and it is pulled out of the system in order to be used to process data. In our case, the real data being trained on is data with no anomalies, which the generator learns to imitate and the discriminator learns to recognise. This implies that, once shown erroneous data or some with visible anomalies, the discriminator will be able to detect it and raise an alarm.
The training takes place over multiple iterations (epochs). During each epoch, the discriminator is fed real labelled data to train on, and is then fed fake data generated by the equally trained generator from random noise, in order to help it discriminate between real and fake data. This completes the discriminator’s training for this epoch. The generator is then trained based on the discriminator’s output, the former’s goal being to maximise the latter’s error.
The whole system is very sensitive to the choice of the hyperparameters, which have to be fine-tuned in order to achieve the best possible discrimination and minimise the discriminator’s error rate.
1. Module imports¶
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
import torch as th
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
%load_ext tensorboard
save_path_discr = 'saved_file.pt'
dico = {"Description" : "Point Name",
"MOTOR CURRENT" : "PARD@3C52@3C52-M@JT7099.CAL",
"Température palier étage 1" : "PARD@3C52@3C52-M@TE7011A.PNT",
"Température palier étage2" : "PARD@3C52@3C52-M@TE7021A.PNT",
"Température palier étage 3" : "PARD@3C52@3C52-M@TE7031A.PNT",
"Température palier étage 4" : "PARD@3C52@3C52-M@TE7041A.PNT",
"Déplacement axiale 1/2" : "PARD@3C52@3C52-M@VT7001.PNT:RAW",
"Déplacement axiale 3/4" : "PARD@3C52@3C52-M@VT7002.PNT:RAW",
"1e stade vibration X" : "PARD@3C52@3C52-M@VT7011A.PNT:RAW",
"1er stade vibration Y" : "PARD@3C52@3C52-M@VT7011B.PNT:RAW",
"2e stade vibration X" : "PARD@3C52@3C52-M@VT7021A.PNT:RAW",
"2e stade vibration Y" : "PARD@3C52@3C52-M@VT7021B.PNT:RAW",
"3e stade vibration X" : "PARD@3C52@3C52-M@VT7031A.PNT:RAW",
"3e stade vibration Y" : "PARD@3C52@3C52-M@VT7031B.PNT:RAW",
"4e stade vibration X" : "PARD@3C52@3C52-M@VT7041A.PNT:RAW",
"4e stade vibration Y" : "PARD@3C52@3C52-M@VT7041B.PNT:RAW",
"Température huile sortie réfrigerant" : "PARD@3C52@3C52@TE7086.PNT",
"labels" : "labels"}
2. Hyperparameters¶
The choice of hyperparameters is probably the most crucial part of the work given GAN's great sensitivity to them. For example, without a fine balance between the learning rates the discriminator becomes far too good in comparison to the generator and an abnormal signal is still detected as real by the discriminator. We also had to determine the optimal size of the 'sliding windows' which are slices of timepoints that are fed to the discriminator. Of course this size depends on the characteristic period of the oscillations of the real problem and on the desired detection time. The other hyperparameters were chosen empirically.
ngpu = 1
features = [
dico["Température palier étage 1"],
]
features_nb = len(features)
## Generator and discriminator hyperparameters
gen_input_dim = 200 # the dimension of the noise which is fed to the generator
gen_output_dim = features_nb
discr_input_dim = gen_output_dim
discr_output_dim = 1 # The discriminator returns the probability that the signal is a real one
#The sizes of the hidden layers in the discriminator
hidden_size = 20
hidden_size_discr = 30
# Data processing constants
window_size = 15
window_step = 15
# Training hyperparameters
num_epochs = 2
batch_size = 5
num_workers = 2
gen_learning_rate = 0.02
discr_learning_rate = 0.00002
beta1 = 0.5 # hyperparameter for the Adam optimizers
II. Data Processing¶
1. Training Dataset¶
Importing anomaly-free data samples in order to train the generator and discriminator.
# Importing the CSV datasets
data_relative_path = ''
normal_1_path = data_relative_path + 'data_gan/normal_0_14000_detrended.csv'
normal_2_path = data_relative_path + 'data_gan/normal_66900_100032_detrended.csv'
normal_1 = pd.read_csv(normal_1_path)
normal_2 = pd.read_csv(normal_2_path)
# Resetting the indices
normal_1.reset_index(inplace = True)
normal_2.reset_index(inplace = True)
Generating a dataset of anomaly-free samples to feed the discriminator. The dataset is a list of set-length windows.
def datawindows_real(df : pd.DataFrame, window_size : int, step_size : int, features: list) -> list :
"""
This function generates set-length windows (tensors) from a dataset values sharing a uniform label.
:param df: The input dataframe
:param window_size: The window width
:param step_size: The generation offset step size
:param features: The list of features to train for (optional)
:returns: A list of tensors of same size
"""
try: df.drop(['level_0'], axis=1, inplace=True)
except: pass
l = df[features[0]].size
windows_nb = (l-window_size)//step_size
window_list = []
for window_id in range(windows_nb) :
window_beginning = window_id*step_size
window_ending = window_beginning + window_size - 1
window_df = df.loc[window_beginning:window_ending, features]
window_array = window_df.to_numpy().astype(np.float)
window_list.append(th.tensor(window_array))
return window_list
### Creation of the window_list that will train the discriminator
train_data_1 = datawindows_real(normal_1, window_size, window_step, features)
train_data_2 = datawindows_real(normal_2, window_size, window_step, features)
window_list = train_data_1 + train_data_2
class GAN_TrainDataset(Dataset):
def __init__(self, window_list: list):
self.dataset = window_list
self.labels = [th.tensor([1.] * features_nb) for idx in range(len(window_list))]
def __len__(self):
return len(self.dataset)
def __getitem__(self, idx: int) -> tuple :
return self.dataset[idx], self.labels[idx]
# Loading the data
dataset = GAN_TrainDataset(window_list)
train_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
2. Testing Dataset¶
Loading normal and abnormal data samples in order to test the discriminator and determine when to save it during the training.
# Importing the CSV datasets
data_relative_path = ''
abnormal_1_path = data_relative_path + 'data_gan/n_anormal_100032_end_detrended.csv'
abnormal_2_path = data_relative_path + 'data_gan/n_anormal_14000_43089_detrended.csv'
abnormal_3_path = data_relative_path + 'data_gan/n_anormal_43089_66900_detrended.csv'
abnormal_1 = pd.read_csv(abnormal_1_path)
abnormal_2 = pd.read_csv(abnormal_2_path)
abnormal_3 = pd.read_csv(abnormal_3_path)
# Resetting the indexes
abnormal_1.reset_index(inplace=True)
abnormal_2.reset_index(inplace=True)
abnormal_3.reset_index(inplace=True)
datawindows_labels
generates set-length windows and the corresponding labels for the discriminator.
def datawindows_labels(df : pd.DataFrame, window_size : int, step_size : int, features: list) -> list :
"""
This function generates set-length windows and the labels corresponding
:param df: The input dataframe
:param window_size: The window width
:param step_size: The generation offset step size
:param features: The list of features to train for (optional)
:returns: A list of tensors of same size
"""
try: df.drop(['level_0'], axis=1, inplace=True)
except: pass
l = df[features[0]].size
windows_nb = (l-window_size)//step_size
window_list = []
for window_id in range(windows_nb) :
window_beginning = window_id*step_size
window_ending = window_beginning + window_size - 1
window_df = df.loc[window_beginning:window_ending, features]
window_array = window_df.to_numpy().astype(np.float)
# labels
window_labels = df.loc[window_beginning:window_ending, 'labels']
window_labels_array = np.abs(window_labels.to_numpy().astype(np.float))
# global window labels
label = float(1. - window_labels_array.any() != 0.)#if a label is other than 0 then it is an anomaly and
#here, as the discriminator returns the probability that the signal comes from the real sample
#(i.e. "is normal"), the label for the discriminator is 0.
window_list.append((th.tensor(window_array), th.tensor([label])))
return window_list
class GAN_TestDataset(Dataset):
def __init__(self, labeled_window_list: list):
self.dataset = [data for data, label in labeled_window_list]
self.labels = [label for data, label in labeled_window_list]
def __len__(self):
return len(self.dataset)
def __getitem__(self, idx: int) -> tuple :
return self.dataset[idx], self.labels[idx]
# Generating windows for testing the discriminator
w1 = datawindows_labels(abnormal_1, window_size, window_step, features)
w2 = datawindows_labels(abnormal_2, window_size, window_step, features)
window_list_test = w1 + w2
# Loading the data
test_dataset = GAN_TestDataset(window_list_test)
test_batch_size = batch_size
test_dataloader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=True, num_workers=num_workers)
OTHER SETTINGS¶
# Setting the working device
device = th.device('cuda:0' if (th.cuda.is_available() and ngpu > 0) else 'cpu')
# Custom weights initialization called on netG and netD
def weights_init(m):
classname = m.__class__.__name__
if classname.find('Conv') != -1:
nn.init.normal_(m.weight.data, 0.0, 0.02)
elif classname.find('BatchNorm') != -1:
nn.init.normal_(m.weight.data, 1.0, 0.02)
nn.init.constant_(m.bias.data, 0)
# Initializing the tensorboard
writer = SummaryWriter()
III. Initialization of the model¶
1. Generator class¶
The generator returns a signal composed of the different windows and takes as input an array of random noise.
# Generator class
class Generator( th.nn.Module ):
def __init__(self, ngpu: int = 0):
super(Generator, self).__init__()
self.ngpu = ngpu
self.main = th.nn.LSTM(gen_input_dim, gen_output_dim)
def forward(self, val):
return self.main(val)
# Creating the generator
netG = Generator(ngpu).to(device)
# Handling multi-GPU
if (device.type == 'cuda') and (ngpu > 1):
netG = th.nn.DataParallel(netG, list(range(ngpu)))
# Applying the weigths_init function to randomly initialise all weights to mean=0, stdev=0.2
netG.apply(weights_init)
Generator( (main): LSTM(200, 1) )
2. Discriminator class¶
The discriminator returns a label per window and takes as input a batch of windows, each window containing the values of the different features within the range of the window. The discriminator is made up of an LSTM neural network and two additional layers.
# Discriminator class
class Discriminator( th.nn.Module ):
def __init__(self, ngpu: int = 0):
super(Discriminator, self).__init__()
self.ngpu = ngpu
self.lstm = th.nn.LSTM(discr_input_dim, hidden_size)
self.linear = th.nn.Linear(hidden_size*window_size, hidden_size_discr)
self.relu = th.nn.ReLU()
self.linear2 = th.nn.Linear(hidden_size_discr, discr_output_dim)
self.sigmoid = th.nn.Sigmoid()
def forward(self, val):
val, _ = self.lstm(val)
val = val.view(val.shape[0], -1)
val = self.linear(val)
val = self.relu(val)
val = self.linear2(val)
val = self.sigmoid(val)
return val
# Creating the discriminator
netD = Discriminator(ngpu).to(device)
# Handling multi-GPU (if desired)
if (device.type == 'cuda') and (ngpu > 1):
netD = th.nn.DataParallel(netD, list(range(ngpu)))
# Applying the weigths_init function to randomly initialise all weights to mean=0, stdev=0.2
netD.apply(weights_init)
Discriminator( (lstm): LSTM(1, 20) (linear): Linear(in_features=300, out_features=30, bias=True) (relu): ReLU() (linear2): Linear(in_features=30, out_features=1, bias=True) (sigmoid): Sigmoid() )
3. Optimizers¶
# Initialising the BCELoss (Binary Cross Entropy Loss) loss function
criterion = th.nn.BCELoss()
# Setting up the Adam optimizers for G and D
optimizerG = th.optim.Adam(netG.parameters(), lr=gen_learning_rate, betas=(beta1, 0.999))
optimizerD = th.optim.Adam(netD.parameters(), lr=discr_learning_rate, betas=(beta1, 0.999))
IV. Training Loop¶
This is the training loop of the discrimintor and the generator, in which they are sequentially updated depending on the discriminator's error in order to refine their respective precisions.
# Setting the conventions for the labels
real_label = 1.
fake_label = 0.
# Creating a batch of latent vectors to visualise the progression of the generator
fixed_noise = th.randn(batch_size, window_size, gen_input_dim, device=device)
G_losses = []
D_losses = []
iters = 0
best_accuracy = 0
for epoch in range(num_epochs):
for train_batch_id, (train_data_batch, train_label_batch) in enumerate(train_dataloader):
# FORMATTING THE DATA BATCH
train_data_batch = train_data_batch.float().to(device)
# Training the discriminator
netD.zero_grad()
### 1. TRAINING THE DISCRIMINATOR WITH AN ALL-REAL BATCH - Updating the discriminator
# 1.1. Formatting the labels
label = th.full((train_data_batch.shape[0],), real_label, dtype=th.float, device=device).squeeze()
# 1.2. Forwards pass with an all-real batch (calculating the discriminator's output and then the error)
output = netD(train_data_batch).squeeze()
errD_real = criterion(output, label)
# 1.3. Calculating the gradients for the discriminator in backwards pass
errD_real.backward()
D_x = output.mean().item() # taking the mean of the labels generated for each data sequence in the batch
### 2. TRAINING THE DISCRIMINATOR WITH AN ALL-FAKE BATCH FROM THE GENERATOR
# 2.1. Generating a batch of random latent vectors
noise = th.randn(train_data_batch.shape[0], window_size, gen_input_dim, device=device)
# 2.2. Generating an all-fake batch using the generator
fake, _ = netG(noise)
label = th.full((train_data_batch.shape[0],), fake_label, dtype=th.float, device=device).squeeze()
# 2.3. Classifying the all-fake batch with D
output = netD(fake.detach()).squeeze()
# 2.4. Calculating the discriminator's loss on the all-fake batch
errD_fake = criterion(output, label) # was label
# 2.5. Calculating the gradients for this batch
errD_fake.backward()
D_G_z1 = output.mean().item()
# 3. Adding the gradients from the all-real and all-fake batches
errD = errD_real + errD_fake
# 4. Updating the discriminator
optimizerD.step()
# 4.3. TRAINING THE GENERATOR - Updating the generator: maximizing log(D(G(z)))
# 1. Formatting the batch
netG.zero_grad()
label = th.full((train_data_batch.shape[0],), real_label, dtype=th.float, device=device).squeeze() # fake labels are real for generator cost
# 2. Performing another forwards pass of an all-fake batch through the discriminator (since it was just updated)
output = netD(fake.detach()).squeeze()
# 3. Calculating the generator's loss based on this output
errG = criterion(output, label)
# 4. Calculating the gradients for the generator
errG.backward()
D_G_z2 = output.mean().item()
# 5. Updating the generator
optimizerG.step()
### 4. OUTPUT TRAINING STATS
if train_batch_id % 450 == 0:
print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
% (epoch+1, num_epochs, train_batch_id+1, len(train_dataloader),
errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))
# 4.2. Saving Losses for plotting
G_losses.append(errG.item())
D_losses.append(errD.item())
# 4.3. Checking how the generator is doing by saving G's output on fixed_noise
if (iters % 500 == 0) or ((epoch == num_epochs-1) and (train_batch_id == len(train_dataloader)-1)):
with th.no_grad():
fake = netG(fixed_noise)[0].detach().cpu()
iters += 1
### X. TESTING AFTER EACH EPOCH
errors = 0
false_positives = 0
false_negatives = 0
# Loading a batch of test data & matching test labels
for batch_id, (test_data_batch, test_label_batch) in enumerate(test_dataloader):
# 1.1. Running through the discriminator
test_data_batch = test_data_batch.float().to(device)
test_discr_output_labels = netD(test_data_batch)
# 1.2. Generating a list of the test's outputs (floats), of size the batch size
test_discr_output_list = [test_output_label.item() for test_output_label in test_discr_output_labels]
# 2. Running through the windows in the batch
batch_len = len(test_label_batch)
for window_id in range(batch_len): # for (window_id, test_output) in enumerate(test_discr_output_list):
# 3.1. Getting the output label from the discriminator
test_output = test_discr_output_list[window_id]
test_output_rounded = float( round(test_output) ) # rounding the test output (<= 0.5 is 0 = ERROR| > 0.5 is 1 = ALL FINE)
# 3.2. Getting the reference label
ref_label = test_label_batch[window_id] .item()
if test_output_rounded == 1. and ref_label == 0. :
false_negatives += 1
errors += 1
if test_output_rounded == 0. and ref_label == 1. :
false_positives += 1
errors += 1
### PLOTTING
writer.add_scalar('discriminator loss', errD.item(), epoch)
writer.add_scalar('generator loss', errG.item(), epoch)
# Saving the discriminator each time it performs with better accuracy
mean_error_rate = errors/(len(test_dataloader)*test_batch_size)
false_positive_rate = false_positives/(len(test_dataloader)*test_batch_size)
false_negative_rate = false_negatives/(len(test_dataloader)*test_batch_size)
accuracy = 1 - mean_error_rate
if accuracy > best_accuracy:
th.save(netD, save_path_discr)
best_accuracy = accuracy
writer.add_scalar('mean error', mean_error_rate, epoch)
writer.add_scalar('false positives', false_positive_rate, epoch)
writer.add_scalar('false negatives', false_negative_rate, epoch)
[1/2][1/628] Loss_D: 1.3862 Loss_G: 0.7005 D(x): 0.4964 D(G(z)): 0.4963 / 0.4963 [1/2][451/628] Loss_D: 1.3867 Loss_G: 0.6921 D(x): 0.5003 D(G(z)): 0.5005 / 0.5005 [2/2][1/628] Loss_D: 1.3860 Loss_G: 0.6925 D(x): 0.5005 D(G(z)): 0.5003 / 0.5003 [2/2][451/628] Loss_D: 1.3834 Loss_G: 0.6955 D(x): 0.5003 D(G(z)): 0.4988 / 0.4988
writer.flush()
writer.close()
Loading the saved discriminator for post-processing and testing¶
# Initialising as a Discriminator class instance
loaded_discr = Discriminator(ngpu).to(device)
# Loading the trained model's state dictionary
loaded_discr = th.load('final_gan.pt') # We're here loading an already trained GAN that is more efficient
loaded_discr.eval()
Discriminator( (lstm): LSTM(1, 20) (linear): Linear(in_features=300, out_features=30, bias=True) (relu): ReLU() (linear2): Linear(in_features=30, out_features=1, bias=True) (sigmoid): Sigmoid() )
V. Post-processing¶
df = abnormal_3
start = 0
stop = 30000
buffer_time = 504 # buffer time set to 3 days
Creating a new dataset for testing and plotting¶
data = df.loc[start:stop, dico["Température palier étage 1"]]
data = {'data': data}
data = pd.DataFrame(data)
data.reset_index(inplace=True)
data.drop(['index'], axis=1, inplace=True)
labels = df.loc[start:stop, 'labels'] / (max(df['labels']) + 1)
labels = {'labels': labels}
labels = pd.DataFrame(labels)
labels.reset_index(inplace=True)
labels.drop(['index'], axis=1, inplace=True)
df1_1 = df.loc[start:stop, features + ['labels']]
df1_1.reset_index(inplace=True)
labeled_window_list = datawindows_labels(df1_1, window_size, 1, features) # window_step = 1 in order to have one
# window corresponding to one timepoint
class TestDataset2(Dataset):
def __init__(self, labeled_window_list: list):
self.dataset = [data for data, label in labeled_window_list]
self.labels = [label for data, label in labeled_window_list]
def __len__(self):
return len(self.dataset)
def __getitem__(self, idx: int) -> tuple :
return self.dataset[idx], self.labels[idx]
# Loading the data
test_dataset = TestDataset2(labeled_window_list)
test_batch_size = batch_size
test_dataloader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False, num_workers=num_workers)
Collecting the discriminator's output on the testing dataset¶
discriminator = loaded_discr
def round_func(number: float, round_threshold: float):
return float(number > round_threshold)
output_list = []
for _, (test_data_batch, test_label_batch) in enumerate(test_dataloader):
test_data_batch = test_data_batch.float().to(device)
output_batch = discriminator(test_data_batch)
output_list += output_batch
# Processing the discriminator's output
discr_output = list()
for i, output in enumerate(output_list) :
discr_output += [1 - round_func(output.item(), 0.75)] #when the probability given by the GAN is smaller than
#0.75, the window is considered to be an anomalous one
Post-processing algorithms¶
Without post-processing the discriminator returns only punctual anomalies with a high density around the spikes. The post-processing intends to keep an alarm raised when encountering an anomaly.
The first step is to smooth the error outputs in an area where the discriminator displays a lot of errors.
def smooth_data(discr_output, smooth_memory:int = 10, occurrences_threshold: int = 1):
occurrences_threshold = 1
smooth_output = discr_output.copy()
for index in range(smooth_memory, len(discr_output)) :
memory_range = discr_output[index - smooth_memory : index]
count = memory_range.count(1.)
if count >= occurrences_threshold:
smooth_output[index] = 1.
return smooth_output
smooth_output = smooth_data(discr_output, smooth_memory=10, occurrences_threshold=1)
And then implementing the 'alarm system' : a red alarm if an anomaly was detected during the last buffer time, an orange alarm if it was detected during the previous buffer time, and no alarm otherwise.
def alarm_levels(discr_output, buffer_time):
alarm_list = [None] * (2 * buffer_time)
for index in range(2 * buffer_time, len(discr_output)) :
red_range = discr_output[index - buffer_time : index]
orange_range = discr_output[index -2 * buffer_time : index - buffer_time]
memory_range = discr_output[index - buffer_time : index]
count = memory_range.count(1.)
if red_range.count(1.) > buffer_time // 100:
alarm = 2
elif 1. in orange_range:
alarm = 1
else:
alarm = 0
alarm_list.append(alarm)
return alarm_list
def display_stats(alarm_list, labels) :
true_positives = 0
false_positives = 0
all_positives = 0
error_number = 0
negative_number = 0
for index, output in enumerate(alarm_list) :
if output > 0 :
all_positives += 1
if labels.loc[index].item() > 0. :
true_positives += 1
if labels.loc[index].item() > 0. :
error_number += 1
if labels.loc[index].item() == 0. :
negative_number += 1
if output > 0. :
false_positives += 1
try:
recall = true_positives / error_number
except ZeroDivisionError:
recall = "pas d'erreur détectée"#or true positives rate
try:
accuracy = true_positives / all_positives
except ZeroDivisionError:
accuracy = "pas d'erreur"
false_positives_rate = false_positives / negative_number
print(f"accuracy : {accuracy} \nrecall : {recall}")
def colorsToJson(data: np.ndarray, output: str):
"""
Prend en entrée un array 2 dimensions
Colonne 1 : Timestamps (ou string)
Colonne 2 : Couleurs
Convertit en JSON
:param data: The data array to convert to a json file
:param output: The output path
"""
if type(data) is not list: # convertit en liste si c'était pas le cas
data = data.tolist()
titles = data[0]
list_dict = []
for i in range(1,len(data)): # convertit les timestamps en string si c'était pas le cas
data[i][0] = str(data[i][0])
d = dict()
for j, title in enumerate(titles):
d[title] = data[i][j]
list_dict.append(d)
with open(output, "w", encoding="utf-8") as fp: # enregistre dans un json
json.dump(list_dict, fp, sort_keys=True, indent=4)
# Getting the timestamps
timestamps_strlist = list(df.loc[start:stop, 'index'])
timestamps_strlist = timestamps_strlist[2 * buffer_time :]
# Alarms
alarm_list = alarm_levels(smooth_output, buffer_time)
alarm_list = alarm_list[2 * buffer_time :]
# Zipping
alarm_list_timestamps = np.array(
[(timestamp_str, alarm) for timestamp_str, alarm in zip(timestamps_strlist, alarm_list)]
)
# Removing the first None values (not enough history)
alarm_list_timestamps = np.concatenate(
(np.array([['timestamp', 'status']]),
alarm_list_timestamps),axis=0
)
VI. Testing¶
In cyan is the alarm level, which globally corresponds to the areas with anomalous data. The raw (and slightly smoothed) discriminator's output is plotted in green. The real signal is in blue and the real label is in orange.
# Plotting
plt.figure(figsize = (40, 15))
plt.xticks(ticks = list(range(0, data.size, 1000)))
plt.grid()
plt.fill([0.] + alarm_list + [0.], c='cyan')
plt.plot([0.] + smooth_output[2 * buffer_time :] + [0.], c='g')
plt.plot([0.] + data[2 * buffer_time :] + [0.], c='b')
plt.plot([0.] + labels[2 * buffer_time :] + [0.], c='orange')
plt.ylim(-1, 2)
plt.show()
#colorsToJson(alarm_list_timestamps, 'output-gan.json') # saving in a json file
print(display_stats(alarm_list, labels))
accuracy : 0.5138546798029556 recall : 1.0 None