-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcalculations_.py
66 lines (51 loc) · 2.05 KB
/
calculations_.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import asyncio, os
import numpy as np
import pandas as pd
from pathlib import Path
from async_class import AsyncClass, AsyncObject, task, link
import aiofiles
from fastapi import UploadFile
path = os.getcwd()
np.set_printoptions(precision=4, suppress=True)
path = os.getcwd()
np.set_printoptions(precision=4, suppress=True)
async def verify_files_and_read(uploadedFile: UploadFile):
accepted_files = ('.json', '.csv', '.xlsx')
extension = None
if not uploadedFile.filename.endswith(accepted_files):
raise ValueError("You can only use .json, Excel, and csv extensions")
for acepF in accepted_files:
if uploadedFile.filename.endswith(acepF):
extension = acepF
break
return uploadedFile.filename, extension
def get_file(file):
with open(os.getcwd() + f"/{file}","r") as f:
return f.read()
class FileManager(AsyncObject):
async def __ainit__(self, file=None, extension=None):
self._AsyncObject__closed = False
self.file = file
self.extension = extension
async def read_file(self):
"""Read a file and return its values"""
async with aiofiles.open(self.file, "r") as f:
return await f.read()
async def extract_some_data_and_check(self, data_frame):
"""Data_frame is our document in str format"""
columns = data_frame.shape[1]
rows = data_frame.shape[0]
return columns, rows
async def main_function(self):
print("Hello world, this is the main function where I'll execute the main scripts")
async def sort_column(self, GivenArray):
pass
if __name__ == "__main__":
async def main():
# Provide a sample file name for testing purposes.
# You should replace 'sample_file.json' with the actual file path you want to use.
file_name, extension = await verify_files_and_read('sample_file.json')
instance = await FileManager.__new__(FileManager)
await instance.__ainit__(file_name, extension)
await instance.est
asyncio.run(main())