• Go to synthesis
Notebooks
  • Preprocessing
  • Ruptures
  • EMD
  • LSTM
  • Quantile regression
  • GAN

Navigation

  • I. Working principle
  • II. Data Processing
  • III. Initialization of the model
  • IV. Training Loop
  • V. Post-processing
  • VI. Testing

GAN¶

I. Working principle¶

A GAN (Generative Adversarial Network) is an unsupervised neural network which is comprised of two neural networks competing to achieve the best results: the generator and the discriminator. The generator’s job is to generate believable fake data from random noise in order to fool the discriminator. The discriminator’s job is to discriminate between fake and real data being sent to it.

At the end of the training loop the discriminator is able to discriminate accurately between real and fake data, and thus to detect real data in a set it is being fed. Its state is then saved and it is pulled out of the system in order to be used to process data. In our case, the real data being trained on is data with no anomalies, which the generator learns to imitate and the discriminator learns to recognise. This implies that, once shown erroneous data or some with visible anomalies, the discriminator will be able to detect it and raise an alarm.

The training takes place over multiple iterations (epochs). During each epoch, the discriminator is fed real labelled data to train on, and is then fed fake data generated by the equally trained generator from random noise, in order to help it discriminate between real and fake data. This completes the discriminator’s training for this epoch. The generator is then trained based on the discriminator’s output, the former’s goal being to maximise the latter’s error.

The whole system is very sensitive to the choice of the hyperparameters, which have to be fine-tuned in order to achieve the best possible discrimination and minimise the discriminator’s error rate.

1. Module imports¶

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json

import torch as th
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter


%load_ext tensorboard
In [2]:
save_path_discr = 'saved_file.pt'
In [3]:
dico = {"Description" : "Point Name",
        "MOTOR CURRENT" : "PARD@3C52@3C52-M@JT7099.CAL",
        "Température palier étage 1" : "PARD@3C52@3C52-M@TE7011A.PNT",
        "Température palier étage2" : "PARD@3C52@3C52-M@TE7021A.PNT",
        "Température palier étage 3" : "PARD@3C52@3C52-M@TE7031A.PNT",
        "Température palier étage 4" : "PARD@3C52@3C52-M@TE7041A.PNT",
        "Déplacement axiale 1/2" : "PARD@3C52@3C52-M@VT7001.PNT:RAW",
        "Déplacement axiale 3/4" : "PARD@3C52@3C52-M@VT7002.PNT:RAW",
        "1e stade vibration X" : "PARD@3C52@3C52-M@VT7011A.PNT:RAW",
        "1er stade vibration Y" : "PARD@3C52@3C52-M@VT7011B.PNT:RAW",
        "2e stade vibration X" : "PARD@3C52@3C52-M@VT7021A.PNT:RAW",
        "2e stade vibration Y" : "PARD@3C52@3C52-M@VT7021B.PNT:RAW",
        "3e stade vibration X" : "PARD@3C52@3C52-M@VT7031A.PNT:RAW",
        "3e stade vibration Y" : "PARD@3C52@3C52-M@VT7031B.PNT:RAW",
        "4e stade vibration X" : "PARD@3C52@3C52-M@VT7041A.PNT:RAW",
        "4e stade vibration Y" : "PARD@3C52@3C52-M@VT7041B.PNT:RAW",
        "Température huile sortie réfrigerant" : "PARD@3C52@3C52@TE7086.PNT",
        "labels" : "labels"}

2. Hyperparameters¶

The choice of hyperparameters is probably the most crucial part of the work given GAN's great sensitivity to them. For example, without a fine balance between the learning rates the discriminator becomes far too good in comparison to the generator and an abnormal signal is still detected as real by the discriminator. We also had to determine the optimal size of the 'sliding windows' which are slices of timepoints that are fed to the discriminator. Of course this size depends on the characteristic period of the oscillations of the real problem and on the desired detection time. The other hyperparameters were chosen empirically.

