Commit 9efe160e authored by rparrond's avatar rparrond
Browse files

new files

parent 071c066f
#!/usr/bin/env python
'''
Short script to define the autoencoder structure. It has three different parts:
the encoder, the latent space and the decoder. Input dimensions, latent
dimensions and the number of layers are parameters that can vary depending on
the data or be modified manually.
The function of this is script is to import the autoencoder class to run the
train and test codes using the command:
from module import AutoEncoder
'''
#Imports
import torch
#Constants
#Functions
#Create the Autoencoder Class
class AutoEncoder(torch.nn.Module):
def __init__(self, input_dim, nlayers, latent):
super().__init__()
delta = int((input_dim - latent) / (nlayers + 1))
#Encoder
encoder = []
nunits = input_dim
for layer in range(nlayers):
encoder.append(torch.nn.Linear(nunits, nunits - delta))
encoder.append(torch.nn.ReLU())
nunits = nunits - delta
self.encoder = torch.nn.Sequential(*encoder)
#Latent View
self.lv = torch.nn.Sequential(
torch.nn.Linear(nunits, latent),
torch.nn.Sigmoid())
#Decoder
decoder = []
nunits = latent
for layer in range(nlayers):
decoder.append(torch.nn.Linear(nunits, nunits + delta))
decoder.append(torch.nn.ReLU())
nunits = nunits + delta
self.decoder = torch.nn.Sequential(*decoder)
#Output
self.output_layer = torch.nn.Sequential(
torch.nn.Linear(nunits, input_dim),
torch.nn.Sigmoid())
def forward(self, x):
encoded = self.encoder(x)
latent_space = self.lv(encoded)
decoded = self.decoder(latent_space)
output = self.output_layer(decoded)
return (latent_space, output)
#!/usr/bin/env python
'''
Short script to train the autoencoder and optimize its parameters, and test the
autoencoder after the training step and compyute the RMSD function as a
performance assessment criteria The optimizer used is Adam and the loss function
is the mean squared error (MSE). The fixed parameters are the batch size and the
no-shuffle option, while the learning rate and the number of epochs can be
variable. These values have to be the same in the validation step as in the
training step.
For the autoencoder, input dimensions, latent dimensions and the number of
layers are parameters that can vary depending on the data or be modified
manually.
To display the usage instructions, run:
train_test_AE.py -h
'''
#Imports
import torch
import numpy as np
import matplotlib as mpl
from matplotlib import pyplot as plt
from matplotlib import cm as cm
from mpl_toolkits import mplot3d
import math as m
import sys
import os
import argparse
import multiprocessing as mp
from model import AutoEncoder
#Constants
CWD = os.getcwd() #should be the directory PK/Autoencoders
#Functions
def test_paths():
if CWD.split("/")[-1] != "Autoencoders":
sys.stderr.write("ERROR: This script needs to be run within the"
"PK/Autoencoders directory.\n")
exit()
def two_sided_moving_average(x, n=3):
out = np.zeros_like(x, dtype=np.float64)
dim_len = x.shape[0]
for i in range(dim_len):
# // is the floor division operator
# the floor division rounds the result down to the nearest whole number
if n%2 == 0:
a, b = i - (n-1)//2, i + (n-1)//2 + 2
else:
a, b = i - (n-1)//2, i + (n-1)//2 + 1
# Cap indices to min and max indices
a = max(0, a)
b = min(dim_len, b)
out[i] = np.mean(x[a:b])
return out
def compute_rmsd(input_frames, output_frames):
rmsds = []
for input_frame, output_frame in zip(input_frames, output_frames):
input_atoms = input_frame.reshape((int(len(input_frame)/3), 3))
output_atoms = output_frame.reshape((int(len(output_frame)/3), 3))
total_sum = 0
for input_coord, output_coord in zip(input_atoms, output_atoms):
value = (input_coord-output_coord)**2
value = np.sum(value)
total_sum += value
rmsd = m.sqrt(total_sum/(len(input_frame)/3))
rmsds.append(rmsd)
return rmsds
#Main Function
def main():
"""
Main execution of the Python file as a script
"""
#Set script arguments
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--partition', metavar='N', default=0.8,
type=float,
help="Percentage selected of data as the training"
"partition.")
parser.add_argument('-bs', '--batch-size', metavar='N', default=100,
type=int,
help="Number of samples of the dataset per batch to"
"load. It is the number of training examples utilized"
"in one iteration")
parser.add_argument('-l', '--latent', metavar='N', default=3,
type=int,
help="Number of dimensions of the latent space. It is"
"a representation of compressed data in a"
"low-dimensional space.")
parser.add_argument('-nl', '--number-layers', metavar='N', required=True,
type=int,
help="Number of layers in both encoder and decoder"
"neural networks, excluding the layer of the latent"
"space and the output layer.")
parser.add_argument('-lr', '--learning-rate', metavar='N', required=True,
type=float,
help="Learning rate used for the Adam optimizer."
"It is the parameter in an optimization algorithm that"
"determines the step size at each iteration while"
"moving toward a minimum of the loss function.")
parser.add_argument('-e', '--epochs', metavar='N', required=True,
type=int,
help="Number of epochs required for training the model"
". It is the number times that the learning algorithm"
"will work through the entire training dataset.")
#Parse arguments
args = parser.parse_args()
partition = args.partition
batch_size = args.batch_size
latent = args.latent
nlayers = args.number_layers
lr = args.learning_rate
epochs = args.epochs
newDir = str(nlayers)+'_layers_'+str(lr)+'_lr_'+str(epochs)+'_epochs'
#Go to the AE directory
os.chdir('AE')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
sys.stdout.write('# Using %s device.\n' % (device))
#load the input data
data = np.load('input_data/AE_normalized_input_data.npy')
#select the data for training and testing
select_train = int(partition*data.shape[0])
select_test = int((1-partition)*data.shape[0])
train = data[:select_train, :]
test = data[-select_test:, :]
#create torch tensors from numpy arrays
train_data = torch.FloatTensor(train).to(device)
test_data = torch.FloatTensor(test).to(device)
#transform into a dataloaders
train_dataloader = torch.utils.data.DataLoader(dataset = train_data,
batch_size = batch_size,
drop_last=True,
shuffle = True)
test_dataloader = torch.utils.data.DataLoader(dataset = test_data,
batch_size = batch_size,
drop_last=True,
shuffle = False)
sys.stdout.write('# Input data for training and testing have been loaded as'
' DataLoader objects.\n')
#call the Autoencoder
model = AutoEncoder(input_dim = train.shape[1], nlayers = nlayers-1,
latent = latent)
model = model.to(device)
#define the optimizer and the loss function
optimizer = torch.optim.Adam(model.parameters(), lr = lr)
loss_function = torch.nn.MSELoss()
#define the loop for training and testing the model
train_losses = []
latent_space = []
outputs = []
test_losses = []
for epoch in range(1, epochs+1):
#training step
model.train()
train_loss = 0
epoch_train_losses = []
for training_index, training_data in enumerate(train_dataloader, 1):
training_latent, training_output = model(training_data)
training_loss = loss_function(training_output, training_data)
optimizer.zero_grad()
training_loss.backward()
optimizer.step()
epoch_train_losses.append(training_loss.item())
train_loss += training_loss.item()
sys.stdout.write('\repoch %3d/%3d batch %3d/%3d\n' % (epoch,
epochs, training_index, len(train_dataloader)))
sys.stdout.write('\rtrain_loss %6.3f\n'
% (train_loss / training_index))
sys.stdout.flush()
epoch_train_loss = np.mean(epoch_train_losses)
train_losses.append(epoch_train_loss)
#testing step
model.eval()
test_loss = 0
epoch_test_losses = []
for testing_index, testing_data in enumerate(test_dataloader, 1):
testing_latent, testing_output = model(testing_data)
if epoch == (epochs-1):
latent_space.append(testing_latent.detach().cpu().numpy())
outputs.append(testing_output.detach().cpu().numpy())
testing_loss = loss_function(testing_output, testing_data)
epoch_test_losses.append(testing_loss.item())
test_loss += testing_loss.item()
sys.stdout.write('\rbatch %3d/%3d\n' % (testing_index,
len(test_dataloader)))
sys.stdout.write('\rtest_loss %6.3f\n'
% (test_loss / testing_index))
sys.stdout.flush()
epoch_test_loss = np.mean(epoch_test_losses)
test_losses.append(epoch_test_loss)
latent_space = np.vstack(latent_space)
outputs = np.vstack(outputs)
sys.stdout.write('# The autoencoder model of %i layers has been trained'
' with %i per cent of the dataset for i% epochs using a'
' learning rate of f% for the optimization of the'
' parameters. After each epoch, it has also been validated'
' with %i per cent of the dataset.\n' % (nlayers,
int(partition*100), epochs, lr, int((1-partition)*100)))
#Create the model directory and go there
modelDir = "model"
if not os.path.exists(modelDir):
os.mkdir(modelDir)
sys.stdout.write("# New directory called %s has been created.\n"
% (modelDir))
os.chdir(modelDir)
#Create new directory and go there
if not os.path.exists(newDir):
os.mkdir(newDir)
sys.stdout.write("# New directory called %s has been created within"
"the %s directory.\n" % (newDir, modelDir))
os.chdir(newDir)
#Save the optimized parameters of the model
torch.save(model.state_dict(), 'model_parameters.pth')
sys.stdout.write("# The optimized parameters of the autoencoder have been"
"saved in a file within the %s directory.\n"
% (modelDir+'/'+newDir))
os.chdir('../..')
#Create the train directory and go there
traintestDir = "train_test"
if not os.path.exists(traintestDir):
os.mkdir(traintestDir)
sys.stdout.write("# New directory called %s has been created.\n"
% (traintestDir))
os.chdir(traintestDir)
#Create new directory and go there
if not os.path.exists(newDir):
os.mkdir(newDir)
sys.stdout.write("# New directory called %s has been created within"
"the %s directory.\n" % (newDir, traintestDir))
os.chdir(newDir)
#Save latent data in a NPY file
np.save('AE_'+newDir+'_latent_space_data.npy', latent_space)
colors = np.load('../../input_data/AE_input_data_indices.npy',
allow_pickle=True)
select_colors = int((1-partition)*colors.shape[0])
colors = colors[-select_colors:]
if len(colors) != len(latent_space):
select2 = int(len(latent_space))
colors = colors[0:select2]
colors = [int(x.split("_")[1]) for x in colors]
#Save output data in NPY file
np.save('AE_'+newDir+'_output_data.npy', outputs)
#Plot the latent values and save the figure
fig1 = plt.figure(figsize=[17, 13])
fig1.tight_layout()
gs = mpl.gridspec.GridSpec(1, 1, figure=fig1, left=0.05, bottom=0.05,
right=0.95, top=0.95, wspace=0)
ax_latent = fig1.add_subplot(gs[0,0], projection='3d')
plot_latent = ax_latent.scatter3D(latent_space[:,0], latent_space[:,1],
latent_space[:,2], c=colors,
cmap = 'YlGnBu', marker='.')
ax_latent.set_title('Latent view (3D projection)', fontsize=24)
ax_latent.set_xlabel('Latent Vector 1', fontsize=18)
ax_latent.set_ylabel('Latent Vector 2', fontsize=18)
ax_latent.set_zlabel('Latent Vector 3', fontsize=18)
ax_latent.grid(True)
fig1.colorbar(plot_latent, ax=ax_latent)
fig1.savefig('AE_'+newDir+'_latent_space_plot.svg', bbox_inches='tight',
pad_inches=0.2, facecolor='white')
sys.stdout.write("# NPY files with latent and output data, have been saved"
"within the %s directory.\n" % (traintestDir+'/'+newDir))
#Save train and test loss values in a NPY file
train_losses = np.array(train_losses)
train_losses = train_losses.astype(float)
np.save('AE_'+newDir+'_train_loss_values.npy', train_losses)
test_losses = np.array(test_losses)
test_losses = test_losses.astype(float)
np.save('AE_'+newDir+'_test_loss_values.npy', test_losses)
train_epochs = np.arange(1, len(train_losses)+1)
#Extract xy coordiantes of train and test minimum loss value
train_ymin = train_losses.min()
train_xmin = np.where(train_losses==train_ymin)
train_xmin = int(train_xmin[0]+1)
test_ymin = test_losses.min()
test_xmin = np.where(test_losses==test_ymin)
test_xmin = int(test_xmin[0]+1)
#Plot the loss values and save the figure
fig2 = plt.figure(figsize=[17, 13])
fig2.tight_layout()
gs = mpl.gridspec.GridSpec(1, 1, figure=fig2, left=0.05, bottom=0.05,
right=0.95, top=0.95, wspace=0)
ax_loss = fig2.add_subplot(gs[0,0])
ax_loss.plot(train_epochs, train_losses, color='blue', linestyle='-',
label='Train Loss')
ax_loss.plot(train_epochs, test_losses, color='orange', linestyle='-',
label='Test Loss')
ax_loss.set_title('Loss Values', fontsize=24)
ax_loss.set_xlabel('Epochs', fontsize=18)
ax_loss.set_ylabel('MSE Loss', fontsize=18)
ax_loss.legend()
ax_loss.annotate(text='minimum train loss value = %f'%(train_ymin),
xy=(train_xmin, train_ymin),
xytext=(train_xmin, float(train_ymin*1.25)),
arrowprops=dict(facecolor='blue', shrink=0.05))
ax_loss.annotate(text='minimum test loss value = %f'%(test_ymin),
xy=(test_xmin, test_ymin),
xytext=(test_xmin, float(test_ymin*0.75)),
arrowprops=dict(facecolor='orange', shrink=0.05))
ax_loss.set_ylim([0, 0.008])
ax_loss.spines['right'].set_visible(False)
ax_loss.spines['top'].set_visible(False)
ax_loss.grid(True)
fig2.savefig('AE_'+newDir+'_train_test_loss_plot.svg', bbox_inches='tight',
pad_inches=0.2, facecolor='white')
sys.stdout.write("# The NPY file with the loss values and the figure with"
"them plotted have been saved within the %s"
"directory.\n" % (traintestDir+'/'+newDir))
#load the denormalized input data
denorm_data = np.load('../../input_data/AE_denormalized_input_data.npy')
select_denorm_test = int((1-partition)*denorm_data.shape[0])
denorm_test = denorm_data[-select_denorm_test:, :]
#Denormalize the output data to original dimensions
maximums = np.load('../../input_data/maximums.npy')
minimums = np.load('../../input_data/minimums.npy')
denorm_outputs = []
for num, maximum, minimum in zip(range(outputs.shape[1]), maximums,
minimums):
column = outputs[:, num]
denorm = column*(maximum-minimum)+minimum
denorm_outputs.append(denorm)
denorm_outputs = np.vstack(denorm_outputs)
denorm_outputs = denorm_outputs.T
#Compute the RMSD function
n_cores = int(os.environ["SLURM_NTASKS"])
if n_cores > 1:
# Parallelized
pool = mp.Pool(processes=n_cores)
rmsd_chunks = {}
for i in range(0, n_cores):
test_chunk = np.array_split(denorm_test, n_cores)[i]
outputs_chunk = np.array_split(denorm_outputs, n_cores)[i]
rmsd_chunks[i] = pool.apply(
compute_rmsd, args=(test_chunk, outputs_chunk))
rmsd_values = []
for i in range(0, n_cores):
rmsd_values.extend(rmsd_chunks[i])
pool.close() # waits to execute any following code until all process
# have completed running
else:
# Not parallelized
rmsd_values = compute_rmsd(denorm_test, denorm_outputs)
#Save RMSD values in a NPY file
rmsd_values = np.array(rmsd_values)
rmsd_values = rmsd_values.astype(float)
np.save('AE_'+newDir+'_RMSD_values.npy', rmsd_values)
rmsd_ma = two_sided_moving_average(rmsd_values, n=500)
frames = np.arange(0, len(rmsd_values))
#Save average RMSD in a txt file
mean_rmsd = np.mean(rmsd_values)
sd_rmsd = np.std(rmsd_values)/m.sqrt(len(rmsd_values))
with open('AE_'+newDir+'_average_RMSD.txt', "w") as txtFile:
txtFile.write("The average RMSD computed between original and decoded"
" data after passing them through an autoencoder model"
" with %i layers, trained for %i epochs and using a"
" learning rate of %f is %f ± %f.\n" % (nlayers, epochs,
lr, mean_rmsd, sd_rmsd))
#Plot the loss values and the latent space
fig3 = plt.figure(figsize=[17, 13])
fig3.tight_layout()
gs = mpl.gridspec.GridSpec(1, 1, figure=fig3, left=0.05, bottom=0.05,
right=0.95, top=0.95, wspace=0)
ax_rmsd = fig3.add_subplot(gs[0,0])
ax_rmsd.plot(frames, rmsd_values, color='grey', linestyle='-')
ax_rmsd.plot(frames, rmsd_ma, color='k', linestyle='-')
ax_rmsd.set_title('RMSD Plot', fontsize=24)
ax_rmsd.set_xlabel('Frames', fontsize=18)
ax_rmsd.set_ylabel('RMSD', fontsize=18)
ax_rmsd.axes.xaxis.set_ticklabels([])
ax_rmsd.set_ylim([0, float(rmsd_values.max()*2)])
ax_rmsd.grid(True)
fig3.savefig('AE_'+newDir+'_RMSD_plot.svg', bbox_inches='tight',
pad_inches=0.2, facecolor='white')
sys.stdout.write("# The NPY file with the RMSD values and the figure with"
"them plotted have been saved within the %s"
"directory.\n" % (traintestDir+'/'+newDir))
#Main execution
if __name__ == '__main__':
test_paths()
main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment