Skip to content

Commit

Permalink
Merge pull request #103 from mlsecproject/winnow-speedup
Browse files Browse the repository at this point in the history
Winnow speedup
  • Loading branch information
alexcpsec committed Jan 8, 2015
2 parents d662493 + 3d911c4 commit e4d3716
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 85 deletions.
123 changes: 64 additions & 59 deletions baler.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import ConfigParser
import csv
import datetime as dt
import gzip
import json
import logging
import os
import sys
import re
import requests
import sys
import time
import re
from Queue import Queue
import unicodecsv
import threading
from logger import get_logger
import logging

from Queue import Queue

logger = get_logger('baler')


def tiq_output(reg_file, enr_file):
config = ConfigParser.SafeConfigParser()
cfg_success = config.read('combine.cfg')
Expand Down Expand Up @@ -43,17 +43,17 @@ def tiq_output(reg_file, enr_file):
outbound_data = [row for row in reg_data if row[2] == 'outbound']

try:
bale_reg_csvgz(inbound_data, os.path.join(tiq_dir, 'raw', 'public_inbound', today+'.csv.gz'))
bale_reg_csvgz(outbound_data, os.path.join(tiq_dir, 'raw', 'public_outbound', today+'.csv.gz'))
bale_reg_csvgz(inbound_data, os.path.join(tiq_dir, 'raw', 'public_inbound', today + '.csv.gz'))
bale_reg_csvgz(outbound_data, os.path.join(tiq_dir, 'raw', 'public_outbound', today + '.csv.gz'))
except:
pass

inbound_data = [row for row in enr_data if row[2] == 'inbound']
outbound_data = [row for row in enr_data if row[2] == 'outbound']

try:
bale_enr_csvgz(inbound_data, os.path.join(tiq_dir, 'enriched', 'public_inbound', today+'.csv.gz'))
bale_enr_csvgz(outbound_data, os.path.join(tiq_dir, 'enriched', 'public_outbound', today+'.csv.gz'))
bale_enr_csvgz(inbound_data, os.path.join(tiq_dir, 'enriched', 'public_inbound', today + '.csv.gz'))
bale_enr_csvgz(outbound_data, os.path.join(tiq_dir, 'enriched', 'public_outbound', today + '.csv.gz'))
except:
pass

Expand All @@ -64,7 +64,7 @@ def bale_reg_csvgz(harvest, output_file):
""" bale the data as a gziped csv file"""
logger.info('Output regular data as GZip CSV to %s' % output_file)
with gzip.open(output_file, 'wb') as csv_file:
bale_writer = csv.writer(csv_file, quoting=csv.QUOTE_ALL)
bale_writer = unicodecsv.writer(csv_file, quoting=unicodecsv.QUOTE_ALL)

# header row
bale_writer.writerow(('entity', 'type', 'direction', 'source', 'notes', 'date'))
Expand All @@ -75,7 +75,7 @@ def bale_reg_csv(harvest, output_file):
""" bale the data as a csv file"""
logger.info('Output regular data as CSV to %s' % output_file)
with open(output_file, 'wb') as csv_file:
bale_writer = csv.writer(csv_file, quoting=csv.QUOTE_ALL)
bale_writer = unicodecsv.writer(csv_file, quoting=unicodecsv.QUOTE_ALL)

# header row
bale_writer.writerow(('entity', 'type', 'direction', 'source', 'notes', 'date'))
Expand All @@ -86,112 +86,117 @@ def bale_enr_csv(harvest, output_file):
""" output the data as an enriched csv file"""
logger.info('Output enriched data as CSV to %s' % output_file)
with open(output_file, 'wb') as csv_file:
bale_writer = csv.writer(csv_file, quoting=csv.QUOTE_ALL)
bale_writer = unicodecsv.writer(csv_file, quoting=unicodecsv.QUOTE_ALL)

# header row
bale_writer.writerow(('entity', 'type', 'direction', 'source', 'notes', 'date', 'asnumber', 'asname', 'country', 'host', 'rhost'))
bale_writer.writerows(harvest)


def bale_enr_csvgz(harvest, output_file):
""" output the data as an enriched gziped csv file"""
logger.info('Output enriched data as GZip CSV to %s' % output_file)
with gzip.open(output_file, 'wb') as csv_file:
bale_writer = csv.writer(csv_file, quoting=csv.QUOTE_ALL)
bale_writer = unicodecsv.writer(csv_file, quoting=unicodecsv.QUOTE_ALL)