In [4]:
ngpu = 1 

features = [
    dico["Température palier étage 1"],
]   

features_nb = len(features)

##    Generator and discriminator hyperparameters
gen_input_dim = 200  # the dimension of the noise which is fed to the generator
gen_output_dim = features_nb
discr_input_dim = gen_output_dim
discr_output_dim = 1  # The discriminator returns the probability that the signal is a real one

#The sizes of the hidden layers in the discriminator
hidden_size = 20
hidden_size_discr = 30

#    Data processing constants
window_size = 15
window_step = 15

#    Training hyperparameters
num_epochs = 2
batch_size = 5
num_workers = 2
gen_learning_rate = 0.02 
discr_learning_rate = 0.00002
beta1 = 0.5 # hyperparameter for the Adam optimizers

II. Data Processing¶

1. Training Dataset¶

Importing anomaly-free data samples in order to train the generator and discriminator.

In [5]:
#    Importing the CSV datasets

data_relative_path = ''

normal_1_path = data_relative_path + 'data_gan/normal_0_14000_detrended.csv'
normal_2_path = data_relative_path + 'data_gan/normal_66900_100032_detrended.csv'

normal_1 = pd.read_csv(normal_1_path)
normal_2 = pd.read_csv(normal_2_path)

#    Resetting the indices
normal_1.reset_index(inplace = True)
normal_2.reset_index(inplace = True)

Generating a dataset of anomaly-free samples to feed the discriminator. The dataset is a list of set-length windows.

In [6]:
def datawindows_real(df : pd.DataFrame, window_size : int, step_size : int, features: list) -> list :
    """
    This function generates set-length windows (tensors) from a dataset values sharing a uniform label.
    :param df: The input dataframe
    :param window_size: The window width
    :param step_size: The generation offset step size
    :param features: The list of features to train for (optional)
    :returns: A list of tensors of same size
    """
    
    try: df.drop(['level_0'], axis=1, inplace=True)
    except: pass
    
    l = df[features[0]].size
    windows_nb = (l-window_size)//step_size
    window_list = []
    
    for window_id in range(windows_nb) :
        
        window_beginning = window_id*step_size
        window_ending = window_beginning + window_size - 1
        
        window_df = df.loc[window_beginning:window_ending, features]
        window_array = window_df.to_numpy().astype(np.float)
        
        window_list.append(th.tensor(window_array))
        
    return window_list
In [7]:
### Creation of the window_list that will train the discriminator
train_data_1 = datawindows_real(normal_1, window_size, window_step, features)
train_data_2 = datawindows_real(normal_2, window_size, window_step, features)
window_list = train_data_1 + train_data_2
In [8]:
class GAN_TrainDataset(Dataset):
    
    def __init__(self, window_list: list):
        self.dataset = window_list
        self.labels = [th.tensor([1.] * features_nb) for idx in range(len(window_list))]
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx: int) -> tuple :
        return self.dataset[idx], self.labels[idx]      
In [9]:
#    Loading the data
dataset = GAN_TrainDataset(window_list)
train_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)

2. Testing Dataset¶

Loading normal and abnormal data samples in order to test the discriminator and determine when to save it during the training.

In [10]:
# Importing the CSV datasets

data_relative_path = ''

abnormal_1_path = data_relative_path + 'data_gan/n_anormal_100032_end_detrended.csv'
abnormal_2_path = data_relative_path + 'data_gan/n_anormal_14000_43089_detrended.csv'
abnormal_3_path = data_relative_path + 'data_gan/n_anormal_43089_66900_detrended.csv'

abnormal_1 = pd.read_csv(abnormal_1_path)
abnormal_2 = pd.read_csv(abnormal_2_path)
abnormal_3 = pd.read_csv(abnormal_3_path)


# Resetting the indexes

abnormal_1.reset_index(inplace=True)
abnormal_2.reset_index(inplace=True)
abnormal_3.reset_index(inplace=True)

