Skip to content

Commit

Permalink
kinesis example added
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver Steffmann committed Jul 21, 2021
1 parent 6db0865 commit ba6e4cc
Show file tree
Hide file tree
Showing 14 changed files with 694 additions and 594 deletions.
594 changes: 0 additions & 594 deletions 2_Strategies/Strategy_Kinesis_EMA_HPO.ipynb

This file was deleted.

35 changes: 35 additions & 0 deletions 4_Kinesis/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
FROM tensorflow/tensorflow:2.1.0rc2-py3

RUN apt-get -y update && apt-get install -y --no-install-recommends \
wget \
python3 \
nginx \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*

RUN wget https://sourceforge.net/projects/ta-lib/files/ta-lib/0.4.0/ta-lib-0.4.0-src.tar.gz && tar -xzf ta-lib-0.4.0-src.tar.gz && cd ta-lib/ && ./configure --prefix=/usr && make && make install && cd ../ && rm -rf ta-lib && rm ta-lib-0.4.0-src.tar.gz

# Install all of the packages
RUN wget https://bootstrap.pypa.io/get-pip.py && python get-pip.py
RUN pip install numpy
RUN pip install scipy
RUN pip install scikit-learn
RUN pip install pandas
RUN pip install flask
RUN pip install gevent
RUN pip install gunicorn
RUN pip install tensorflow==2.2.0
RUN pip install keras
RUN pip install backtrader
RUN pip install matplotlib==3.2.2
RUN pip install ta-lib
RUN pip install boto3

# Env Variables
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PATH="/opt/program:${PATH}"

# Set up the program in the image
COPY model /opt/program
WORKDIR /opt/program
1 change: 1 addition & 0 deletions 0_Setup/algo-kinesis.yaml → 4_Kinesis/algo-kinesis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ Resources:
- arn:aws:iam::aws:policy/AmazonSageMakerFullAccess
- arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess
- arn:aws:iam::aws:policy/AmazonKinesisReadOnlyAccess
- arn:aws:iam::aws:policy/AmazonECS_FullAccess
- !Ref 'S3Policy'

S3Policy:
Expand Down
45 changes: 45 additions & 0 deletions 4_Kinesis/build_and_push.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env bash

# This script shows how to build the Docker image and push it to ECR to be ready for use
# by SageMaker.

image=$1

chmod +x model/train
chmod +x model/serve

# Get the account number associated with the current IAM credentials
account=$(aws sts get-caller-identity --query Account --output text)

if [ $? -ne 0 ]
then
exit 255
fi


# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
region=${region:-us-east-1}


fullname="${account}.dkr.ecr.${region}.amazonaws.com/${image}:latest"

# If the repository doesn't exist in ECR, create it.

aws ecr describe-repositories --repository-names "${image}" > /dev/null 2>&1

if [ $? -ne 0 ]
then
aws ecr create-repository --repository-name "${image}" > /dev/null
fi

# Get the login command from ECR and execute it directly
$(aws ecr get-login --region ${region} --no-include-email)

# Build the docker image locally with the image name and then push it to ECR
# with the full name.

docker build -t ${image} .
docker tag ${image} ${fullname}

docker push ${fullname}
11 changes: 11 additions & 0 deletions 4_Kinesis/init_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pathlib import Path
import sys

algo_name=sys.argv[1]

Path('local/'+algo_name+'/input/data/training').mkdir(parents=True, exist_ok=True)
Path('local/'+algo_name+'/input/config').mkdir(parents=True, exist_ok=True)
Path('local/'+algo_name+'/model').mkdir(parents=True, exist_ok=True)

model_name=algo_name
print(algo_name)
168 changes: 168 additions & 0 deletions 4_Kinesis/model/algo_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import backtrader as bt
import backtrader.feeds as btfeeds
import backtrader.analyzers as btanalyzers
import backtrader.plot
import os
import pytz
from pytz import timezone
import requests
import json
import time
from algo_sim_feed import AlgoSimData
#from abc import classmethod

import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = [16,9]

# More documentation about backtrader: https://www.backtrader.com/

class AlgoStrategy():

def __init__(self,config,strategy):
self.config=config

