1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
| # ---------------------------------------------------- # Description: Class for DynamicLSTM Network # Created by: Bendong Tan # Created time: Friday, Jan 25, 2019 # Last Modified: Monday, Jan 30, 2019 # Wuhan University # ---------------------------------------------------- import os os.environ['KERAS_BACKEND']='tensorflow' import time import numpy as np from keras.models import Sequential from keras.layers import Dense,Masking,Flatten,Dropout,LSTM from keras.preprocessing.sequence import pad_sequences from keras.utils import to_categorical from keras.callbacks import ModelCheckpoint from keras import backend as K from keras import regularizers
class DynamicLSTM: ''' 初始化 ''' def __init__(self, X_train, y_train, X_test, y_test): n_trainsample, n_timesteps, n_features = np.shape(X_train) # 训练数据 self.X_train = X_train # 测试文件 self.X_test = X_test # 训练数据 self.y_train = y_train # 测试文件 self.y_test = y_test # 最后输出 self.n_outputs = 1 # 每个时刻的输入特征数量 self.n_features = n_features # 时序持续长度为 self.n_timesteps = n_timesteps # 层数 self.layer_num = 1 # 隐含层神经元数目 self.hidden_size=100 # 每代训练模型步数 self.batch_size=n_trainsample # 学习率 self.learningRate = 1e-3 # 训练代数 self.epochs=200 # 保存模型数据目录 self.storePath = None ''' 搭建LSTM网络 ''' def build(self): # 开始搭建浒关模型 model = Sequential() # 针对自适应评估模式设计补零操作以保持模型输入长度 model.add(Masking(mask_value=0, input_shape=(self.n_timesteps, self.n_features))) # LSTM层 model.add(LSTM(self.hidden_size,return_sequences=True)) # 设置dropout防止过拟合 model.add(Dropout(0.05)) model.add(LSTM(self.hidden_size, return_sequences=True)) model.add(Dropout(0.05)) model.add(LSTM(self.hidden_size)) # sigmoid层 model.add(Dense(self.n_outputs, activation='sigmoid')) # 编译模型 model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
return model ''' 训练模型 ''' def fit(self,model): # 记录最好模型 filepath = r".\model\model_best.h5" checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=1, save_best_only=True, mode='max') callbacks_list = [checkpoint]
# 训练模型 model.fit(self.X_train, self.y_train, epochs=self.epochs, batch_size=self.batch_size, validation_data=(self.X_test, self.y_test), callbacks=callbacks_list, verbose=1)
return model ''' 标准评估模型 ''' def evaluation(self,model,X_test, y_test): _, accuracy = model.evaluate(X_test, y_test,batch_size=len(X_test), verbose=1) # 标准评估准确率
return accuracy ''' 自适应评估模型 ''' def Adaptive_TSA(self,model,X_test, y_test,delta): miss = 0 # 记录分类错误分数 right = np.zeros((len(y_test), 20)) # 记录每个样本在每个时刻的评估情况,做了评估记为1 y_pred = np.zeros((len(y_test), 1)) # 最终评估分类结果
# 自适应评估过程 for i in range(len(y_test)): for t in range(20): # 最大评估时刻数 # 如果在时间窗口内采取按照时刻逐步增加采样点的方式进行评估 if t<self.n_timesteps: Input=pad_sequences(np.reshape(X_test[i,0:t+1,:], (-1,t+1,self.n_features)), maxlen=self.n_timesteps, padding='post') predictions = model.predict(Input) if predictions >= delta and predictions <= 1: right[i, 0:t + 1] = 1 y_pred[i] = 1 break if predictions >=0 and predictions < 1-delta: right[i, 0:t + 1] = 1 y_pred[i] = 0 break
# 如果评估时刻超出时间窗口,则采取滑动时间窗口的方式评估 if t >= self.n_timesteps: Input = np.reshape(X_test[i, t-self.n_timesteps+1:t + 1, :], (-1, self.n_timesteps, self.n_features)) predictions = model.predict(Input) if predictions >= delta and predictions <= 1: right[i, 0:t + 1] = 1 break if predictions >= 0 and predictions < 1 - delta: right[i, 0:t + 1] = 1 y_pred[i] = 0 break # 超出最大时刻的均视为失稳处理 if t + 1 == 20: if predictions>=0.5: y_pred[i] = 1 if predictions<0.5: y_pred[i] = 0 right[i, 0:t + 1] = np.ones((1, t+1)) break # 计算自适应评估准确率 for i in range(len(y_test)): if y_pred[i]!=y_test[i]: miss = miss + 1
# 记录平均评估时间 ART = sum(sum(right)) / len(y_test)
# 记录自适应评估准确率 Accuracy=(len(y_test)-miss)/len(y_test)*100
return ART , Accuracy
|