datawindows_labels generates set-length windows and the corresponding labels for the discriminator.

In [11]:
def datawindows_labels(df : pd.DataFrame, window_size : int, step_size : int, features: list) -> list :
    """
    This function generates set-length windows and the labels corresponding
    :param df: The input dataframe
    :param window_size: The window width
    :param step_size: The generation offset step size
    :param features: The list of features to train for (optional)
    :returns: A list of tensors of same size
    """
    try: df.drop(['level_0'], axis=1, inplace=True)
    except: pass
    
    l = df[features[0]].size
    windows_nb = (l-window_size)//step_size
    window_list = []
    
    for window_id in range(windows_nb) :
        
        window_beginning = window_id*step_size
        window_ending = window_beginning + window_size - 1
        
        window_df = df.loc[window_beginning:window_ending, features]
        window_array = window_df.to_numpy().astype(np.float)
        
        # labels
        window_labels = df.loc[window_beginning:window_ending, 'labels']
        window_labels_array = np.abs(window_labels.to_numpy().astype(np.float))
        
        # global window labels
        label = float(1. - window_labels_array.any() != 0.)#if a label is other than 0 then it is an anomaly and 
        #here, as the discriminator returns the probability that the signal comes from the real sample 
        #(i.e. "is normal"), the label for the discriminator is 0.
        
        window_list.append((th.tensor(window_array), th.tensor([label])))
        
    return window_list
In [12]:
class GAN_TestDataset(Dataset):
    
    def __init__(self, labeled_window_list: list):
        self.dataset = [data for data, label in labeled_window_list]
        self.labels = [label for data, label in labeled_window_list]
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx: int) -> tuple :
        return self.dataset[idx], self.labels[idx]
In [13]:
# Generating windows for testing the discriminator
w1 = datawindows_labels(abnormal_1, window_size, window_step, features)
w2 = datawindows_labels(abnormal_2, window_size, window_step, features)
window_list_test = w1 + w2
In [14]:
# Loading the data
test_dataset = GAN_TestDataset(window_list_test)
test_batch_size = batch_size
test_dataloader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=True, num_workers=num_workers)

OTHER SETTINGS¶

In [15]:
# Setting the working device
device = th.device('cuda:0' if (th.cuda.is_available() and ngpu > 0) else 'cpu')
In [16]:
# Custom weights initialization called on netG and netD
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)
In [17]:
# Initializing the tensorboard
writer = SummaryWriter()

III. Initialization of the model¶

1. Generator class¶

The generator returns a signal composed of the different windows and takes as input an array of random noise.

In [18]:
#    Generator class
class Generator( th.nn.Module ):
    
    def __init__(self, ngpu: int = 0):
        super(Generator, self).__init__()
        self.ngpu = ngpu
        self.main = th.nn.LSTM(gen_input_dim, gen_output_dim)

    def forward(self, val):
        return self.main(val)
In [19]:
#    Creating the generator
netG = Generator(ngpu).to(device)

#    Handling multi-GPU
if (device.type == 'cuda') and (ngpu > 1):
    netG = th.nn.DataParallel(netG, list(range(ngpu)))

#    Applying the weigths_init function to randomly initialise all weights to mean=0, stdev=0.2
netG.apply(weights_init)
Out[19]:
Generator(
  (main): LSTM(200, 1)
)

2. Discriminator class¶

The discriminator returns a label per window and takes as input a batch of windows, each window containing the values of the different features within the range of the window. The discriminator is made up of an LSTM neural network and two additional layers.

In [20]:
#    Discriminator class
class Discriminator( th.nn.Module ):
    
    def __init__(self, ngpu: int = 0):
        super(Discriminator, self).__init__()

        self.ngpu = ngpu
        
        self.lstm = th.nn.LSTM(discr_input_dim, hidden_size)
        self.linear = th.nn.Linear(hidden_size*window_size, hidden_size_discr)
        self.relu = th.nn.ReLU()
        self.linear2 = th.nn.Linear(hidden_size_discr, discr_output_dim)
        self.sigmoid = th.nn.Sigmoid()

    
    def forward(self, val):
        val, _ = self.lstm(val)
        val = val.view(val.shape[0], -1)
        val = self.linear(val)
        val = self.relu(val)
        val = self.linear2(val)
        val = self.sigmoid(val)
        return val