self.cerebro = bt.Cerebro()
strategy.config=config
strategy.init_broker(self.cerebro.broker)
strategy.add_data(self.cerebro)
self.cerebro.addstrategy(strategy)

self.portfolioStartValue=self.cerebro.broker.getvalue()

self.cerebro.addanalyzer(btanalyzers.DrawDown, _name='dd')
self.cerebro.addanalyzer(btanalyzers.SharpeRatio_A, _name='sharpe')
self.cerebro.addanalyzer(btanalyzers.SQN, _name='sqn')
self.cerebro.addanalyzer(btanalyzers.TradeAnalyzer, _name='ta')

def performance(self):
analyzer=self.thestrat.analyzers.ta.get_analysis()
dd_analyzer=self.thestrat.analyzers.dd.get_analysis()

#Get the results we are interested in
total_open = analyzer.total.open
total_closed = analyzer.total.closed
total_won = analyzer.won.total
total_lost = analyzer.lost.total
win_streak = analyzer.streak.won.longest
lose_streak = analyzer.streak.lost.longest
pnl_net = round(analyzer.pnl.net.total,2)
strike_rate = (total_won / total_closed) * 100
#Designate the rows
h1 = ['Total Open', 'Total Closed', 'Total Won', 'Total Lost']
h2 = ['Strike Rate','Win Streak', 'Losing Streak', 'PnL Net']
h3 = ['DrawDown Pct','MoneyDown', '', '']
self.total_closed=total_closed
self.strike_rate=strike_rate
self.max_drawdown=dd_analyzer.max.drawdown
r1 = [total_open, total_closed,total_won,total_lost]
r2 = [('%.2f%%' %(strike_rate)), win_streak, lose_streak, pnl_net]
r3 = [('%.2f%%' %(dd_analyzer.max.drawdown)), dd_analyzer.max.moneydown, '', '']
#Check which set of headers is the longest.
header_length = max(len(h1),len(h2),len(h3))
#Print the rows
print_list = [h1,r1,h2,r2,h3,r3]
row_format ="{:<15}" * (header_length + 1)
print("Trade Analysis Results:")
for row in print_list:
print(row_format.format('',*row))

analyzer=self.thestrat.analyzers.sqn.get_analysis()
sharpe_analyzer=self.thestrat.analyzers.sharpe.get_analysis()
self.sqn = analyzer.sqn
self.sharpe_ratio = sharpe_analyzer['sharperatio']
if self.sharpe_ratio is None:
self.sharpe_ratio=0
self.pnl = self.cerebro.broker.getvalue()-self.portfolioStartValue
print('[SQN:%.2f, Sharpe Ratio:%.2f, Final Portfolio:%.2f, Total PnL:%.2f]' % (self.sqn,self.sharpe_ratio,self.cerebro.broker.getvalue(),self.pnl))

# plot
fig=self.cerebro.plot()
plt.savefig(os.path.join(StrategyTemplate.MODEL_PATH, 'chart.png'))

def submit(self):
try:
if 'submitUrl' in self.config:
name=self.config['user']+'@'+self.config['account']
algo=self.config['algo_name']
submitUrl=self.config['submitUrl']

URL = submitUrl
ts=str(int(time.time()))
PARAMS={'id': algo,
'name': name,
'trades': self.total_closed,
'strike_rate': self.strike_rate,
'max_drawdown': self.max_drawdown,
'pnl': self.pnl,
'sqn': self.sqn,
'sharpe_ratio': self.sharpe_ratio}
print("submit:%s" % (json.dumps(PARAMS)))
r = requests.get(url = URL, params = PARAMS, timeout=3)
print("status=%s,res=%s" % (r.status_code,r.text))
if r.status_code == 200:
print("performance submitted")
else:
print("error submitting performance:%s" % r.text)
except Exception as e:
print("error submitting performance:%s" % e)

def run(self):
thestrats = self.cerebro.run()
self.thestrat = thestrats[0]
self.performance()
self.submit()

class StrategyTemplate(bt.Strategy):

PREFIX='/opt/ml/'
TRAIN_FILE = os.path.join(PREFIX,'input/data/training/data.csv')
CONFIG_FILE = os.path.join(PREFIX,'input/config/hyperparameters.json')
MODEL_PATH = os.path.join(PREFIX,'model')

