Skip to content

Commit 8a695de

Browse files
committed
delete duplicate entries
1 parent 2af1f18 commit 8a695de

File tree

5 files changed

+154
-10
lines changed

5 files changed

+154
-10
lines changed
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
const mongoose = require("mongoose");
22

3-
const key = mongoose.Schema({}, { strict: false });
3+
const key = mongoose.Schema({
4+
key: { type: String, required: true},
5+
value: { type: mongoose.Schema.Types.Mixed, required: true},
6+
}, { versionKey: false });
47

58
module.exports = mongoose.model("entries", key);
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
const mongoose = require("mongoose");
22

3-
const key = mongoose.Schema({}, { strict: false });
3+
const key = mongoose.Schema({
4+
key: { type: String, required: true}}, {versionKey: false }
5+
);
46

57
module.exports = mongoose.model("key", key);
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
const mongoose = require("mongoose");
22

3-
const source = mongoose.Schema({}, { strict: false });
3+
const source = mongoose.Schema({}, { strict: false, versionKey: false });
44

55
module.exports = mongoose.model("source", source);
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
const mongoose = require("mongoose");
22

3-
const value = mongoose.Schema({}, { strict: false });
3+
const value = mongoose.Schema({
4+
value: { type: mongoose.Schema.Types.Mixed, required: true}
5+
}, { versionKey: false });
46

57
module.exports = mongoose.model("value", value);

Minio-SQL-connector/utils/minioWriter.js

Lines changed: 143 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,132 @@ function log2(...m) {
4343
}
4444
}
4545

46+
async function insertUniqueEntries_1(entries) {
47+
48+
const existingEntries = await Entries.find({ $or: entries });
49+
const existingSet = new Set(existingEntries.map(e => `${e.key}:${e.value}`));
50+
const newEntries = entries.filter(e => !existingSet.has(`${e.key}:${e.value}`));
51+
if (newEntries.length > 0) {
52+
await Entries.insertMany(newEntries.map(e => ({ key: e.key, value: typeof e.value == "object" ? JSON.stringify(e.value) : e.value })));
53+
} else {
54+
logger.info("no entries to insert")
55+
}
56+
}
57+
58+
async function insertUniqueKeys_1(keys) {
59+
60+
const existingEntries = await Key.find({ $or: keys });
61+
const existingSet = new Set(existingEntries.map(e => e.key));
62+
const newKeys = keys.filter(e => !existingSet.has(e.key));
63+
if (newKeys.length > 0) {
64+
try {
65+
await Key.insertMany(newKeys, { ordered: false });
66+
logger.info(`Inserted ${newKeys.length} new keys`);
67+
} catch (error) {
68+
if (error.code === 11000) {
69+
logger.warn("some keys ignored");
70+
} else {
71+
throw error;
72+
}
73+
}
74+
} else {
75+
logger.info("No keys to insert");
76+
}
77+
/*
78+
const existingEntries = await Key.find({ $or: keys });
79+
const existingSet = new Set(existingEntries.map(e => `${e.key}`));
80+
const newKeys = keys.filter(e => !existingSet.has(`${e.key}`));
81+
if (newKeys.length > 0) {
82+
await Key.insertMany(newKeys);
83+
} else {
84+
logger.info("no keys to insert")
85+
}*/
86+
}
87+
88+
async function insertUniqueValues(values) {
89+
for (let value of values)
90+
if (!(await Value.find({ value: value.value }))[0])
91+
await Value.insertMany([value])
92+
else
93+
logger.info(value.value, " already exists")
94+
}
95+
async function insertUniqueEntries(entries) {
96+
for (let entry of entries)
97+
if (!(await Entries.find({ value: entry.value, key:entry.key }))[0])
98+
await Entries.insertMany([entry])
99+
else
100+
logger.info(entry, " already exists")
101+
}
102+
async function insertUniqueKeys(keys) {
103+
for (let key of keys)
104+
if (!(await Key.find({ key: key.key }))[0])
105+
await Key.insertMany([key])
106+
else
107+
logger.info(key.key, " already exists")
108+
}
109+
110+
111+
async function insertUniqueValues_1(values) {
112+
// Trova i valori già esistenti nel database
113+
const existingEntries = await Value.find({ $or: values.map(v => ({ value: v.value })) });
114+
115+
// Crea un Set con i valori già presenti
116+
const existingSet = new Set(existingEntries.map(e => e.value));
117+
118+
// Filtra i nuovi valori che non esistono già nel database
119+
const newValues = values.filter(e => !existingSet.has(e.value));
120+
logger.debug(values.length, " | ", newValues.length)
121+
122+
if (newValues.length > 0) {
123+
try {
124+
await Value.insertMany(newValues, { ordered: false });
125+
logger.info(`Inserted ${newValues.length} new values`);
126+
} catch (error) {
127+
if (error.code === 11000) {
128+
logger.warn(error, newValues, "Some values were ignored due to duplicates");
129+
} else {
130+
throw error;
131+
}
132+
}
133+
} else {
134+
logger.info("No values to insert");
135+
}
136+
}
137+
138+
139+
140+
async function insertUniqueValues_0(values) {
141+
//await Value.insertMany(entries.map(e => ({value : typeof e.value == "object" ? JSON.stringify(e.value) : e.value})))
142+
const existingEntries = await Value.find({ $or: values });
143+
const existingSet = new Set(existingEntries.map(e => e.value));
144+
const newValues = values.filter(e => !existingSet.has(e.value));
145+
if (newValues.length > 0) {
146+
try {
147+
await Value.insertMany(newValues, { ordered: false });
148+
logger.info(`Inserted ${newValues.length} new values`);
149+
} catch (error) {
150+
if (error.code === 11000) {
151+
logger.warn("some values ignored");
152+
} else {
153+
throw error;
154+
}
155+
}
156+
} else {
157+
logger.info("No values to insert");
158+
}
159+
160+
/*
161+
const existingEntries = await Value.find({ $or: values });
162+
const existingSet = new Set(existingEntries.map(e => `${e.value}`));
163+
const newValues = values.filter(e => !existingSet.has(`${e.value}`));
164+
if (newValues.length > 0) {
165+
await Value.insertMany(newValues);
166+
} else {
167+
logger.info("no values to insert")
168+
}*/
169+
}
170+
171+
46172
if (config.writeLogsOnFile)
47173
setInterval(logSizeChecker, 3600000)
48174

