Skip to content

Commit 0e93816

Browse files
committed
Merge branch 'add_cvm_plugin' into 'master'
cvm plugin draft See merge request deep-learning/tensornet!18
2 parents 74bfb58 + 44d4f66 commit 0e93816

15 files changed

+174
-47
lines changed

core/kernels/sparse_table_ops.cc

+13-7
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ class SparseTablePullKernel : public AsyncOpKernel {
267267
float* w_matrix = var_tensor->matrix<float>().data();
268268

269269
size_t emb_size = sizeof(float) * dim;
270-
CHECK_EQ(emb_size, emb_buf.cutn(w_matrix + sign_index * dim, emb_size));
270+
CHECK_EQ(emb_size, emb_buf.cutn(w_matrix + sign_index * dim , emb_size));
271271
}
272272
}
273273

@@ -281,22 +281,26 @@ REGISTER_KERNEL_BUILDER(Name("SparseTablePull").Device(DEVICE_CPU),
281281

282282
struct SparsePushVarInfo {
283283
public:
284-
SparsePushVarInfo(const Tensor* t_value, const Tensor* t_grad)
284+
SparsePushVarInfo(const Tensor* t_value, const Tensor* t_grad, const Tensor* t_labels)
285285
: value(t_value)
286-
, grad(t_grad) {
286+
, grad(t_grad)
287+
, labels(t_labels) {
287288

288289
const int64* feasign_vec = value->flat<int64>().data();
290+
const int64* fea_label_vec = t_labels->flat<int64>().data();
289291

290292
std::map<uint64, int> sign_id_mapping;
291293
for (int i = 0; i < value->NumElements(); ++i) {
292294
uint64 sign = (uint64)feasign_vec[i];
295+
int label = static_cast<int>(fea_label_vec[i]);
293296
auto ret = sign_id_mapping.insert({sign, sign_id_mapping.size()});
294297

295298
if (ret.second) {
296-
virtual_sign_infos.emplace_back(sign, 1);
299+
virtual_sign_infos.emplace_back(sign, 1, label);
297300
} else {
298301
auto iter = ret.first;
299302
virtual_sign_infos[iter->second].batch_show += 1;
303+
virtual_sign_infos[iter->second].batch_click += label;
300304
}
301305
}
302306
}
@@ -308,6 +312,7 @@ struct SparsePushVarInfo {
308312
public:
309313
const Tensor* value;
310314
const Tensor* grad;
315+
const Tensor* labels;
311316

312317
std::vector<SparsePushSignInfo> virtual_sign_infos;
313318
};
@@ -321,16 +326,17 @@ class SparseTablePushKernel : public AsyncOpKernel {
321326
}
322327

323328
void ComputeAsync(OpKernelContext* c, DoneCallback done) override {
324-
OP_REQUIRES_ASYNC(c, c->num_inputs() == N_ * 2,
329+
OP_REQUIRES_ASYNC(c, c->num_inputs() == N_ * 3,
325330
errors::InvalidArgument("SparseTable push num_inputs:",
326331
c->num_inputs(),
327-
" not equal:", N_ * 2),
332+
" not equal:", N_ * 3),
328333
done);
329334
std::vector<SparsePushVarInfo> var_infos;
330335

331336
for (int i = 0; i < N_; i++) {
332337
const Tensor* value = &c->input(i);
333338
const Tensor* grad = &c->input(N_ + i);
339+
const Tensor* labels = &c->input(2 * N_ + i);
334340

335341
OP_REQUIRES_ASYNC(
336342
c, TensorShapeUtils::IsMatrix(grad->shape()),
@@ -339,7 +345,7 @@ class SparseTablePushKernel : public AsyncOpKernel {
339345
grad->shape().DebugString()),
340346
done);
341347

342-
var_infos.emplace_back(value, grad);
348+
var_infos.emplace_back(value, grad, labels);
343349
}
344350

345351
CHECK_GT(var_infos.size(), 0);

core/main/py_wrapper.cc

+3-1
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,12 @@ PYBIND11_MODULE(_pywrap_tn, m) {
114114

115115
return py::reinterpret_steal<py::object>(obj);
116116
})
117-
.def("create_sparse_table", [](py::object obj, std::string name, int dimension) {
117+
.def("create_sparse_table", [](py::object obj, std::string name, int dimension, bool use_cvm) {
118118
OptimizerBase* opt =
119119
static_cast<OptimizerBase*>(PyCapsule_GetPointer(obj.ptr(), nullptr));
120120

121+
opt->SetUseCvm(use_cvm);
122+
121123
PsCluster* cluster = PsCluster::Instance();
122124

123125
SparseTable* table = CreateSparseTable(opt, name, dimension, cluster->RankNum(), cluster->Rank());

core/ops/sparse_table_ops.cc

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ REGISTER_OP("SparseTablePush")
4848
)doc")
4949
.Input("values: N * int64")
5050
.Input("grads: N * float")
51+
.Input("feature_labels: N * int64")
5152
.Attr("table_handle: int")
5253
.Attr("N: int")
5354
.SetShapeFn(shape_inference::NoOutputs);

core/ps/optimizer/ada_grad_kernel.cc

+38-1
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,22 @@ SparseAdaGradValue::SparseAdaGradValue(int dim, const AdaGrad* opt) {
9494
}
9595
}
9696

97+
use_cvm_ = opt->ShouldUseCvm();
9798
g2sum_ = opt->initial_g2sum;
9899
old_compat_ = false;
99100
no_show_days_ = 0;
101+
click_ = 0;
102+
show_ = 0;
103+
if(opt->ShouldUseCvm()){
104+
w[dim] = 0;
105+
w[dim+1] = 0;
106+
}
107+
100108
}
101109

102110
void SparseAdaGradValue::Apply(const AdaGrad* opt, SparseGradInfo& grad_info, int dim) {
103111
show_ += grad_info.batch_show;
112+
click_ += grad_info.batch_click;
104113
no_show_days_ = 0;
105114

106115
float* w = Weight();
@@ -116,6 +125,13 @@ void SparseAdaGradValue::Apply(const AdaGrad* opt, SparseGradInfo& grad_info, in
116125
for (int i = 0; i < dim; ++i) {
117126
w[i] -= opt->learning_rate * grad_info.grad[i] / (opt->epsilon + sqrt(g2sum_));
118127
}
128+
if(opt->ShouldUseCvm()){
129+
float log_show = log(show_ + 1);
130+
float log_click = log(click_ + 1);
131+
w[dim] = show_;
132+
w[dim+1] = (log_click - log_show);
133+
}
134+
119135
}
120136

121137
void SparseAdaGradValue::SerializeTxt_(std::ostream& os, int dim) {
@@ -126,7 +142,10 @@ void SparseAdaGradValue::SerializeTxt_(std::ostream& os, int dim) {
126142

127143
os << g2sum_ << "\t";
128144
os << show_ << "\t";
129-
os << no_show_days_;
145+
os << no_show_days_ << "\t";
146+
if(use_cvm_){
147+
os << click_;
148+
}
130149
}
131150

132151
void SparseAdaGradValue::DeSerializeTxt_(std::istream& is, int dim) {
@@ -139,6 +158,13 @@ void SparseAdaGradValue::DeSerializeTxt_(std::istream& is, int dim) {
139158
is >> show_;
140159
if(!old_compat_) {
141160
is >> no_show_days_;
161+
if(use_cvm_){
162+
is >> click_;
163+
float log_show = log(show_ + 1);
164+
float log_click = log(click_ + 1);
165+
Weight()[dim] = show_;
166+
Weight()[dim+1] = (log_click - log_show);
167+
}
142168
}
143169
}
144170

@@ -147,6 +173,9 @@ void SparseAdaGradValue::SerializeBin_(std::ostream& os, int dim) {
147173
os.write(reinterpret_cast<const char*>(&g2sum_), sizeof(g2sum_));
148174
os.write(reinterpret_cast<const char*>(&show_), sizeof(show_));
149175
os.write(reinterpret_cast<const char*>(&no_show_days_), sizeof(no_show_days_));
176+
if(use_cvm_){
177+
os.write(reinterpret_cast<const char*>(&click_), sizeof(click_));
178+
}
150179
}
151180

152181
void SparseAdaGradValue::DeSerializeBin_(std::istream& is, int dim) {
@@ -155,11 +184,19 @@ void SparseAdaGradValue::DeSerializeBin_(std::istream& is, int dim) {
155184
is.read(reinterpret_cast<char*>(&show_), sizeof(show_));
156185
if(!old_compat_) {
157186
is.read(reinterpret_cast<char*>(&no_show_days_), sizeof(no_show_days_));
187+
if(use_cvm_){
188+
is.read(reinterpret_cast<char*>(&click_), sizeof(click_));
189+
float log_show = log(show_ + 1);
190+
float log_click = log(click_ + 1);
191+
Weight()[dim] = show_;
192+
Weight()[dim+1] = (log_click - log_show);
193+
}
158194
}
159195
}
160196

161197
void SparseAdaGradValue::ShowDecay(const AdaGrad* opt, int delta_days) {
162198
show_ *= opt->show_decay_rate;
199+
click_ *= opt->show_decay_rate;
163200
no_show_days_ += delta_days;
164201
}
165202

core/ps/optimizer/ada_grad_kernel.h

+2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ class alignas(4) SparseAdaGradValue
8888
int dim_;
8989
float g2sum_;
9090
float show_ = 0.0;
91+
float click_ = 0.0;
9192
int no_show_days_ = 0;
93+
bool use_cvm_ = false;
9294
float data_[0];
9395
};
9496

core/ps/optimizer/data_struct.h

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ namespace tensornet {
2020
struct SparseGradInfo {
2121
float* grad;
2222
int batch_show;
23+
int batch_click;
2324
};
2425

2526
extern int const SERIALIZE_FMT_ID;
@@ -56,6 +57,7 @@ class alignas(4) SparseOptValue {
5657

5758
protected:
5859
float show_ = 0.0;
60+
float click_ = 0.0;
5961
int delta_show_ = 0;
6062
bool old_compat_ = false;
6163
};

core/ps/optimizer/optimizer.h

+12-2
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,18 @@ class OptimizerBase {
4343
return std::make_tuple(false, emptyString);
4444
}
4545

46+
virtual void SetUseCvm(bool use_cvm) {
47+
use_cvm_ = use_cvm;
48+
}
49+
50+
virtual bool ShouldUseCvm() const {
51+
return use_cvm_;
52+
}
53+
4654
public:
4755
float learning_rate = 0.01;
4856
float show_decay_rate = 0.98;
57+
float use_cvm_ = false;
4958
};
5059

5160
class Adam : public OptimizerBase {
@@ -90,8 +99,9 @@ class AdaGrad : public OptimizerBase {
9099
++column_count;
91100
}
92101

93-
// columns should be sign, dim_, dims_ * weight, g2sum, show, no_show_days
94-
// if columnCount is 12, means no no_show_days column
102+
// if use cvm plugins, columns should be sign, dim_, dims_ * weight, g2sum, show, no_show_days, click,should be dim + 6
103+
// if no use cvm, no click, should be dim + 5
104+
// for old version, no no_show_days column, column_count should be dim + 4
95105
if(column_count == dim + 4){
96106
need_old_compat = true;
97107
}

core/ps/optimizer/optimizer_kernel.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ class SparseKernelBlock {
276276
SparseKernelBlock(const OptimizerBase* opt, int dimension)
277277
: values_(15485863, sparse_key_hasher)
278278
, dim_(dimension)
279-
, alloc_(ValueType::DynSizeof(dim_), 1 << 16) {
279+
, alloc_(ValueType::DynSizeof(dimension + opt->ShouldUseCvm() * 2), 1 << 16) {
280280
values_.max_load_factor(0.75);
281281
opt_ = dynamic_cast<const OptType*>(opt);
282282
mutex_ = std::make_unique<std::mutex>();

core/ps/table/sparse_table.cc

+4-5
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ void SparseTable::SetHandle(uint32_t handle) {
4747
void SparseTable::Pull(const SparsePullRequest* req, butil::IOBuf& out_emb_buf, SparsePullResponse* resp) {
4848
resp->set_table_handle(req->table_handle());
4949

50-
CHECK_EQ(dim_, req->dim());
5150
resp->set_dim(req->dim());
5251

5352
for (int i = 0; i < req->signs_size(); ++i) {
@@ -57,23 +56,23 @@ void SparseTable::Pull(const SparsePullRequest* req, butil::IOBuf& out_emb_buf,
5756
float* w = op_kernel_->GetWeight(sign);
5857
CHECK(nullptr != w);
5958

60-
out_emb_buf.append(w, sizeof(float) * dim_);
59+
out_emb_buf.append(w, sizeof(float) * (req->dim()));
6160
}
6261
}
6362

6463
void SparseTable::Push(const SparsePushRequest* req, butil::IOBuf& grad_buf, SparsePushResponse* resp) {
65-
CHECK_EQ(dim_, req->dim());
6664

67-
float grad[dim_];
65+
float grad[req->dim()];
6866
SparsePushSignInfo sign_info;
6967

7068
while (sizeof(sign_info) == grad_buf.cutn(&sign_info, sizeof(sign_info))) {
71-
size_t grad_size = sizeof(float) * dim_;
69+
size_t grad_size = sizeof(float) * req->dim();
7270
CHECK_EQ(grad_size, grad_buf.cutn(grad, grad_size));
7371

7472
SparseGradInfo grad_info;
7573
grad_info.grad = grad;
7674
grad_info.batch_show = sign_info.batch_show;
75+
grad_info.batch_click = sign_info.batch_click;
7776

7877
op_kernel_->Apply(sign_info.sign, grad_info);
7978
}

core/ps_interface/ps_raw_interface.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,18 @@ namespace tensornet {
2222
struct SparsePushSignInfo {
2323
public:
2424
SparsePushSignInfo()
25-
: SparsePushSignInfo(0, 0)
25+
: SparsePushSignInfo(0, 0, 0)
2626
{ }
2727

28-
SparsePushSignInfo(uint64_t s, int bs)
28+
SparsePushSignInfo(uint64_t s, int bs, int cs)
2929
: sign(s)
3030
, batch_show(bs)
31+
, batch_click(cs)
3132
{ }
3233

3334
uint64_t sign;
3435
int batch_show;
36+
int batch_click;
3537
};
3638

3739
} // namespace tensornet

examples/common/feature_column.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ def create_emb_model(features, columns_group, suffix = "_input"):
2424
for slot in features:
2525
inputs[slot] = tf.keras.layers.Input(name=slot, shape=(None,), dtype="int64", sparse=True)
2626

27+
inputs["label"] = tf.keras.layers.Input(name="label", shape=(None,), dtype="int64", sparse=False)
28+
2729
sparse_opt = tn.core.AdaGrad(learning_rate=0.01, initial_g2sum=0.1, initial_scale=0.1)
2830

2931
for column_group_name in columns_group.keys():
3032
embs = tn.layers.EmbeddingFeatures(columns_group[column_group_name], sparse_opt,
31-
name=column_group_name + suffix)(inputs)
33+
name=column_group_name + suffix, target_columns=["label"])(inputs)
34+
#name=column_group_name + suffix)(inputs)
3235
model_output.append(embs)
3336

3437
emb_model = tn.model.Model(inputs=inputs, outputs=model_output, name="emb_model")

examples/main.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def parse_line_batch(example_proto):
2222
fea_desc[slot] = tf.io.VarLenFeature(tf.int64)
2323

2424
feature_dict = tf.io.parse_example(example_proto, fea_desc)
25-
label = feature_dict.pop('label')
25+
label = feature_dict.pop['label']
2626
return feature_dict, label
2727

2828
def create_model():

examples/models/wide_deep.py

+9-7
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ def create_sub_model(linear_embs, deep_embs, deep_hidden_units):
2525
for i, unit in enumerate(C.DEEP_HIDDEN_UNITS):
2626
deep = tf.keras.layers.Dense(unit, activation='relu', name='dnn_{}'.format(i))(deep)
2727

28-
if linear_inputs and not deep_inputs:
29-
output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(linear)
30-
elif deep_inputs and not linear_inputs:
31-
output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(deep)
32-
else:
33-
both = tf.keras.layers.concatenate([deep, linear], name='both')
34-
output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(both)
28+
# if linear_inputs and not deep_inputs:
29+
# output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(linear)
30+
# elif deep_inputs and not linear_inputs:
31+
# output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(deep)
32+
# else:
33+
both = tf.keras.layers.concatenate([deep, linear], name='both')
34+
both = tn.layers.TNBatchNormalization(synchronized=True, sync_freq=4, max_count=1000000)(both)
35+
output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(both)
3536

3637
return tn.model.Model(inputs=[linear_inputs, deep_inputs], outputs=output, name="sub_model")
3738

@@ -45,6 +46,7 @@ def WideDeep(linear_features, dnn_features, dnn_hidden_units=(128, 128)):
4546
inputs = {}
4647
for slot in features:
4748
inputs[slot] = tf.keras.layers.Input(name=slot, shape=(None,), dtype="int64", sparse=True)
49+
inputs['label'] = tf.keras.layers.Input(name="label", shape=(None,), dtype="int64", sparse=False)
4850
emb_model = create_emb_model(features, columns_group)
4951
linear_embs, deep_embs = emb_model(inputs)
5052
sub_model = create_sub_model(linear_embs, deep_embs, dnn_hidden_units)

0 commit comments

Comments
 (0)