def __init__(self):
with open(StrategyTemplate.CONFIG_FILE, 'r') as f:
self.config = json.load(f)
print("[INIT]:config:%s=%s" % (StrategyTemplate.CONFIG_FILE,self.config))

self.lastDay=-1
self.lastMonth=-1
self.dataclose = self.datas[0].close

@staticmethod
def init_broker(broker):
pass

@staticmethod
def add_data(cerebro):
pass

def notify_order(self, order):
dt=self.datas[0].datetime.datetime(0)

if order.status in [order.Completed]:
if order.isbuy():
print(
'[%s] BUY EXECUTED, Price: %.2f, PNL: %.2f, Cash: %.2f' %
(dt,order.executed.price,order.executed.pnl,self.broker.getvalue()))
else: # Sell
print('[%s] SELL EXECUTED, Price: %.2f, PNL: %.2f, Cash: %.2f' %
(dt,order.executed.price,order.executed.pnl,self.broker.getvalue()))

def next(self):
dt=self.datas[0].datetime.datetime(0)
#print("[NEXT]:%s:close=%s" % (dt,self.dataclose[0]))

#SOM
if self.lastMonth!=dt.month:
if self.lastMonth!=-1:
chg=self.broker.getvalue()-self.monthCash
print("[%s] SOM:chg=%.2f,cash=%.2f" % (dt,chg,self.broker.getvalue()))
self.lastMonth=dt.month
self.monthCash=self.broker.getvalue()

#SOD
if self.lastDay!=dt.day:
self.lastDay=dt.day
print("[%s] SOD:cash=%.2f" % (dt,self.broker.getvalue()))
94 changes: 94 additions & 0 deletions 4_Kinesis/model/algo_live_feed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import datetime
import struct
import time

from backtrader.feed import DataBase
from backtrader import date2num
from backtrader import TimeFrame
import backtrader as bt
import math
import numpy as np
import pandas as pd
import json
import boto3

class AlgoLiveData(DataBase):
def __init__(self,region):
super(AlgoLiveData, self).__init__()
self.region=region
self.lambda_client = boto3.client('lambda',region_name=self.region)
self.connected=False

#start_date = '2017-08-11'
#now = datetime.datetime.now() # current date and time
#end_date = now.strftime("%Y-%m-%d")

#self.fromdate=pd.to_datetime(start_date, format = "%Y-%m-%d")
#self.todate=pd.to_datetime(end_date, format = "%Y-%m-%d")
self.timeframe=bt.TimeFrame.Ticks
print(self.lines.datetime.array)

def start(self):
print("start feed")
print(self.lines.datetime.array)

def stop(self):
print("stop feed")

def islive(self):
'''Returns ``True`` to notify ``Cerebro`` that preloading and runonce
should be deactivated'''
return True

def haslivedata(self):
'''Returns ``True`` to notify ``Cerebro`` that preloading and runonce
should be deactivated'''
return self.connected

def _load(self):
#print("A:%s" % self.lines.datetime.array)
if not self.connected:
while not self.connected:
self.pull()
else:
self.pull()
return True

def pull(self):
#print("B:%s" % self.lines.datetime.array)
if math.isnan(self.lines.datetime[0]):
now = datetime.datetime.now()
self.lines.datetime[0]=date2num(now)
now=datetime.datetime.now()
try:
item={}
res=self.lambda_client.invoke(
FunctionName='algo_market_data',
InvocationType='RequestResponse',
Payload=json.dumps(item)
)
t=res['Payload']
l=json.loads(t.read().decode('utf-8'))
print("load:%s" % l)

#print(self.lines.datetime.array)
#print(self.lines.close.array)

for x in l:
dt=pd.to_datetime(x['date'], format = "%Y-%m-%d")
#print(dt)
close=x['close']

self.lines.datetime[0] = date2num(datetime.datetime.now())
self.lines.open[0] = close
self.lines.high[0] = close
self.lines.low[0] = close
self.lines.close[0] = close
self.lines.volume[0] = 0

self.connected=True
self._laststatus=self.LIVE
#print("connected")
except Exception as e:
print("err:%s" % e)
time.sleep(5)
Loading

0 comments on commit ba6e4cc

Please sign in to comment.