Skip to content

Commit 7bb752a

Browse files
committed
add detecting fraudulent transactions using stream app with kafka tutorial
1 parent 272e14b commit 7bb752a

File tree

9 files changed

+145
-0
lines changed

9 files changed

+145
-0
lines changed

Diff for: README.md

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ This is a repository of all the tutorials of [The Python Code](https://www.thepy
8686
- [How to Play and Record Audio in Python](https://www.thepythoncode.com/article/play-and-record-audio-sound-in-python). ([code](general/recording-and-playing-audio))
8787
- [How to Get Geographic Locations in Python](https://www.thepythoncode.com/article/get-geolocation-in-python). ([code](general/geolocation))
8888
- [How to Assembly, Disassembly and Emulate Machine Code using Python](https://www.thepythoncode.com/article/arm-x86-64-assembly-disassembly-and-emulation-in-python). ([code](general/assembly-code))
89+
- [Detecting Fraudulent Transactions in a Streaming Application using Kafka in Python](https://www.thepythoncode.com/article/detect-fraudulent-transactions-with-apache-kafka-in-python). ([code](general/detect-fraudulent-transactions))
8990

9091

9192
- ### [Web Scraping](https://www.thepythoncode.com/topic/web-scraping)

Diff for: general/detect-fraudulent-transactions/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# [Detecting Fraudulent Transactions in a Streaming Application using Kafka in Python](https://www.thepythoncode.com/article/detect-fraudulent-transactions-with-apache-kafka-in-python)
2+
Check the original repo [here](https://github.com/bassemmarji/Kafka_Fraud_Detector).
+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from flask import Flask, Response, stream_with_context, render_template, json, url_for
2+
3+
from kafka import KafkaConsumer
4+
from settings import *
5+
6+
# create the flask object app
7+
app = Flask(__name__)
8+
9+
def stream_template(template_name, **context):
10+
print('template name =',template_name)
11+
app.update_template_context(context)
12+
t = app.jinja_env.get_template(template_name)
13+
rv = t.stream(context)
14+
rv.enable_buffering(5)
15+
return rv
16+
17+
def is_suspicious(transaction: dict) -> bool:
18+
"""Determine whether a transaction is suspicious."""
19+
return transaction["amount"] >= 900
20+
21+
# this router will render the template named index.html and will pass the following parameters to it:
22+
# title and Kafka stream
23+
@app.route('/')
24+
def index():
25+
def g():
26+
consumer = KafkaConsumer(
27+
TRANSACTIONS_TOPIC
28+
, bootstrap_servers=KAFKA_BROKER_URL
29+
, value_deserializer=lambda value: json.loads(value)
30+
,
31+
)
32+
for message in consumer:
33+
transaction: dict = message.value
34+
topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
35+
print(topic, transaction) # DEBUG
36+
yield topic, transaction
37+
38+
return Response(stream_template('index.html', title='Fraud Detector / Kafka',data=g()))
39+
40+
if __name__ == "__main__":
41+
app.run(host="localhost" , debug=True)

Diff for: general/detect-fraudulent-transactions/detector.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import os
2+
import json
3+
from kafka import KafkaConsumer, KafkaProducer
4+
from settings import *
5+
6+
def is_suspicious(transaction: dict) -> bool:
7+
"""Simple condition to determine whether a transaction is suspicious."""
8+
return transaction["amount"] >= 900
9+
10+
if __name__ == "__main__":
11+
consumer = KafkaConsumer(
12+
TRANSACTIONS_TOPIC
13+
,bootstrap_servers=KAFKA_BROKER_URL
14+
,value_deserializer = lambda value: json.loads(value)
15+
,
16+
)
17+
18+
for message in consumer:
19+
transaction: dict = message.value
20+
topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
21+
print(topic,transaction) #DEBUG

Diff for: general/detect-fraudulent-transactions/producer.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import os
2+
import json
3+
from time import sleep
4+
from kafka import KafkaProducer
5+
# import initialization parameters
6+
from settings import *
7+
from transactions import create_random_transaction
8+
9+
10+
if __name__ == "__main__":
11+
producer = KafkaProducer(bootstrap_servers = KAFKA_BROKER_URL
12+
#Encode all values as JSON
13+
,value_serializer = lambda value: json.dumps(value).encode()
14+
,)
15+
while True:
16+
transaction: dict = create_random_transaction()
17+
producer.send(TRANSACTIONS_TOPIC, value= transaction)
18+
print(transaction) #DEBUG
19+
sleep(SLEEP_TIME)
+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Kafka-python==2.0.2
2+
Flask==1.1.2

Diff for: general/detect-fraudulent-transactions/settings.py

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# URL for our broker used for connecting to the Kafka cluster
2+
KAFKA_BROKER_URL = "localhost:9092"
3+
# name of the topic hosting the transactions to be processed and requiring processing
4+
TRANSACTIONS_TOPIC = "queuing.transactions"
5+
# these 2 variables will control the amount of transactions automatically generated
6+
TRANSACTIONS_PER_SECOND = float("2.0")
7+
SLEEP_TIME = 1 / TRANSACTIONS_PER_SECOND
8+
# name of the topic hosting the legitimate transactions
9+
LEGIT_TOPIC = "queuing.legit"
10+
# name of the topic hosting the suspicious transactions
11+
FRAUD_TOPIC = "queuing.fraud"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<!doctype html>
2+
<title> Send Javascript with template demo </title>
3+
<html>
4+
5+
<head>
6+
</head>
7+
8+
<body>
9+
<div class="container">
10+
<h1>{{title}}</h1>
11+
</div>
12+
<div id="data"></div>
13+
{% for topic, transaction in data: %}
14+
<script>
15+
var topic = "{{ topic }}";
16+
var transaction = "{{ transaction }}";
17+
if (topic.search("fraud") > 0) {
18+
topic = topic.fontcolor("red")
19+
} else {
20+
topic = topic.fontcolor("green")
21+
}
22+
document.getElementById('data').innerHTML += "<br>" + topic + " " + transaction;
23+
</script>
24+
{% endfor %}
25+
</body>
26+
27+
</html>
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from random import choices, randint
2+
from string import ascii_letters, digits
3+
4+
account_chars: str = digits + ascii_letters
5+
6+
def _random_account_id() -> str:
7+
"""Return a random account number made of 12 characters"""
8+
return "".join(choices(account_chars,k=12))
9+
10+
def _random_amount() -> float:
11+
"""Return a random amount between 1.00 and 1000.00"""
12+
return randint(100,1000000)/100
13+
14+
def create_random_transaction() -> dict:
15+
"""Create a fake randomised transaction."""
16+
return {
17+
"source":_random_account_id()
18+
,"target":_random_account_id()
19+
,"amount":_random_amount()
20+
,"currency":"EUR"
21+
}

0 commit comments

Comments
 (0)