In [21]:
#    Creating the discriminator
netD = Discriminator(ngpu).to(device)

#    Handling multi-GPU (if desired)
if (device.type == 'cuda') and (ngpu > 1):
    netD = th.nn.DataParallel(netD, list(range(ngpu)))

#    Applying the weigths_init function to randomly initialise all weights to mean=0, stdev=0.2
netD.apply(weights_init)
Out[21]:
Discriminator(
  (lstm): LSTM(1, 20)
  (linear): Linear(in_features=300, out_features=30, bias=True)
  (relu): ReLU()
  (linear2): Linear(in_features=30, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)

3. Optimizers¶

In [22]:
#    Initialising the BCELoss (Binary Cross Entropy Loss) loss function
criterion = th.nn.BCELoss()
In [23]:
#    Setting up the Adam optimizers for G and D
optimizerG = th.optim.Adam(netG.parameters(), lr=gen_learning_rate, betas=(beta1, 0.999))
optimizerD = th.optim.Adam(netD.parameters(), lr=discr_learning_rate, betas=(beta1, 0.999))

IV. Training Loop¶

This is the training loop of the discrimintor and the generator, in which they are sequentially updated depending on the discriminator's error in order to refine their respective precisions.

In [24]:
#    Setting the conventions for the labels
real_label = 1.
fake_label = 0.

#    Creating a batch of latent vectors to visualise the progression of the generator
fixed_noise = th.randn(batch_size, window_size, gen_input_dim, device=device)

G_losses = []
D_losses = []
iters = 0
best_accuracy = 0

for epoch in range(num_epochs):
        
    for train_batch_id, (train_data_batch, train_label_batch) in enumerate(train_dataloader):
        # FORMATTING THE DATA BATCH
        train_data_batch = train_data_batch.float().to(device)

        # Training the discriminator
        netD.zero_grad() 
        
        ### 1. TRAINING THE DISCRIMINATOR WITH AN ALL-REAL BATCH - Updating the discriminator
        # 1.1. Formatting the labels
        label = th.full((train_data_batch.shape[0],), real_label, dtype=th.float, device=device).squeeze()         
        
        # 1.2. Forwards pass with an all-real batch (calculating the discriminator's output and then the error)
        output = netD(train_data_batch).squeeze()
        errD_real = criterion(output, label)
        
        # 1.3. Calculating the gradients for the discriminator in backwards pass
        errD_real.backward()
        D_x = output.mean().item()  # taking the mean of the labels generated for each data sequence in the batch

        ### 2. TRAINING THE DISCRIMINATOR WITH AN ALL-FAKE BATCH FROM THE GENERATOR 
        # 2.1. Generating a batch of random latent vectors
        noise = th.randn(train_data_batch.shape[0], window_size, gen_input_dim, device=device)
        
        # 2.2. Generating an all-fake batch using the generator
        fake, _ = netG(noise)
        label = th.full((train_data_batch.shape[0],), fake_label, dtype=th.float, device=device).squeeze()
        
        # 2.3. Classifying the all-fake batch with D
        output = netD(fake.detach()).squeeze()
        
        # 2.4. Calculating the discriminator's loss on the all-fake batch
        errD_fake = criterion(output, label) # was label
        
        # 2.5. Calculating the gradients for this batch
        errD_fake.backward()
        D_G_z1 = output.mean().item()

        # 3. Adding the gradients from the all-real and all-fake batches
        errD = errD_real + errD_fake

        # 4. Updating the discriminator
        optimizerD.step()
        
        # 4.3. TRAINING THE GENERATOR - Updating the generator: maximizing log(D(G(z)))
    
        # 1. Formatting the batch
        netG.zero_grad()
        label = th.full((train_data_batch.shape[0],), real_label, dtype=th.float, device=device).squeeze() # fake labels are real for generator cost

        # 2. Performing another forwards pass of an all-fake batch through the discriminator (since it was just updated)
        output = netD(fake.detach()).squeeze()

        # 3. Calculating the generator's loss based on this output
        errG = criterion(output, label)

        # 4. Calculating the gradients for the generator
        errG.backward()
        D_G_z2 = output.mean().item()

        # 5. Updating the generator
        optimizerG.step()
     
        ### 4. OUTPUT TRAINING STATS
        if train_batch_id % 450 == 0:
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                  % (epoch+1, num_epochs, train_batch_id+1, len(train_dataloader),
                     errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        # 4.2. Saving Losses for plotting
        G_losses.append(errG.item())
        D_losses.append(errD.item())

        # 4.3. Checking how the generator is doing by saving G's output on fixed_noise
        if (iters % 500 == 0) or ((epoch == num_epochs-1) and (train_batch_id == len(train_dataloader)-1)):
            with th.no_grad():
                fake = netG(fixed_noise)[0].detach().cpu()

        iters += 1
    
    ### X. TESTING AFTER EACH EPOCH
    errors = 0
    false_positives = 0
    false_negatives = 0
    
    # Loading a batch of test data & matching test labels
    for batch_id, (test_data_batch, test_label_batch) in enumerate(test_dataloader):

        # 1.1. Running through the discriminator
        test_data_batch = test_data_batch.float().to(device)
        test_discr_output_labels = netD(test_data_batch)
        
        # 1.2. Generating a list of the test's outputs (floats), of size the batch size
        test_discr_output_list = [test_output_label.item() for test_output_label in test_discr_output_labels]

        # 2. Running through the windows in the batch
        batch_len = len(test_label_batch)
        for window_id in range(batch_len):  # for (window_id, test_output) in enumerate(test_discr_output_list):
            
            # 3.1. Getting the output label from the discriminator
            test_output = test_discr_output_list[window_id]
            test_output_rounded = float( round(test_output) )  # rounding the test output (<= 0.5 is 0  = ERROR| > 0.5 is 1 = ALL FINE)
            
            # 3.2. Getting the reference label
            ref_label = test_label_batch[window_id] .item()            
            
            if test_output_rounded == 1. and ref_label == 0. :
                false_negatives += 1
                errors += 1
            
            if test_output_rounded == 0. and ref_label == 1. :
                false_positives += 1
                errors += 1
    
    ### PLOTTING
    writer.add_scalar('discriminator loss', errD.item(), epoch)
    writer.add_scalar('generator loss', errG.item(), epoch)
    
    
    # Saving the discriminator each time it performs with better accuracy
    mean_error_rate = errors/(len(test_dataloader)*test_batch_size)
    false_positive_rate = false_positives/(len(test_dataloader)*test_batch_size)
    false_negative_rate = false_negatives/(len(test_dataloader)*test_batch_size)
    
    accuracy = 1 - mean_error_rate
    
    if accuracy > best_accuracy:
        th.save(netD, save_path_discr)
        best_accuracy = accuracy
        
    writer.add_scalar('mean error', mean_error_rate, epoch)
    writer.add_scalar('false positives', false_positive_rate, epoch)
    writer.add_scalar('false negatives', false_negative_rate, epoch)
[1/2][1/628]	Loss_D: 1.3862	Loss_G: 0.7005	D(x): 0.4964	D(G(z)): 0.4963 / 0.4963
[1/2][451/628]	Loss_D: 1.3867	Loss_G: 0.6921	D(x): 0.5003	D(G(z)): 0.5005 / 0.5005
[2/2][1/628]	Loss_D: 1.3860	Loss_G: 0.6925	D(x): 0.5005	D(G(z)): 0.5003 / 0.5003
[2/2][451/628]	Loss_D: 1.3834	Loss_G: 0.6955	D(x): 0.5003	D(G(z)): 0.4988 / 0.4988
In [25]:
writer.flush()
writer.close()

Loading the saved discriminator for post-processing and testing¶

In [27]:
# Initialising as a Discriminator class instance
loaded_discr = Discriminator(ngpu).to(device)

# Loading the trained model's state dictionary
loaded_discr = th.load('final_gan.pt')  # We're here loading an already trained GAN that is more efficient
loaded_discr.eval()
Out[27]:
Discriminator(
  (lstm): LSTM(1, 20)
  (linear): Linear(in_features=300, out_features=30, bias=True)
  (relu): ReLU()
  (linear2): Linear(in_features=30, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)

V. Post-processing¶

In [28]:
df = abnormal_3

start = 0
stop = 30000
buffer_time = 504  # buffer time set to 3 days
Creating a new dataset for testing and plotting¶
In [29]:
data = df.loc[start:stop, dico["Température palier étage 1"]]
data = {'data': data} 
data = pd.DataFrame(data)
data.reset_index(inplace=True)
data.drop(['index'], axis=1, inplace=True)

labels = df.loc[start:stop, 'labels'] / (max(df['labels']) + 1)
labels = {'labels': labels} 
labels = pd.DataFrame(labels)
labels.reset_index(inplace=True)
labels.drop(['index'], axis=1, inplace=True)


df1_1 = df.loc[start:stop, features + ['labels']]
df1_1.reset_index(inplace=True)

labeled_window_list = datawindows_labels(df1_1, window_size, 1, features)  # window_step = 1 in order to have one
# window corresponding to one timepoint
In [30]:
class TestDataset2(Dataset):
    
    def __init__(self, labeled_window_list: list):
        self.dataset = [data for data, label in labeled_window_list]
        self.labels = [label for data, label in labeled_window_list]
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx: int) -> tuple :
        return self.dataset[idx], self.labels[idx]
In [31]:
#    Loading the data
test_dataset = TestDataset2(labeled_window_list)
test_batch_size = batch_size
test_dataloader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False, num_workers=num_workers)
Collecting the discriminator's output on the testing dataset¶
In [32]:
discriminator = loaded_discr

def round_func(number: float, round_threshold: float):
    return float(number > round_threshold)
    
output_list = []
for _, (test_data_batch, test_label_batch) in enumerate(test_dataloader):
    
    test_data_batch = test_data_batch.float().to(device)
    output_batch = discriminator(test_data_batch)
    output_list += output_batch
    
# Processing the discriminator's output
discr_output = list()
for i, output in enumerate(output_list) :
    discr_output += [1 - round_func(output.item(), 0.75)]  #when the probability given by the GAN is smaller than
    #0.75, the window is considered to be an anomalous one
Post-processing algorithms¶

Without post-processing the discriminator returns only punctual anomalies with a high density around the spikes. The post-processing intends to keep an alarm raised when encountering an anomaly.

The first step is to smooth the error outputs in an area where the discriminator displays a lot of errors.

In [33]:
def smooth_data(discr_output, smooth_memory:int = 10, occurrences_threshold: int = 1):
    occurrences_threshold = 1
    
    smooth_output = discr_output.copy()
    
    for index in range(smooth_memory, len(discr_output)) :

        memory_range = discr_output[index - smooth_memory : index]
        count = memory_range.count(1.)

        if count >= occurrences_threshold:
            smooth_output[index] = 1.
    
    return smooth_output

smooth_output = smooth_data(discr_output, smooth_memory=10, occurrences_threshold=1)

And then implementing the 'alarm system' : a red alarm if an anomaly was detected during the last buffer time, an orange alarm if it was detected during the previous buffer time, and no alarm otherwise.

In [34]:
def alarm_levels(discr_output, buffer_time):
    
    alarm_list = [None] * (2 * buffer_time)
    
    for index in range(2 * buffer_time, len(discr_output)) :
        
        red_range = discr_output[index - buffer_time : index]
        orange_range = discr_output[index -2 * buffer_time : index - buffer_time]

        memory_range = discr_output[index - buffer_time : index]
        count = memory_range.count(1.)
        
        if red_range.count(1.) > buffer_time // 100:
            alarm = 2
        elif 1. in orange_range:
            alarm = 1
        else:
            alarm = 0

        alarm_list.append(alarm)
        
    return alarm_list
In [35]:
def display_stats(alarm_list, labels) :
    true_positives = 0
    false_positives = 0
    all_positives = 0
    error_number = 0
    negative_number = 0

    for index, output in enumerate(alarm_list) :
        if output > 0 :
            all_positives += 1
            if labels.loc[index].item() > 0. :
                true_positives += 1
        if labels.loc[index].item() > 0. :
            error_number += 1
        if labels.loc[index].item() == 0. :
            negative_number += 1
            if output > 0. :
                false_positives += 1


    try:
        recall = true_positives / error_number 
    except ZeroDivisionError:
        recall = "pas d'erreur détectée"#or true positives rate
    try:
        accuracy = true_positives / all_positives
    except ZeroDivisionError:
        accuracy = "pas d'erreur"
    
    false_positives_rate = false_positives / negative_number
    print(f"accuracy : {accuracy} \nrecall : {recall}")
In [36]:
def colorsToJson(data: np.ndarray, output: str):
    """
    Prend en entrée un array 2 dimensions
    Colonne 1 : Timestamps (ou string)
    Colonne 2 : Couleurs
    Convertit en JSON
    
    :param data: The data array to convert to a json file
    :param output: The output path
    """
    if type(data) is not list: # convertit en liste si c'était pas le cas
        data = data.tolist()
        
    titles = data[0]
    list_dict = []
    
    for i in range(1,len(data)): # convertit les timestamps en string si c'était pas le cas
        data[i][0] = str(data[i][0])
        d = dict()
        for j, title in enumerate(titles):
            d[title] = data[i][j]
        list_dict.append(d)
    
    with open(output, "w", encoding="utf-8") as fp: # enregistre dans un json
        json.dump(list_dict, fp, sort_keys=True, indent=4)
In [37]:
# Getting the timestamps
timestamps_strlist = list(df.loc[start:stop, 'index'])
timestamps_strlist = timestamps_strlist[2 * buffer_time :]

# Alarms
alarm_list = alarm_levels(smooth_output, buffer_time)
alarm_list = alarm_list[2 * buffer_time :]

# Zipping
alarm_list_timestamps = np.array(
    [(timestamp_str, alarm) for timestamp_str, alarm in zip(timestamps_strlist, alarm_list)]
)
    
# Removing the first None values (not enough history)
alarm_list_timestamps = np.concatenate(
    (np.array([['timestamp', 'status']]),
     alarm_list_timestamps),axis=0
)

VI. Testing¶

In cyan is the alarm level, which globally corresponds to the areas with anomalous data. The raw (and slightly smoothed) discriminator's output is plotted in green. The real signal is in blue and the real label is in orange.

In [38]:
# Plotting
plt.figure(figsize = (40, 15))
plt.xticks(ticks = list(range(0, data.size, 1000)))
plt.grid()

plt.fill([0.] + alarm_list + [0.], c='cyan')
plt.plot([0.] + smooth_output[2 * buffer_time :] + [0.], c='g')
plt.plot([0.] + data[2 * buffer_time :] + [0.], c='b')
plt.plot([0.] + labels[2 * buffer_time :] + [0.], c='orange')
   
plt.ylim(-1, 2)
plt.show()

#colorsToJson(alarm_list_timestamps, 'output-gan.json')  # saving in a json file
In [39]:
print(display_stats(alarm_list, labels))
accuracy : 0.5138546798029556 
recall : 1.0
None