|
| 1 | +from datetime import datetime, timedelta |
| 2 | +import sys, os, stat, re, time |
| 3 | +from timelib import ShiftTZ, timestamp |
| 4 | +from GraphiteInterface import GraphiteInterface |
| 5 | + |
| 6 | +class CacheLogParser: |
| 7 | + |
| 8 | + def __init__(self, files, last_file_inode, last_file_position, last_t): |
| 9 | + self.Files = files[:] |
| 10 | + self.TMax = None |
| 11 | + self.TStart = last_t |
| 12 | + self.LastFileInode = last_file_inode |
| 13 | + self.LastFilePosition = last_file_position |
| 14 | + |
| 15 | + def parseLine(self, l): |
| 16 | + out = {} |
| 17 | + while l: |
| 18 | + l = l.strip() |
| 19 | + if not l: break |
| 20 | + words = l.split("=", 1) |
| 21 | + if len(words) != 2: |
| 22 | + break |
| 23 | + n, rest = tuple(words) |
| 24 | + if rest[0] == '[': |
| 25 | + words = rest[1:].split("]",1) |
| 26 | + else: |
| 27 | + words = rest.split(None, 1) |
| 28 | + val = words[0] |
| 29 | + rest = words[1] if len(words) > 1 else "" |
| 30 | + if n == 'time': |
| 31 | + words = val.split() |
| 32 | + dt = datetime.strptime(words[0], "%d/%b/%Y:%H:%M:%S") |
| 33 | + zone = None |
| 34 | + if len(words) > 1: |
| 35 | + zone = words[1] |
| 36 | + minus = 1.0 |
| 37 | + if zone[0] == '-': |
| 38 | + minus = -1.0 |
| 39 | + zone = zone[1:] |
| 40 | + elif zone[0] == '+': |
| 41 | + zone = zone[1:] |
| 42 | + zone = zone[:4] |
| 43 | + h = float(zone[:2]) |
| 44 | + m = float(zone[2:])/60.0 |
| 45 | + shift = (h+m)*minus |
| 46 | + zone = ShiftTZ(shift) |
| 47 | + dt = dt.replace(tzinfo=zone) |
| 48 | + out["clock"] = timestamp(dt) |
| 49 | + #print val, " -> ", dt, out["clock"] |
| 50 | + elif n == 'request': |
| 51 | + req, url, proto = val.split(None, 2) |
| 52 | + out["req"] = req |
| 53 | + out["url"] = url |
| 54 | + out["proto"] = proto |
| 55 | + out[n] = val |
| 56 | + l = rest |
| 57 | + #print "out=", out |
| 58 | + return out |
| 59 | + |
| 60 | + def validateData(self, data): |
| 61 | + for k in ['req','url','clock','cached','bytes_sent']: |
| 62 | + if k not in data: return False |
| 63 | + return True |
| 64 | + |
| 65 | + def parse(self): |
| 66 | + out = [] # [(t, data),...] |
| 67 | + t_max = last_t |
| 68 | + |
| 69 | + files = [] |
| 70 | + for f in self.Files: |
| 71 | + try: s = os.stat(f) |
| 72 | + except: |
| 73 | + print(sys.exc_info()) |
| 74 | + mtime = s.st_mtime |
| 75 | + if last_t == None or mtime >= self.TStart: |
| 76 | + files.append((mtime, f)) |
| 77 | + else: |
| 78 | + print("File %s was not updated since last scan time(%s). File updated at %s" % ( |
| 79 | + f, time.ctime(mtime), time.ctime(last_t))) |
| 80 | + |
| 81 | + files.sort() |
| 82 | + |
| 83 | + t_begin = None |
| 84 | + |
| 85 | + for mt, fn in files: |
| 86 | + try: |
| 87 | + f = open(fn, 'r') |
| 88 | + except: |
| 89 | + continue |
| 90 | + |
| 91 | + s = os.fstat(f.fileno()) |
| 92 | + size = s.st_size |
| 93 | + inode = s.st_ino |
| 94 | + |
| 95 | + if inode == self.LastFileInode: |
| 96 | + f.seek(self.LastFilePosition) |
| 97 | + |
| 98 | + self.LastFileInode = inode |
| 99 | + self.LastFilePosition = size |
| 100 | + |
| 101 | + for l in f.readlines(): |
| 102 | + try: |
| 103 | + data = self.parseLine(l) |
| 104 | + except: |
| 105 | + continue |
| 106 | + if self.validateData(data): |
| 107 | + t = data["clock"] |
| 108 | + if last_t == None or t > last_t: |
| 109 | + yield (t, data) |
| 110 | + if t_max == None: |
| 111 | + t_begin = t |
| 112 | + t_max = t |
| 113 | + t_max = max(t_max, t) |
| 114 | + else: |
| 115 | + #print "data validation failed:", l |
| 116 | + pass |
| 117 | + self.TMax = t_max |
| 118 | + |
| 119 | +class ServiceData: |
| 120 | + def __init__(self, suffix, interval): |
| 121 | + self.Suffix = suffix |
| 122 | + self.Interval = interval |
| 123 | + self.RequestsMiss = 0 |
| 124 | + self.RequestsHit = 0 |
| 125 | + self.BytesMiss = 0 |
| 126 | + self.BytesHit = 0 |
| 127 | + |
| 128 | + def frequencies(self): |
| 129 | + return ( |
| 130 | + float(self.RequestsHit)/self.Interval, |
| 131 | + float(self.RequestsMiss)/self.Interval, |
| 132 | + float(self.BytesHit)/self.Interval, |
| 133 | + float(self.BytesMiss)/self.Interval |
| 134 | + ) |
| 135 | + |
| 136 | + def __str__(self): |
| 137 | + return "[Service %s: requests: %d/%d bytes: %d/%d]" % (self.Suffix, |
| 138 | + self.RequestsHit, self.RequestsMiss, self.BytesHit, self.BytesMiss) |
| 139 | + |
| 140 | +class CacheLogSummarizer: |
| 141 | + |
| 142 | + def __init__(self, parser, map): |
| 143 | + self.Files = files |
| 144 | + self.URLMap = map # (graphite_suffix, url_head) |
| 145 | + self.Parser = parser |
| 146 | + |
| 147 | + def interval(self, dt, interval): |
| 148 | + dt = int(dt) |
| 149 | + t0 = (dt//interval)*interval |
| 150 | + t1 = t0 + interval |
| 151 | + return t0, t1 |
| 152 | + |
| 153 | + def summarize(self, interval): |
| 154 | + # interval - seconds |
| 155 | + lst = self.Parser.parse() |
| 156 | + t0, t1 = None, None |
| 157 | + int_data = {} |
| 158 | + for t, data in lst: |
| 159 | + if t0 == None or t >= t1: |
| 160 | + if int_data: yield (t0, t1, int_data) |
| 161 | + t0, t1 = self.interval(t, interval) |
| 162 | + int_data = {} # {service_name -> ServiceData} |
| 163 | + # find service |
| 164 | + svc = None |
| 165 | + suffix = None |
| 166 | + for url_head, sfx in self.URLMap: |
| 167 | + if data["url"].startswith(url_head): |
| 168 | + suffix = sfx |
| 169 | + break |
| 170 | + else: |
| 171 | + continue |
| 172 | + |
| 173 | + svc_data = int_data.get(sfx) |
| 174 | + if svc_data == None: |
| 175 | + svc_data = ServiceData(sfx, interval) |
| 176 | + int_data[sfx] = svc_data |
| 177 | + cache_sts = data["cached"] |
| 178 | + try: |
| 179 | + n = int(data["bytes_sent"]) |
| 180 | + except: |
| 181 | + n = 0 |
| 182 | + |
| 183 | + if cache_sts == 'HIT': |
| 184 | + svc_data.RequestsHit += 1 |
| 185 | + svc_data.BytesHit += n |
| 186 | + else: |
| 187 | + svc_data.RequestsMiss += 1 |
| 188 | + svc_data.BytesMiss += n |
| 189 | + |
| 190 | + if int_data: |
| 191 | + yield (t0, t1, int_data) |
| 192 | + |
| 193 | +if __name__ == '__main__': |
| 194 | + |
| 195 | + import yaml, getopt, sys, glob |
| 196 | + |
| 197 | + config_file = None |
| 198 | + |
| 199 | + opts, args = getopt.getopt(sys.argv[1:], "c:") |
| 200 | + |
| 201 | + for opt, val in opts: |
| 202 | + if opt == '-c': config_file = val |
| 203 | + |
| 204 | + config = yaml.load(open(config_file, 'r').read()) |
| 205 | + map = config.get("mapping") |
| 206 | + |
| 207 | + map_lst = [] |
| 208 | + for d in map: |
| 209 | + map_lst.append((d["url"],d["suffix"])) |
| 210 | + #print map_lst |
| 211 | + |
| 212 | + |
| 213 | + state_file = config.get("state_file") |
| 214 | + interval = config.get("aggregation_interval", 60) |
| 215 | + files = config.get("files") |
| 216 | + |
| 217 | + GI = GraphiteInterface(config.get("Graphite")) |
| 218 | + |
| 219 | + last_t = None |
| 220 | + |
| 221 | + try: |
| 222 | + sf = open(state_file, 'r') |
| 223 | + words = sf.read().split() |
| 224 | + last_file_inode = int(words[0]) |
| 225 | + last_file_position = int(words[1]) |
| 226 | + last_t = int(words[2]) |
| 227 | + except: |
| 228 | + last_file_inode = None |
| 229 | + last_file_position = None |
| 230 | + last_t = None |
| 231 | + |
| 232 | + parser = CacheLogParser(glob.glob(files), last_file_inode, last_file_position, last_t) |
| 233 | + s = CacheLogSummarizer(parser, map_lst) |
| 234 | + |
| 235 | + lst = s.summarize(interval) |
| 236 | + for t0, t1, data in lst: |
| 237 | + for svc, svc_data in sorted(data.items()): |
| 238 | + rh, rm, bh, bm = svc_data.frequencies() |
| 239 | + GI.feedData(t1, svc_data.Suffix+".requests.hit", rh) |
| 240 | + GI.feedData(t1, svc_data.Suffix+".requests.miss", rm) |
| 241 | + GI.feedData(t1, svc_data.Suffix+".bytes.hit", bh) |
| 242 | + GI.feedData(t1, svc_data.Suffix+".bytes.miss", bm) |
| 243 | + #print t0, t1, svc, svc_data.frequencies() |
| 244 | + last_t = t0 |
| 245 | + GI.flushData() |
| 246 | + |
| 247 | + if last_t: |
| 248 | + open(state_file, 'w').write("%s %s %d" % (parser.LastFileInode, parser.LastFilePosition, last_t)) |
| 249 | + |
| 250 | + |
| 251 | + |
| 252 | + |
| 253 | + |
| 254 | + |
| 255 | + |
0 commit comments