-
Notifications
You must be signed in to change notification settings - Fork 58
/
Copy pathTAE.py
131 lines (104 loc) · 5.62 KB
/
TAE.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
Implementation of the Deep Temporal Clustering model
Temporal Autoencoder (TAE)
@author Florent Forest (FlorentF9)
"""
from keras.models import Model
from keras.layers import Input, Conv1D, LeakyReLU, MaxPool1D, CuDNNLSTM, Bidirectional, TimeDistributed, Dense, Reshape
from keras.layers import UpSampling2D, Conv2DTranspose
def temporal_autoencoder(input_dim, timesteps, n_filters=50, kernel_size=10, strides=1, pool_size=10, n_units=[50, 1]):
"""
Temporal Autoencoder (TAE) model with Convolutional and BiLSTM layers.
# Arguments
input_dim: input dimension
timesteps: number of timesteps (can be None for variable length sequences)
n_filters: number of filters in convolutional layer
kernel_size: size of kernel in convolutional layer
strides: strides in convolutional layer
pool_size: pooling size in max pooling layer, must divide time series length
n_units: numbers of units in the two BiLSTM layers
alpha: coefficient in Student's kernel
dist_metric: distance metric between latent sequences
# Return
(ae_model, encoder_model, decoder_model): AE, encoder and decoder models
"""
assert(timesteps % pool_size == 0)
# Input
x = Input(shape=(timesteps, input_dim), name='input_seq')
# Encoder
encoded = Conv1D(n_filters, kernel_size, strides=strides, padding='same', activation='linear')(x)
encoded = LeakyReLU()(encoded)
encoded = MaxPool1D(pool_size)(encoded)
encoded = Bidirectional(CuDNNLSTM(n_units[0], return_sequences=True), merge_mode='sum')(encoded)
encoded = LeakyReLU()(encoded)
encoded = Bidirectional(CuDNNLSTM(n_units[1], return_sequences=True), merge_mode='sum')(encoded)
encoded = LeakyReLU(name='latent')(encoded)
# Decoder
decoded = Reshape((-1, 1, n_units[1]), name='reshape')(encoded)
decoded = UpSampling2D((pool_size, 1), name='upsampling')(decoded) #decoded = UpSampling1D(pool_size, name='upsampling')(decoded)
decoded = Conv2DTranspose(input_dim, (kernel_size, 1), padding='same', name='conv2dtranspose')(decoded)
output = Reshape((-1, input_dim), name='output_seq')(decoded) #output = Conv1D(1, kernel_size, strides=strides, padding='same', activation='linear', name='output_seq')(decoded)
# AE model
autoencoder = Model(inputs=x, outputs=output, name='AE')
# Encoder model
encoder = Model(inputs=x, outputs=encoded, name='encoder')
# Create input for decoder model
encoded_input = Input(shape=(timesteps // pool_size, n_units[1]), name='decoder_input')
# Internal layers in decoder
decoded = autoencoder.get_layer('reshape')(encoded_input)
decoded = autoencoder.get_layer('upsampling')(decoded)
decoded = autoencoder.get_layer('conv2dtranspose')(decoded)
decoder_output = autoencoder.get_layer('output_seq')(decoded)
# Decoder model
decoder = Model(inputs=encoded_input, outputs=decoder_output, name='decoder')
return autoencoder, encoder, decoder
def temporal_autoencoder_v2(input_dim, timesteps, n_filters=50, kernel_size=10, strides=1, pool_size=10, n_units=[50, 1]):
"""
Temporal Autoencoder (TAE) model with Convolutional and BiLSTM layers.
# Arguments
input_dim: input dimension
timesteps: number of timesteps (can be None for variable length sequences)
n_filters: number of filters in convolutional layer
kernel_size: size of kernel in convolutional layer
strides: strides in convolutional layer
pool_size: pooling size in max pooling layer
n_units: numbers of units in the two BiLSTM layers
alpha: coefficient in Student's kernel
dist_metric: distance metric between latent sequences
# Return
(ae_model, encoder_model, decoder_model): AE, encoder and decoder models
"""
assert (timesteps % pool_size == 0)
# Input
x = Input(shape=(timesteps, input_dim), name='input_seq')
# Encoder
encoded = Conv1D(n_filters, kernel_size, strides=strides, padding='same', activation='linear')(x)
encoded = LeakyReLU()(encoded)
encoded = MaxPool1D(pool_size)(encoded)
encoded = Bidirectional(CuDNNLSTM(n_units[0], return_sequences=True), merge_mode='concat')(encoded)
encoded = LeakyReLU()(encoded)
encoded = Bidirectional(CuDNNLSTM(n_units[1], return_sequences=True), merge_mode='concat')(encoded)
encoded = LeakyReLU(name='latent')(encoded)
# Decoder
decoded = TimeDistributed(Dense(units=n_filters), name='dense')(encoded) # sequence labeling
decoded = LeakyReLU(name='act')(decoded)
decoded = Reshape((-1, 1, n_filters), name='reshape')(decoded)
decoded = UpSampling2D((pool_size, 1), name='upsampling')(decoded)
decoded = Conv2DTranspose(input_dim, (kernel_size, 1), padding='same', name='conv2dtranspose')(decoded)
output = Reshape((-1, input_dim), name='output_seq')(decoded)
# AE model
autoencoder = Model(inputs=x, outputs=output, name='AE')
# Encoder model
encoder = Model(inputs=x, outputs=encoded, name='encoder')
# Create input for decoder model
encoded_input = Input(shape=(timesteps // pool_size, 2 * n_units[1]), name='decoder_input')
# Internal layers in decoder
decoded = autoencoder.get_layer('dense')(encoded_input)
decoded = autoencoder.get_layer('act')(decoded)
decoded = autoencoder.get_layer('reshape')(decoded)
decoded = autoencoder.get_layer('upsampling')(decoded)
decoded = autoencoder.get_layer('conv2dtranspose')(decoded)
decoder_output = autoencoder.get_layer('output_seq')(decoded)
# Decoder model
decoder = Model(inputs=encoded_input, outputs=decoder_output, name='decoder')
return autoencoder, encoder, decoder