# header row
bale_writer.writerow(('entity', 'type', 'direction', 'source', 'notes', 'date', 'asnumber', 'asname', 'country', 'host', 'rhost'))
bale_writer.writerows(harvest)

def bale_CRITs_indicator(base_url,data,indicator_que):

def bale_CRITs_indicator(base_url, data, indicator_que):
""" One thread of adding indicators to CRITs"""
while not indicator_que.empty():
indicator=indicator_que.get()
indicator = indicator_que.get()
if indicator[1] == 'IPv4':
# using the IP API
url=base_url+'ips/'
data['add_indicator']="true"
data['ip']=indicator[0]
data['ip_type']='Address - ipv4-addr'
data['reference']=indicator[3]
url = base_url + 'ips/'
data['add_indicator'] = "true"
data['ip'] = indicator[0]
data['ip_type'] = 'Address - ipv4-addr'
data['reference'] = indicator[3]
# getting the source automatically:
source=re.findall(r'\/\/(.*?)\/',data['reference'])
source = re.findall(r'\/\/(.*?)\/', data['reference'])
if source:
data['source']=source[0]
res = requests.post(url,data=data,verify=False)
if not res.status_code in [201,200,400]:
data['source'] = source[0]
res = requests.post(url, data=data, verify=False)
if not res.status_code in [201, 200, 400]:
logger.info("Issues with adding: %s" % data['ip'])
elif indicator[1] == "FQDN":
# using the Domain API
url=base_url+'domains/'
data['add_indicator']="true"
data['domain']=indicator[0]
data['reference']=indicator[3]
url = base_url + 'domains/'
data['add_indicator'] = "true"
data['domain'] = indicator[0]
data['reference'] = indicator[3]
# getting the source automatically:
source=re.findall(r'\/\/(.*?)\/',data['reference'])
source = re.findall(r'\/\/(.*?)\/', data['reference'])
if source:
data['source']=source[0]
res = requests.post(url,data=data,verify=False)
if not res.status_code in [201,200,400]:
data['source'] = source[0]
res = requests.post(url, data=data, verify=False)
if not res.status_code in [201, 200, 400]:
logger.info("Issues with adding: %s" % data['domain'])
else:
logger.info("don't yet know what to do with: %s[%s]" % (indicator[1],indicator[0]))
logger.info("don't yet know what to do with: %s[%s]" % (indicator[1], indicator[0]))


def bale_CRITs(harvest,filename):
def bale_CRITs(harvest, filename):
""" taking the output from combine and pushing it to the CRITs web API"""
# checking the minimum requirements for parameters
# it would be nice to have some metadata on the feeds that can be imported in the intel library:
# -> confidence
# -> type of feed (bot vs spam vs ddos, you get the picture)
data={'confidence':'medium'}
start_time=time.time()
data = {'confidence': 'medium'}
start_time = time.time()
config = ConfigParser.SafeConfigParser()
cfg_success = config.read('combine.cfg')
if not cfg_success:
logger.error('tiq_output: Could not read combine.cfg.\n')
logger.error('HINT: edit combine-example.cfg and save as combine.cfg.\n')
return
if config.has_option('Baler','crits_username'):
data['username']=config.get('Baler', 'crits_username')
if config.has_option('Baler', 'crits_username'):
data['username'] = config.get('Baler', 'crits_username')
else:
raise 'Please check the combine.cnf file for the crits_username field in the [Baler] section'
if config.has_option('Baler','crits_api_key'):
data['api_key']=config.get('Baler', 'crits_api_key')
if config.has_option('Baler', 'crits_api_key'):
data['api_key'] = config.get('Baler', 'crits_api_key')
else:
raise 'Please check the combine.cnf file for the crits_api_key field in the [Baler] section'
if config.has_option('Baler','crits_campaign'):
data['campaign']=config.get('Baler', 'crits_campaign')
if config.has_option('Baler', 'crits_campaign'):
data['campaign'] = config.get('Baler', 'crits_campaign')
else:
logger.info('Lacking a campaign name, we will default to "combine." Errors might ensue if it does not exist in CRITs')
data['campaign']='combine'
if config.has_option('Baler','crits_url'):
base_url=config.get('Baler','crits_url')
data['campaign'] = 'combine'
if config.has_option('Baler', 'crits_url'):
base_url = config.get('Baler', 'crits_url')
else:
raise 'Please check the combine.cnf file for the crits_url field in the [Baler] section'
if config.has_option('Baler','crits_maxThreads'):
maxThreads=int(config.get('Baler', 'crits_maxThreads'))
if config.has_option('Baler', 'crits_maxThreads'):
maxThreads = int(config.get('Baler', 'crits_maxThreads'))
else:
logger.info('No number of maximum Threads has been given, defaulting to 10')
maxThreads=10
maxThreads = 10

