1
- from datetime import datetime
1
+ from datetime import datetime , timedelta
2
2
from decimal import Decimal
3
3
4
4
from fastapi import FastAPI , Path
5
- from fastapi_pagination import Page , add_pagination
6
- from fastapi_pagination .ext .pymongo import paginate
5
+ from fastapi_pagination import Page , Params , add_pagination
6
+ from fastapi_pagination .ext .pymongo import paginate as pymongo_paginate
7
7
from pydantic import BaseModel
8
8
9
9
from opensensor .utils import get_open_sensor_db
@@ -107,7 +107,7 @@ async def historical_temperatures(
107
107
device_id : str = Path (title = "The ID of the device about which to retrieve historical data." ),
108
108
):
109
109
db = get_open_sensor_db ()
110
- matching_data = paginate (
110
+ matching_data = pymongo_paginate (
111
111
db .Temperature ,
112
112
{"metadata.device_id" : device_id },
113
113
projection = {
@@ -120,6 +120,64 @@ async def historical_temperatures(
120
120
return matching_data
121
121
122
122
123
+ def get_uniform_sample_pipeline ():
124
+ # Define timestamp range and sampling interval
125
+ start_date = datetime (2023 , 1 , 1 )
126
+ end_date = datetime (2023 , 3 , 22 )
127
+ sampling_interval = timedelta (hours = 1 ) # Adjust the sampling interval as needed
128
+
129
+ # Query a uniform sample of documents within the timestamp range
130
+ pipeline = [
131
+ {"$match" : {"timestamp" : {"$gte" : start_date , "$lte" : end_date }}},
132
+ {
133
+ "$addFields" : {
134
+ "group" : {
135
+ "$floor" : {
136
+ "$divide" : [
137
+ {"$subtract" : ["$timestamp" , start_date ]},
138
+ sampling_interval .total_seconds () * 1000 ,
139
+ ]
140
+ }
141
+ }
142
+ }
143
+ },
144
+ {"$group" : {"_id" : "$group" , "doc" : {"$first" : "$$ROOT" }}},
145
+ {"$replaceRoot" : {"newRoot" : "$doc" }},
146
+ {
147
+ "$project" : {
148
+ "_id" : False ,
149
+ "unit" : "$metadata.unit" ,
150
+ "temp" : "$temp" ,
151
+ "timestamp" : "$timestamp" , # Don't forget to include the timestamp field
152
+ }
153
+ },
154
+ {"$sort" : {"timestamp" : 1 }}, # Sort by timestamp in ascending order
155
+ # {"$count": "total"}
156
+ ]
157
+ return pipeline
158
+
159
+
160
+ @app .get ("/sampled-temp/{device_id}" , response_model = Page [Temperature ])
161
+ async def historical_temperatures_sampled (
162
+ device_id : str = Path (title = "The ID of the device about which to retrieve historical data." ),
163
+ params : Params = Params (),
164
+ ):
165
+ pipeline = get_uniform_sample_pipeline ()
166
+
167
+ offset = (params .page - 1 ) * params .size
168
+ size = params .size
169
+
170
+ # Add $skip and $limit stages for pagination
171
+ pipeline .extend ([{"$skip" : offset }, {"$limit" : size }])
172
+ db = get_open_sensor_db ()
173
+ data = list (db .Temperature .aggregate (pipeline ))
174
+ pipeline .append ({"$count" : "total" })
175
+ data_count = list (db .Temperature .aggregate (pipeline ))
176
+ total_count = data_count [0 ]["total" ] if data else 0
177
+ print (data )
178
+ return Page (items = data , total = total_count , page = params .page , size = size , params = params )
179
+
180
+
123
181
@app .post ("/environment/" , response_model = Environment )
124
182
async def record_environment (environment : Environment ):
125
183
db = get_open_sensor_db ()
0 commit comments