forked from thedanfields/python-mastery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader.py
113 lines (82 loc) · 3.2 KB
/
reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import collections, copy, csv
import collections.abc
from abc import ABC, abstractmethod
class DataCollection(collections.abc.Sequence):
def __init__(self, headers, types):
self.col_count = len(headers)
self.headers = headers
self.types = types
self.data = [[] for _ in range(len(headers))]
def __len__(self):
return len(self.data[0]) if self.col_count > 0 else 0
def __getitem__(self, value):
if isinstance(value, slice):
data = DataCollection(copy.deepcopy(self.headers), copy.deepcopy(self.types))
data.data = [col[value] for col in self.data]
return data
return dict(zip(self.headers, (col[value] for col in self.data)))
def append(self, list):
for index in range(self.col_count):
self.data[index].append(self.types[index](list[index]))
def read_csv_as_dicts_old(filename, types):
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
if len(headers) != len(types):
print("Error: Length of types does not match data count")
return []
return [{ name:typed(value) for name, typed, value in zip(headers, types, row) } for row in rows]
def read_csv_as_columns(filename, types):
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
if len(headers) != len(types):
print("Error: Length of types does not match data count")
return DataCollection([], [])
data = DataCollection(headers, types)
for row in rows:
data.append(row)
return data
def read_csv_as_instances_old(filename, cls):
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
records.append(cls.from_row(row))
return records
def read_csv_as_instances(filename, cls):
return InstanceCSVParser(cls).parse(filename)
def read_csv_as_dicts(filename, types):
return DictCSVParser(types).parse(filename)
class CSVParser(ABC):
def parse(self, filename):
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
record = self.make_record(headers, row)
records.append(record)
return records
@abstractmethod
def make_record(self, headers, row):
pass
class DictCSVParser(CSVParser):
def __init__(self, types):
self.types = types
def make_record(self, headers, row):
return { name: func(val) for name, func, val in zip(headers, self.types, row) }
class InstanceCSVParser(CSVParser):
def __init__(self, cls):
self.cls = cls
def make_record(self, headers, row):
return self.cls.from_row(row)
if __name__ == '__main__':
from sys import intern
import tracemalloc
tracemalloc.start()
rows = read_csv_as_columns('Data/ctabus.csv', [intern, intern, str, int])
print('Memory Use: Current %d, Peak %d' % tracemalloc.get_traced_memory())
# Current 96724503, Peak 96754552 (csv as columns)
# Current 35330133, Peak 35360292 (csv as columns, with intern route/dates)