data['source']='Combine'
data['method']='trawl'
data['source'] = 'Combine'
data['method'] = 'trawl'

# initializing the Queue to the list of indicators in the harvest
ioc_queue=Queue()
ioc_queue = Queue()
for indicator in harvest:
ioc_queue.put(indicator)
total_iocs=ioc_queue.qsize()
total_iocs = ioc_queue.qsize()

for x in range(maxThreads):
th=threading.Thread(target=bale_CRITs_indicator, args=(base_url,data,ioc_queue))
th = threading.Thread(target=bale_CRITs_indicator, args=(base_url, data, ioc_queue))
th.start()

for x in threading.enumerate():
if x.name=="MainThread":
if x.name == "MainThread":
continue
x.join()

logger.info('Output %d indicators to CRITs using %d threads. Operation tool %d seconds\n' % (total_iocs,maxThreads,time.time()-start_time))
logger.info('Output %d indicators to CRITs using %d threads. Operation tool %d seconds\n' %
(total_iocs, maxThreads, time.time() - start_time))


def bale(input_file, output_file, output_format, is_regular):
config = ConfigParser.SafeConfigParser()
Expand All @@ -203,13 +208,13 @@ def bale(input_file, output_file, output_format, is_regular):

logger.info('Reading processed data from %s' % input_file)
with open(input_file, 'rb') as f:
harvest = json.load(f)
harvest = json.load(f, encoding='utf8')

# TODO: also need plugins here (cf. #23)
if is_regular:
format_funcs = {'csv': bale_reg_csv,'crits':bale_CRITs}
format_funcs = {'csv': bale_reg_csv, 'crits': bale_CRITs}
else:
format_funcs = {'csv': bale_enr_csv,'crits':bale_CRITs}
format_funcs = {'csv': bale_enr_csv, 'crits': bale_CRITs}
format_funcs[output_format](harvest, output_file)

if __name__ == "__main__":
Expand Down
8 changes: 5 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
CsvSchema==1.1.1
argparse==1.2.1
argparse>=1.2.1,<1.4.0
beautifulsoup4==4.3.2
feedparser==5.1.3
gevent==1.0.1
greenlet>=0.4.2,<0.5.0
-e git+https://github.com/rtdean/grequests@19239a34b00b8ac226b21f01b0fb55e869097fb7#egg=grequests-origin
netaddr==0.7.12
pygeoip==0.3.1
requests>=2.3.0,<2.5.0
pygeoip>=0.3.1,<0.4.0
requests>=2.3.0,<2.6.0
sortedcontainers==0.9.4
wsgiref==0.1.2
unicodecsv==0.9.4
49 changes: 26 additions & 23 deletions winnower.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,36 @@
import sys

from netaddr import IPAddress, IPRange, IPSet
from sortedcontainers import SortedDict

from logger import get_logger
import logging

logger = get_logger('winnower')

# from http://en.wikipedia.org/wiki/Reserved_IP_addresses:
reserved_ranges = IPSet(['0.0.0.0/8', '100.64.0.0/10', '127.0.0.0/8', '192.88.99.0/24',
'198.18.0.0/15', '198.51.100.0/24', '203.0.113.0/24', '233.252.0.0/24'])
gi_org = SortedDict()


def load_gi_org(filename):
gi_org = {}
with open(filename, 'rb') as f:
org_reader = csv.DictReader(f, fieldnames=['start', 'end', 'org'])
for row in org_reader:
gi_org[IPRange(row['start'], row['end'])] = row['org']
gi_org[row['start']] = (IPRange(row['start'], row['end']), unicode(row['org'], errors='replace'))

return gi_org


