Skip to content

Commit f3b625a

Browse files
authored
Revert "Revert "Begin to factor towards a single data collection (#75)" (#76)" (#77)
This reverts commit ef111bd.
1 parent ef111bd commit f3b625a

File tree

6 files changed

+333
-139
lines changed

6 files changed

+333
-139
lines changed

Diff for: init_db.py

+64-43
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,83 @@
11
import pymongo
22

3-
from opensensor.utils import get_open_sensor_db
3+
from opensensor.db import get_open_sensor_db
44

55
# Script for creating the Time Series
66

77
db = get_open_sensor_db()
8-
try:
9-
db.create_collection(
10-
"Temperature",
11-
timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
12-
)
13-
except pymongo.errors.CollectionInvalid:
14-
print("Temperature collection Already exists, skipping ...")
15-
16-
try:
17-
db.create_collection(
18-
"Humidity",
19-
timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
20-
)
21-
except pymongo.errors.CollectionInvalid:
22-
print("Humidity collection Already exists, skipping ...")
23-
24-
try:
25-
db.create_collection(
26-
"Pressure",
27-
timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
28-
)
29-
except pymongo.errors.CollectionInvalid:
30-
print("Pressure collection Already exists, skipping ...")
8+
# try:
9+
# db.create_collection(
10+
# "Temperature",
11+
# timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
12+
# )
13+
# except pymongo.errors.CollectionInvalid:
14+
# print("Temperature collection Already exists, skipping ...")
15+
#
16+
# try:
17+
# db.create_collection(
18+
# "Humidity",
19+
# timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
20+
# )
21+
# except pymongo.errors.CollectionInvalid:
22+
# print("Humidity collection Already exists, skipping ...")
23+
#
24+
# try:
25+
# db.create_collection(
26+
# "Pressure",
27+
# timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
28+
# )
29+
# except pymongo.errors.CollectionInvalid:
30+
# print("Pressure collection Already exists, skipping ...")
31+
#
32+
# try:
33+
# db.create_collection(
34+
# "Lux",
35+
# timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
36+
# )
37+
# except pymongo.errors.CollectionInvalid:
38+
# print("Lux collection Already exists, skipping ...")
39+
#
40+
# try:
41+
# db.create_collection(
42+
# "CO2",
43+
# timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
44+
# )
45+
# except pymongo.errors.CollectionInvalid:
46+
# print("CO2 collection Already exists, skipping ...")
47+
#
48+
# try:
49+
# db.create_collection(
50+
# "Moisture",
51+
# timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
52+
# )
53+
# except pymongo.errors.CollectionInvalid:
54+
# print("Moisture collection Already exists, skipping ...")
55+
#
56+
# try:
57+
# db.create_collection(
58+
# "pH",
59+
# timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
60+
# )
61+
# except pymongo.errors.CollectionInvalid:
62+
# print("pH collection Already exists, skipping ...")
3163

3264
try:
3365
db.create_collection(
34-
"Lux",
66+
"FreeTier",
3567
timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
68+
# expireAfterSeconds=8000000,
3669
)
3770
except pymongo.errors.CollectionInvalid:
38-
print("Lux collection Already exists, skipping ...")
71+
print("FreeTier collection Already exists, skipping ...")
3972

40-
try:
41-
db.create_collection(
42-
"CO2",
43-
timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
44-
)
45-
except pymongo.errors.CollectionInvalid:
46-
print("CO2 collection Already exists, skipping ...")
4773

4874
try:
4975
db.create_collection(
50-
"Moisture",
51-
timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
76+
"Migration",
77+
# timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
78+
# expireAfterSeconds=8000000,
5279
)
53-
except pymongo.errors.CollectionInvalid:
54-
print("Moisture collection Already exists, skipping ...")
80+
db.Migration.create_index([("migration_name", 1)], unique=True)
5581

56-
try:
57-
db.create_collection(
58-
"pH",
59-
timeseries={"timeField": "timestamp", "metaField": "metadata", "granularity": "minutes"},
60-
)
6182
except pymongo.errors.CollectionInvalid:
62-
print("pH collection Already exists, skipping ...")
83+
print("Migration collection Already exists, skipping ...")

Diff for: migrate_collections.py

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
from datetime import datetime, timedelta
2+
from operator import itemgetter
3+
4+
from pymongo import ASCENDING
5+
6+
from opensensor.collection_apis import new_collections, old_collections
7+
8+
# Create a MongoDB client
9+
from opensensor.db import get_open_sensor_db
10+
11+
# Access the database
12+
db = get_open_sensor_db()
13+
14+
# List of all collections/models to migrate
15+
collections_to_migrate = ["Temperature", "Humidity", "Pressure", "Lux", "CO2", "PH", "Moisture"]
16+
17+
migration = db.Migration.find_one({"migration_name": "FreeTier"})
18+
if not migration:
19+
db["Migration"].insert_one({"migration_name": "FreeTier", "migration_complete": False})
20+
21+
# Determine the earliest and latest timestamps in your data
22+
earliest_timestamp = datetime.now()
23+
latest_timestamp = datetime.min
24+
25+
for collection_name in collections_to_migrate:
26+
collection = db[collection_name]
27+
earliest_document = collection.find_one(sort=[("timestamp", ASCENDING)])
28+
latest_document = collection.find_one(sort=[("timestamp", -1)])
29+
if earliest_document and earliest_document["timestamp"] < earliest_timestamp:
30+
earliest_timestamp = earliest_document["timestamp"]
31+
if latest_document and latest_document["timestamp"] > latest_timestamp:
32+
latest_timestamp = latest_document["timestamp"]
33+
34+
# Migrate data in chunks, e.g., one week at a time
35+
start_date = earliest_timestamp
36+
one_week = timedelta(weeks=1)
37+
38+
while start_date <= latest_timestamp:
39+
end_date = start_date + one_week
40+
all_documents = []
41+
42+
# Define a "reasonable" time window
43+
reasonable_time = timedelta(seconds=3)
44+
45+
# Migrate data in chunks
46+
start_date = earliest_timestamp
47+
48+
while start_date <= latest_timestamp:
49+
end_date = start_date + one_week
50+
buffer = {} # We'll store the records for the current time window here
51+
52+
for collection_name in collections_to_migrate:
53+
collection = db[collection_name]
54+
for document in collection.find({"timestamp": {"$gte": start_date, "$lt": end_date}}):
55+
# Convert to the FreeTier model
56+
unit = document["metadata"].get("unit")
57+
new_document = {
58+
"metadata": {
59+
"device_id": document["metadata"]["device_id"],
60+
"name": document["metadata"].get("name"),
61+
"user_id": document.get("user_id"),
62+
},
63+
new_collections[collection_name]: document.get(
64+
old_collections[collection_name]
65+
),
66+
"timestamp": document["timestamp"],
67+
}
68+
if unit:
69+
new_document[f"{new_collections[collection_name]}_unit"] = unit
70+
71+
# Merge with an existing document if it's within a reasonable time,
72+
# otherwise add a new document to the buffer
73+
for existing_timestamp in buffer.keys():
74+
if abs(existing_timestamp - document["timestamp"]) <= reasonable_time:
75+
buffer[existing_timestamp][new_collections[collection_name]] = document.get(
76+
old_collections[collection_name]
77+
)
78+
if unit:
79+
buffer[existing_timestamp][
80+
f"{new_collections[collection_name]}_unit"
81+
] = unit
82+
break
83+
else:
84+
buffer[document["timestamp"]] = new_document
85+
86+
# Sort all documents by timestamp
87+
all_documents = sorted(buffer.values(), key=itemgetter("timestamp"))
88+
89+
# Access the destination collection
90+
free_tier_collection = db["FreeTier"]
91+
92+
# Insert all documents into the new collection, in sorted order
93+
for document in all_documents:
94+
free_tier_collection.insert_one(document)
95+
96+
# Advance to the next time chunk
97+
start_date = end_date
98+
99+
100+
db["Migration"].update_one({"migration_name": "FreeTier"}, {"$set": {"migration_complete": True}})

0 commit comments

Comments
 (0)