-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtelemetry.py
229 lines (179 loc) · 7.29 KB
/
telemetry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
from flask import Flask
from flask_cors import CORS
from flask_socketio import SocketIO, emit, join_room, leave_room
import os
from dotenv import load_dotenv
import uuid
from langchain.chains import ConversationChain
from langchain.memory import ConversationBufferMemory
from langchain_core.prompts import (
ChatPromptTemplate,
HumanMessagePromptTemplate,
MessagesPlaceholder,
)
from langchain_core.messages import SystemMessage
from langchain_groq import ChatGroq
from groq import Groq
import sqlparse
from supabase import create_client, Client
import supabase
import matplotlib.pyplot as plt
import seaborn as sns
load_dotenv()
app = Flask(__name__)
CORS(app)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='gevent')
llm = ChatGroq(
groq_api_key=os.environ.get("GROQ_API_KEY"),
model_name="llama3-70b-8192",
temperature=0.1,
)
housekeeping_data_db_schema = '''
CREATE TABLE telemetry_data (
timestamp TEXT NOT NULL,
name TEXT NOT NULL,
value TEXT, -- Assuming 'value' can store text or numerical data
calibrated_value TEXT,
unit TEXT,
c_func TEXT
);
'''
def query_conv(user_query):
client = Groq(
api_key=os.environ.get("GROQ_API_KEY")
)
query_prompt = f"""
### Task
Generate a SQL query to answer [QUESTION]{user_query}[/QUESTION] about satellite housekeeping data.
## Database Schema:
{housekeeping_data_db_schema}
## Information about Data attributes:
1. timestamp: The date and time of the data entry.
2. name: The type of measurement or event recorded (e.g., uptime, eps_battery, software_ident, safe_mode, AMRAD_message).
3. value: The raw value of the measurement.
4. calibrated_value: A potentially processed or calibrated value (though this column seems mostly empty).
5. unit: The units associated with the values (e.g., seconds for time measurements).
6. c_func: Possibly a column for function or additional metadata.
## Satellite Housekeeping Data Context:
This data represents various metrics and events from an Amateur Radio Satellite, including power levels, battery status, temperature, RSSI, and system messages.
## Instructions
- Provide a SQL query that answers the user's question about satellite housekeeping data.
- If you cannot answer the question with the available database schema, return 'I don't know'.
- Focus on retrieving relevant data for satellite metrics, events, and status information.
### Answer
Given the database schema, here is the SQL query that answers [QUESTION]{user_query}[/QUESTION]
Response only in [SQL] format and do not include any other information.
"""
chat_completion = client.chat.completions.create(
messages=[
{
"role": "system",
"content": query_prompt,
},
{
"role": "user",
"content": user_query,
},
],
model="llama3-70b-8192",
)
query = chat_completion.choices[0].message.content
sql_query = sqlparse.format(query, reindent=True)
return sql_query
SUPABASE_URL = os.environ.get('SUPABASE_URL')
SUPABASE_KEY = os.environ.get('SUPABASE_KEY')
ACCESS_TOKEN = os.environ.get('ACCESS_TOKEN')
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
# Use your Database API endpoint here to query the database and return result
def execute_sql_query(sql_query):
if sql_query == "I don't know":
return "I don't know"
# #API here
response = supabase.table("telemetry_data").select("*", count="exact").execute()
return response
def generate_nl_response(user_query, sql_query, query_results):
client = Groq(
api_key=os.environ.get("GROQ_API_KEY"),
)
prompt = f"""
### Task
Answer to the user's query about satellite housekeeping data based on the table data provided.
### User Query
{user_query}
### SQL Query
{sql_query}
### Query Results
{query_results}
## Database Schema:
{housekeeping_data_db_schema}
## Information about Data attributes:
1. timestamp: The date and time of the data entry.
2. name: The type of measurement or event recorded (e.g., uptime, eps_battery, software_ident, safe_mode, AMRAD_message).
3. value: The raw value of the measurement.
4. calibrated_value: A potentially processed or calibrated value (though this column seems mostly empty).
5. unit: The units associated with the values (e.g., seconds for time measurements).
6. c_func: Possibly a column for function or additional metadata.
## Instructions
- Provide a short, clear and concise, to the point answer to the user's query about satellite housekeeping data.
- Interpret the query results in the context of satellite operations and metrics.
- If specific data is requested, present it clearly and explain its significance.
- Avoid mentioning SQL or database operations in your response.
- If you cannot answer the question with the available data, say "I don't have enough information to answer that question."
"""
chat_completion = client.chat.completions.create(
messages=[
{
"role": "system",
"content": prompt,
}
],
model="llama3-70b-8192",
)
return chat_completion.choices[0].message.content
system_prompt = """You are an expert in satellite housekeeping data analysis. Your role is to assist users by answering questions about Amateur Radio Satellite data, including power levels, battery status, temperature, RSSI, system messages, and other relevant metrics. Use your knowledge to provide insightful answers and explanations about satellite operations and status. If you cannot answer a question, say "I don't have enough information to answer that question."
"""
user_conversations = {}
@socketio.on('connect')
def handle_connect():
user_id = str(uuid.uuid4())
join_room(user_id)
emit('set_user_id', {'user_id': user_id})
print(f'Client connected with ID: {user_id}')
@socketio.on('join')
def on_join(data):
user_id = data['user_id']
join_room(user_id)
print(f'User {user_id} joined their room')
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
@socketio.on('user_message')
def handle_message(message):
user_id = message['user_id']
user_query = message['data']
print(f'User {user_id}: {user_query}')
if user_id not in user_conversations:
memory = ConversationBufferMemory(return_messages=True)
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=system_prompt),
MessagesPlaceholder(variable_name="history"),
HumanMessagePromptTemplate.from_template("{input}")
])
user_conversations[user_id] = ConversationChain(
llm=llm,
memory=memory,
prompt=prompt,
verbose=False
)
conversation = user_conversations[user_id]
sql_query = query_conv(user_query)
print('Generated SQL query:', sql_query)
query_results = execute_sql_query(sql_query)
print('Query results:', query_results)
nl_response = generate_nl_response(user_query, sql_query, query_results)
print('Natural language response:', nl_response)
conversation.predict(input=f"User: {user_query}\nAI: {nl_response}")
emit('bot_response', {'data': nl_response}, room=user_id)
if __name__ == '__main__':
socketio.run(app, debug=True)