@@ -181,6 +307,14 @@ module.exports = {
181307

182308
},
183309

310+
/*
311+
async entriesToDB(entries){
312+
let found = Entries.find()
313+
await Entries.insertMany(entries)
314+
await Key.insertMany(entries.map(e => ({key : e.key})))
315+
await Value.insertMany(entries.map(e => ({value : typeof e.value == "object" ? JSON.stringify(e.value) : e.value})))
316+
},*/
317+
184318
async insertInDBs(newObject, record, align) {
185319
log("Insert in DBs ", record?.s3?.object?.key || record.name)
186320
let csv = false
@@ -370,20 +504,23 @@ module.exports = {
370504
log("entries ", entries != undefined)
371505
//const { keys, values } = entries
372506
//let values = getValues(obj, type)
373-
logger.debug("entries\n",JSON.stringify(entries).substring(0,30))
507+
logger.debug("entries\n", JSON.stringify(entries).substring(0, 30))
374508
//await sleep(100)
375-
await Entries.insertMany(entries)
376-
await Key.insertMany(entries.map(e => ({key : e.key})))
377-
////await sleep(100)
378-
await Value.insertMany(entries.map(e => ({value : typeof e.value == "object" ? JSON.stringify(e.value) : e.value})))
509+
await insertUniqueEntries(entries.map(e => ({ key: e.key, value: typeof e.value == "object" ? JSON.stringify(e.value) : e.value })))
510+
await insertUniqueKeys(entries.map(e => ({ key: e.key })))
511+
await insertUniqueValues(entries.map(e => ({ value: typeof e.value == "object" ? JSON.stringify(e.value) : e.value })))
379512
}
380513
catch (error) {
381514
if (!error?.errorResponse?.message?.includes("Document can't have"))
382515
log(error)
383516
//log("Probably there are some special characters not allowed")
384517
try {
385518
//const { keys, values } = entries
386-
await Entries.insertMany(JSON.parse(JSON.stringify(entries).replace(/\$/g, '')))
519+
let cleanedEntries = JSON.parse(JSON.stringify(entries).replace(/\$/g, ''))
520+
await insertUniqueEntries(cleanedEntries.map(e => ({ key: e.key, value: typeof e.value == "object" ? JSON.stringify(e.value) : e.value })))
521+
await insertUniqueKeys(cleanedEntries.map(e => ({ key: e.key })))
522+
await insertUniqueValues(cleanedEntries.map(e => ({ value: typeof e.value == "object" ? JSON.stringify(e.value) : e.value })))
523+
//await Entries.insertMany(JSON.parse(JSON.stringify(entries).replace(/\$/g, '')))
387524
//await Value.insertMany(JSON.parse(JSON.stringify(values).replace(/\$/g, '')))
388525
//log("Indeed")
389526
}

0 commit comments

Comments
 (0)