def org_by_addr(address, org_data):
def org_by_addr(address):
as_num = None
as_name = None
for iprange in org_data:
if address in iprange:
as_num, sep, as_name = org_data[iprange].partition(' ')
as_num = as_num.replace("AS", "") # Making sure the variable only has the number
break
gi_index = gi_org.bisect(str(int(address)))
gi_net = gi_org[gi_org.iloc[gi_index - 1]]
if address in gi_net[0]:
as_num, sep, as_name = gi_net[1].partition(' ')
as_num = as_num.replace("AS", "") # Making sure the variable only has the number
return as_num, as_name


Expand All @@ -46,8 +52,8 @@ def maxhits(dns_records):
return hostname


def enrich_IPv4(address, org_data, geo_data, dnsdb=None):
as_num, as_name = org_by_addr(address, org_data)
def enrich_IPv4(address, geo_data, dnsdb=None):
as_num, as_name = org_by_addr(address)
country = geo_data.country_code_by_addr('%s' % address)
if dnsdb:
hostname = maxhits(dnsdb.query_rdata_ip('%s' % address))
Expand All @@ -73,12 +79,9 @@ def filter_date(records, date):


def reserved(address):
# from http://en.wikipedia.org/wiki/Reserved_IP_addresses:
ranges = IPSet(['0.0.0.0/8', '100.64.0.0/10', '127.0.0.0/8', '192.88.99.0/24',
'198.18.0.0/15', '198.51.100.0/24', '203.0.113.0/24', '233.252.0.0/24'])
a_reserved = address.is_reserved()
a_private = address.is_private()
a_inr = address in ranges
a_inr = address in reserved_ranges
if a_reserved or a_private or a_inr:
return True
else:
Expand Down Expand Up @@ -138,7 +141,7 @@ def winnow(in_file, out_file, enr_file):

# TODO: make these locations configurable?
logger.info('Loading GeoIP data')
org_data = load_gi_org('data/GeoIPASNum2.csv')
gi_org = load_gi_org('data/GeoIPASNum2.csv')
geo_data = pygeoip.GeoIP('data/GeoIP.dat', pygeoip.MEMORY_CACHE)

wheat = []
Expand All @@ -147,23 +150,21 @@ def winnow(in_file, out_file, enr_file):
logger.info('Beginning winnowing process')
for each in crop:
(addr, addr_type, direction, source, note, date) = each
# TODO: enrich DNS indicators as well
if addr_type == 'IPv4' and is_ipv4(addr):
logger.info('Enriching %s' % addr)
#logger.info('Enriching %s' % addr)
ipaddr = IPAddress(addr)
if not reserved(ipaddr):
wheat.append(each)
if enrich_ip:
e_data = (addr, addr_type, direction, source, note, date) + enrich_IPv4(ipaddr, org_data, geo_data, dnsdb)
e_data = (addr, addr_type, direction, source, note, date) + enrich_IPv4(ipaddr, geo_data, dnsdb)
enriched.append(e_data)
else:
e_data = (addr, addr_type, direction, source, note, date) + enrich_IPv4(ipaddr, org_data, geo_data)
e_data = (addr, addr_type, direction, source, note, date) + enrich_IPv4(ipaddr, geo_data)
enriched.append(e_data)
else:
logger.error('Found invalid address: %s from: %s' % (addr, source))
elif addr_type == 'FQDN' and is_fqdn(addr):
# TODO: validate these (cf. https://github.com/mlsecproject/combine/issues/15 )
logger.info('Enriching %s' % addr)
#logger.info('Enriching %s' % addr)
wheat.append(each)
if enrich_dns and dnsdb:
e_data = (addr, addr_type, direction, source, note, date, enrich_FQDN(addr, date, dnsdb))
Expand All @@ -173,10 +174,12 @@ def winnow(in_file, out_file, enr_file):

logger.info('Dumping results')
with open(out_file, 'wb') as f:
json.dump(wheat, f, indent=2)
w_data = json.dumps(wheat, indent=2, ensure_ascii=False).encode('utf8')
f.write(w_data)

with open(enr_file, 'wb') as f:
json.dump(enriched, f, indent=2)
e_data = json.dumps(enriched, indent=2, ensure_ascii=False).encode('utf8')
f.write(e_data)


if __name__ == "__main__":
Expand Down

0 comments on commit e4d3716

Please sign in to comment.